Woody 3 týždňov pred
rodič
commit
262cfe9dbd
56 zmenil súbory, kde vykonal 1305 pridanie a 769 odobranie
  1. 24 4
      framework-base/src/main/java/com/chelvc/framework/base/context/ApplicationContextHolder.java
  2. 1 1
      framework-base/src/main/java/com/chelvc/framework/base/context/SessionContextHolder.java
  3. 3 3
      framework-base/src/main/java/com/chelvc/framework/base/context/ThreadContextHolder.java
  4. 4 4
      framework-common/src/main/java/com/chelvc/framework/common/model/Paging.java
  5. 1 1
      framework-common/src/main/java/com/chelvc/framework/common/model/Period.java
  6. 1 1
      framework-common/src/main/java/com/chelvc/framework/common/model/Pool.java
  7. 30 6
      framework-common/src/main/java/com/chelvc/framework/common/util/AssertUtils.java
  8. 2 2
      framework-common/src/main/java/com/chelvc/framework/common/util/BarcodeUtils.java
  9. 1 1
      framework-common/src/main/java/com/chelvc/framework/common/util/DateUtils.java
  10. 6 6
      framework-common/src/main/java/com/chelvc/framework/common/util/DesensitizeUtils.java
  11. 3 3
      framework-common/src/main/java/com/chelvc/framework/common/util/ExcelUtils.java
  12. 2 2
      framework-common/src/main/java/com/chelvc/framework/common/util/IdentityUtils.java
  13. 2 2
      framework-common/src/main/java/com/chelvc/framework/common/util/ObjectUtils.java
  14. 5 5
      framework-common/src/main/java/com/chelvc/framework/common/util/StringUtils.java
  15. 0 106
      framework-common/src/main/java/com/chelvc/framework/common/util/ThreadUtils.java
  16. 1 1
      framework-database/src/main/java/com/chelvc/framework/database/support/Updates.java
  17. 2 2
      framework-jpush/src/main/java/com/chelvc/framework/jpush/DefaultJPushHandler.java
  18. 7 0
      framework-kafka/pom.xml
  19. 10 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaConfigurer.java
  20. 102 12
      framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java
  21. 1 1
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java
  22. 17 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/KafkaTransactionChecker.java
  23. 40 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/StandardTransactionChecker.java
  24. 31 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionCheckProcessor.java
  25. 51 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessage.java
  26. 73 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java
  27. 24 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionResolution.java
  28. 1 1
      framework-location/src/main/java/com/chelvc/framework/location/support/CacheableLocationHandler.java
  29. 1 6
      framework-nacos/src/main/java/com/chelvc/framework/nacos/context/NacosContextHolder.java
  30. 2 4
      framework-redis/src/main/java/com/chelvc/framework/redis/config/RedisMQListenerRegistry.java
  31. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisHashHolder.java
  32. 19 2
      framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java
  33. 5 5
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java
  34. 179 0
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/DelayRedisQueue.java
  35. 16 52
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java
  36. 303 0
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/PriorityRedisQueue.java
  37. 192 0
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/Queues.java
  38. 7 38
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java
  39. 0 361
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalRedisQueue.java
  40. 6 4
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQConfigurer.java
  41. 2 4
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQListenerRegistry.java
  42. 17 35
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java
  43. 12 20
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/SingleRocketMQListenerContainer.java
  44. 1 1
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java
  45. 0 47
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/CommonTransactionChecker.java
  46. 52 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/StandardTransactionChecker.java
  47. 4 4
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionCheckProcessor.java
  48. 1 1
      framework-sms/src/main/java/com/chelvc/framework/sms/support/DefaultCaptchaSmsHandler.java
  49. 1 1
      framework-sms/src/main/java/com/chelvc/framework/sms/support/DelegatingNormalSmsHandler.java
  50. 1 1
      framework-sms/src/main/java/com/chelvc/framework/sms/support/DelegatingTemplateSmsHandler.java
  51. 14 6
      framework-upload/src/main/java/com/chelvc/framework/upload/support/AliyunUploadHandler.java
  52. 5 2
      framework-upload/src/main/java/com/chelvc/framework/upload/support/LocalUploadHandler.java
  53. 11 5
      framework-upload/src/main/java/com/chelvc/framework/upload/support/TencentUploadHandler.java
  54. 4 2
      framework-wechat/src/main/java/com/chelvc/framework/wechat/context/WechatContextHolder.java
  55. 2 2
      framework-wechat/src/main/java/com/chelvc/framework/wechat/support/DefaultWechatPaymentHandler.java
  56. 2 1
      framework-wechat/src/main/java/com/chelvc/framework/wechat/support/DefaultWechatPublicHandler.java

+ 24 - 4
framework-base/src/main/java/com/chelvc/framework/base/context/ApplicationContextHolder.java

@@ -101,7 +101,17 @@ public class ApplicationContextHolder implements EnvironmentAware, ApplicationCo
      * @return 环境标识
      */
     public static String getProfile() {
-        Environment environment = getEnvironment(false);
+        return getProfile(false);
+    }
+
+    /**
+     * 获取环境标识
+     *
+     * @param required 是否必须
+     * @return 环境标识
+     */
+    public static String getProfile(boolean required) {
+        Environment environment = getEnvironment(required);
         return environment == null ? null : getProfile(environment);
     }
 
@@ -142,7 +152,17 @@ public class ApplicationContextHolder implements EnvironmentAware, ApplicationCo
      * @return 应用名称
      */
     public static String getApplicationName() {
-        Environment environment = getEnvironment(false);
+        return getApplicationName(false);
+    }
+
+    /**
+     * 获取应用名称
+     *
+     * @param required 是否必须
+     * @return 应用名称
+     */
+    public static String getApplicationName(boolean required) {
+        Environment environment = getEnvironment(required);
         return environment == null ? null : getApplicationName(environment);
     }
 
@@ -184,7 +204,7 @@ public class ApplicationContextHolder implements EnvironmentAware, ApplicationCo
      */
     public static Environment getEnvironment(boolean required) {
         if (required) {
-            return AssertUtils.nonnull(ENVIRONMENT, () -> "Application environment has not been initialized");
+            AssertUtils.state(ENVIRONMENT != null, () -> "Application environment has not been initialized");
         }
         return ENVIRONMENT;
     }
@@ -206,7 +226,7 @@ public class ApplicationContextHolder implements EnvironmentAware, ApplicationCo
      */
     public static ApplicationContext getApplicationContext(boolean required) {
         if (required) {
-            return AssertUtils.nonnull(APPLICATION_CONTEXT, () -> "Application context has not been initialized");
+            AssertUtils.state(APPLICATION_CONTEXT != null, () -> "Application context has not been initialized");
         }
         return APPLICATION_CONTEXT;
     }

+ 1 - 1
framework-base/src/main/java/com/chelvc/framework/base/context/SessionContextHolder.java

@@ -133,7 +133,7 @@ public class SessionContextHolder implements ServletRequestListener {
             session = null;
         }
         if (required) {
-            return AssertUtils.nonnull(session, () -> "Session has not been initialized");
+            AssertUtils.state(session != null, () -> "Session has not been initialized");
         }
         return session;
     }

+ 3 - 3
framework-base/src/main/java/com/chelvc/framework/base/context/ThreadContextHolder.java

