Bläddra i källkod

新增消息队列并发批量消费工具方法

woody 1 år sedan
förälder
incheckning
2c8cf9c8d9

+ 61 - 0
framework-base/src/main/java/com/chelvc/framework/base/util/ThreadUtils.java

@@ -10,8 +10,10 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import com.google.common.collect.Lists;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.CollectionUtils;
 
 /**
  * 线程工具类
@@ -168,6 +170,47 @@ public final class ThreadUtils {
         })).collect(Collectors.toList());
     }
 
+    /**
+     * 异步批量消费消息
+     *
+     * @param queue       阻塞队列
+     * @param consumer    消息消费者
+     * @param running     消费线程运行开关
+     * @param concurrency 并发数量
+     * @param batchSize   批处理数量
+     * @param <T>         消息类型
+     * @return 消费线程列表
+     */
+    public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<List<T>> consumer,
+                                           @NonNull Supplier<Boolean> running, int concurrency, int batchSize) {
+        return IntStream.range(0, concurrency).mapToObj(i -> run(() -> {
+            List<T> buffer = Lists.newArrayListWithCapacity(batchSize);
+            while (Boolean.TRUE.equals(running.get()) && !Thread.interrupted()) {
+                T message;
+                try {
+                    message = queue.poll(1, TimeUnit.SECONDS);
+                } catch (InterruptedException ignored) {
+                    break;
+                }
+                if (message != null) {
+                    buffer.add(message);
+                    if (buffer.size() % batchSize == 0) {
+                        try {
+                            consumer.accept(buffer);
+                        } catch (Exception e) {
+                            log.error(e.getMessage(), e);
+                        } finally {
+                            buffer.clear();
+                        }
+                    }
+                }
+            }
+            if (!CollectionUtils.isEmpty(buffer)) {
+                consumer.accept(buffer);
+            }
+        })).collect(Collectors.toList());
+    }
+
     /**
      * 异步消费消息
      *
@@ -272,4 +315,22 @@ public final class ThreadUtils {
         consume(queue, consumer, running, concurrency);
         return queue;
     }
+
+    /**
+     * 异步批量消费消息
+     *
+     * @param capacity    队列大小
+     * @param consumer    消息消费者
+     * @param running     消费线程运行开关
+     * @param concurrency 并发数量
+     * @param batchSize   批处理数量
+     * @param <T>         消息类型
+     * @return 消息队列
+     */
+    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<List<T>> consumer,
+                                             @NonNull Supplier<Boolean> running, int concurrency, int batchSize) {
+        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
+        consume(queue, consumer, running, concurrency, batchSize);
+        return queue;
+    }
 }