浏览代码

优化线程处理逻辑

woody 2 年之前
父节点
当前提交
7c6955dcc5
共有 1 个文件被更改,包括 23 次插入25 次删除
  1. 23 25
      framework-base/src/main/java/com/chelvc/framework/base/util/ThreadUtils.java

+ 23 - 25
framework-base/src/main/java/com/chelvc/framework/base/util/ThreadUtils.java

@@ -78,7 +78,6 @@ public final class ThreadUtils {
             try {
                 thread.join();
             } catch (InterruptedException ignored) {
-                break;
             }
         }
     }
@@ -93,7 +92,6 @@ public final class ThreadUtils {
             try {
                 thread.join();
             } catch (InterruptedException ignored) {
-                break;
             }
         }
     }
@@ -121,7 +119,7 @@ public final class ThreadUtils {
      */
     public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue,
                                            @NonNull Consumer<T> consumer, int concurrency) {
-        return consume(queue, consumer, () -> false, concurrency);
+        return consume(queue, consumer, () -> true, concurrency);
     }
 
     /**
@@ -129,14 +127,14 @@ public final class ThreadUtils {
      *
      * @param queue    阻塞队列
      * @param consumer 消息消费者
-     * @param breaker  中断方法(返回true表示中断)
+     * @param running  消费线程运行开关
      * @param <T>      消息类型
      * @return 消费线程列表
      */
     public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue,
                                            @NonNull Consumer<T> consumer,
-                                           @NonNull Supplier<Boolean> breaker) {
-        return consume(queue, consumer, breaker, 3);
+                                           @NonNull Supplier<Boolean> running) {
+        return consume(queue, consumer, running, 3);
     }
 
     /**
@@ -144,15 +142,15 @@ public final class ThreadUtils {
      *
      * @param queue       阻塞队列
      * @param consumer    消息消费者
-     * @param breaker     中断方法(返回true表示中断)
+     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param <T>         消息类型
      * @return 消费线程列表
      */
     public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<T> consumer,
-                                           @NonNull Supplier<Boolean> breaker, int concurrency) {
+                                           @NonNull Supplier<Boolean> running, int concurrency) {
         return IntStream.range(0, concurrency).mapToObj(i -> run(() -> {
-            while (!Thread.interrupted() && !Boolean.TRUE.equals(breaker.get())) {
+            while (Boolean.TRUE.equals(running.get()) && !Thread.interrupted()) {
                 T message;
                 try {
                     message = queue.poll(1, TimeUnit.SECONDS);
@@ -178,19 +176,19 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer) {
-        return queue(consumer, () -> false);
+        return queue(consumer, () -> true);
     }
 
     /**
      * 异步消费消息
      *
      * @param consumer 消息消费者
-     * @param breaker  中断方法(返回true表示中断)
+     * @param running  消费线程运行开关
      * @param <T>      消息类型
      * @return 消息队列
      */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> breaker) {
-        return queue(1000, consumer, breaker);
+    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> running) {
+        return queue(1000, consumer, running);
     }
 
     /**
@@ -202,7 +200,7 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer) {
-        return queue(capacity, consumer, () -> false);
+        return queue(capacity, consumer, () -> true);
     }
 
     /**
@@ -210,13 +208,13 @@ public final class ThreadUtils {
      *
      * @param capacity 队列大小
      * @param consumer 消息消费者
-     * @param breaker  中断方法(返回true表示中断)
+     * @param running  消费线程运行开关
      * @param <T>      消息类型
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer,
-                                             @NonNull Supplier<Boolean> breaker) {
-        return queue(capacity, consumer, breaker, 3);
+                                             @NonNull Supplier<Boolean> running) {
+        return queue(capacity, consumer, running, 3);
     }
 
     /**
@@ -228,21 +226,21 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, int concurrency) {
-        return queue(consumer, () -> false, concurrency);
+        return queue(consumer, () -> true, concurrency);
     }
 
     /**
      * 异步消费消息
      *
      * @param consumer    消息消费者
-     * @param breaker     中断方法(返回true表示中断)
+     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param <T>         消息类型
      * @return 消息队列
      */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> breaker,
+    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> running,
                                              int concurrency) {
-        return queue(1000, consumer, breaker, concurrency);
+        return queue(1000, consumer, running, concurrency);
     }
 
     /**
@@ -255,7 +253,7 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer, int concurrency) {
-        return queue(capacity, consumer, () -> false, concurrency);
+        return queue(capacity, consumer, () -> true, concurrency);
     }
 
     /**
@@ -263,15 +261,15 @@ public final class ThreadUtils {
      *
      * @param capacity    队列大小
      * @param consumer    消息消费者
-     * @param breaker     中断方法(返回true表示中断)
+     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param <T>         消息类型
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer,
-                                             @NonNull Supplier<Boolean> breaker, int concurrency) {
+                                             @NonNull Supplier<Boolean> running, int concurrency) {
         BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer, breaker, concurrency);
+        consume(queue, consumer, running, concurrency);
         return queue;
     }
 }