@@ -147,10 +147,10 @@ public final class ThreadContextHolder {
     /**
      * 异步执行方法(等待执行完成)
      *
-     * @param array Runnable实例数组
+     * @param runs Runnable实例数组
      */
-    public static void execute(@NonNull Runnable... array) {
-        execute(getDefaultExecutor(), array);
+    public static void execute(@NonNull Runnable... runs) {
+        execute(getDefaultExecutor(), runs);
     }
 
     /**

+ 4 - 4
framework-common/src/main/java/com/chelvc/framework/common/model/Paging.java

@@ -91,8 +91,8 @@ public final class Paging implements Serializable {
      * @return 分页实例
      */
     public static Paging of(int number, int size, boolean counting) {
-        AssertUtils.check(number > 0, () -> "Page's number must be greater than 0");
-        AssertUtils.check(size > 0, () -> "Page's size must be greater than 0");
+        AssertUtils.arg(number > 0, () -> "Page's number must be greater than 0");
+        AssertUtils.arg(size > 0, () -> "Page's size must be greater than 0");
         Paging paging = new Paging();
         paging.number = number;
         paging.size = size;
@@ -124,8 +124,8 @@ public final class Paging implements Serializable {
      * @return 分页后的对象列表
      */
     public static <T> List<T> list(List<T> objects, int page, int size) {
-        AssertUtils.check(page > 0, () -> "page must be greater than 0");
-        AssertUtils.check(size > 0, () -> "size must be greater than 0");
+        AssertUtils.arg(page > 0, () -> "page must be greater than 0");
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
         if (ObjectUtils.isEmpty(objects)) {
             return objects;
         }

+ 1 - 1
framework-common/src/main/java/com/chelvc/framework/common/model/Period.java

@@ -166,7 +166,7 @@ public final class Period implements Serializable {
      * @return 时间周期实例
      */
     public static Period of(Date begin, Date end) {
-        AssertUtils.check(begin != null || end != null, () -> "Period's begin or end must not be null");
+        AssertUtils.arg(begin != null || end != null, () -> "Period's begin or end must not be null");
         Period period = new Period();
         if (begin != null && end != null && begin.after(end)) {
             period.begin = end;

+ 1 - 1
framework-common/src/main/java/com/chelvc/framework/common/model/Pool.java

@@ -68,7 +68,7 @@ public class Pool<T> {
     }
 
     public Pool(long idle, @NonNull Supplier<T> supplier) {
-        AssertUtils.check(idle > 0, () -> "Pool idle must be greater than 0");
+        AssertUtils.arg(idle > 0, () -> "Pool idle must be greater than 0");
         this.idle = idle;
         this.root = new Node<>((this.supplier = supplier).get(), null, -1);
     }

+ 30 - 6
framework-common/src/main/java/com/chelvc/framework/common/util/AssertUtils.java

@@ -21,29 +21,53 @@ public final class AssertUtils {
     }
 
     /**
-     * 条件真假检测
+     * 参数真假检测
      *
      * @param expression 真假表达式
      * @param message    异常消息
      */
-    public static void check(boolean expression, String message) {
+    public static void arg(boolean expression, String message) {
         if (!expression) {
             throw new IllegalArgumentException(message);
         }
     }
 
     /**
-     * 条件真假检测
+     * 参数真假检测
      *
      * @param expression 真假表达式
      * @param message    异常消息回调函数
      */
-    public static void check(boolean expression, @NonNull Supplier<String> message) {
+    public static void arg(boolean expression, @NonNull Supplier<String> message) {
         if (!expression) {
             throw new IllegalArgumentException(message.get());
         }
     }
 
+    /**
+     * 状态真假检测
+     *
+     * @param expression 真假表达式
+     * @param message    异常消息
+     */
+    public static void state(boolean expression, String message) {
+        if (!expression) {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    /**
+     * 状态真假检测
+     *
+     * @param expression 真假表达式
+     * @param message    异常消息回调函数
+     */
+    public static void state(boolean expression, @NonNull Supplier<String> message) {
+        if (!expression) {
+            throw new IllegalStateException(message.get());
+        }
+    }
+
     /**
      * 参数有效性校验
      *
@@ -53,7 +77,7 @@ public final class AssertUtils {
      * @param <T>        函数调用对象类型
      * @param <R>        函数调用返回类型
      */
-    public static <T, R> void check(boolean expression, @NonNull Getter<T, R> getter, @NonNull String message) {
+    public static <T, R> void param(boolean expression, @NonNull Getter<T, R> getter, @NonNull String message) {
         if (!expression) {
             throw new ParameterInvalidException(ObjectUtils.lookupGetterProperty(getter), message);
         }
@@ -68,7 +92,7 @@ public final class AssertUtils {
      * @param <T>        函数调用对象类型
      * @param <R>        函数调用返回类型
      */
-    public static <T, R> void check(boolean expression, @NonNull Getter<T, R> getter,
+    public static <T, R> void param(boolean expression, @NonNull Getter<T, R> getter,
                                     @NonNull Supplier<String> message) {
         if (!expression) {
             throw new ParameterInvalidException(ObjectUtils.lookupGetterProperty(getter), message.get());

+ 2 - 2
framework-common/src/main/java/com/chelvc/framework/common/util/BarcodeUtils.java

@@ -75,8 +75,8 @@ public final class BarcodeUtils {
      * @return 图片对象
      */
     public static BufferedImage encode(@NonNull String content, @NonNull BarcodeFormat format, int width, int height) {
-        AssertUtils.check(width > 0, () -> "width must be greater than 0");
-        AssertUtils.check(height > 0, () -> "height must be greater than 0");
+        AssertUtils.arg(width > 0, () -> "width must be greater than 0");
+        AssertUtils.arg(height > 0, () -> "height must be greater than 0");
         Map<EncodeHintType, Object> hints =
                 ImmutableMap.of(EncodeHintType.CHARACTER_SET, StandardCharsets.UTF_8.name());
         try {

+ 1 - 1
framework-common/src/main/java/com/chelvc/framework/common/util/DateUtils.java

@@ -1200,7 +1200,7 @@ public final class DateUtils {
      */
     public static Duration countdown(int type) {
         int index = Arrays.binarySearch(COUNTDOWN_SUPPORT_FIELDS, type);
-        AssertUtils.check(index > -1, () -> "Not support countdown type: " + type);
+        AssertUtils.arg(index > -1, () -> "Not support countdown type: " + type);
         Calendar calendar = Calendar.getInstance();
         calendar.add(COUNTDOWN_SUPPORT_FIELDS[index], 1);
         for (int i = index + 1; i < COUNTDOWN_SUPPORT_FIELDS.length; i++) {

+ 6 - 6
framework-common/src/main/java/com/chelvc/framework/common/util/DesensitizeUtils.java

@@ -175,8 +175,8 @@ public final class DesensitizeUtils {
         private final int ent;
 
         public Index(int start, int end) {
-            AssertUtils.check(start > -1, () -> "start must be greater than -1");
-            AssertUtils.check(end > -1, () -> "end must be greater than -1");
+            AssertUtils.arg(start > -1, () -> "start must be greater than -1");
+            AssertUtils.arg(end > -1, () -> "end must be greater than -1");
             this.start = start;
             this.ent = end;
         }
@@ -235,7 +235,7 @@ public final class DesensitizeUtils {
         private final int offset;
 
         public Fixed(@NonNull Position position, int number, int offset) {
-            AssertUtils.check(number > 0, () -> "number must be greater than 0");
+            AssertUtils.arg(number > 0, () -> "number must be greater than 0");
             this.position = position;
             this.number = number;
             this.offset = offset;
@@ -280,7 +280,7 @@ public final class DesensitizeUtils {
         private final int offset;
 
         public Ratio(@NonNull Position position, float number, int offset) {
-            AssertUtils.check(number > 0, () -> "number must be greater than 0");
+            AssertUtils.arg(number > 0, () -> "number must be greater than 0");
             this.position = position;
             this.number = number;
             this.offset = offset;
@@ -322,8 +322,8 @@ public final class DesensitizeUtils {
         private final int after;
 
         public Reverse(int before, int after) {
-            AssertUtils.check(before > -1, () -> "before must be greater than -1");
-            AssertUtils.check(after > -1, () -> "after must be greater than -1");
+            AssertUtils.arg(before > -1, () -> "before must be greater than -1");
+            AssertUtils.arg(after > -1, () -> "after must be greater than -1");
             this.before = before;
             this.after = after;
         }

+ 3 - 3
framework-common/src/main/java/com/chelvc/framework/common/util/ExcelUtils.java

@@ -347,7 +347,7 @@ public final class ExcelUtils {
      * @return 读取数量
      */
     public static int read(@NonNull Workbook workbook, int index, @NonNull BiConsumer<Row, Integer> reader) {
-        AssertUtils.check(index > -1, () -> "index must be greater than -1");
+        AssertUtils.arg(index > -1, () -> "index must be greater than -1");
         int count = 0;
         for (int i = 0, n = workbook.getNumberOfSheets(); i < n; i++) {
             Sheet sheet = workbook.getSheetAt(i);
@@ -756,7 +756,7 @@ public final class ExcelUtils {
         private Object value;
 
         public XMLCell(@NonNull Row row, int column, CellType type) {
-            AssertUtils.check(column > -1, () -> "column must be greater than -1");
+            AssertUtils.arg(column > -1, () -> "column must be greater than -1");
             this.row = row;
             this.column = column;
             this.type = type;
@@ -1008,7 +1008,7 @@ public final class ExcelUtils {
         private String value;
 
         public AbstractExcelReader(@NonNull OPCPackage pkg, int index) {
-            AssertUtils.check(index > -1, () -> "index must be greater than -1");
+            AssertUtils.arg(index > -1, () -> "index must be greater than -1");
             this.pkg = pkg;
             this.index = index;
             this.value = StringUtils.EMPTY;

+ 2 - 2
framework-common/src/main/java/com/chelvc/framework/common/util/IdentityUtils.java

@@ -180,9 +180,9 @@ public final class IdentityUtils {
          * @param datacenterId 序列号
          */
         public Generator(long workerId, long datacenterId) {
-            AssertUtils.check(workerId >= 0 && workerId <= MAX_WORKER_ID,
+            AssertUtils.arg(workerId >= 0 && workerId <= MAX_WORKER_ID,
                     () -> String.format("worker Id can't be greater than %d or less than 0", MAX_WORKER_ID));
-            AssertUtils.check(datacenterId >= 0 && datacenterId <= MAX_DATACENTER_ID,
+            AssertUtils.arg(datacenterId >= 0 && datacenterId <= MAX_DATACENTER_ID,
                     () -> String.format("datacenter Id can't be greater than %d or less than 0", MAX_DATACENTER_ID));
             this.workerId = workerId;
             this.datacenterId = datacenterId;

+ 2 - 2
framework-common/src/main/java/com/chelvc/framework/common/util/ObjectUtils.java

@@ -482,7 +482,7 @@ public final class ObjectUtils {
      */
     public static Field getClassField(@NonNull Class<?> clazz, @NonNull String name) {
         Field field = findClassField(clazz, name);
-        AssertUtils.check(Objects.nonNull(field), () -> "No such field: " + clazz.getName() + "." + name);
+        AssertUtils.arg(field != null, () -> "No such field: " + clazz.getName() + "." + name);
         return field;
     }
 
@@ -1633,7 +1633,7 @@ public final class ObjectUtils {
      * @return 元素
      */
     public static <T, R> R get(List<T> list, int index, @NonNull Function<T, R> function) {
-        AssertUtils.check(index > -1, () -> "index must be greater than -1");
+        AssertUtils.arg(index > -1, () -> "index must be greater than -1");
         return size(list) > index ? ifNull(list.get(index), function) : null;
     }
 

+ 5 - 5
framework-common/src/main/java/com/chelvc/framework/common/util/StringUtils.java

@@ -622,7 +622,7 @@ public final class StringUtils {
      * @return 子字符串
      */
     public static String substring(CharSequence original, int index, int length) {
-        AssertUtils.check(length > -1, () -> "length must be greater than -1");
+        AssertUtils.arg(length > -1, () -> "length must be greater than -1");
         if (isEmpty(original)) {
             return original == null ? null : EMPTY;
         } else if (index < 0) {
@@ -749,7 +749,7 @@ public final class StringUtils {
      * @return 对齐后的字符串
      */
     private static String just(CharSequence original, int length, char c, boolean right) {
-        AssertUtils.check(length > -1, () -> "length must be greater than -1");
+        AssertUtils.arg(length > -1, () -> "length must be greater than -1");
         if (length == 0 || (notEmpty(original) && length <= original.length())) {
             return ObjectUtils.ifNull(original, Object::toString);
         }
@@ -1448,8 +1448,8 @@ public final class StringUtils {
      * @return 随机字符串
      */
     public static String random(@NonNull char[] chars, int length) {
-        AssertUtils.check(chars.length > 0, () -> "chars must not be empty");
-        AssertUtils.check(length > 0, () -> "length must be greater than 0");
+        AssertUtils.arg(chars.length > 0, () -> "chars must not be empty");
+        AssertUtils.arg(length > 0, () -> "length must be greater than 0");
         char[] array = new char[length];
         ThreadLocalRandom random = ThreadLocalRandom.current();
         for (int i = 0; i < length; i++) {
@@ -1525,7 +1525,7 @@ public final class StringUtils {
      * @return 随机序列号
      */
     public static String sequence(int length) {
-        AssertUtils.check(length > 0, () -> "length must be greater than 0");
+        AssertUtils.arg(length > 0, () -> "length must be greater than 0");
         char[] sequence = new char[length];
         for (int i = 0, count = ObjectUtils.group(length, 8); i < count; i++) {
             String uuid = uuid();

+ 0 - 106
framework-common/src/main/java/com/chelvc/framework/common/util/ThreadUtils.java

@@ -4,7 +4,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -295,109 +294,4 @@ public final class ThreadUtils {
             }
         })).collect(Collectors.toList());
     }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer 消息消费者
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer) {
-        return queue(1000, consumer);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer 消息消费者
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Predicate<T> consumer) {
-        return queue(1000, consumer);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param capacity 队列大小
-     * @param consumer 消息消费者
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer) {
-        return queue(capacity, consumer, 3);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer    消息消费者
-     * @param concurrency 并发数量
-     * @param <T>         消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<T> consumer, int concurrency) {
-        return queue(1000, consumer, concurrency);
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param capacity 队列大小
-     * @param consumer 消息消费者
-     * @param <T>      消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Predicate<T> consumer) {
-        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer);
-        return queue;
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param capacity    队列大小
-     * @param consumer    消息消费者
-     * @param concurrency 并发数量
-     * @param <T>         消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<T> consumer, int concurrency) {
-        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer, concurrency);
-        return queue;
-    }
-
-    /**
-     * 异步消费消息
-     *
-     * @param consumer    消息消费者
-     * @param concurrency 并发数量
-     * @param batchSize   批处理数量
-     * @param <T>         消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(@NonNull Consumer<List<T>> consumer, int concurrency, int batchSize) {
-        return queue(1000, consumer, concurrency, batchSize);
-    }
-
-    /**
-     * 异步批量消费消息
-     *
-     * @param capacity    队列大小
-     * @param consumer    消息消费者
-     * @param concurrency 并发数量
-     * @param batchSize   批处理数量
-     * @param <T>         消息类型
-     * @return 消息队列
-     */
-    public static <T> BlockingQueue<T> queue(int capacity, @NonNull Consumer<List<T>> consumer, int concurrency,
-                                             int batchSize) {
-        BlockingQueue<T> queue = new ArrayBlockingQueue<>(Math.max(capacity, 1));
-        consume(queue, consumer, concurrency, batchSize);
-        return queue;
-    }
 }

+ 1 - 1
framework-database/src/main/java/com/chelvc/framework/database/support/Updates.java

@@ -376,7 +376,7 @@ public final class Updates {
         Map<String, ColumnCache> columns = LambdaUtils.getColumnMap(clazz);
         String property = PropertyNamer.methodToProperty(lambda.getImplMethodName());
         ColumnCache column = ObjectUtils.ifNull(columns, cs -> cs.get(LambdaUtils.formatKey(property)));
-        AssertUtils.nonnull(column, () -> "Column not found for property: " + clazz.getName() + "." + property);
+        AssertUtils.state(column != null, () -> "Column not found for property: " + clazz.getName() + "." + property);
         return column.getColumn();
     }
 

+ 2 - 2
framework-jpush/src/main/java/com/chelvc/framework/jpush/DefaultJPushHandler.java

@@ -207,13 +207,13 @@ public class DefaultJPushHandler implements JPushHandler {
 
     @Override
     public BatchPushResult push(@NonNull PushPayload... payloads) {
-        AssertUtils.check(payloads.length > 0, () -> "payloads must not be empty");
+        AssertUtils.arg(payloads.length > 0, () -> "payloads must not be empty");
         return this.push(Arrays.asList(payloads));
     }
 
     @Override
     public BatchPushResult push(@NonNull List<PushPayload> payloads) {
-        AssertUtils.nonempty(payloads, () -> "payloads must not be empty");
+        AssertUtils.arg(!payloads.isEmpty(), () -> "payloads must not be empty");
         // 针对生产环境IOS推送,需要设置apnsProduction为true
         if (ApplicationContextHolder.isProduction()) {
             payloads.forEach(payload -> payload.resetOptionsApnsProduction(true));

+ 7 - 0
framework-kafka/pom.xml

@@ -17,6 +17,7 @@
     <properties>
         <spring-retry.version>1.3.4</spring-retry.version>
         <framework-base.version>1.0.0-RELEASE</framework-base.version>
+        <framework-redis.version>1.0.0-RELEASE</framework-redis.version>
     </properties>
 
     <dependencies>
@@ -25,6 +26,12 @@
             <artifactId>framework-base</artifactId>
             <version>${framework-base.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.chelvc.framework</groupId>
+            <artifactId>framework-redis</artifactId>
+            <version>${framework-redis.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>

+ 10 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaConfigurer.java

@@ -12,6 +12,9 @@ import com.chelvc.framework.kafka.fallback.KafkaStoreClient;
 import com.chelvc.framework.kafka.interceptor.KafkaSessionInterceptor;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
 import com.chelvc.framework.kafka.producer.KafkaSenderWrapper;
+import com.chelvc.framework.kafka.producer.KafkaTransactionChecker;
+import com.chelvc.framework.kafka.producer.StandardTransactionChecker;
+import com.chelvc.framework.kafka.producer.TransactionCheckProcessor;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -19,6 +22,7 @@ import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -102,6 +106,12 @@ public class KafkaConfigurer implements KafkaListenerConfigurer {
         registrar.setEndpointRegistry(new KafkaListenerRegistryWrapper(registry));
     }
 
+    @Bean
+    @ConditionalOnMissingBean(KafkaTransactionChecker.class)
+    public KafkaTransactionChecker kafkaTransactionChecker(List<TransactionCheckProcessor<?>> processors) {
+        return new StandardTransactionChecker(processors);
+    }
+
     @Bean
     @ConditionalOnExpression("'${spring.kafka.fallbacks}'.contains('MEMORY')")
     public KafkaSenderWrapper memorySenderWrapper() {

+ 102 - 12
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -2,6 +2,7 @@ package com.chelvc.framework.kafka.context;
 
 import java.lang.reflect.Type;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -10,6 +11,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
@@ -18,7 +20,11 @@ import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
+import com.chelvc.framework.kafka.producer.TransactionMessage;
+import com.chelvc.framework.redis.queue.DelayRedisQueue;
+import com.chelvc.framework.redis.queue.Queues;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -30,6 +36,7 @@ import org.springframework.kafka.core.KafkaTemplate;
  * @author Woody
  * @date 2024/1/30
  */
+@Slf4j
 public final class KafkaContextHolder {
     /**
      * 模版实例
@@ -58,6 +65,18 @@ public final class KafkaContextHolder {
         return (KafkaTemplate<K, V>) KAFKA_TEMPLATE;
     }
 
+    /**
+     * 获取事务消息队列
+     *
+     * @return 队列实例
+     */
+    public static DelayRedisQueue<String> getTransactionQueue() {
+        return Queues.getDelayRedisQueue(
+                ApplicationContextHolder.getApplicationName(true) + "-kafka-transaction-message",
+                name -> new DelayRedisQueue<>(name, Duration.ofSeconds(60), Duration.ZERO)
+        );
+    }
+
     /**
      * 环境隔离
      *
@@ -166,12 +185,7 @@ public final class KafkaContextHolder {
      * @param <V>     消息载体类型
      */
     public static <V> void send(@NonNull String topic, @NonNull V payload) {
-        KafkaTemplate<?, V> template = getKafkaTemplate();
-        try {
-            template.send(topic, payload).get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        send(topic, payload, null, (Integer) null);
     }
 
     /**
@@ -184,12 +198,7 @@ public final class KafkaContextHolder {
      * @param <V>      消息载体类型
      */
     public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering) {
-        KafkaTemplate<K, V> template = getKafkaTemplate();
-        try {
-            template.send(topic, ordering, payload).get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        send(topic, payload, ordering, (Integer) null);
     }
 
     /**
@@ -210,4 +219,85 @@ public final class KafkaContextHolder {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param callback 本地事务回调函数
+     * @param <V>      消息载体类型
+     */
+    public static <V> void send(@NonNull String topic, @NonNull V payload, @NonNull Predicate<String> callback) {
+        send(topic, payload, null, null, callback);
+    }
+
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param ordering 顺序消息标识
+     * @param callback 本地事务回调函数
+     * @param <K>      顺序标识类型
+     * @param <V>      消息载体类型
+     */
+    public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering,
+                                   @NonNull Predicate<String> callback) {
+        send(topic, payload, ordering, null, callback);
+    }
+
+    /**
+     * 发送事务消息
+     *
+     * @param topic     消息主题
+     * @param payload   消息内容
+     * @param ordering  顺序消息标识
+     * @param partition 指定分区
+     * @param callback  本地事务回调函数
+     * @param <K>       顺序标识类型
+     * @param <V>       消息载体类型
+     */
+    public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering, Integer partition,
+                                   @NonNull Predicate<String> callback) {
+        // 将事务消息放入延时队列
+        String id = StringUtils.uuid();
+        Session session = SessionContextHolder.getSession(false);
+        TransactionMessage transaction = TransactionMessage.builder().id(id).topic(topic).payload(payload)
+                .ordering(ordering).partition(partition).session(session).build();
+        String message = JacksonUtils.serialize(transaction);
+        DelayRedisQueue<String> queue = getTransactionQueue();
+        queue.offer(message, Duration.ofMillis(queue.getTimeout()));
+
+        // 处理本地事务
+        boolean committable;
+        try {
+            committable = callback.test(id);
+        } catch (Exception e) {
+            // 本地事务处理异常则删除事务延时消息
+            try {
+                queue.remove(message);
+            } catch (Throwable t) {
+                log.error("Kafka transaction message remove failed: {}", message, t);
+            }
+            throw e;
+        }
+
+        // 本地事务处理成功则发送Kafka消息
+        if (committable) {
+            KafkaTemplate<K, V> template = getKafkaTemplate();
+            try {
+                template.send(topic, partition, ordering, payload).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // 处理完成后删除事务延时消息
+        try {
+            queue.remove(message);
+        } catch (Throwable t) {
+            log.error("Kafka transaction message remove failed: {}", message, t);
+        }
+    }
 }

+ 1 - 1
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java

@@ -33,7 +33,7 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
 
     public KafkaMemorySender(int capacity, @NonNull KafkaMessageSender<K, V> delegate) {
         super(delegate);
-        AssertUtils.check(capacity > 0, () -> "Kafka fallback queue capacity must be greater than 0");
+        AssertUtils.arg(capacity > 0, () -> "Kafka fallback queue capacity must be greater than 0");
         this.capacity = capacity;
     }
 

+ 17 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/KafkaTransactionChecker.java

@@ -0,0 +1,17 @@
+package com.chelvc.framework.kafka.producer;
+
+/**
+ * Kafka事务消息检测接口
+ *
+ * @author Woody
+ * @date 2024/10/29
+ */
+public interface KafkaTransactionChecker {
+    /**
+     * 事务检测
+     *
+     * @param message 事务消息
+     * @return 事务处理决策
+     */
+    TransactionResolution check(TransactionMessage message);
+}

+ 40 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/StandardTransactionChecker.java

@@ -0,0 +1,40 @@
+package com.chelvc.framework.kafka.producer;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Kafka事务消息检测器标准实现
+ *
+ * @author Woody
+ * @date 2025/5/9
+ */
+@Slf4j
+public class StandardTransactionChecker implements KafkaTransactionChecker {
+    private final Map<String, TransactionCheckProcessor<?>> processors;
+
+    public StandardTransactionChecker(@NonNull List<TransactionCheckProcessor<?>> processors) {
+        this.processors = ObjectUtils.isEmpty(processors) ? Collections.emptyMap() :
+                processors.stream().collect(Collectors.toMap(TransactionCheckProcessor::topic, processor -> processor));
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public TransactionResolution check(TransactionMessage message) {
+        TransactionCheckProcessor processor = this.processors.get(message.getTopic());
+        if (processor == null) {
+            log.error("Transaction message check processor is missing: {}", message.getTopic());
+            return TransactionResolution.UNKNOWN;
+        }
+
+        Object payload = JacksonUtils.convert(message.getPayload(), processor.model());
+        return ObjectUtils.ifNull(processor.check(payload), TransactionResolution.UNKNOWN);
+    }
+}

+ 31 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionCheckProcessor.java

@@ -0,0 +1,31 @@
+package com.chelvc.framework.kafka.producer;
+
+/**
+ * Kafka事务消息检测处理接口
+ *
+ * @author Woody
+ * @date 2025/5/9
+ */
+public interface TransactionCheckProcessor<T> {
+    /**
+     * 获取消息主题
+     *
+     * @return 消息主题
+     */
+    String topic();
+
+    /**
+     * 获取消息类型
+     *
+     * @return 消息模型对象
+     */
+    Class<T> model();
+
+    /**
+     * 本地事务检测
+     *
+     * @param message 事务消息
+     * @return 本地事务检测处理决策
+     */
+    TransactionResolution check(T message);
+}

+ 51 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessage.java

@@ -0,0 +1,51 @@
+package com.chelvc.framework.kafka.producer;
+
+import java.io.Serializable;
+
+import com.chelvc.framework.base.context.Session;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Kafka事务消息模型
+ *
+ * @author Woody
+ * @date 2025/5/7
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionMessage implements Serializable {
+    /**
+     * 消息ID
+     */
+    private String id;
+
+    /**
+     * 消息主体
+     */
+    private String topic;
+
+    /**
+     * 消息载体
+     */
+    private Object payload;
+
+    /**
+     * 顺序标识
+     */
+    private Object ordering;
+
+    /**
+     * 所属分区
+     */
+    private Integer partition;
+
+    /**
+     * 会话信息
+     */
+    private Session session;
+}

+ 73 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java

@@ -0,0 +1,73 @@
+package com.chelvc.framework.kafka.producer;
+
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.kafka.context.KafkaContextHolder;
+import com.chelvc.framework.redis.context.RedisContextHolder;
+import com.chelvc.framework.redis.queue.DelayRedisQueue;
+import com.chelvc.framework.redis.queue.Queues;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.stereotype.Component;
+
+/**
+ * Kafka事务延时消息处理器
+ *
+ * @author Woody
+ * @date 2025/5/7
+ */
+@Slf4j
+@Component
+@ConditionalOnClass(RedisContextHolder.class)
+@RequiredArgsConstructor(onConstructor = @__(@Autowired))
+public class TransactionMessageProcessor implements ApplicationRunner {
+    private final KafkaTransactionChecker checker;
+
+    /**
+     * 处理事务延时消息
+     *
+     * @param queue   延时队列
+     * @param message 延时消息
+     */
+    private void processing(DelayRedisQueue<String> queue, String message) {
+        // 获取事务处理决策
+        TransactionResolution resolution;
+        TransactionMessage instance = JacksonUtils.deserialize(message, TransactionMessage.class);
+        SessionContextHolder.setSession(instance.getSession());
+        try {
+            resolution = ObjectUtils.ifNull(this.checker.check(instance), TransactionResolution.UNKNOWN);
+            if (resolution == TransactionResolution.COMMIT) {
+                // 本地事务处理成功则重新发送消息
+                String topic = instance.getTopic();
+                Object payload = instance.getPayload();
+                Object ordering = instance.getOrdering();
+                Integer partition = instance.getPartition();
+                KafkaContextHolder.send(topic, payload, ordering, partition);
+            }
+        } finally {
+            SessionContextHolder.removeSessionContext();
+        }
+
+        // 如果事务处理决策是非未知则删除事务延时消息
+        if (resolution != TransactionResolution.UNKNOWN) {
+            queue.remove(message);
+        }
+    }
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        DelayRedisQueue<String> queue = KafkaContextHolder.getTransactionQueue();
+        Queues.consume(queue, 100, messages -> messages.forEach(message -> {
+            try {
+                this.processing(queue, message);
+            } catch (Exception e) {
+                log.error("Transaction message processing failed: {}", message);
+            }
+        }));
+    }
+}

+ 24 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionResolution.java

@@ -0,0 +1,24 @@
+package com.chelvc.framework.kafka.producer;
+
+/**
+ * Kafka事务决策枚举
+ *
+ * @author Woody
+ * @date 2025/5/7
+ */
+public enum TransactionResolution {
+    /**
+     * 提交
+     */
+    COMMIT,
+
+    /**
+     * 回滚
+     */
+    ROLLBACK,
+
+    /**
+     * 未知
+     */
+    UNKNOWN
+}

+ 1 - 1
framework-location/src/main/java/com/chelvc/framework/location/support/CacheableLocationHandler.java

@@ -46,7 +46,7 @@ public class CacheableLocationHandler implements LocationHandler {
             log.warn("Location cache handle failed: {}", e.getMessage());
         }
         if (t == null && (t = supplier.get()) != null) {
-            // 为避免缓存雪崩,缓存过期时间增加随机数
+            // 为避免缓存穿透,缓存过期时间增加随机数
             int random = ThreadLocalRandom.current().nextInt(60);
             Duration duration = Duration.ofSeconds(this.expiration + random);
             try {

+ 1 - 6
framework-nacos/src/main/java/com/chelvc/framework/nacos/context/NacosContextHolder.java

@@ -12,7 +12,6 @@ import com.alibaba.nacos.api.common.Constants;
 import com.alibaba.nacos.api.config.ConfigService;
 import com.alibaba.nacos.api.exception.NacosException;
 import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
@@ -49,11 +48,7 @@ public final class NacosContextHolder {
      */
     public static String getConfigId() {
         String id = ApplicationContextHolder.getProperty("nacos.config.id");
-        if (StringUtils.isEmpty(id)) {
-            String name = ApplicationContextHolder.getApplicationName();
-            return AssertUtils.nonempty(name, () -> "Application name is missing");
-        }
-        return id;
+        return StringUtils.notEmpty(id) ? id : ApplicationContextHolder.getApplicationName(true);
     }
 
     /**

+ 2 - 4
framework-redis/src/main/java/com/chelvc/framework/redis/config/RedisMQListenerRegistry.java

@@ -49,10 +49,8 @@ public class RedisMQListenerRegistry implements ApplicationListener<ApplicationS
      */
     protected <T> void registerListenerContainer(@NonNull String name, @NonNull RedisMQListener<T> listener) {
         Class<?> clazz = AopProxyUtils.ultimateTargetClass(listener);
-        RedisMQConsumer annotation = AssertUtils.nonnull(
-                clazz.getAnnotation(RedisMQConsumer.class),
-                () -> "@RedisMQConsumer annotation is missing: " + clazz.getName()
-        );
+        RedisMQConsumer annotation = clazz.getAnnotation(RedisMQConsumer.class);
+        AssertUtils.state(annotation != null, () -> "@RedisMQConsumer annotation is missing: " + clazz.getName());
         RedisMQListenerContainer<T> container = this.createListenerContainer();
         Type type = ObjectUtils.lookupSuperclassParameterized(clazz, RedisMQListener.class, Object.class);
         try {

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisHashHolder.java

@@ -229,7 +229,7 @@ public final class RedisHashHolder {
             return false;
         }
 
-        AssertUtils.check(array.length % 2 == 0, () -> "Invalid key/value array");
+        AssertUtils.arg(array.length % 2 == 0, () -> "Invalid key/value array");
         byte[][] args = new byte[array.length + 2][];
         args[0] = RedisContextHolder.serialize(template.getKeySerializer(), name);
         args[1] = RedisContextHolder.serialize(template.getValueSerializer(), duration.getSeconds());

+ 19 - 2
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -1,6 +1,7 @@
 package com.chelvc.framework.redis.context;
 
 import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Date;
@@ -32,6 +33,7 @@ import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.script.DefaultRedisScript;
 import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
 import org.springframework.data.redis.stream.StreamListener;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 
@@ -69,6 +71,21 @@ public final class RedisStreamHolder {
             "return redis.call('XADD', KEYS[1], 'MINID', '~', ARGV[1], '*', unpack(ARGV, 2))", String.class
     );
 
+    /**
+     * 对象字符串序列化处理器
+     */
+    private static final RedisSerializer<Object> OBJECT_STRING_SERIALIZER = new RedisSerializer<Object>() {
+        @Override
+        public byte[] serialize(Object o) throws SerializationException {
+            return o == null ? null : o.toString().getBytes(StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public Object deserialize(byte[] bytes) throws SerializationException {
+            return bytes == null ? null : new String(bytes, StandardCharsets.UTF_8);
+        }
+    };
+
     /**
      * 字符串RedisTemplate实例
      */
@@ -103,9 +120,9 @@ public final class RedisStreamHolder {
                     RedisTemplate<String, Object> template = new RedisTemplate<>();
                     template.setConnectionFactory(RedisContextHolder.getDefaultConnectionFactory());
                     template.setKeySerializer(RedisSerializer.string());
-                    template.setValueSerializer(RedisSerializer.string());
+                    template.setValueSerializer(OBJECT_STRING_SERIALIZER);
                     template.setHashKeySerializer(RedisSerializer.string());
-                    template.setHashValueSerializer(RedisSerializer.string());
+                    template.setHashValueSerializer(OBJECT_STRING_SERIALIZER);
                     template.afterPropertiesSet();
 
                     STRING_TEMPLATE = template;

+ 5 - 5
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -78,7 +78,7 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
     public void initialize(@NonNull Type type, @NonNull RedisMQListener<T> listener,
                            @NonNull RedisMQConsumer annotation) {
         int concurrency = annotation.concurrency();
-        AssertUtils.check(concurrency > 0, () -> "Consumer concurrency must be greater than 0");
+        AssertUtils.arg(concurrency > 0, () -> "Consumer concurrency must be greater than 0");
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>((int) (concurrency / 0.1) * 2);
         ExecutorService executor = new ThreadPoolExecutor(
                 concurrency, concurrency, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy()
@@ -89,11 +89,11 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
     @Override
     public void initialize(@NonNull Type type, @NonNull String topic, @NonNull String group, int batch,
                            @NonNull RedisMQListener<T> listener, @NonNull ExecutorService executor) {
-        AssertUtils.nonempty(topic, () -> "Consumer topic must not be empty");
-        AssertUtils.nonempty(group, () -> "Consumer group must not be empty");
-        AssertUtils.check(batch > 0, () -> "Consumer batch must be greater than 0");
+        AssertUtils.arg(StringUtils.notEmpty(topic), () -> "Consumer topic must not be empty");
+        AssertUtils.arg(StringUtils.notEmpty(group), () -> "Consumer group must not be empty");
+        AssertUtils.arg(batch > 0, () -> "Consumer batch must be greater than 0");
         int idle = ApplicationContextHolder.getBean(RedisProperties.class).getStream().getIdle();
-        AssertUtils.check((this.idle = idle) > 0, () -> "Consumer idle must be greater than 0");
+        AssertUtils.arg((this.idle = idle) > 0, () -> "Consumer idle must be greater than 0");
         Environment environment = ApplicationContextHolder.getEnvironment();
         this.topic = RedisStreamHolder.isolate(environment.resolvePlaceholders(topic));
         this.group = RedisStreamHolder.isolate(environment.resolvePlaceholders(group));

+ 179 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DelayRedisQueue.java

@@ -0,0 +1,179 @@
+package com.chelvc.framework.redis.queue;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.NonNull;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.SessionCallback;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
+
+/**
+ * Redis延时队列实现
+ *
+ * @param <E> 元素类型
+ * @author Woody
+ * @date 2024/8/29
+ */
+@Getter
+public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
+    /**
+     * 弹出并延迟脚本
+     */
+    @SuppressWarnings("rawtypes")
+    private static final RedisScript<List> POLL_DELAY_SCRIPT = new DefaultRedisScript<>(
+            "local values = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
+                    "if #values > 0 then for i = 1, #values do redis.call('ZINCRBY', KEYS[1], ARGV[4], values[i]) " +
+                    "end end return values", List.class
+    );
+
+    private final long timeout;
+    private final long expiration;
+
+    public DelayRedisQueue(@NonNull String name) {
+        this(name, Duration.ZERO, Duration.ZERO);
+    }
+
+    public DelayRedisQueue(@NonNull String name, @NonNull Duration timeout, @NonNull Duration expiration) {
+        this(ApplicationContextHolder.getProfile(), name, timeout, expiration);
+    }
+
+    DelayRedisQueue(String namespace, @NonNull String name, @NonNull Duration timeout, @NonNull Duration expiration) {
+        super(namespace, name);
+        this.timeout = timeout.toMillis();
+        this.expiration = expiration.toMillis();
+    }
+
+    /**
+     * 判断元素是否处于延时状态
+     *
+     * @param e 元素对象
+     * @return true/false
+     */
+    public boolean isDelaying(E e) {
+        if (e == null) {
+            return false;
+        }
+
+        Double score = this.template().opsForZSet().score(this.getName(), e);
+        return score != null && score > System.currentTimeMillis();
+    }
+
+    /**
+     * 判断元素是否处于延时状态
+     *
+     * @param collection 元素集合
+     * @return 元素/是否延时映射表
+     */
+    public Map<E, Boolean> isDelaying(Collection<E> collection) {
+        if (ObjectUtils.isEmpty(collection)) {
+            return Collections.emptyMap();
+        }
+
+        // 处理单个元素
+        if (collection.size() == 1) {
+            E e = collection.iterator().next();
+            return ImmutableMap.of(e, this.isDelaying(e));
+        }
+
+        // 批量处理多个元素
+        List<Object> scores = this.template().executePipelined(new SessionCallback<Object>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                for (E e : collection) {
+                    operations.opsForZSet().score((K) getName(), e);
+                }
+                return null;
+            }
+        });
+        if (ObjectUtils.isEmpty(scores)) {
+            return Collections.emptyMap();
+        }
+        int i = 0;
+        long timestamp = System.currentTimeMillis();
+        Map<E, Boolean> consumings = Maps.newHashMapWithExpectedSize(collection.size());
+        for (E e : collection) {
+            Double score = (Double) scores.get(i++);
+            consumings.put(e, score != null && score > timestamp);
+        }
+        return consumings;
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param e        元素
+     * @param duration 延时周期
+     * @return true/false
+     */
+    public boolean offer(@NonNull E e, @NonNull Duration duration) {
+        return this.offer(e, duration, true);
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param e        元素
+     * @param duration 延时周期
+     * @param override 是否可覆盖
+     * @return true/false
+     */
+    public boolean offer(@NonNull E e, @NonNull Duration duration, boolean override) {
+        return super.offer(e, System.currentTimeMillis() + duration.toMillis(), override);
+    }
+
+    @Override
+    protected Long offer(RedisOperations<String, E> operations, Collection<? extends E> objects, double priority,
+                         boolean override) {
+        if (this.expiration > 0) {
+            long max = System.currentTimeMillis() - this.expiration - 1000;
+            List<Object> values = this.template().executePipelined(new SessionCallback<Object>() {
+                @Override
+                @SuppressWarnings("unchecked")
+                public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                    DelayRedisQueue.super.offer((RedisOperations<String, E>) operations, objects, priority, override);
+                    operations.opsForZSet().removeRangeByScore((K) getName(), 0, max);
+                    return null;
+                }
+            });
+            return ObjectUtils.isEmpty(values) ? null : (Long) values.get(0);
+        }
+        return super.offer(operations, objects, priority, override);
+    }
+
+    @Override
+    public E peek() {
+        long now = System.currentTimeMillis();
+        if (this.expiration > 0) {
+            return super.peek(now - this.expiration, now);
+        }
+        return super.peek(Long.MIN_VALUE, now);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<E> poll(int size) {
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
+
+        long now = System.currentTimeMillis();
+        long min = this.expiration > 0 ? now - this.expiration : Long.MIN_VALUE;
+        if (this.timeout > 0) {
+            List<String> keys = Collections.singletonList(this.getName());
+            List<E> values = this.template().execute(POLL_DELAY_SCRIPT, keys, min, now, size, this.timeout);
+            return ObjectUtils.ifEmpty(values, Collections::emptyList);
+        }
+        return super.poll(min, now, size);
+    }
+}

+ 16 - 52
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -1,82 +1,48 @@
 package com.chelvc.framework.redis.queue;
 
 import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
+import java.time.Duration;
 import java.util.concurrent.Executor;
 
-import com.chelvc.framework.common.util.ObjectUtils;
-import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.redis.context.RedisStreamHolder;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.stream.Consumer;
 import org.springframework.data.redis.connection.stream.MapRecord;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.script.DefaultRedisScript;
-import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.stream.StreamListener;
 
 /**
  * 消息流监听器实现
  *
  * @author Woody
- * @date 2024/1/30
+ * @date 2024/4/3
  */
 @Slf4j
 public class MessageStreamListener<T> implements StreamListener<String, MapRecord<String, String, String>> {
-    /**
-     * 批量获取延迟消息脚本
-     */
-    @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> DELAY_RANGE_SCRIPT = new DefaultRedisScript<>(
-            "local records = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
-                    "if #records > 0 then local members = {} for i = 1, #records " +
-                    "do table.insert(members, ARGV[4]) table.insert(members, records[i]) end " +
-                    "redis.call('ZADD', KEYS[1], 'XX', unpack(members)) end return records", List.class
-    );
-
     private final Type type;
     private final String topic;
-    private final String delayed;
+    private final Executor executor;
     private final Consumer consumer;
     private final RedisMQListener<T> listener;
-    private final Executor executor;
+    private final DelayRedisQueue<String> delayQueue;
 
-    @SuppressWarnings("unchecked")
     public MessageStreamListener(Type type, String topic, Consumer consumer, RedisMQListener<T> listener,
                                  Executor executor) {
         this.type = type;
         this.topic = topic;
+        this.executor = executor;
         this.consumer = consumer;
         this.listener = listener;
-        this.executor = executor;
-        this.delayed = this.topic + ":" + this.consumer.getGroup();
 
-        // 初始化延时消息消费服务
-        this.executor.execute(() -> {
-            List<String> keys = Collections.singletonList(this.delayed);
-            while (!Thread.currentThread().isInterrupted()) {
-                // 批量获取延时消息(100条),为避免重复获取将消息延时时间增加1分钟
-                List<String> messages = null;
-                long now = System.currentTimeMillis(), delaying = now + 60000;
-                RedisTemplate<String, T> template = RedisStreamHolder.getStreamTemplate();
-                Object[] args = new Object[]{"-1", String.valueOf(now), "100", String.valueOf(delaying)};
-                try {
-                    messages = template.execute(DELAY_RANGE_SCRIPT, keys, args);
-                } catch (Throwable t) {
-                    log.error("RedisMQ message consume failed: {}", this.consumer, t);
-                }
-
-                // 如果当前没有延时消息则暂停1秒后继续
-                if (ObjectUtils.isEmpty(messages)) {
-                    ThreadUtils.sleep(1000);
-                    continue;
-                }
-
-                // 延时消息处理
-                messages.forEach(this::processing);
+        // 初始化延时消息队列
+        String name = this.topic + ":" + this.consumer.getGroup();
+        this.delayQueue = new DelayRedisQueue<String>(null, name, Duration.ofSeconds(60), Duration.ZERO) {
+            @Override
+            protected RedisTemplate<String, String> template() {
+                return RedisStreamHolder.getStreamTemplate();
             }
-        });
+        };
+        Queues.consume(this.delayQueue, 100, messages -> messages.forEach(this::processing));
     }
 
     /**
@@ -90,10 +56,10 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
                 MapRecord<String, String, String> record = RedisStreamHolder.deserialize(message);
                 if (RedisStreamHolder.isHeartbeat(record)) {
                     // 如果是心跳消息则删除相关数据
+                    this.delayQueue.remove(message);
                     RedisStreamHolder.delete(this.topic, record.getId());
-                    RedisStreamHolder.getStreamTemplate().opsForZSet().remove(this.delayed, message);
                 } else if (this.processing(record)) {
-                    RedisStreamHolder.getStreamTemplate().opsForZSet().remove(this.delayed, message);
+                    this.delayQueue.remove(message);
                 }
             } catch (Throwable t) {
                 log.error("RedisMQ message consume failed: {}, {}", this.consumer, message, t);
@@ -108,7 +74,6 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
      * @return true/false
      */
     private boolean processing(MapRecord<String, String, String> record) {
-        // 消费消息
         try {
             RedisStreamHolder.consume(record, this.type, this.listener::consume);
         } catch (Throwable t) {
@@ -127,8 +92,7 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
      */
     private boolean delaying(MapRecord<String, String, String> record, long timestamp) {
         try {
-            String message = RedisStreamHolder.serialize(record);
-            RedisStreamHolder.getStreamTemplate().opsForZSet().add(this.delayed, message, timestamp);
+            this.delayQueue.offer(RedisStreamHolder.serialize(record), timestamp);
         } catch (Throwable t) {
             log.error("RedisMQ message delaying failed: {}, {}", this.consumer, record, t);
             return false;

+ 303 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/PriorityRedisQueue.java

@@ -0,0 +1,303 @@
+package com.chelvc.framework.redis.queue;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.google.common.collect.Sets;
+import lombok.NonNull;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.core.DefaultTypedTuple;
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.SessionCallback;
+import org.springframework.data.redis.core.ZSetOperations;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
+
+/**
+ * Redis优先级队列实现
+ *
+ * @author Woody
+ * @date 2025/4/28
+ */
+public class PriorityRedisQueue<E> extends RedisQueue<E> {
+    /**
+     * 添加或忽略脚本
+     */
+    private static final RedisScript<Long> ADD_IGNORE_SCRIPT = new DefaultRedisScript<>(
+            "return redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV))", Long.class
+    );
+
+    /**
+     * 弹出并移除脚本
+     */
+    @SuppressWarnings("rawtypes")
+    private static final RedisScript<List> POLL_REMOVE_SCRIPT = new DefaultRedisScript<>(
+            "local values = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
+                    "if #values > 0 then redis.call('ZREM', KEYS[1], unpack(values)) end return values", List.class
+    );
+
+    public PriorityRedisQueue(@NonNull String name) {
+        super(name);
+    }
+
+    PriorityRedisQueue(String namespace, @NonNull String name) {
+        super(namespace, name);
+    }
+
+    /**
+     * 集合迭代器实现
+     */
+    private class Iter implements Iterator<E> {
+        private E element;
+        private int index = 0;
+        private final int size;
+
+        public Iter(int size) {
+            this.size = size;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.index < this.size;
+        }
+
+        @Override
+        public E next() {
+            Set<E> values = template().opsForZSet().range(getName(), this.index, this.index++);
+            return ObjectUtils.isEmpty(values) ? null : (this.element = values.iterator().next());
+        }
+
+        @Override
+        public void remove() {
+            if (this.element != null) {
+                template().opsForZSet().remove(getName(), this.element);
+                this.element = null;
+            }
+        }
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param operations Redis操作接口
+     * @param objects    元素列表
+     * @param priority   优先级数字
+     * @param override   是否可覆盖
+     * @return 元素写入成功个数
+     */
+    protected Long offer(RedisOperations<String, E> operations, Collection<? extends E> objects, double priority,
+                         boolean override) {
+        if (ObjectUtils.isEmpty(objects)) {
+            return 0L;
+        }
+
+        // 可覆盖模式
+        if (override) {
+            Set<ZSetOperations.TypedTuple<E>> tuples = Sets.newHashSetWithExpectedSize(objects.size());
+            for (E e : objects) {
+                tuples.add(new DefaultTypedTuple<>(e, priority++));
+            }
+            return operations.opsForZSet().add(this.getName(), tuples);
+        }
+
+        // 不可覆盖模式
+        int i = 0;
+        Object[] args = new Object[objects.size() * 2];
+        for (E e : objects) {
+            args[i++] = priority++;
+            args[i++] = Objects.requireNonNull(e);
+        }
+        return operations.execute(ADD_IGNORE_SCRIPT, Collections.singletonList(this.getName()), args);
+    }
+
+    /**
+     * 弹出元素
+     *
+     * @param size 元素个数
+     * @return 元素列表
+     */
+    public List<E> poll(int size) {
+        return this.poll(Long.MIN_VALUE, Long.MAX_VALUE, size);
+    }
+
+    /**
+     * 弹出元素
+     *
+     * @param min  最小优先级
+     * @param max  最大优先级
+     * @param size 元素个数
+     * @return 元素列表
+     */
+    @SuppressWarnings("unchecked")
+    public List<E> poll(double min, double max, int size) {
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
+
+        List<String> keys = Collections.singletonList(this.getName());
+        List<E> values = this.template().execute(POLL_REMOVE_SCRIPT, keys, min, max, size);
+        return ObjectUtils.ifEmpty(values, Collections::emptyList);
+    }
+
+    /**
+     * 查看元素
+     *
+     * @param min 最小优先级
+     * @param max 最大优先级
+     * @return 元素
+     */
+    public E peek(double min, double max) {
+        Set<E> values = this.template().opsForZSet().rangeByScore(this.getName(), min, max, 0, 1);
+        return ObjectUtils.isEmpty(values) ? null : values.iterator().next();
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param e        元素
+     * @param priority 优先级数字
+     * @return true/false
+     */
+    public boolean offer(@NonNull E e, double priority) {
+        return this.offer(e, priority, true);
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param e        元素
+     * @param priority 优先级数字
+     * @param override 是否可覆盖
+     * @return true/false
+     */
+    public boolean offer(@NonNull E e, double priority, boolean override) {
+        Long count = this.offer(this.template(), Collections.singleton(e), priority, override);
+        return count != null && count > 0;
+    }
+
+    /**
+     * 将元素放入队列
+     *
+     * @param objects  元素列表
+     * @param override 是否可覆盖
+     * @return true/false
+     */
+    public boolean offer(@NonNull Collection<? extends E> objects, boolean override) {
+        if (ObjectUtils.isEmpty(objects)) {
+            return false;
+        }
+
+        Long count = this.offer(this.template(), objects, System.currentTimeMillis(), override);
+        return count != null && count > 0;
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return new Iter(this.size());
+    }
+
+    @Override
+    public int size() {
+        Long size = this.template().opsForZSet().size(this.getName());
+        return size == null ? 0 : size.intValue();
+    }
+
+    @Override
+    public boolean offer(E e) {
+        return this.offer(e, System.currentTimeMillis());
+    }
+
+    @Override
+    public E poll() {
+        List<E> values = this.poll(1);
+        return ObjectUtils.isEmpty(values) ? null : values.get(0);
+    }
+
+    @Override
+    public E peek() {
+        return this.peek(Long.MIN_VALUE, Long.MAX_VALUE);
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        return this.offer(c, true);
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o == null) {
+            return false;
+        }
+
+        Long index = this.template().opsForZSet().rank(this.getName(), o);
+        return index != null && index >= 0;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+
+        // 处理单个元素
+        if (c.size() == 1) {
+            return this.contains(c.iterator().next());
+        }
+
+        // 批量处理多个元素
+        List<Object> indexes = this.template().executePipelined(new SessionCallback<Object>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                for (Object o : c) {
+                    operations.opsForZSet().rank((K) getName(), o);
+                }
+                return null;
+            }
+        });
+        return ObjectUtils.notEmpty(indexes) && indexes.size() == c.size()
+                && indexes.stream().allMatch(index -> index != null && ((Long) index) >= 0);
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        if (o == null) {
+            return false;
+        }
+
+        Long count = this.template().opsForZSet().remove(this.getName(), o);
+        return count != null && count > 0;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+
+        Long count = this.template().opsForZSet().remove(this.getName(), c.toArray());
+        return count != null && count > 0;
+    }
+
+    @Override
+    public Object[] toArray() {
+        Set<E> values = this.template().opsForZSet().range(this.getName(), 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        Set<E> values = this.template().opsForZSet().range(this.getName(), 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toArray(a);
+    }
+
+    @Override
+    public String toString() {
+        Set<E> values = this.template().opsForZSet().range(this.getName(), 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toString();
+    }
+}

+ 192 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/Queues.java

@@ -0,0 +1,192 @@
+package com.chelvc.framework.redis.queue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.ThreadUtils;
+import com.google.common.collect.Maps;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 分布式队列处理工具类
+ *
+ * @author Woody
+ * @date 2025/4/28
+ */
+@Slf4j
+public final class Queues {
+    /**
+     * 队列名称/实例映射表
+     */
+    @SuppressWarnings("rawtypes")
+    private static final Map<String, RedisQueue> REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+
+    /**
+     * 延时队列名称/实例映射表
+     */
+    @SuppressWarnings("rawtypes")
+    private static final Map<String, DelayRedisQueue> DELAY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+
+    /**
+     * 优先级队列名称/实例映射表
+     */
+    @SuppressWarnings("rawtypes")
+    private static final Map<String, PriorityRedisQueue> PRIORITY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+
+    private Queues() {
+    }
+
+    /**
+     * 获取消息队列
+     *
+     * @param name 队列名称
+     * @param <E>  消息类型
+     * @return 队列实例
+     */
+    public static <E> RedisQueue<E> getRedisQueue(@NonNull String name) {
+        return getRedisQueue(name, RedisQueue::new);
+    }
+
+    /**
+     * 获取消息队列
+     *
+     * @param name    队列名称
+     * @param builder 队列构建函数
+     * @param <E>     消息类型
+     * @return 队列实例
+     */
+    @SuppressWarnings("unchecked")
+    public static <E> RedisQueue<E> getRedisQueue(@NonNull String name,
+                                                  @NonNull Function<String, RedisQueue<E>> builder) {
+        RedisQueue<E> queue = REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    }
+
+    /**
+     * 获取延时消息队列
+     *
+     * @param name 队列名称
+     * @param <E>  消息类型
+     * @return 队列实例
+     */
+    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name) {
+        return getDelayRedisQueue(name, DelayRedisQueue::new);
+    }
+
+    /**
+     * 获取延时消息队列
+     *
+     * @param name    队列名称
+     * @param builder 队列构建函数
+     * @param <E>     消息类型
+     * @return 队列实例
+     */
+    @SuppressWarnings("unchecked")
+    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name,
+                                                            @NonNull Function<String, DelayRedisQueue<E>> builder) {
+        DelayRedisQueue<E> queue = DELAY_REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? DELAY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    }
+
+    /**
+     * 获取优先级消息队列
+     *
+     * @param name 队列名称
+     * @param <E>  消息类型
+     * @return 队列实例
+     */
+    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name) {
+        return getPriorityRedisQueue(name, PriorityRedisQueue::new);
+    }
+
+    /**
+     * 获取优先级消息队列
+     *
+     * @param name    队列名称
+     * @param builder 队列构建函数
+     * @param <E>     消息类型
+     * @return 队列实例
+     */
+    @SuppressWarnings("unchecked")
+    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name,
+                                                                  @NonNull Function<String, PriorityRedisQueue<E>> builder) {
+        PriorityRedisQueue<E> queue = PRIORITY_REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? PRIORITY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    }
+
+    /**
+     * 消费队列消息
+     *
+     * @param queue    消息队列
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull RedisQueue<T> queue, @NonNull Consumer<T> consumer) {
+        ThreadUtils.run(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                // 拉取消息
+                T message = null;
+                try {
+                    message = queue.poll();
+                } catch (Throwable t) {
+                    log.error("RedisMQ message poll failed: {}", queue.getName(), t);
+                }
+
+                // 如果当前没有消息则暂停1秒后继续
+                if (message == null) {
+                    ThreadUtils.sleep(1000);
+                    continue;
+                }
+
+                // 消费消息
+                try {
+                    consumer.accept(message);
+                } catch (Throwable t) {
+                    log.error("RedisMQ message consume failed: {}, {}", queue.getName(), message, t);
+                }
+            }
+        });
+    }
+
+    /**
+     * 消费队列消息
+     *
+     * @param queue    消息队列
+     * @param size     拉取数量
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull PriorityRedisQueue<T> queue, int size,
+                                   @NonNull Consumer<List<T>> consumer) {
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
+        ThreadUtils.run(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                // 拉取消息
+                List<T> messages = null;
+                try {
+                    messages = queue.poll(size);
+                } catch (Throwable t) {
+                    log.error("RedisMQ message poll failed: {}", queue.getName(), t);
+                }
+
+                // 如果当前没有消息则暂停1秒后继续
+                if (ObjectUtils.isEmpty(messages)) {
+                    ThreadUtils.sleep(1000);
+                    continue;
+                }
+
+                // 消费消息
+                try {
+                    consumer.accept(messages);
+                } catch (Throwable t) {
+                    log.error("RedisMQ message consume failed: {}, {}", queue.getName(), messages, t);
+                }
+            }
+        });
+    }
+}

+ 7 - 38
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -5,16 +5,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
-import com.google.common.collect.Maps;
+import lombok.Getter;
 import lombok.NonNull;
 import org.springframework.data.redis.core.RedisTemplate;
 
@@ -25,18 +22,16 @@ import org.springframework.data.redis.core.RedisTemplate;
  * @author Woody
  * @date 2024/8/19
  */
+@Getter
 public class RedisQueue<E> extends AbstractQueue<E> {
-    /**
-     * 队列名称/实例映射表
-     */
-    @SuppressWarnings("rawtypes")
-    private static final Map<String, RedisQueue> INSTANCES = Maps.newConcurrentMap();
-
     private final String name;
 
     public RedisQueue(@NonNull String name) {
-        String profile = ApplicationContextHolder.getProfile();
-        this.name = StringUtils.isEmpty(profile) ? name : (profile + "-" + name);
+        this(ApplicationContextHolder.getProfile(), name);
+    }
+
+    RedisQueue(String namespace, @NonNull String name) {
+        this.name = StringUtils.isEmpty(namespace) ? name : (namespace + "-" + name);
     }
 
     /**
@@ -48,7 +43,6 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         private final int size;
 
         public Iter(int size) {
-            AssertUtils.check(size > -1, () -> "size must be greater than -1");
             this.size = size;
         }
 
@@ -71,31 +65,6 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         }
     }
 
-    /**
-     * 获取消息队列
-     *
-     * @param name 队列名称
-     * @param <E>  消息类型
-     * @return 队列实例
-     */
-    public static <E> RedisQueue<E> get(@NonNull String name) {
-        return get(name, RedisQueue::new);
-    }
-
-    /**
-     * 获取消息队列
-     *
-     * @param name    队列名称
-     * @param builder 队列构建函数
-     * @param <E>     消息类型
-     * @return 队列实例
-     */
-    @SuppressWarnings("unchecked")
-    public static <E> RedisQueue<E> get(@NonNull String name, @NonNull Function<String, RedisQueue<E>> builder) {
-        RedisQueue<E> queue = INSTANCES.get(name);
-        return queue == null ? INSTANCES.computeIfAbsent(name, builder) : queue;
-    }
-
     /**
      * 获取RedisTemplate实例
      *

+ 0 - 361
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalRedisQueue.java

@@ -1,361 +0,0 @@
-package com.chelvc.framework.redis.queue;
-
-import java.time.Duration;
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Function;
-
-import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.common.util.AssertUtils;
-import com.chelvc.framework.common.util.ObjectUtils;
-import com.chelvc.framework.common.util.StringUtils;
-import com.chelvc.framework.redis.context.RedisContextHolder;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import lombok.NonNull;
-import org.springframework.dao.DataAccessException;
-import org.springframework.data.redis.core.RedisOperations;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.SessionCallback;
-import org.springframework.data.redis.core.script.DefaultRedisScript;
-import org.springframework.data.redis.core.script.RedisScript;
-
-/**
- * 带临时性的Redis队列
- *
- * @param <E> 元素类型
- * @author Woody
- * @date 2024/8/29
- */
-public class TemporalRedisQueue<E> extends AbstractQueue<E> {
-    /**
-     * 队列名称/实例映射表
-     */
-    @SuppressWarnings("rawtypes")
-    private static final Map<String, TemporalRedisQueue> INSTANCES = Maps.newConcurrentMap();
-
-    /**
-     * Redis队列添加元素脚本(如果存在则忽略)
-     */
-    private static final RedisScript<Long> ADD_SCRIPT = new DefaultRedisScript<>(
-            "return redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV))", Long.class
-    );
-
-    /**
-     * Redis队列弹出元素脚本
-     */
-    @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> POLL_SCRIPT = new DefaultRedisScript<>(
-            "local value = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, 1) " +
-                    "if value[1] then redis.call('ZADD', KEYS[1], 'XX', ARGV[3], value[1]) end return value", List.class
-    );
-
-    private final long idle;
-    private final long timeout;
-    private final String name;
-    private final List<String> keys;
-
-    public TemporalRedisQueue(@NonNull String name) {
-        this(name, Duration.ofMinutes(1), Duration.ZERO);
-    }
-
-    public TemporalRedisQueue(@NonNull String name, @NonNull Duration idle) {
-        this(name, idle, Duration.ZERO);
-    }
-
-    public TemporalRedisQueue(@NonNull String name, @NonNull Duration idle, @NonNull Duration timeout) {
-        String profile = ApplicationContextHolder.getProfile();
-        this.name = StringUtils.isEmpty(profile) ? name : (profile + "-" + name);
-        this.idle = idle.toMillis();
-        AssertUtils.check(this.idle > 0, () -> "idle must be greater than 0");
-        this.timeout = timeout.toMillis();
-        this.keys = Collections.singletonList(this.name);
-    }
-
-    /**
-     * 集合迭代器实现
-     */
-    private class Iter implements Iterator<E> {
-        private E element;
-        private int index = 0;
-        private final int size;
-
-        public Iter(int size) {
-            AssertUtils.check(size > -1, () -> "size must be greater than -1");
-            this.size = size;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return this.index < this.size;
-        }
-
-        @Override
-        public E next() {
-            Set<E> values = template().opsForZSet().range(name, this.index, this.index++);
-            return ObjectUtils.isEmpty(values) ? null : (this.element = values.iterator().next());
-        }
-
-        @Override
-        public void remove() {
-            if (this.element != null) {
-                template().opsForZSet().remove(name, this.element);
-                this.element = null;
-            }
-        }
-    }
-
-    /**
-     * 获取消息队列
-     *
-     * @param name 队列名称
-     * @param <E>  消息类型
-     * @return 队列实例
-     */
-    public static <E> TemporalRedisQueue<E> get(@NonNull String name) {
-        return get(name, TemporalRedisQueue::new);
-    }
-
-    /**
-     * 获取消息队列
-     *
-     * @param name    队列名称
-     * @param builder 队列构建函数
-     * @param <E>     消息类型
-     * @return 队列实例
-     */
-    @SuppressWarnings("unchecked")
-    public static <E> TemporalRedisQueue<E> get(@NonNull String name,
-                                                @NonNull Function<String, TemporalRedisQueue<E>> builder) {
-        TemporalRedisQueue<E> queue = INSTANCES.get(name);
-        return queue == null ? INSTANCES.computeIfAbsent(name, builder) : queue;
-    }
-
-    /**
-     * 添加元素
-     *
-     * @param args 添加参数
-     * @return true/false
-     */
-    private boolean add(Object[] args) {
-        Long count;
-        if (this.timeout > 0) {
-            long max = System.currentTimeMillis() - this.timeout - 1000;
-            List<Object> values = this.template().executePipelined(new SessionCallback<Object>() {
-                @Override
-                @SuppressWarnings("unchecked")
-                public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
-                    operations.execute(ADD_SCRIPT, (List<K>) keys, args);
-                    operations.opsForZSet().removeRangeByScore((K) name, 0, max);
-                    return null;
-                }
-            });
-            count = ObjectUtils.isEmpty(values) ? null : (Long) values.get(0);
-        } else {
-            count = this.template().execute(ADD_SCRIPT, this.keys, args);
-        }
-        return count != null && count > 0;
-    }
-
-    /**
-     * 获取RedisTemplate实例
-     *
-     * @return RedisTemplate实例
-     */
-    protected RedisTemplate<String, E> template() {
-        return RedisContextHolder.getDefaultTemplate();
-    }
-
-    /**
-     * 判断元素是否在临时状态
-     *
-     * @param e 元素对象
-     * @return true/false
-     */
-    public boolean temporally(E e) {
-        if (e == null) {
-            return false;
-        }
-        Double score = this.template().opsForZSet().score(this.name, e);
-        return score != null && score > System.currentTimeMillis();
-    }
-
-    /**
-     * 判断元素是否在临时状态
-     *
-     * @param collection 元素集合
-     * @return 元素/是否在临时状态映射表
-     */
-    public Map<E, Boolean> temporally(Collection<E> collection) {
-        if (ObjectUtils.isEmpty(collection)) {
-            return Collections.emptyMap();
-        }
-
-        // 处理单个元素
-        if (collection.size() == 1) {
-            E e = collection.iterator().next();
-            return ImmutableMap.of(e, this.temporally(e));
-        }
-
-        // 批量处理多个元素
-        List<Object> scores = this.template().executePipelined(new SessionCallback<Object>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
-                for (E e : collection) {
-                    operations.opsForZSet().score((K) name, e);
-                }
-                return null;
-            }
-        });
-        if (ObjectUtils.isEmpty(scores)) {
-            return Collections.emptyMap();
-        }
-        int i = 0;
-        long timestamp = System.currentTimeMillis();
-        Map<E, Boolean> consumings = Maps.newHashMapWithExpectedSize(collection.size());
-        for (E e : collection) {
-            Double score = (Double) scores.get(i++);
-            consumings.put(e, score != null && score > timestamp);
-        }
-        return consumings;
-    }
-
-    @Override
-    public Iterator<E> iterator() {
-        return new Iter(this.size());
-    }
-
-    @Override
-    public int size() {
-        Long size = this.template().opsForZSet().size(this.name);
-        return size == null ? 0 : size.intValue();
-    }
-
-    @Override
-    public boolean add(E e) {
-        return this.offer(e);
-    }
-
-    @Override
-    public boolean offer(E e) {
-        return this.add(new Object[]{System.currentTimeMillis(), Objects.requireNonNull(e)});
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public E poll() {
-        long max = System.currentTimeMillis();
-        long min = this.timeout > 0 ? max - this.timeout : 0, score = max + this.idle;
-        List<E> values = this.template().execute(POLL_SCRIPT, this.keys, min, max, score);
-        return ObjectUtils.isEmpty(values) ? null : values.get(0);
-    }
-
-    @Override
-    public E peek() {
-        long min = 0, max = System.currentTimeMillis();
-        Set<E> values = this.template().opsForZSet().rangeByScore(this.name, min, max, 0, 1);
-        return ObjectUtils.isEmpty(values) ? null : values.iterator().next();
-    }
-
-    @Override
-    public void clear() {
-        this.template().delete(this.name);
-    }
-
-    @Override
-    public boolean addAll(Collection<? extends E> c) {
-        if (ObjectUtils.isEmpty(c)) {
-            return false;
-        }
-
-        int i = 0;
-        Object[] args = new Object[c.size() * 2];
-        long timestamp = System.currentTimeMillis();
-        for (E e : c) {
-            args[i++] = timestamp++;
-            args[i++] = Objects.requireNonNull(e);
-        }
-        return this.add(args);
-    }
-
-    @Override
-    public boolean contains(Object o) {
-        if (o == null) {
-            return false;
-        }
-
-        Long index = this.template().opsForZSet().rank(this.name, o);
-        return index != null && index >= 0;
-    }
-
-    @Override
-    public boolean containsAll(Collection<?> c) {
-        if (ObjectUtils.isEmpty(c)) {
-            return false;
-        }
-
-        // 处理单个元素
-        if (c.size() == 1) {
-            return this.contains(c.iterator().next());
-        }
-
-        // 批量处理多个元素
-        List<Object> indexes = this.template().executePipelined(new SessionCallback<Object>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
-                for (Object o : c) {
-                    operations.opsForZSet().rank((K) name, o);
-                }
-                return null;
-            }
-        });
-        return ObjectUtils.notEmpty(indexes) && indexes.size() == c.size()
-                && indexes.stream().allMatch(index -> index != null && ((Long) index) >= 0);
-    }
-
-    @Override
-    public boolean remove(Object o) {
-        if (o == null) {
-            return false;
-        }
-
-        Long count = this.template().opsForZSet().remove(this.name, o);
-        return count != null && count > 0;
-    }
-
-    @Override
-    public boolean removeAll(Collection<?> c) {
-        if (ObjectUtils.isEmpty(c)) {
-            return false;
-        }
-
-        Long count = this.template().opsForZSet().remove(this.name, c.toArray());
-        return count != null && count > 0;
-    }
-
-    @Override
-    public Object[] toArray() {
-        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
-        return ObjectUtils.ifNull(values, Collections::emptySet).toArray();
-    }
-
-    @Override
-    public <T> T[] toArray(T[] a) {
-        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
-        return ObjectUtils.ifNull(values, Collections::emptySet).toArray(a);
-    }
-
-    @Override
-    public String toString() {
-        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
-        return ObjectUtils.ifNull(values, Collections::emptySet).toString();
-    }
-}

+ 6 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQConfigurer.java

@@ -1,11 +1,14 @@
 package com.chelvc.framework.rocketmq.config;
 
+import java.util.List;
+
 import com.chelvc.framework.rocketmq.fallback.RocketMQFallbackPolicy;
 import com.chelvc.framework.rocketmq.fallback.RocketMQMemoryProducer;
 import com.chelvc.framework.rocketmq.fallback.RocketMQPersistentProducer;
 import com.chelvc.framework.rocketmq.fallback.RocketMQStoreClient;
-import com.chelvc.framework.rocketmq.producer.CommonTransactionChecker;
 import com.chelvc.framework.rocketmq.producer.RocketMQProducerWrapper;
+import com.chelvc.framework.rocketmq.producer.StandardTransactionChecker;
+import com.chelvc.framework.rocketmq.producer.TransactionCheckProcessor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
@@ -17,7 +20,6 @@ import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -50,8 +52,8 @@ public class RocketMQConfigurer {
 
     @Bean
     @ConditionalOnMissingBean(TransactionChecker.class)
-    public TransactionChecker transactionChecker(ApplicationContext applicationContext) {
-        return new CommonTransactionChecker(applicationContext);
+    public TransactionChecker transactionChecker(List<TransactionCheckProcessor<?>> processors) {
+        return new StandardTransactionChecker(processors);
     }
 
     @Bean

+ 2 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQListenerRegistry.java

@@ -63,10 +63,8 @@ public class RocketMQListenerRegistry implements ApplicationListener<Application
      */
     protected <T> void registerListenerContainer(@NonNull String name, @NonNull RocketMQListener<T> listener) {
         Class<?> clazz = AopProxyUtils.ultimateTargetClass(listener);
-        RocketMQConsumer annotation = AssertUtils.nonnull(
-                clazz.getAnnotation(RocketMQConsumer.class),
-                () -> "@RocketMQConsumer annotation is missing: " + clazz.getName()
-        );
+        RocketMQConsumer annotation = clazz.getAnnotation(RocketMQConsumer.class);
+        AssertUtils.state(annotation != null, () -> "@RocketMQConsumer annotation is missing: " + clazz.getName());
         RocketMQListenerContainer<T> container = this.createListenerContainer(annotation.batch() > 1);
         Type type = ObjectUtils.lookupSuperclassParameterized(clazz, RocketMQListener.class, Object.class);
         try {

+ 17 - 35
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java

@@ -10,6 +10,7 @@ import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.rocketmq.annotation.RocketMQConsumer;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
@@ -129,44 +130,17 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
         });
     }
 
-    /**
-     * 关闭消息消费者线程
-     *
-     * @param thread 消息消费者线程
-     */
-    private void shutdown(Thread thread) {
-        if (thread != null) {
-            thread.interrupt();
-            ThreadUtils.join(thread);
-        }
-    }
-
-    /**
-     * 关闭消息消费者
-     *
-     * @param consumer 消息消费者实例
-     */
-    private void shutdown(SimpleConsumer consumer) {
-        if (consumer != null) {
-            try {
-                consumer.close();
-            } catch (Throwable t) {
-                log.warn("RocketMQ consumer shutdown failed: {}:{}, {}", this.topic, this.group, t.getMessage());
-            }
-        }
-    }
-
     @Override
     public void initialize(@NonNull Type type, @NonNull ClientServiceProvider provider,
                            @NonNull ClientConfiguration configuration, @NonNull RocketMQListener<T> listener,
                            @NonNull RocketMQConsumer annotation) {
         // 参数配置有效性校验
-        AssertUtils.nonempty(annotation.topic(), () -> "Consumer topic must not be empty");
-        AssertUtils.nonempty(annotation.group(), () -> "Consumer group must not be empty");
-        AssertUtils.check(annotation.batch() > 0, () -> "Consumer batch must be greater than 0");
-        AssertUtils.check(annotation.duration() > 0, () -> "Consumer duration must be greater than 0");
-        AssertUtils.check(annotation.interval() > 0, () -> "Consumer interval must be greater than 0");
-        AssertUtils.nonempty(annotation.tag(), () -> "Consumer tag must not be empty");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.topic()), () -> "Consumer topic must not be empty");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.group()), () -> "Consumer group must not be empty");
+        AssertUtils.arg(annotation.batch() > 0, () -> "Consumer batch must be greater than 0");
+        AssertUtils.arg(annotation.duration() > 0, () -> "Consumer duration must be greater than 0");
+        AssertUtils.arg(annotation.interval() > 0, () -> "Consumer interval must be greater than 0");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.tag()), () -> "Consumer tag must not be empty");
 
         // 初始化消费者实例
         this.type = type;
@@ -195,9 +169,17 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
     public void destroy() throws Exception {
         this.running = false;
         try {
-            this.shutdown(this.thread);
+            if (this.thread != null) {
+                ThreadUtils.stop(this.thread);
+            }
         } finally {
-            this.shutdown(this.consumer);
+            if (this.consumer != null) {
+                try {
+                    this.consumer.close();
+                } catch (Throwable t) {
+                    log.warn("RocketMQ consumer shutdown failed: {}:{}, {}", this.topic, this.group, t.getMessage());
+                }
+            }
         }
     }
 }

+ 12 - 20
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/SingleRocketMQListenerContainer.java

@@ -5,6 +5,7 @@ import java.util.Collections;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.rocketmq.annotation.RocketMQConsumer;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import lombok.NonNull;
@@ -55,30 +56,15 @@ public class SingleRocketMQListenerContainer<T> implements RocketMQListenerConta
         }
     }
 
-    /**
-     * 关闭消息消费者
-     *
-     * @param consumer 消息消费者实例
-     */
-    private void shutdown(PushConsumer consumer) {
-        if (consumer != null) {
-            try {
-                consumer.close();
-            } catch (Throwable t) {
-                log.warn("RocketMQ consumer shutdown failed: {}:{}, {}", this.topic, this.group, t.getMessage());
-            }
-        }
-    }
-
     @Override
     public void initialize(@NonNull Type type, @NonNull ClientServiceProvider provider,
                            @NonNull ClientConfiguration configuration, @NonNull RocketMQListener<T> listener,
                            @NonNull RocketMQConsumer annotation) {
         // 参数配置有效性校验
-        AssertUtils.nonempty(annotation.topic(), () -> "Consumer topic must not be empty");
-        AssertUtils.nonempty(annotation.group(), () -> "Consumer group must not be empty");
-        AssertUtils.nonempty(annotation.tag(), () -> "Consumer tag must not be empty");
-        AssertUtils.check(annotation.concurrency() > 0, () -> "Consumer concurrency must be greater than 0");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.topic()), () -> "Consumer topic must not be empty");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.group()), () -> "Consumer group must not be empty");
+        AssertUtils.arg(StringUtils.notEmpty(annotation.tag()), () -> "Consumer tag must not be empty");
+        AssertUtils.arg(annotation.concurrency() > 0, () -> "Consumer concurrency must be greater than 0");
 
         // 初始化消费者实例
         this.type = type;
@@ -100,6 +86,12 @@ public class SingleRocketMQListenerContainer<T> implements RocketMQListenerConta
 
     @Override
     public void destroy() throws Exception {
-        this.shutdown(this.consumer);
+        if (this.consumer != null) {
+            try {
+                this.consumer.close();
+            } catch (Throwable t) {
+                log.warn("RocketMQ consumer shutdown failed: {}:{}, {}", this.topic, this.group, t.getMessage());
+            }
+        }
     }
 }

+ 1 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java

@@ -34,7 +34,7 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
 
     public RocketMQMemoryProducer(int capacity, @NonNull Producer delegate) {
         super(delegate);
-        AssertUtils.check(capacity > 0, () -> "RocketMQ fallback queue capacity must be greater than 0");
+        AssertUtils.arg(capacity > 0, () -> "RocketMQ fallback queue capacity must be greater than 0");
         this.capacity = capacity;
     }
 

+ 0 - 47
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/CommonTransactionChecker.java

@@ -1,47 +0,0 @@
-package com.chelvc.framework.rocketmq.producer;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import com.chelvc.framework.common.util.JacksonUtils;
-import com.chelvc.framework.common.util.ObjectUtils;
-import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.apis.message.MessageView;
-import org.apache.rocketmq.client.apis.producer.TransactionChecker;
-import org.apache.rocketmq.client.apis.producer.TransactionResolution;
-import org.springframework.context.ApplicationContext;
-
-/**
- * RocketMQ事务本地检测公共处理器实现
- *
- * @author Woody
- * @date 2024/10/29
- */
-@Slf4j
-public class CommonTransactionChecker implements TransactionChecker {
-    private final Map<String, TransactionMessageChecker<?>> checkers;
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    public CommonTransactionChecker(@NonNull ApplicationContext applicationContext) {
-        Map<String, TransactionMessageChecker> checkers =
-                applicationContext.getBeansOfType(TransactionMessageChecker.class);
-        this.checkers = ObjectUtils.isEmpty(checkers) ? Collections.emptyMap() :
-                checkers.values().stream().collect(Collectors.toMap(TransactionMessageChecker::topic, c -> c));
-    }
-
-    @Override
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    public TransactionResolution check(MessageView message) {
-        String body = RocketMQContextHolder.body(message);
-        String topic = RocketMQContextHolder.topic(message);
-        TransactionMessageChecker checker = this.checkers.get(topic);
-        if (checker == null) {
-            log.error("Transaction message checker is missing: {}, {}", topic, body);
-            return TransactionResolution.UNKNOWN;
-        }
-        return checker.check(JacksonUtils.deserialize(body, checker.model()));
-    }
-}

+ 52 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/StandardTransactionChecker.java

@@ -0,0 +1,52 @@
+package com.chelvc.framework.rocketmq.producer;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+
+/**
+ * RocketMQ事务消息检测器标准实现
+ *
+ * @author Woody
+ * @date 2024/10/29
+ */
+@Slf4j
+public class StandardTransactionChecker implements TransactionChecker {
+    private final Map<String, TransactionCheckProcessor<?>> processors;
+
+    public StandardTransactionChecker(@NonNull List<TransactionCheckProcessor<?>> processors) {
+        this.processors = ObjectUtils.isEmpty(processors) ? Collections.emptyMap() :
+                processors.stream().collect(Collectors.toMap(TransactionCheckProcessor::topic, processor -> processor));
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public TransactionResolution check(MessageView message) {
+        String topic = RocketMQContextHolder.topic(message);
+        TransactionCheckProcessor processor = this.processors.get(topic);
+        if (processor == null) {
+            log.error("Transaction message check processor is missing: {}", topic);
+            return TransactionResolution.UNKNOWN;
+        }
+
+        String body = RocketMQContextHolder.body(message);
+        Object payload = JacksonUtils.deserialize(body, processor.model());
+        SessionContextHolder.setSession(RocketMQContextHolder.getSession(message));
+        try {
+            return ObjectUtils.ifNull(processor.check(payload), TransactionResolution.UNKNOWN);
+        } finally {
+            SessionContextHolder.removeSessionContext();
+        }
+    }
+}

+ 4 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionMessageChecker.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionCheckProcessor.java

@@ -3,13 +3,13 @@ package com.chelvc.framework.rocketmq.producer;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 
 /**
- * RocketMQ事务消息本地事务检测接口
+ * RocketMQ事务消息检测处理接口
  *
  * @param <T> 消息类型
  * @author Woody
  * @date 2024/10/29
  */
-public interface TransactionMessageChecker<T> {
+public interface TransactionCheckProcessor<T> {
     /**
      * 获取消息主题
      *
@@ -25,10 +25,10 @@ public interface TransactionMessageChecker<T> {
     Class<T> model();
 
     /**
-     * 本地事务检测
+     * 事务检测
      *
      * @param message 事务消息
-     * @return 本地事务检测处理决策
+     * @return 事务处理决策
      */
     TransactionResolution check(T message);
 }

+ 1 - 1
framework-sms/src/main/java/com/chelvc/framework/sms/support/DefaultCaptchaSmsHandler.java

@@ -77,7 +77,7 @@ public class DefaultCaptchaSmsHandler implements CaptchaSmsHandler {
 
     @Override
     public SmsSession send(@NonNull String mobile, int length) {
-        AssertUtils.check(length > 0, () -> "Captcha length must be greater than 0");
+        AssertUtils.arg(length > 0, () -> "Captcha length must be greater than 0");
 
         // 短信验证码发送频率限制
         String lock = "sms:captcha:interval:" + mobile;

+ 1 - 1
framework-sms/src/main/java/com/chelvc/framework/sms/support/DelegatingNormalSmsHandler.java

@@ -27,7 +27,7 @@ public class DelegatingNormalSmsHandler implements NormalSmsHandler {
     private final List<NormalSmsHandler> handlers;
 
     public DelegatingNormalSmsHandler(@NonNull List<NormalSmsHandler> handlers) {
-        AssertUtils.nonempty(handlers, () -> "Delegating handlers must not be empty");
+        AssertUtils.arg(!handlers.isEmpty(), () -> "Delegating handlers must not be empty");
         this.handlers = Lists.newArrayList(handlers);
     }
 

+ 1 - 1
framework-sms/src/main/java/com/chelvc/framework/sms/support/DelegatingTemplateSmsHandler.java

@@ -22,7 +22,7 @@ public class DelegatingTemplateSmsHandler implements TemplateSmsHandler {
     private final List<TemplateSmsHandler> handlers;
 
     public DelegatingTemplateSmsHandler(@NonNull List<TemplateSmsHandler> handlers) {
-        AssertUtils.nonempty(handlers, () -> "Delegating handlers must not be empty");
+        AssertUtils.arg(!handlers.isEmpty(), () -> "Delegating handlers must not be empty");
         this.handlers = Lists.newArrayList(handlers);
     }
 

+ 14 - 6
framework-upload/src/main/java/com/chelvc/framework/upload/support/AliyunUploadHandler.java

@@ -21,6 +21,7 @@ import com.aliyuncs.profile.DefaultProfile;
 import com.chelvc.framework.base.util.HttpUtils;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.upload.UploadHandler;
 import com.chelvc.framework.upload.config.UploadProperties;
 import lombok.NonNull;
@@ -38,12 +39,19 @@ public class AliyunUploadHandler implements UploadHandler {
     private final String domain;
 
     public AliyunUploadHandler(@NonNull UploadProperties properties) {
-        String appid = AssertUtils.nonempty(properties.getAppid(), () -> "Aliyun upload appid is missing");
-        String secret = AssertUtils.nonempty(properties.getSecret(), () -> "Aliyun upload secret is missing");
-        String region = AssertUtils.nonempty(properties.getRegion(), () -> "Aliyun upload region is missing");
-        String endpoint = AssertUtils.nonempty(properties.getEndpoint(), () -> "Aliyun upload endpoint is missing");
-        this.bucket = AssertUtils.nonempty(properties.getBucket(), () -> "Aliyun upload bucket is missing");
-        this.domain = AssertUtils.nonempty(properties.getDomain(), () -> "Aliyun upload access domain is missing");
+        this.bucket = properties.getBucket();
+        this.domain = properties.getDomain();
+        AssertUtils.arg(StringUtils.notEmpty(this.bucket), () -> "Aliyun upload bucket is missing");
+        AssertUtils.arg(StringUtils.notEmpty(this.domain), () -> "Aliyun upload access domain is missing");
+
+        String appid = properties.getAppid();
+        String secret = properties.getSecret();
+        String region = properties.getRegion();
+        String endpoint = properties.getEndpoint();
+        AssertUtils.arg(StringUtils.notEmpty(appid), () -> "Aliyun upload appid is missing");
+        AssertUtils.arg(StringUtils.notEmpty(secret), () -> "Aliyun upload secret is missing");
+        AssertUtils.arg(StringUtils.notEmpty(region), () -> "Aliyun upload region is missing");
+        AssertUtils.arg(StringUtils.notEmpty(endpoint), () -> "Aliyun upload endpoint is missing");
 
         // 初始化OSS客户端
         ClientBuilderConfiguration configuration = new ClientBuilderConfiguration();

+ 5 - 2
framework-upload/src/main/java/com/chelvc/framework/upload/support/LocalUploadHandler.java

@@ -7,6 +7,7 @@ import java.io.InputStream;
 import com.chelvc.framework.base.util.HttpUtils;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.FileUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.upload.UploadHandler;
 import com.chelvc.framework.upload.config.UploadProperties;
 import lombok.NonNull;
@@ -22,8 +23,10 @@ public class LocalUploadHandler implements UploadHandler {
     private final String domain;
 
     public LocalUploadHandler(@NonNull UploadProperties properties) {
-        this.bucket = AssertUtils.nonempty(properties.getBucket(), () -> "Local upload bucket is missing");
-        this.domain = AssertUtils.nonempty(properties.getDomain(), () -> "Local upload access domain is missing");
+        this.bucket = properties.getBucket();
+        this.domain = properties.getDomain();
+        AssertUtils.arg(StringUtils.notEmpty(this.bucket), () -> "Local upload bucket is missing");
+        AssertUtils.arg(StringUtils.notEmpty(this.domain), () -> "Local upload access domain is missing");
     }
 
     @Override

+ 11 - 5
framework-upload/src/main/java/com/chelvc/framework/upload/support/TencentUploadHandler.java

@@ -37,11 +37,17 @@ public class TencentUploadHandler implements UploadHandler {
     private final CdnClient cdn;
 
     public TencentUploadHandler(@NonNull UploadProperties properties) {
-        String appid = AssertUtils.nonempty(properties.getAppid(), () -> "Tencent upload appid is missing");
-        String secret = AssertUtils.nonempty(properties.getSecret(), () -> "Tencent upload secret is missing");
-        String region = AssertUtils.nonempty(properties.getRegion(), () -> "Tencent upload region is missing");
-        this.bucket = AssertUtils.nonempty(properties.getBucket(), () -> "Tencent upload bucket is missing");
-        this.domain = AssertUtils.nonempty(properties.getDomain(), () -> "Tencent upload access domain is missing");
+        this.bucket = properties.getBucket();
+        this.domain = properties.getDomain();
+        AssertUtils.arg(StringUtils.notEmpty(this.bucket), () -> "Tencent upload bucket is missing");
+        AssertUtils.arg(StringUtils.notEmpty(this.domain), () -> "Tencent upload access domain is missing");
+
+        String appid = properties.getAppid();
+        String secret = properties.getSecret();
+        String region = properties.getRegion();
+        AssertUtils.arg(StringUtils.notEmpty(appid), () -> "Tencent upload appid is missing");
+        AssertUtils.arg(StringUtils.notEmpty(secret), () -> "Tencent upload secret is missing");
+        AssertUtils.arg(StringUtils.notEmpty(region), () -> "Tencent upload region is missing");
 
         // 初始化COS客户端
         ClientConfig config = new ClientConfig();

+ 4 - 2
framework-wechat/src/main/java/com/chelvc/framework/wechat/context/WechatContextHolder.java

@@ -122,9 +122,10 @@ public final class WechatContextHolder implements ApplicationListener<Applicatio
      * @return 访问令牌/过期时间戳
      */
     private static Pair<String, Long> invokeAccessToken(String appid) {
+        String secret = APPID_SECRET_MAPPING.get(appid);
+        AssertUtils.state(StringUtils.notEmpty(secret), () -> "Wechat secret not found: " + appid);
         HttpHeaders headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_JSON);
-        String secret = AssertUtils.nonnull(APPID_SECRET_MAPPING.get(appid), () -> "Wechat secret not found: " + appid);
         Map<?, ?> body = ImmutableMap.of("grant_type", "client_credential", "appid", appid, "secret", secret);
         HttpEntity<?> entity = new HttpEntity<>(body, headers);
         WechatAccessToken response = exchange(ACCESS_TOKEN_URL, HttpMethod.POST, entity, WechatAccessToken.class);
@@ -180,7 +181,8 @@ public final class WechatContextHolder implements ApplicationListener<Applicatio
      */
     public static WechatPublicHandler getWechatPublicHandler(@NonNull String id) {
         WechatPublicHandler handler = getWechatPublicHandlers().get(id);
-        return AssertUtils.nonnull(handler, () -> "Wechat public handler not found: " + id);
+        AssertUtils.state(handler != null, () -> "Wechat public handler not found: " + id);
+        return handler;
     }
 
     /**

+ 2 - 2
framework-wechat/src/main/java/com/chelvc/framework/wechat/support/DefaultWechatPaymentHandler.java

@@ -24,8 +24,8 @@ import lombok.extern.slf4j.Slf4j;
 public class DefaultWechatPaymentHandler implements WechatPaymentHandler {
     private final List<WechatPaymentProcessor> processors;
 
-    public DefaultWechatPaymentHandler(List<WechatPaymentProcessor> processors) {
-        AssertUtils.nonempty(processors, () -> "Wechat payment processors must not be empty");
+    public DefaultWechatPaymentHandler(@NonNull List<WechatPaymentProcessor> processors) {
+        AssertUtils.arg(!processors.isEmpty(), () -> "Wechat payment processors must not be empty");
         this.processors = Lists.newArrayList(processors);
     }
 

+ 2 - 1
framework-wechat/src/main/java/com/chelvc/framework/wechat/support/DefaultWechatPublicHandler.java

@@ -97,7 +97,8 @@ public class DefaultWechatPublicHandler implements WechatPublicHandler {
 
     @Override
     public String decrypt(@NonNull String ciphertext) {
-        String secret = AssertUtils.nonempty(this.properties.getKey(), () -> "Wechat secret is missing");
+        String secret = this.properties.getKey();
+        AssertUtils.state(StringUtils.notEmpty(secret), () -> "Wechat secret is missing");
         return WechatContextHolder.decrypt(ciphertext, secret);
     }