Przeglądaj źródła

优化多线程逻辑

woody 1 rok temu
rodzic
commit
4648411bad

+ 10 - 4
framework-base/src/main/java/com/chelvc/framework/base/context/ThreadContextHolder.java

@@ -259,8 +259,11 @@ public final class ThreadContextHolder {
             for (CompletableFuture<T> future : futures) {
                 try {
                     consumer.accept(future.get());
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (ExecutionException e) {
                     throw new RuntimeException(e);
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+                    break;
                 }
             }
         }
@@ -276,13 +279,16 @@ public final class ThreadContextHolder {
     public static <T> void results(@NonNull Collection<CompletableFuture<T>> futures, @NonNull Consumer<T> consumer) {
         if (!CollectionUtils.isEmpty(futures)) {
             join(futures);
-            futures.forEach(future -> {
+            for (CompletableFuture<T> future : futures) {
                 try {
                     consumer.accept(future.get());
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (ExecutionException e) {
                     throw new RuntimeException(e);
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+                    break;
                 }
-            });
+            }
         }
     }
 }

+ 2 - 2
framework-common/src/main/java/com/chelvc/framework/common/util/IdentityUtils.java

@@ -226,8 +226,8 @@ public final class IdentityUtils {
                 // 时间回退timeOffset毫秒内,则允许等待2倍的偏移量后重新获取,解决小范围的时间回拨问题
                 try {
                     this.wait(offset << 1);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
                 }
 
                 // 再次获取并校验

+ 59 - 97
framework-common/src/main/java/com/chelvc/framework/common/util/ThreadUtils.java

@@ -1,12 +1,12 @@
 package com.chelvc.framework.common.util;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -35,6 +35,7 @@ public final class ThreadUtils {
         try {
             Thread.sleep(millis);
         } catch (InterruptedException ignored) {
+            Thread.currentThread().interrupt();
             return false;
         }
         return true;
@@ -79,6 +80,8 @@ public final class ThreadUtils {
             try {
                 thread.join();
             } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+                break;
             }
         }
     }
@@ -93,34 +96,32 @@ public final class ThreadUtils {
             try {
                 thread.join();
             } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+                break;
             }
         }
     }
 
     /**
-     * 异步消费消息
+     * 中断线程
      *
-     * @param queue    阻塞队列
-     * @param consumer 消息消费者
-     * @param <T>      消息类型
-     * @return 消费线程列表
+     * @param threads 线程对象数组
      */
-    public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<T> consumer) {
-        return consume(queue, consumer, 3);
+    public static void interrupt(@NonNull Thread... threads) {
+        for (Thread thread : threads) {
+            thread.interrupt();
+        }
     }
 
     /**
-     * 异步消费消息
+     * 中断线程
      *
-     * @param queue       阻塞队列
-     * @param consumer    消息消费者
-     * @param concurrency 并发数量
-     * @param <T>         消息类型
-     * @return 消费线程列表
+     * @param threads 线程对象集合
      */
-    public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue,
-                                           @NonNull Consumer<T> consumer, int concurrency) {
-        return consume(queue, consumer, () -> true, concurrency);
+    public static void interrupt(@NonNull Collection<Thread> threads) {
+        for (Thread thread : threads) {
+            thread.interrupt();
+        }
     }
 
     /**
@@ -128,14 +129,11 @@ public final class ThreadUtils {
      *
      * @param queue    阻塞队列
      * @param consumer 消息消费者
-     * @param running  消费线程运行开关
      * @param <T>      消息类型
      * @return 消费线程列表
      */
-    public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue,
-                                           @NonNull Consumer<T> consumer,
-                                           @NonNull Supplier<Boolean> running) {
-        return consume(queue, consumer, running, 3);
+    public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<T> consumer) {
+        return consume(queue, consumer, 3);
     }
 
     /**
@@ -143,21 +141,23 @@ public final class ThreadUtils {
      *
      * @param queue       阻塞队列
      * @param consumer    消息消费者
-     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param <T>         消息类型
      * @return 消费线程列表
      */
     public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<T> consumer,
-                                           @NonNull Supplier<Boolean> running, int concurrency) {
+                                           int concurrency) {
         return IntStream.range(0, concurrency).mapToObj(i -> run(() -> {
-            while (Boolean.TRUE.equals(running.get()) && !Thread.interrupted()) {
-                T message;
+            while (!Thread.currentThread().isInterrupted()) {
+                // 从队列中获取消息
+                T message = null;
                 try {
                     message = queue.poll(1, TimeUnit.SECONDS);
                 } catch (InterruptedException ignored) {
-                    break;
+                    Thread.currentThread().interrupt();
                 }
+
+                // 消费消息
                 if (message != null) {
                     try {
                         consumer.accept(message);
@@ -174,38 +174,43 @@ public final class ThreadUtils {
      *
      * @param queue       阻塞队列
      * @param consumer    消息消费者
-     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param batchSize   批处理数量
      * @param <T>         消息类型
      * @return 消费线程列表
      */
     public static <T> List<Thread> consume(@NonNull BlockingQueue<T> queue, @NonNull Consumer<List<T>> consumer,
-                                           @NonNull Supplier<Boolean> running, int concurrency, int batchSize) {
+                                           int concurrency, int batchSize) {
         return IntStream.range(0, concurrency).mapToObj(i -> run(() -> {
             List<T> buffer = Lists.newArrayListWithCapacity(batchSize);
-            while (Boolean.TRUE.equals(running.get()) && !Thread.interrupted()) {
-                T message;
+            List<T> unmodifiable = Collections.unmodifiableList(buffer);
+            while (!Thread.currentThread().isInterrupted()) {
+                // 从队列中获取消息
+                T message = null;
                 try {
-                    message = queue.poll(1, TimeUnit.SECONDS);
+                    message = queue.poll(3, TimeUnit.SECONDS);
                 } catch (InterruptedException ignored) {
-                    break;
+                    Thread.currentThread().interrupt();
                 }
+
+                // 将消息加入缓冲区
                 if (message != null) {
                     buffer.add(message);
-                    if (buffer.size() % batchSize == 0) {
-                        try {
-                            consumer.accept(buffer);
-                        } catch (Exception e) {
-                            log.error(e.getMessage(), e);
-                        } finally {
-                            buffer.clear();
-                        }
+                }
+
+                // 如果获取消息超时或缓冲区消息数量达到batchSize则执行消费
+                if (buffer.size() > 0 && (message == null || buffer.size() % batchSize == 0)) {
+                    try {
+                        consumer.accept(unmodifiable);
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                    } finally {
+                        buffer.clear();
                     }
                 }
             }
             if (ObjectUtils.notEmpty(buffer)) {
-                consumer.accept(buffer);
+                consumer.accept(unmodifiable);
             }
         })).collect(Collectors.toList());
     }
@@ -218,19 +223,7 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer) {
-        return queue(consumer, () -> true);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer 消息消费者
-     * @param running  消费线程运行开关
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> running) {
-        return queue(1000, consumer, running);
+        return queue(1000, consumer);
     }
 
     /**
@@ -242,21 +235,7 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer) {
-        return queue(capacity, consumer, () -> true);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param capacity 队列大小
-     * @param consumer 消息消费者
-     * @param running  消费线程运行开关
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer,
-                                             @NonNull Supplier<Boolean> running) {
-        return queue(capacity, consumer, running, 3);
+        return queue(capacity, consumer, 3);
     }
 
     /**
@@ -268,21 +247,7 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, int concurrency) {
-        return queue(consumer, () -> true, concurrency);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer    消息消费者
-     * @param running     消费线程运行开关
-     * @param concurrency 并发数量
-     * @param <T>         消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, @NonNull Supplier<Boolean> running,
-                                             int concurrency) {
-        return queue(1000, consumer, running, concurrency);
+        return queue(1000, consumer, concurrency);
     }
 
     /**
@@ -295,24 +260,22 @@ public final class ThreadUtils {
      * @return 消息队列
      */
     public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer, int concurrency) {
-        return queue(capacity, consumer, () -> true, concurrency);
+        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
+        consume(queue, consumer, concurrency);
+        return queue;
     }
 
     /**
      * 异步消费消息
      *
-     * @param capacity    队列大小
      * @param consumer    消息消费者
-     * @param running     消费线程运行开关
      * @param concurrency 并发数量
+     * @param batchSize   批处理数量
      * @param <T>         消息类型
      * @return 消息队列
      */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer,
-                                             @NonNull Supplier<Boolean> running, int concurrency) {
-        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer, running, concurrency);
-        return queue;
+    public static <T> BlockingQueue<T> queue(@NonNull Consumer<List<T>> consumer, int concurrency, int batchSize) {
+        return queue(1000, consumer, concurrency, batchSize);
     }
 
     /**
@@ -320,16 +283,15 @@ public final class ThreadUtils {
      *
      * @param capacity    队列大小
      * @param consumer    消息消费者
-     * @param running     消费线程运行开关
      * @param concurrency 并发数量
      * @param batchSize   批处理数量
      * @param <T>         消息类型
      * @return 消息队列
      */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<List<T>> consumer,
-                                             @NonNull Supplier<Boolean> running, int concurrency, int batchSize) {
+    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<List<T>> consumer, int concurrency,
+                                             int batchSize) {
         BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer, running, concurrency, batchSize);
+        consume(queue, consumer, concurrency, batchSize);
         return queue;
     }
 }

+ 2 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisContextHolder.java

@@ -593,7 +593,8 @@ public final class RedisContextHolder {
             if (StringUtils.isEmpty(token = tryLock(name, duration))) {
                 ThreadUtils.sleep(200);
             }
-        } while (StringUtils.isEmpty(token) && System.currentTimeMillis() <= expiration && !Thread.interrupted());
+        } while (StringUtils.isEmpty(token) && System.currentTimeMillis() <= expiration
+                && !Thread.currentThread().isInterrupted());
         if (StringUtils.isEmpty(token)) {
             throw new IllegalStateException("Redis lock timeout");
         }

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/stream/MessageStreamListener.java

@@ -53,7 +53,7 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
         // 初始化延时消息消费服务
         this.executor.execute(() -> {
             List<String> keys = Collections.singletonList(this.delayed);
-            while (!Thread.interrupted()) {
+            while (!Thread.currentThread().isInterrupted()) {
                 // 批量获取延时消息(100条),为避免重复获取将消息延时时间增加1分钟
                 String timestamp = String.valueOf(System.currentTimeMillis());
                 List<String> messages = RedisContextHolder.getStringTemplate().execute(

+ 1 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java

@@ -112,7 +112,7 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
      */
     private Thread startup() {
         return ThreadUtils.run(() -> {
-            while (this.running && !Thread.interrupted()) {
+            while (this.running && !Thread.currentThread().isInterrupted()) {
                 // 批量接收消息
                 List<MessageView> messages = this.receive();
                 if (ObjectUtils.isEmpty(messages)) {

+ 7 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -87,7 +87,7 @@ public final class RocketMQContextHolder {
     static {
         // 初始化消息发送降级处理线程
         ThreadUtils.run(() -> {
-            while (!Thread.interrupted()) {
+            while (!Thread.currentThread().isInterrupted()) {
                 // 尝试从队列中获取降级处理信息,如果为空则等待1分钟后继续
                 MessageSending sending = MESSAGE_SENDING_QUEUE.peek();
                 if (sending == null) {
@@ -336,7 +336,8 @@ public final class RocketMQContextHolder {
             // 降级处理
             try {
                 MESSAGE_SENDING_QUEUE.put(new MessageSending(producer, message));
-            } catch (InterruptedException ignore) {
+            } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
             }
         } catch (ClientException e) {
             throw new RuntimeException(e);
@@ -447,7 +448,8 @@ public final class RocketMQContextHolder {
                 // 降级处理
                 try {
                     MESSAGE_SENDING_QUEUE.put(new MessageSending(producer, message));
-                } catch (InterruptedException ignore) {
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
                 }
             } else {
                 log.error("RocketMQ message async send failed: {}", message.getTopic(), e);
@@ -576,7 +578,8 @@ public final class RocketMQContextHolder {
             // 降级处理
             try {
                 MESSAGE_SENDING_QUEUE.put(new TransactionalMessageSending(producer, message, executor));
-            } catch (InterruptedException ignore) {
+            } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
             }
         } catch (ClientException e) {
             throw new RuntimeException(e);