Przeglądaj źródła

升级log4j版本;优化安全处理逻辑;优化Kafka、RedisMQ、RocketMQ处理逻辑;

Woody 1 tydzień temu
rodzic
commit
e95ecbc663
57 zmienionych plików z 1048 dodań i 587 usunięć
  1. 11 0
      framework-base/src/main/java/com/chelvc/framework/base/context/ApplicationContextHolder.java
  2. 30 0
      framework-base/src/main/java/com/chelvc/framework/base/context/Session.java
  3. 4 0
      framework-boot/pom.xml
  4. 18 4
      framework-common/pom.xml
  5. 2 2
      framework-common/src/main/java/com/chelvc/framework/common/model/Ip.java
  6. 4 4
      framework-common/src/main/java/com/chelvc/framework/common/util/JacksonUtils.java
  7. 1 1
      framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveArrayDeserializer.java
  8. 1 1
      framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveListDeserializer.java
  9. 1 1
      framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveSetDeserializer.java
  10. 16 0
      framework-dependencies/pom.xml
  11. 1 2
      framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaListenerRegistryWrapper.java
  12. 2 3
      framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaProperties.java
  13. 2 75
      framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java
  14. 0 18
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/FallbackSendResult.java
  15. 5 5
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallbackSender.java
  16. 7 15
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java
  17. 5 11
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaPersistentSender.java
  18. 21 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/ConsumerRecordInterceptor.java
  19. 0 32
      framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordAdapter.java
  20. 74 42
      framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordInterceptor.java
  21. 21 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/ProducerRecordInterceptor.java
  22. 118 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/EmptySendResult.java
  23. 21 3
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/StandardTransactionChecker.java
  24. 1 1
      framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java
  25. 4 0
      framework-nacos/pom.xml
  26. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/annotation/RedisMQConsumer.java
  27. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/config/RedisMQListenerRegistry.java
  28. 6 17
      framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java
  29. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/ConsumerPendingCleaner.java
  30. 3 3
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java
  31. 18 4
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java
  32. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisMQListener.java
  33. 1 1
      framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisMQListenerContainer.java
  34. 1 1
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQProperties.java
  35. 43 18
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java
  36. 3 18
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/RocketMQListener.java
  37. 1 1
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/RocketMQListenerContainer.java
  38. 51 10
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/SingleRocketMQListenerContainer.java
  39. 45 207
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java
  40. 10 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackMessage.java
  41. 10 10
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackProducer.java
  42. 2 2
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackTransaction.java
  43. 7 16
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java
  44. 15 35
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQPersistentProducer.java
  45. 20 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/ConsumerMessageInterceptor.java
  46. 19 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/ProducerMessageInterceptor.java
  47. 100 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/RocketMQProducerInterceptor.java
  48. 44 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/EmptySendReceipt.java
  49. 145 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/ProducerMessage.java
  50. 9 1
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/RocketMQProducerFactory.java
  51. 75 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/RocketMQProducerIsolator.java
  52. 29 13
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/StandardTransactionChecker.java
  53. 9 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionCheckProcessor.java
  54. 5 3
      framework-security/src/main/java/com/chelvc/framework/security/interceptor/SecurityFirewallInterceptor.java
  55. 1 1
      framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonArrayDecrypter.java
  56. 1 1
      framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonListDecrypter.java
  57. 1 1
      framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonSetDecrypter.java

+ 11 - 0
framework-base/src/main/java/com/chelvc/framework/base/context/ApplicationContextHolder.java

@@ -104,6 +104,17 @@ public class ApplicationContextHolder implements EnvironmentAware, ApplicationCo
         return getProfile(false);
     }
 
+    /**
+     * 获取环境标识
+     *
+     * @param suffix 标识后缀
+     * @return 环境标识
+     */
+    public static String getProfile(String suffix) {
+        String profile = getProfile();
+        return StringUtils.isEmpty(profile) ? suffix : StringUtils.isEmpty(suffix) ? profile : (profile + "-" + suffix);
+    }
+
     /**
      * 获取环境标识
      *

+ 30 - 0
framework-base/src/main/java/com/chelvc/framework/base/context/Session.java

@@ -8,6 +8,7 @@ import java.util.Set;
 import com.chelvc.framework.common.model.Caps;
 import com.chelvc.framework.common.model.Platform;
 import com.chelvc.framework.common.model.Terminal;
+import com.chelvc.framework.common.util.JacksonUtils;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Maps;
 import lombok.AccessLevel;
@@ -106,6 +107,14 @@ public class Session implements Serializable {
      */
     private Set<String> authorities;
 
+    /**
+     * 序列化值(JSON字符串)
+     */
+    @JsonIgnore
+    @Getter(AccessLevel.NONE)
+    @Setter(AccessLevel.NONE)
+    private volatile String serialization;
+
     /**
      * 分组场景/标识映射表
      */
@@ -115,6 +124,22 @@ public class Session implements Serializable {
     @Setter(AccessLevel.NONE)
     private final Map<String, Caps> groups = Maps.newConcurrentMap();
 
+    /**
+     * 序列化会话信息
+     *
+     * @return JSON字符串
+     */
+    public String serialize() {
+        if (this.serialization == null) {
+            synchronized (this) {
+                if (this.serialization == null) {
+                    this.serialization = JacksonUtils.serialize(this);
+                }
+            }
+        }
+        return this.serialization;
+    }
+
     /**
      * 获取场景分组
      *
@@ -153,5 +178,10 @@ public class Session implements Serializable {
         this.mobile = mobile;
         this.registering = registering;
         this.authorities = Collections.unmodifiableSet(authorities);
+
+        // 更新序列化值
+        if (this.serialization != null) {
+            this.serialization = JacksonUtils.serialize(this);
+        }
     }
 }

+ 4 - 0
framework-boot/pom.xml

@@ -31,6 +31,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
             <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-to-slf4j</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.module</groupId>
                     <artifactId>jackson-module-parameter-names</artifactId>

+ 18 - 4
framework-common/pom.xml

@@ -70,15 +70,29 @@
         <dependency>
             <groupId>org.apache.xmlbeans</groupId>
             <artifactId>xmlbeans</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.github.ben-manes.caffeine</groupId>
-            <artifactId>caffeine</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>xerces</groupId>
             <artifactId>xercesImpl</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.hibernate.validator</groupId>
             <artifactId>hibernate-validator</artifactId>

+ 2 - 2
framework-common/src/main/java/com/chelvc/framework/common/model/Ip.java

@@ -126,7 +126,7 @@ public class Ip implements CharSequence, Comparable<Ip>, Serializable {
             try {
                 return new Ip(value, delimiter).matches(this.number);
             } catch (Exception e) {
-                log.error("Address number convert failed", e);
+                log.error("IP address number convert failed: {}", e.getMessage());
             }
             return false;
         } else if (address instanceof Ip) {
@@ -145,7 +145,7 @@ public class Ip implements CharSequence, Comparable<Ip>, Serializable {
             try {
                 return this.number == (getAddressNumber(value) & this.mask);
             } catch (Exception e) {
-                log.error("Address number convert failed", e);
+                log.error("IP address number convert failed: {}", e.getMessage());
             }
             return false;
         }

+ 4 - 4
framework-common/src/main/java/com/chelvc/framework/common/util/JacksonUtils.java

@@ -56,25 +56,25 @@ public final class JacksonUtils {
     /**
      * 字符串数组类型引用
      */
-    public static final TypeReference<String[]> STRING_ARRAY_TYPE = new TypeReference<String[]>() {
+    public static final TypeReference<String[]> ARRAY_STRING_TYPE = new TypeReference<String[]>() {
     };
 
     /**
      * 字符串集合类型引用
      */
-    public static final TypeReference<Set<String>> STRING_SET_TYPE = new TypeReference<Set<String>>() {
+    public static final TypeReference<Set<String>> SET_STRING_TYPE = new TypeReference<Set<String>>() {
     };
 
     /**
      * 字符串列表类型引用
      */
-    public static final TypeReference<List<String>> STRING_LIST_TYPE = new TypeReference<List<String>>() {
+    public static final TypeReference<List<String>> LIST_STRING_TYPE = new TypeReference<List<String>>() {
     };
 
     /**
      * 字符串键/值映射类型引用
      */
-    public static final TypeReference<Map<String, String>> STRING_MAP_TYPE = new TypeReference<Map<String, String>>() {
+    public static final TypeReference<Map<String, String>> MAP_STRING_TYPE = new TypeReference<Map<String, String>>() {
     };
 
     /**

+ 1 - 1
framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveArrayDeserializer.java

@@ -17,7 +17,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class SensitiveArrayDeserializer extends JsonDeserializer<String[]> {
     @Override
     public String[] deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        String[] values = parser.readValueAs(JacksonUtils.STRING_ARRAY_TYPE);
+        String[] values = parser.readValueAs(JacksonUtils.ARRAY_STRING_TYPE);
         return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(values, true);
     }
 }

+ 1 - 1
framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveListDeserializer.java

@@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class SensitiveListDeserializer extends JsonDeserializer<List<String>> {
     @Override
     public List<String> deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        List<String> values = parser.readValueAs(JacksonUtils.STRING_LIST_TYPE);
+        List<String> values = parser.readValueAs(JacksonUtils.LIST_STRING_TYPE);
         return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(values, true);
     }
 }

+ 1 - 1
framework-database/src/main/java/com/chelvc/framework/database/jackson/SensitiveSetDeserializer.java

@@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class SensitiveSetDeserializer extends JsonDeserializer<Set<String>> {
     @Override
     public Set<String> deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        Set<String> values = parser.readValueAs(JacksonUtils.STRING_SET_TYPE);
+        Set<String> values = parser.readValueAs(JacksonUtils.SET_STRING_TYPE);
         return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(values, true);
     }
 }

+ 16 - 0
framework-dependencies/pom.xml

@@ -21,6 +21,7 @@
         <mail.version>1.4.7</mail.version>
         <guava.version>23.0</guava.version>
         <zxing.version>3.4.1</zxing.version>
+        <log4j.version>2.25.1</log4j.version>
         <xerces.version>2.12.0</xerces.version>
         <bcpkix-jdk15on.version>1.64</bcpkix-jdk15on.version>
         <pinyin4j.version>2.5.1</pinyin4j.version>
@@ -87,6 +88,21 @@
                 <artifactId>xercesImpl</artifactId>
                 <version>${xerces.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-api</artifactId>
+                <version>${log4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-core</artifactId>
+                <version>${log4j.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-to-slf4j</artifactId>
+                <version>${log4j.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.bouncycastle</groupId>
                 <artifactId>bcpkix-jdk15on</artifactId>

+ 1 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaListenerRegistryWrapper.java

@@ -8,7 +8,6 @@ import java.util.regex.Pattern;
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
-import com.chelvc.framework.kafka.context.KafkaContextHolder;
 import lombok.NonNull;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
@@ -41,7 +40,7 @@ public class KafkaListenerRegistryWrapper extends KafkaListenerEndpointRegistry
      */
     private String isolate(String original) {
         Environment environment = ApplicationContextHolder.getEnvironment();
-        return KafkaContextHolder.isolate(environment.resolvePlaceholders(original));
+        return ApplicationContextHolder.getProfile(environment.resolvePlaceholders(original));
     }
 
     /**

+ 2 - 3
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaProperties.java

@@ -49,9 +49,8 @@ public class KafkaProperties extends org.springframework.boot.autoconfigure.kafk
         this.getProducer().setAcks("1");
         this.getProducer().setKeySerializer(StringSerializer.class);
         this.getProducer().setValueSerializer(JacksonSerializer.class);
-        this.getProducer().getProperties().put(
-                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaRecordInterceptor.class.getName()
-        );
+        String interceptor = KafkaRecordInterceptor.class.getName();
+        this.getProducer().getProperties().put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptor);
 
         // 初始化消费者默认配置
         this.getConsumer().setEnableAutoCommit(true);

+ 2 - 75
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -1,12 +1,8 @@
 package com.chelvc.framework.kafka.context;
 
 import java.lang.reflect.Type;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -15,12 +11,10 @@ import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.JacksonUtils;
-import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.kafka.producer.TransactionMessage;
 import com.chelvc.framework.redis.queue.DelayRedisQueue;
 import com.chelvc.framework.redis.queue.RedisQueues;
-import com.google.common.collect.Lists;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -73,17 +67,6 @@ public final class KafkaContextHolder {
         return RedisQueues.getDelayRedisQueue(application + "-kafka-transaction-message", DelayRedisQueue::new);
     }
 
-    /**
-     * 环境隔离
-     *
-     * @param original 原始标记
-     * @return 环境隔离标记
-     */
-    public static String isolate(@NonNull String original) {
-        String namespace = ApplicationContextHolder.getProfile();
-        return StringUtils.isEmpty(namespace) ? original : (namespace + "-" + original);
-    }
-
     /**
      * 从消息头中获取会话信息
      *
@@ -92,22 +75,7 @@ public final class KafkaContextHolder {
      */
     public static Session getSession(@NonNull Headers headers) {
         Iterator<Header> iterator = headers.headers(Session.NAMING).iterator();
-        if (iterator.hasNext()) {
-            return JacksonUtils.deserialize(iterator.next().value(), Session.class);
-        }
-        return null;
-    }
-
-    /**
-     * 初始化会话信息
-     *
-     * @param headers 消息头对象实例
-     */
-    public static void initializeSession(@NonNull Headers headers) {
-        Session session = SessionContextHolder.getSession(false);
-        if (session != null) {
-            headers.add(Session.NAMING, JacksonUtils.serialize(session).getBytes(StandardCharsets.UTF_8));
-        }
+        return iterator.hasNext() ? JacksonUtils.deserialize(iterator.next().value(), Session.class) : null;
     }
 
     /**
@@ -129,46 +97,6 @@ public final class KafkaContextHolder {
         return JacksonUtils.deserialize(String.valueOf(value), type);
     }
 
-    /**
-     * 消息反序列化
-     *
-     * @param record   消息记录
-     * @param type     消息体类型
-     * @param consumer 消息消费者
-     * @param <K>      顺序标识类型
-     * @param <V>      消息载体类型
-     */
-    public static <K, V> void deserialize(@NonNull ConsumerRecord<K, V> record, @NonNull Type type,
-                                          @NonNull Consumer<Object> consumer) {
-        Object payload = deserialize(record, type);
-        Session session = getSession(record.headers());
-        SessionContextHolder.setSession(session);
-        try {
-            consumer.accept(payload);
-        } finally {
-            SessionContextHolder.removeSessionContext();
-        }
-    }
-
-    /**
-     * 消息反序列化
-     *
-     * @param records 消息记录集合
-     * @param type    消息体类型
-     * @param <K>     顺序标识类型
-     * @param <V>     消息载体类型
-     * @return 消息列表
-     */
-    public static <K, V> List<Object> deserialize(@NonNull Collection<ConsumerRecord<K, V>> records,
-                                                  @NonNull Type type) {
-        if (ObjectUtils.isEmpty(records)) {
-            return Collections.emptyList();
-        }
-        List<Object> messages = Lists.newArrayListWithCapacity(records.size());
-        records.forEach(record -> messages.add(deserialize(record, type)));
-        return messages;
-    }
-
     /**
      * 发送消息
      *
@@ -301,8 +229,7 @@ public final class KafkaContextHolder {
         // 将事务消息放入延时队列
         String id = StringUtils.uuid();
         Session session = SessionContextHolder.getSession(false);
-        TransactionMessage transaction = TransactionMessage.builder().id(id).topic(topic).payload(payload)
-                .ordering(ordering).partition(partition).session(session).build();
+        TransactionMessage transaction = new TransactionMessage(id, topic, payload, ordering, partition, session);
         String message = JacksonUtils.serialize(transaction);
         DelayRedisQueue<String> queue = getTransactionQueue();
         queue.offer(message, Duration.ofSeconds(60));

+ 0 - 18
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/FallbackSendResult.java

@@ -1,18 +0,0 @@
-package com.chelvc.framework.kafka.fallback;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.kafka.support.SendResult;
-
-/**
- * Kafka消息降级发送结果
- *
- * @param <K> 顺序标识类型
- * @param <V> 消息载体类型
- * @author Woody
- * @date 2024/7/16
- */
-public class FallbackSendResult<K, V> extends SendResult<K, V> {
-    public FallbackSendResult(ProducerRecord<K, V> record) {
-        super(record, null);
-    }
-}

+ 5 - 5
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallbackSender.java

@@ -1,16 +1,18 @@
 package com.chelvc.framework.kafka.fallback;
 
+import com.chelvc.framework.kafka.producer.EmptySendResult;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.SettableListenableFuture;
 
 /**
  * Kafka消息发送降级处理器抽象实现
  *
+ * @param <K> 顺序标识类型
+ * @param <V> 消息载体类型
  * @author Woody
  * @date 2024/7/23
  */
@@ -34,12 +36,10 @@ public abstract class KafkaFallbackSender<K, V> implements KafkaMessageSender<K,
         try {
             return this.delegate.send(record);
         } catch (Exception e) {
-            log.error("Kafka message send failed: {}", record.topic(), e);
+            log.error("Kafka message send failed: {}", record, e);
         }
         this.fallback(record);
-        SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
-        future.set(new FallbackSendResult<K, V>(record));
-        return future;
+        return EmptySendResult.<K, V>fallback().future();
     }
 
     @Override

+ 7 - 15
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java

@@ -3,14 +3,11 @@ package com.chelvc.framework.kafka.fallback;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import com.chelvc.framework.base.context.Session;
-import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
 /**
@@ -25,7 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
     private final int capacity;
     private volatile Thread consumer;
-    private volatile BlockingQueue<Pair<ProducerRecord<K, V>, Session>> queue;
+    private volatile BlockingQueue<ProducerRecord<K, V>> queue;
 
     public KafkaMemorySender(@NonNull KafkaMessageSender<K, V> delegate) {
         this(10000, delegate);
@@ -45,16 +42,12 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
             synchronized (this) {
                 if (this.queue == null) {
                     this.queue = new ArrayBlockingQueue<>(this.capacity);
-                    this.consumer = ThreadUtils.consume(this.queue, pair -> {
-                        ProducerRecord<K, V> message = pair.getLeft();
-                        SessionContextHolder.setSession(pair.getRight());
+                    this.consumer = ThreadUtils.consume(this.queue, record -> {
                         try {
-                            this.delegate.send(message);
+                            this.delegate.send(record);
                             return true;
                         } catch (Exception e) {
-                            log.error("Kafka message send failed: {}", message.topic(), e);
-                        } finally {
-                            SessionContextHolder.removeSessionContext();
+                            log.error("Kafka message send failed: {}", record, e);
                         }
                         ThreadUtils.sleep(5000);
                         return false;
@@ -67,13 +60,12 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
     @Override
     protected void fallback(ProducerRecord<K, V> record) {
         if (log.isDebugEnabled()) {
-            log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
+            log.debug("Kafka fallback message: {}", record);
         }
 
         this.initializeMessageQueue();
-        Session session = SessionContextHolder.getSession(false);
-        if (!this.queue.offer(Pair.of(record, session))) {
-            throw new RuntimeException("Kafka fallback queue is full: " + record.topic() + ", " + record.value());
+        if (!this.queue.offer(record)) {
+            throw new RuntimeException("Kafka fallback queue is full: " + record);
         }
     }
 

+ 5 - 11
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaPersistentSender.java

@@ -1,6 +1,5 @@
 package com.chelvc.framework.kafka.fallback;
 
-import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
@@ -28,21 +27,16 @@ public class KafkaPersistentSender<K, V> extends KafkaFallbackSender<K, V> {
     @Override
     protected void fallback(ProducerRecord<K, V> record) {
         if (log.isDebugEnabled()) {
-            log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
+            log.debug("Kafka fallback message: {}", record);
         }
 
         KafkaFallbackMessage fallback =
                 new KafkaFallbackMessage(record.topic(), record.value(), record.key(), record.partition());
-        Session session = SessionContextHolder.getSession(false);
-        if (session == null && (session = KafkaContextHolder.getSession(record.headers())) != null) {
-            SessionContextHolder.setSession(session);
-            try {
-                this.client.save(fallback);
-            } finally {
-                SessionContextHolder.removeSessionContext();
-            }
-        } else {
+        SessionContextHolder.setSession(KafkaContextHolder.getSession(record.headers()));
+        try {
             this.client.save(fallback);
+        } finally {
+            SessionContextHolder.removeSessionContext();
         }
     }
 }

+ 21 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/ConsumerRecordInterceptor.java

@@ -0,0 +1,21 @@
+package com.chelvc.framework.kafka.interceptor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Kafka消费记录拦截器接口
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+public interface ConsumerRecordInterceptor {
+    /**
+     * 拦截消费记录
+     *
+     * @param record 消费记录
+     * @param <K>    顺序标识类型
+     * @param <V>    消息载体类型
+     * @return 消费记录
+     */
+    <K, V> ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
+}

+ 0 - 32
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordAdapter.java

@@ -1,32 +0,0 @@
-package com.chelvc.framework.kafka.interceptor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-/**
- * Kafka消息记录适配器接口
- *
- * @author Woody
- * @date 2025/8/28
- */
-public interface KafkaRecordAdapter {
-    /**
-     * 生产记录适配
-     *
-     * @param record 生产记录
-     * @return 生产记录
-     */
-    default ProducerRecord<Object, Object> produce(ProducerRecord<Object, Object> record) {
-        return record;
-    }
-
-    /**
-     * 消费记录适配
-     *
-     * @param record 消费记录
-     * @return 消费记录
-     */
-    default ConsumerRecord<Object, Object> consume(ConsumerRecord<Object, Object> record) {
-        return record;
-    }
-}

+ 74 - 42
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordInterceptor.java

@@ -1,13 +1,20 @@
 package com.chelvc.framework.kafka.interceptor;
 
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.ErrorUtils;
+import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
+import com.chelvc.framework.kafka.producer.EmptySendResult;
+import com.chelvc.framework.kafka.producer.KafkaMessageSender;
+import com.chelvc.framework.kafka.producer.KafkaSenderWrapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -15,7 +22,9 @@ import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.kafka.listener.ConsumerAwareRecordInterceptor;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
 
 /**
  * Kafka消息处理拦截器
@@ -25,38 +34,56 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Object>,
-        ConsumerAwareRecordInterceptor<Object, Object> {
+public class KafkaRecordInterceptor implements KafkaSenderWrapper,
+        ProducerInterceptor<Object, Object>, ConsumerAwareRecordInterceptor<Object, Object> {
     /**
-     * Kafka消息记录适配器列表单例对象
+     * Kafka消息记录拦截器实例对象
      */
-    private static class KafkaRecordAdapters {
+    private static class Interceptors {
         /**
-         * 适配器实例列表
+         * 生产记录拦截器实例列表
          */
-        private static final List<KafkaRecordAdapter> INSTANCES =
-                ApplicationContextHolder.getSortBeans(KafkaRecordAdapter.class);
+        private static final List<ProducerRecordInterceptor> PRODUCER_RECORD_INTERCEPTORS =
+                ApplicationContextHolder.getSortBeans(ProducerRecordInterceptor.class);
+
+        /**
+         * 消费记录拦截器实例列表
+         */
+        private static final List<ConsumerRecordInterceptor> CONSUMER_RECORD_INTERCEPTORS =
+                ApplicationContextHolder.getSortBeans(ConsumerRecordInterceptor.class);
     }
 
     @Override
-    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
-        // 初始化会话信息到消息头
-        KafkaContextHolder.initializeSession(record.headers());
-
-        // 隔离消息主题环境
-        String topic = KafkaContextHolder.isolate(record.topic());
-
-        // 构建并适配消息记录
-        record = new ProducerRecord<>(topic, record.partition(), record.timestamp(), record.key(), record.value(),
-                record.headers());
-        if (ObjectUtils.notEmpty(KafkaRecordAdapters.INSTANCES)) {
-            for (KafkaRecordAdapter adapter : KafkaRecordAdapters.INSTANCES) {
-                if ((record = adapter.produce(record)) == null) {
-                    break;
+    public <K, V> KafkaMessageSender<K, V> wrap(KafkaMessageSender<K, V> sender) {
+        return new KafkaMessageSender<K, V>() {
+            private final KafkaMessageSender<K, V> delegate = sender;
+
+            @Override
+            public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
+                // 若消息拦截处理结果为null则不发送消息
+                if (ObjectUtils.notEmpty(Interceptors.PRODUCER_RECORD_INTERCEPTORS)) {
+                    for (ProducerRecordInterceptor interceptor : Interceptors.PRODUCER_RECORD_INTERCEPTORS) {
+                        if ((record = interceptor.intercept(record)) == null) {
+                            return EmptySendResult.<K, V>none().future();
+                        }
+                    }
+                }
+
+                // 设置消息会话信息(在其他拦截处理逻辑之前)
+                Session session = SessionContextHolder.getSession(false);
+                if (session != null) {
+                    record.headers().add(Session.NAMING, session.serialize().getBytes(StandardCharsets.UTF_8));
                 }
+                return this.delegate.send(record);
             }
-        }
-        return record;
+        };
+    }
+
+    @Override
+    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
+        // 构建环境隔离的消息记录
+        return new ProducerRecord<>(ApplicationContextHolder.getProfile(record.topic()),
+                record.partition(), record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     @Override
@@ -74,32 +101,31 @@ public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Objec
     @Override
     public ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record,
                                                     Consumer<Object, Object> consumer) {
-        // 如果消费者组关闭则忽略该消息,返回null跳过该消息且不会执行afterRecord方法
-        if (!ApplicationContextHolder.getProperty(consumer.groupMetadata().groupId(), boolean.class, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Kafka message skipping: {}, {}", consumer.groupMetadata().groupId(), record);
+        // 如果当前消费组存在消息忽略配置,则返回null跳过该消息且不会执行afterRecord方法
+        Set<String> ignores =
+                ApplicationContextHolder.getProperty("kafka.message.ignore.group", JacksonUtils.SET_STRING_TYPE);
+        if (ObjectUtils.notEmpty(ignores)) {
+            String group = consumer.groupMetadata().groupId();
+            if (ignores.contains(group)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Kafka message ignored: {}, {}", group, record);
+                }
+                return null;
             }
-            return null;
         }
 
-        // 设置当前会话信息
-        SessionContextHolder.setSession(KafkaContextHolder.getSession(record.headers()));
-
-        // 适配消息记录
-        if (ObjectUtils.notEmpty(KafkaRecordAdapters.INSTANCES)) {
-            for (KafkaRecordAdapter adapter : KafkaRecordAdapters.INSTANCES) {
-                if ((record = adapter.consume(record)) == null) {
-                    break;
+        // 若消息拦截处理结果为null则不发送消息
+        if (ObjectUtils.notEmpty(Interceptors.CONSUMER_RECORD_INTERCEPTORS)) {
+            for (ConsumerRecordInterceptor interceptor : Interceptors.CONSUMER_RECORD_INTERCEPTORS) {
+                if ((record = interceptor.intercept(record)) == null) {
+                    return null;
                 }
             }
         }
-        return record;
-    }
 
-    @Override
-    public void afterRecord(ConsumerRecord<Object, Object> record, Consumer<Object, Object> consumer) {
-        // 清空当前会话
-        SessionContextHolder.removeSessionContext();
+        // 设置当前会话信息
+        SessionContextHolder.setSession(KafkaContextHolder.getSession(record.headers()));
+        return record;
     }
 
     @Override
@@ -107,4 +133,10 @@ public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Objec
         String group = consumer.groupMetadata().groupId();
         log.error("Kafka message consume failed: {}, {}", group, record, ErrorUtils.root(exception));
     }
+
+    @Override
+    public void afterRecord(ConsumerRecord<Object, Object> record, Consumer<Object, Object> consumer) {
+        // 清空当前会话
+        SessionContextHolder.removeSessionContext();
+    }
 }

+ 21 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/ProducerRecordInterceptor.java

@@ -0,0 +1,21 @@
+package com.chelvc.framework.kafka.interceptor;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Kafka生产记录拦截器接口
+ *
+ * @author Woody
+ * @date 2025/8/28
+ */
+public interface ProducerRecordInterceptor {
+    /**
+     * 拦截生产记录
+     *
+     * @param record 生产记录
+     * @param <K>    顺序标识类型
+     * @param <V>    消息载体类型
+     * @return 生产记录
+     */
+    <K, V> ProducerRecord<K, V> intercept(ProducerRecord<K, V> record);
+}

+ 118 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/EmptySendResult.java

@@ -0,0 +1,118 @@
+package com.chelvc.framework.kafka.producer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.FailureCallback;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+import org.springframework.util.concurrent.SuccessCallback;
+
+/**
+ * Kafka消息发送结果空实现
+ *
+ * @param <K> 顺序标识类型
+ * @param <V> 消息载体类型
+ * @author Woody
+ * @date 2025/9/3
+ */
+public class EmptySendResult<K, V> extends SendResult<K, V> {
+    /**
+     * 无发送结果实例
+     */
+    @SuppressWarnings("rawtypes")
+    public static final EmptySendResult NONE = new EmptySendResult<>();
+
+    /**
+     * 降级发送结果实例
+     */
+    @SuppressWarnings("rawtypes")
+    public static final EmptySendResult FALLBACK = new EmptySendResult<>();
+
+    private final ListenableFuture<SendResult<K, V>> future;
+
+    private EmptySendResult() {
+        super(null, null);
+        this.future = new SendResultFuture<K, V>(this);
+    }
+
+    /**
+     * 获取无发送结果实例
+     *
+     * @param <K> 顺序标识类型
+     * @param <V> 消息载体类型
+     * @return 发送结果实例
+     */
+    @SuppressWarnings("unchecked")
+    public static <K, V> EmptySendResult<K, V> none() {
+        return (EmptySendResult<K, V>) NONE;
+    }
+
+    /**
+     * 获取降级发送结果实例
+     *
+     * @param <K> 顺序标识类型
+     * @param <V> 消息载体类型
+     * @return 发送结果实例
+     */
+    @SuppressWarnings("unchecked")
+    public static <K, V> EmptySendResult<K, V> fallback() {
+        return (EmptySendResult<K, V>) FALLBACK;
+    }
+
+    /**
+     * 获取发送结果监听器实例
+     *
+     * @return 发送结果监听器实例
+     */
+    public ListenableFuture<SendResult<K, V>> future() {
+        return this.future;
+    }
+
+    /**
+     * 空发送结果监听器实现
+     *
+     * @param <K> 顺序标识类型
+     * @param <V> 消息载体类型
+     */
+    private static class SendResultFuture<K, V> implements ListenableFuture<SendResult<K, V>> {
+        private final SendResult<K, V> value;
+
+        public SendResultFuture(SendResult<K, V> value) {
+            this.value = value;
+        }
+
+        @Override
+        public void addCallback(ListenableFutureCallback<? super SendResult<K, V>> callback) {
+        }
+
+        @Override
+        public void addCallback(SuccessCallback<? super SendResult<K, V>> success, FailureCallback failure) {
+        }
+
+        @Override
+        public boolean cancel(boolean interruptable) {
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return true;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+        @Override
+        public SendResult<K, V> get() {
+            return this.value;
+        }
+
+        @Override
+        public SendResult<K, V> get(long timeout, TimeUnit unit) {
+            return this.value;
+        }
+    }
+}

+ 21 - 3
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/StandardTransactionChecker.java

@@ -3,10 +3,12 @@ package com.chelvc.framework.kafka.producer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -21,8 +23,24 @@ public class StandardTransactionChecker implements KafkaTransactionChecker {
     private final Map<String, TransactionCheckProcessor<?>> processors;
 
     public StandardTransactionChecker(@NonNull List<TransactionCheckProcessor<?>> processors) {
-        this.processors = ObjectUtils.isEmpty(processors) ? Collections.emptyMap() :
-                processors.stream().collect(Collectors.toMap(TransactionCheckProcessor::topic, processor -> processor));
+        if (ObjectUtils.isEmpty(processors)) {
+            this.processors = Collections.emptyMap();
+        } else {
+            this.processors = processors.stream().collect(Collectors.toMap(
+                    processor -> {
+                        String topic = processor.topic();
+                        if (StringUtils.isEmpty(topic)) {
+                            throw new IllegalStateException("Transaction check processor topic must not be empty: " +
+                                    processor.getClass().getName());
+                        } else if (Objects.isNull(processor.model())) {
+                            throw new IllegalStateException("Transaction check processor model must not be null: " +
+                                    processor.getClass().getName());
+                        }
+                        return topic;
+                    },
+                    processor -> processor
+            ));
+        }
     }
 
     @Override
@@ -30,7 +48,7 @@ public class StandardTransactionChecker implements KafkaTransactionChecker {
     public TransactionResolution check(TransactionMessage message) {
         TransactionCheckProcessor processor = this.processors.get(message.getTopic());
         if (processor == null) {
-            log.error("Transaction message check processor is missing: {}", message.getTopic());
+            log.error("Kafka transaction message check processor is missing: {}", message);
             return TransactionResolution.UNKNOWN;
         }
 

+ 1 - 1
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java

@@ -68,7 +68,7 @@ public class TransactionMessageProcessor implements ApplicationRunner {
             try {
                 this.processing(queue, message);
             } catch (Exception e) {
-                log.error("Transaction message processing failed: {}", message);
+                log.error("Kafka transaction message processing failed: {}", message);
             }
         }));
     }

+ 4 - 0
framework-nacos/pom.xml

@@ -31,6 +31,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
             <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-to-slf4j</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.module</groupId>
                     <artifactId>jackson-module-parameter-names</artifactId>

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/annotation/RedisMQConsumer.java

@@ -7,7 +7,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Redis Stream 消费者注解
+ * RedisMQ消费者注解
  *
  * @author Woody
  * @date 2024/1/30

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/config/RedisMQListenerRegistry.java

@@ -18,7 +18,7 @@ import org.springframework.context.ApplicationListener;
 import org.springframework.context.support.GenericApplicationContext;
 
 /**
- * Redis消息监听器注册器
+ * RedisMQ消息监听器注册器
  *
  * @author Woody
  * @date 2024/7/13

+ 6 - 17
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -103,7 +103,7 @@ public final class RedisStreamHolder {
      *
      * @return 消息ID
      */
-    public static String getMinid() {
+    public static String getMinimum() {
         RedisProperties properties = ApplicationContextHolder.getBean(RedisProperties.class);
         // 消息过期时间最短1分钟,最长24小时
         int expiration = Math.min(Math.max(properties.getStream().getExpiration(), 60), 86400);
@@ -136,17 +136,6 @@ public final class RedisStreamHolder {
         return (RedisTemplate<String, T>) STRING_TEMPLATE;
     }
 
-    /**
-     * 环境隔离
-     *
-     * @param original 原始标记
-     * @return 环境隔离标记
-     */
-    public static String isolate(@NonNull String original) {
-        String namespace = ApplicationContextHolder.getProfile();
-        return StringUtils.isEmpty(namespace) ? original : (namespace + "-" + original);
-    }
-
     /**
      * 序列化消息记录
      *
@@ -228,7 +217,7 @@ public final class RedisStreamHolder {
      */
     public static void send(@NonNull String topic, @NonNull Object payload, Long delaying) {
         List<String> args = Lists.newLinkedList();
-        args.add(getMinid());
+        args.add(getMinimum());
         args.add(PAYLOAD);
         args.add(JacksonUtils.serialize(payload));
         if (delaying != null) {
@@ -238,9 +227,9 @@ public final class RedisStreamHolder {
         Session session = SessionContextHolder.getSession(false);
         if (session != null) {
             args.add(Session.NAMING);
-            args.add(JacksonUtils.serialize(session));
+            args.add(session.serialize());
         }
-        List<String> keys = Collections.singletonList(isolate(topic));
+        List<String> keys = Collections.singletonList(ApplicationContextHolder.getProfile(topic));
         getStreamTemplate().execute(XADD_MINID_SCRIPT, keys, args.toArray());
     }
 
@@ -454,7 +443,7 @@ public final class RedisStreamHolder {
                                                @NonNull Supplier<Pair<Long, Long>> period, @NonNull Class<T> type,
                                                @NonNull Predicate<T> filter, boolean single) {
         long offset = 0, count = 1000;
-        String name = isolate(topic) + ":" + isolate(group);
+        String name = ApplicationContextHolder.getProfile(topic) + ":" + ApplicationContextHolder.getProfile(group);
         RedisTemplate<String, String> template = getStreamTemplate();
 
         batch:
@@ -508,7 +497,7 @@ public final class RedisStreamHolder {
      */
     public static void heartbeat(@NonNull String topic) {
         List<String> keys = Collections.singletonList(topic);
-        getStreamTemplate().execute(XADD_MINID_SCRIPT, keys, getMinid(), PAYLOAD, StringUtils.EMPTY);
+        getStreamTemplate().execute(XADD_MINID_SCRIPT, keys, getMinimum(), PAYLOAD, StringUtils.EMPTY);
     }
 
     /**

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/ConsumerPendingCleaner.java

@@ -16,7 +16,7 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 
 /**
- * 消费者获取未确认消息清理处理器
+ * RedisMQ消费者获取未确认消息清理处理器
  *
  * @author Woody
  * @date 2024/1/30

+ 3 - 3
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -28,7 +28,7 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 
 /**
- * 消息监听器容器默认实现
+ * RedisMQ消息监听器容器默认实现
  *
  * @param <T> 监听消息对象类型
  * @author Woody
@@ -96,8 +96,8 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
         int idle = ApplicationContextHolder.getBean(RedisProperties.class).getStream().getIdle();
         AssertUtils.arg((this.idle = idle) > 0, () -> "Consumer idle must be greater than 0");
         Environment environment = ApplicationContextHolder.getEnvironment();
-        this.topic = RedisStreamHolder.isolate(environment.resolvePlaceholders(topic));
-        this.group = RedisStreamHolder.isolate(environment.resolvePlaceholders(group));
+        this.topic = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(topic));
+        this.group = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(group));
         this.consumer = Consumer.from(this.group, this.name);
         this.listener = new MessageStreamListener<>(
                 type, this.topic, this.consumer, listener, this.executor = executor

+ 18 - 4
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -3,9 +3,12 @@ package com.chelvc.framework.redis.queue;
 import java.lang.reflect.Type;
 import java.time.Duration;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisStreamHolder;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.stream.Consumer;
@@ -14,7 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.stream.StreamListener;
 
 /**
- * 消息流监听器实现
+ * RedisMQ消息流监听器实现
  *
  * @author Woody
  * @date 2024/4/3
@@ -47,6 +50,17 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
         RedisQueues.consume(this.delayQueue, 100, Duration.ofSeconds(60), this::processing);
     }
 
+    /**
+     * 判断消息是否忽略
+     *
+     * @return true/false
+     */
+    private boolean isIgnore() {
+        Set<String> ignores =
+                ApplicationContextHolder.getProperty("redismq.message.ignore.group", JacksonUtils.SET_STRING_TYPE);
+        return ObjectUtils.notEmpty(ignores) && ignores.contains(this.consumer.getGroup());
+    }
+
     /**
      * 消息消费处理
      *
@@ -76,10 +90,10 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
      * @return true/false
      */
     private boolean processing(MapRecord<String, String, String> record) {
-        // 如果消费者组关闭则忽略该消息
-        if (!ApplicationContextHolder.getProperty(this.consumer.getGroup(), boolean.class, true)) {
+        // 如果当前消费组存在消息忽略配置则直接返回成功状态
+        if (this.isIgnore()) {
             if (log.isDebugEnabled()) {
-                log.debug("RedisMQ message skipping: {}, {}", this.consumer.getGroup(), record);
+                log.debug("RedisMQ message ignored: {}, {}", this.consumer.getGroup(), record);
             }
             return true;
         }

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisMQListener.java

@@ -1,7 +1,7 @@
 package com.chelvc.framework.redis.queue;
 
 /**
- * Redis Stream 消息消费监听接口
+ * RedisMQ消息消费监听接口
  *
  * @param <T> 消息类型
  * @author Woody

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisMQListenerContainer.java

@@ -6,7 +6,7 @@ import java.util.concurrent.ExecutorService;
 import com.chelvc.framework.redis.annotation.RedisMQConsumer;
 
 /**
- * 消息监听器容器接口
+ * RedisMQ消息监听器容器接口
  *
  * @param <T> 监听消息对象类型
  * @author Woody

+ 1 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQProperties.java

@@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
 /**
- * RokcetMQ配置属性
+ * RocketMQ配置属性
  *
  * @author Woody
  * @date 2024/1/30

+ 43 - 18
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java

@@ -1,12 +1,14 @@
 package com.chelvc.framework.rocketmq.consumer;
 
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
@@ -15,7 +17,6 @@ import com.chelvc.framework.rocketmq.annotation.RocketMQConsumer;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
@@ -28,7 +29,7 @@ import org.springframework.beans.factory.DisposableBean;
 import org.springframework.core.env.Environment;
 
 /**
- * 消息批量消费监听器容器
+ * RocketMQ消息批量消费监听器容器
  *
  * @param <T> 监听消息对象类型
  * @author Woody
@@ -43,6 +44,7 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
     private Duration duration;
     private SimpleConsumer consumer;
     private RocketMQListener<T> listener;
+    private boolean batchConsumeOverridden;
     private volatile boolean running = true;
 
     /**
@@ -71,20 +73,21 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
      */
     @SuppressWarnings("unchecked")
     private boolean consume(List<MessageView> messages) {
-        // 如果消费者组关闭则忽略该消息
-        if (!ApplicationContextHolder.getProperty(this.group, boolean.class, true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("RocketMQ message skipping: {}, {}", this.group, messages);
+        try {
+            if (this.batchConsumeOverridden) {
+                this.listener.consume(messages);
+            } else {
+                messages.forEach(message -> {
+                    T payload = (T) RocketMQContextHolder.deserialize(message, this.type);
+                    SessionContextHolder.setSession(RocketMQContextHolder.getSession(message));
+                    try {
+                        this.listener.consume(payload);
+                    } finally {
+                        SessionContextHolder.removeSessionContext();
+                    }
+                });
             }
             return true;
-        }
-
-        // 消息消费处理
-        try {
-            List<Pair<T, Session>> payloads = RocketMQContextHolder.deserialize(
-                    messages, this.type, (payload, session) -> Pair.of((T) payload, session)
-            );
-            return this.listener.consume(payloads);
         } catch (Throwable t) {
             log.error("RocketMQ message consume failed: {}, {}", this.group, messages, t);
         }
@@ -126,6 +129,27 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
         });
     }
 
+    /**
+     * 判断消息监听器是否重写了批量消费方法
+     *
+     * @param listener 消息监听器实例
+     * @return true/false
+     */
+    private boolean isBatchConsumeOverridden(RocketMQListener<?> listener) {
+        Method method;
+        try {
+            method = listener.getClass().getMethod("consume", List.class);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        }
+        Type parameterType = ObjectUtils.first(method.getGenericParameterTypes());
+        if (!(parameterType instanceof ParameterizedType)) {
+            return false;
+        }
+        Type argumentType = ObjectUtils.first(((ParameterizedType) parameterType).getActualTypeArguments());
+        return argumentType == MessageView.class && method.getDeclaringClass() != RocketMQListener.class;
+    }
+
     @Override
     public void initialize(@NonNull Type type, @NonNull ClientServiceProvider provider,
                            @NonNull ClientConfiguration configuration, @NonNull RocketMQListener<T> listener,
@@ -141,11 +165,12 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
         // 初始化消费者实例
         this.type = type;
         this.batch = annotation.batch();
-        this.listener = listener;
         this.duration = Duration.ofSeconds(annotation.duration());
+        this.listener = listener;
+        this.batchConsumeOverridden = this.isBatchConsumeOverridden(listener);
         Environment environment = ApplicationContextHolder.getEnvironment();
-        this.group = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.group()));
-        String topic = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.topic()));
+        this.group = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(annotation.group()));
+        String topic = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(annotation.topic()));
         Duration interval = Duration.ofSeconds(annotation.interval());
         FilterExpression filter = new FilterExpression(annotation.tag(), FilterExpressionType.TAG);
         SimpleConsumerBuilder builder = provider.newSimpleConsumerBuilder().setClientConfiguration(configuration)

+ 3 - 18
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/RocketMQListener.java

@@ -2,10 +2,7 @@ package com.chelvc.framework.rocketmq.consumer;
 
 import java.util.List;
 
-import com.chelvc.framework.base.context.Session;
-import com.chelvc.framework.base.context.SessionContextHolder;
-import com.chelvc.framework.common.util.ObjectUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.client.apis.message.MessageView;
 
 /**
  * RocketMQ消息消费监听接口
@@ -26,20 +23,8 @@ public interface RocketMQListener<T> {
     /**
      * 批量消费消息
      *
-     * @param messages 消息实例列表
-     * @return true/false
+     * @param messages 消息列表
      */
-    default boolean consume(List<Pair<T, Session>> messages) {
-        if (ObjectUtils.notEmpty(messages)) {
-            messages.forEach(message -> {
-                SessionContextHolder.setSession(message.getRight());
-                try {
-                    this.consume(message.getLeft());
-                } finally {
-                    SessionContextHolder.removeSessionContext();
-                }
-            });
-        }
-        return true;
+    default void consume(List<MessageView> messages) {
     }
 }

+ 1 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/RocketMQListenerContainer.java

@@ -7,7 +7,7 @@ import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 
 /**
- * 消息监听器容器接口
+ * RocketMQ消息监听器容器接口
  *
  * @param <T> 监听消息对象类型
  * @author Woody

+ 51 - 10
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/SingleRocketMQListenerContainer.java

@@ -2,12 +2,18 @@ package com.chelvc.framework.rocketmq.consumer;
 
 import java.lang.reflect.Type;
 import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.rocketmq.annotation.RocketMQConsumer;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import com.chelvc.framework.rocketmq.interceptor.ConsumerMessageInterceptor;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
@@ -23,7 +29,7 @@ import org.springframework.beans.factory.DisposableBean;
 import org.springframework.core.env.Environment;
 
 /**
- * 消息单个消费监听器容器
+ * RocketMQ消息单个消费监听器容器
  *
  * @param <T> 监听消息对象类型
  * @author Woody
@@ -31,11 +37,33 @@ import org.springframework.core.env.Environment;
  */
 @Slf4j
 public class SingleRocketMQListenerContainer<T> implements RocketMQListenerContainer<T>, DisposableBean {
+    /**
+     * RocketMQ消费消息拦截器对象
+     */
+    private static class Interceptors {
+        /**
+         * 拦截器实例列表
+         */
+        private static final List<ConsumerMessageInterceptor> INSTANCES =
+                ApplicationContextHolder.getSortBeans(ConsumerMessageInterceptor.class);
+    }
+
     private Type type;
     private String group;
     private PushConsumer consumer;
     private RocketMQListener<T> listener;
 
+    /**
+     * 判断消息是否忽略
+     *
+     * @return true/false
+     */
+    private boolean isIgnore() {
+        Set<String> ignores =
+                ApplicationContextHolder.getProperty("rocketmq.message.ignore.group", JacksonUtils.SET_STRING_TYPE);
+        return ObjectUtils.notEmpty(ignores) && ignores.contains(this.group);
+    }
+
     /**
      * 消费消息
      *
@@ -44,20 +72,33 @@ public class SingleRocketMQListenerContainer<T> implements RocketMQListenerConta
      */
     @SuppressWarnings("unchecked")
     private ConsumeResult consume(MessageView message) {
-        // 如果消费者组关闭则忽略该消息
-        if (!ApplicationContextHolder.getProperty(this.group, boolean.class, true)) {
+        // 如果当前消费组存在消息忽略配置则直接返回成功状态
+        if (this.isIgnore()) {
             if (log.isDebugEnabled()) {
-                log.debug("RocketMQ message skipping: {}, {}", this.group, message);
+                log.debug("RocketMQ message ignored: {}, {}", this.group, message);
             }
             return ConsumeResult.SUCCESS;
         }
 
+        // 若消息拦截处理结果为null则不发送消息
+        if (ObjectUtils.notEmpty(Interceptors.INSTANCES)) {
+            for (ConsumerMessageInterceptor interceptor : Interceptors.INSTANCES) {
+                if ((message = interceptor.intercept(this.group, message)) == null) {
+                    return ConsumeResult.SUCCESS;
+                }
+            }
+        }
+
         // 消息消费处理
         try {
-            return RocketMQContextHolder.deserialize(message, this.type, payload -> {
-                this.listener.consume((T) payload);
-                return ConsumeResult.SUCCESS;
-            });
+            T payload = (T) RocketMQContextHolder.deserialize(message, this.type);
+            SessionContextHolder.setSession(RocketMQContextHolder.getSession(message));
+            try {
+                this.listener.consume(payload);
+            } finally {
+                SessionContextHolder.removeSessionContext();
+            }
+            return ConsumeResult.SUCCESS;
         } catch (Throwable t) {
             log.error("RocketMQ message consume failed: {}, {}", this.group, message, t);
             return ConsumeResult.FAILURE;
@@ -78,8 +119,8 @@ public class SingleRocketMQListenerContainer<T> implements RocketMQListenerConta
         this.type = type;
         this.listener = listener;
         Environment environment = ApplicationContextHolder.getEnvironment();
-        this.group = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.group()));
-        String topic = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.topic()));
+        this.group = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(annotation.group()));
+        String topic = ApplicationContextHolder.getProfile(environment.resolvePlaceholders(annotation.topic()));
         FilterExpression filter = new FilterExpression(annotation.tag(), FilterExpressionType.TAG);
         PushConsumerBuilder builder = provider.newPushConsumerBuilder().setClientConfiguration(configuration)
                 .setConsumerGroup(this.group).setConsumptionThreadCount(annotation.concurrency())

+ 45 - 207
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -2,30 +2,23 @@ package com.chelvc.framework.rocketmq.context;
 
 import java.lang.reflect.Type;
 import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.Session;
-import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
+import com.chelvc.framework.rocketmq.producer.EmptySendReceipt;
+import com.chelvc.framework.rocketmq.producer.ProducerMessage;
 import lombok.NonNull;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageBuilder;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 
 /**
@@ -35,21 +28,11 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
  * @date 2024/1/30
  */
 public final class RocketMQContextHolder {
-    /**
-     * topic、tag分隔符
-     */
-    public static final char TOPIC_TAG_DELIMITER = ':';
-
     /**
      * 消息生产者
      */
     private static volatile Producer PRODUCER;
 
-    /**
-     * 客户端服务提供者
-     */
-    private static volatile ClientServiceProvider SERVICE_PROVIDER;
-
     private RocketMQContextHolder() {
     }
 
@@ -69,96 +52,16 @@ public final class RocketMQContextHolder {
         return PRODUCER;
     }
 
-    /**
-     * 获取服务提供者
-     *
-     * @return 服务提供者实例
-     */
-    public static ClientServiceProvider getServiceProvider() {
-        if (SERVICE_PROVIDER == null) {
-            synchronized (ClientServiceProvider.class) {
-                if (SERVICE_PROVIDER == null) {
-                    SERVICE_PROVIDER = ApplicationContextHolder.getBean(ClientServiceProvider.class);
-                }
-            }
-        }
-        return SERVICE_PROVIDER;
-    }
-
-    /**
-     * 获取消息主题
-     *
-     * @param message 消息实例
-     * @return 消息主题
-     */
-    public static String topic(@NonNull Message message) {
-        String namespace = ApplicationContextHolder.getProfile();
-        String topic = message.getTopic(), tag = message.getTag().orElse(null);
-        if (StringUtils.notEmpty(namespace) && StringUtils.notEmpty(topic) && topic.startsWith(namespace)
-                && topic.length() > namespace.length() && topic.charAt(namespace.length()) == '-') {
-            topic = topic.substring(namespace.length() + 1);
-        }
-        return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
-    }
-
-    /**
-     * 获取消息主题
-     *
-     * @param message 消息实例
-     * @return 消息主题
-     */
-    public static String topic(@NonNull MessageView message) {
-        String namespace = ApplicationContextHolder.getProfile();
-        String topic = message.getTopic(), tag = message.getTag().orElse(null);
-        if (StringUtils.notEmpty(namespace) && StringUtils.notEmpty(topic) && topic.startsWith(namespace)
-                && topic.length() > namespace.length() && topic.charAt(namespace.length()) == '-') {
-            topic = topic.substring(namespace.length() + 1);
-        }
-        return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
-    }
-
     /**
      * 获取消息体内容
      *
      * @param message 消息实例
      * @return 消息体内容
      */
-    public static String body(@NonNull Message message) {
+    public static String getBody(@NonNull MessageView message) {
         return StandardCharsets.UTF_8.decode(message.getBody()).toString();
     }
 
-    /**
-     * 获取消息体内容
-     *
-     * @param message 消息实例
-     * @return 消息体内容
-     */
-    public static String body(@NonNull MessageView message) {
-        return StandardCharsets.UTF_8.decode(message.getBody()).toString();
-    }
-
-    /**
-     * 环境隔离
-     *
-     * @param original 原始标记
-     * @return 环境隔离标记
-     */
-    public static String isolate(@NonNull String original) {
-        String namespace = ApplicationContextHolder.getProfile();
-        return StringUtils.isEmpty(namespace) ? original : (namespace + "-" + original);
-    }
-
-    /**
-     * 获取会话信息
-     *
-     * @param message 消息实例
-     * @return 会话实例
-     */
-    public static Session getSession(@NonNull Message message) {
-        String json = message.getProperties().get(Session.NAMING);
-        return StringUtils.isEmpty(json) ? null : JacksonUtils.deserialize(json, Session.class);
-    }
-
     /**
      * 获取会话信息
      *
@@ -170,34 +73,6 @@ public final class RocketMQContextHolder {
         return StringUtils.isEmpty(json) ? null : JacksonUtils.deserialize(json, Session.class);
     }
 
-    /**
-     * 消息序列化
-     *
-     * @param topic    消息主题
-     * @param payload  消息内容
-     * @param ordering 顺序消息标识
-     * @return 消息对象实例
-     */
-    public static Message serialize(@NonNull String topic, @NonNull Object payload, String ordering) {
-        byte[] body = JacksonUtils.serialize(payload).getBytes(StandardCharsets.UTF_8);
-        String session = JacksonUtils.serialize(SessionContextHolder.getSession(false));
-        MessageBuilder builder = getServiceProvider().newMessageBuilder().setBody(body);
-        int delimiter = topic.indexOf(TOPIC_TAG_DELIMITER);
-        if (delimiter <= 0) {
-            builder.setTopic(isolate(topic));
-        } else {
-            builder.setTopic(isolate(topic.substring(0, delimiter)));
-            builder.setTag(topic.substring(delimiter + 1));
-        }
-        if (Objects.nonNull(session)) {
-            builder.addProperty(Session.NAMING, session);
-        }
-        if (StringUtils.notEmpty(ordering)) {
-            builder.setMessageGroup(ordering);
-        }
-        return builder.build();
-    }
-
     /**
      * 消息反序列化
      *
@@ -206,78 +81,31 @@ public final class RocketMQContextHolder {
      * @return 消息体实例
      */
     public static Object deserialize(@NonNull MessageView message, @NonNull Type type) {
-        return JacksonUtils.deserialize(body(message), type);
-    }
-
-    /**
-     * 消息反序列化
-     *
-     * @param message  消息视图
-     * @param function 消息处理函数
-     * @param <T>      消息类型
-     * @return 消息处理结果
-     */
-    public static <T> T deserialize(@NonNull MessageView message, @NonNull Function<MessageView, T> function) {
-        SessionContextHolder.setSession(getSession(message));
-        try {
-            return function.apply(message);
-        } finally {
-            SessionContextHolder.removeSessionContext();
-        }
-    }
-
-    /**
-     * 消息反序列化
-     *
-     * @param message  消息视图
-     * @param type     消息类型
-     * @param function 消息处理函数
-     * @param <T>      消息类型
-     * @return 消息处理结果
-     */
-    public static <T> T deserialize(@NonNull MessageView message, @NonNull Type type,
-                                    @NonNull Function<Object, T> function) {
-        return deserialize(message, view -> function.apply(deserialize(message, type)));
-    }
-
-    /**
-     * 消息反序列化
-     *
-     * @param messages 消息视图集合
-     * @param type     消息类型
-     * @param function 消息处理函数
-     * @param <T>      消息类型
-     * @return 消息列表
-     */
-    public static <T> List<T> deserialize(@NonNull Collection<MessageView> messages, @NonNull Type type,
-                                          @NonNull BiFunction<Object, Session, T> function) {
-        if (ObjectUtils.isEmpty(messages)) {
-            return Collections.emptyList();
-        }
-        return messages.stream().map(message -> function.apply(deserialize(message, type), getSession(message)))
-                .filter(Objects::nonNull).collect(Collectors.toList());
+        return JacksonUtils.deserialize(getBody(message), type);
     }
 
     /**
      * 发送消息
      *
      * @param topic   消息主题
+     * @param tag     消息标签
      * @param payload 消息内容
      */
-    public static void send(@NonNull String topic, @NonNull Object payload) {
-        send(topic, payload, (String) null);
+    public static void send(@NonNull String topic, String tag, @NonNull Object payload) {
+        send(topic, tag, payload, (String) null);
     }
 
     /**
      * 发送消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param ordering 顺序消息标识
      */
-    public static void send(@NonNull String topic, @NonNull Object payload, String ordering) {
+    public static void send(@NonNull String topic, String tag, @NonNull Object payload, String ordering) {
         Producer producer = getProducer();
-        Message message = serialize(topic, payload, ordering);
+        Message message = new ProducerMessage(topic, tag, payload, ordering, null);
         try {
             producer.send(message);
         } catch (ClientException e) {
@@ -289,38 +117,43 @@ public final class RocketMQContextHolder {
      * 发送事务消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param callback 本地事务回调函数
      * @param <T>      消息类型
      */
-    public static <T> void send(@NonNull String topic, @NonNull T payload, @NonNull Consumer<MessageId> callback) {
-        send(topic, payload, null, callback);
+    public static <T> void send(@NonNull String topic, String tag, @NonNull T payload,
+                                @NonNull Consumer<String> callback) {
+        send(topic, tag, payload, null, callback);
     }
 
     /**
      * 发送事务消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param callback 本地事务回调函数
      * @param <T>      消息类型
      */
-    public static <T> void send(@NonNull String topic, @NonNull T payload, @NonNull Predicate<MessageId> callback) {
-        send(topic, payload, null, callback);
+    public static <T> void send(@NonNull String topic, String tag, @NonNull T payload,
+                                @NonNull Predicate<String> callback) {
+        send(topic, tag, payload, null, callback);
     }
 
     /**
      * 发送事务消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param ordering 顺序消息标识
      * @param callback 本地事务回调函数
      * @param <T>      消息类型
      */
-    public static <T> void send(@NonNull String topic, @NonNull T payload, String ordering,
-                                @NonNull Consumer<MessageId> callback) {
-        send(topic, payload, ordering, id -> {
+    public static <T> void send(@NonNull String topic, String tag, @NonNull T payload, String ordering,
+                                @NonNull Consumer<String> callback) {
+        send(topic, tag, payload, ordering, id -> {
             callback.accept(id);
             return true;
         });
@@ -330,34 +163,38 @@ public final class RocketMQContextHolder {
      * 发送事务消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param ordering 顺序消息标识
      * @param callback 本地事务回调函数
      * @param <T>      消息类型
      */
-    public static <T> void send(@NonNull String topic, @NonNull T payload, String ordering,
-                                @NonNull Predicate<MessageId> callback) {
+    public static <T> void send(@NonNull String topic, String tag, @NonNull T payload, String ordering,
+                                @NonNull Predicate<String> callback) {
         // 发送事务半消息
-        MessageId id;
+        SendReceipt receipt;
         Transaction transaction;
         Producer producer = getProducer();
-        Message message = serialize(topic, payload, ordering);
+        Message message = new ProducerMessage(topic, tag, payload, ordering, null);
         try {
             transaction = producer.beginTransaction();
-            id = producer.send(message, transaction).getMessageId();
+            receipt = producer.send(message, transaction);
         } catch (ClientException e) {
             throw new RuntimeException(e);
         }
 
         // 处理本地事务并执行事务消息提交或回滚
-        try {
-            if (callback.test(id)) {
-                transaction.commit();
-            } else {
-                transaction.rollback();
+        if (receipt != EmptySendReceipt.NONE) {
+            String id = ObjectUtils.ifNull(receipt.getMessageId(), MessageId::toString);
+            try {
+                if (callback.test(id)) {
+                    transaction.commit();
+                } else {
+                    transaction.rollback();
+                }
+            } catch (ClientException e) {
+                throw new RuntimeException(e);
             }
-        } catch (ClientException e) {
-            throw new RuntimeException(e);
         }
     }
 
@@ -365,21 +202,22 @@ public final class RocketMQContextHolder {
      * 发送异步消息
      *
      * @param topic   消息主题
+     * @param tag     消息标签
      * @param payload 消息内容
      */
-    public static void sendAsync(@NonNull String topic, @NonNull Object payload) {
-        sendAsync(topic, payload, null);
+    public static void sendAsync(@NonNull String topic, String tag, @NonNull Object payload) {
+        sendAsync(topic, tag, payload, null);
     }
 
     /**
      * 发送异步消息
      *
      * @param topic    消息主题
+     * @param tag      消息标签
      * @param payload  消息内容
      * @param ordering 顺序消息标识
      */
-    public static void sendAsync(@NonNull String topic, @NonNull Object payload, String ordering) {
-        Message message = serialize(topic, payload, ordering);
-        getProducer().sendAsync(message);
+    public static void sendAsync(@NonNull String topic, String tag, @NonNull Object payload, String ordering) {
+        getProducer().sendAsync(new ProducerMessage(topic, tag, payload, ordering, null));
     }
 }

+ 10 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackMessage.java

@@ -18,6 +18,11 @@ import lombok.experimental.SuperBuilder;
 @NoArgsConstructor
 @AllArgsConstructor
 public class RocketMQFallbackMessage implements Serializable {
+    /**
+     * 消息标签
+     */
+    private String tag;
+
     /**
      * 消息主题
      */
@@ -33,6 +38,11 @@ public class RocketMQFallbackMessage implements Serializable {
      */
     private String ordering;
 
+    /**
+     * 延迟时间戳
+     */
+    private Long delaying;
+
     /**
      * 是否是事务消息
      */

+ 10 - 10
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackProducer.java

@@ -3,7 +3,7 @@ package com.chelvc.framework.rocketmq.fallback;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import com.chelvc.framework.rocketmq.producer.EmptySendReceipt;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.apis.ClientException;
@@ -39,38 +39,38 @@ public abstract class RocketMQFallbackProducer implements Producer {
         try {
             return this.delegate.send(message);
         } catch (Exception e) {
-            log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(message), e);
+            log.error("RocketMQ message send failed: {}", message, e);
         }
         this.fallback(message, null);
-        return FallbackSendReceipt.INSTANCE;
+        return EmptySendReceipt.FALLBACK;
     }
 
     @Override
     public SendReceipt send(Message message, Transaction transaction) throws ClientException {
         // 执行原生逻辑
-        if (!(transaction instanceof FallbackTransaction)) {
+        if (!(transaction instanceof RocketMQFallbackTransaction)) {
             return this.delegate.send(message, transaction);
         }
 
         // 降级处理
-        FallbackTransaction proxy = (FallbackTransaction) transaction;
+        RocketMQFallbackTransaction proxy = (RocketMQFallbackTransaction) transaction;
         if (!proxy.isDegraded()) {
             try {
                 return this.delegate.send(message, proxy.getTransaction());
             } catch (Exception e) {
-                log.error("RocketMQ transactional message send failed: {}", RocketMQContextHolder.topic(message), e);
+                log.error("RocketMQ transactional message send failed: {}", message, e);
             }
         }
         proxy.delegate(message);
-        return FallbackSendReceipt.INSTANCE;
+        return EmptySendReceipt.FALLBACK;
     }
 
     @Override
     public CompletableFuture<SendReceipt> sendAsync(Message message) {
         return this.delegate.sendAsync(message).exceptionally(e -> {
-            log.error("RocketMQ message async send failed: {}", RocketMQContextHolder.topic(message), e);
+            log.error("RocketMQ message async send failed: {}", message, e);
             this.fallback(message, null);
-            return FallbackSendReceipt.INSTANCE;
+            return EmptySendReceipt.FALLBACK;
         });
     }
 
@@ -82,7 +82,7 @@ public abstract class RocketMQFallbackProducer implements Producer {
         } catch (Exception e) {
             log.error("RocketMQ transaction open failed", e);
         }
-        return new FallbackTransaction(transaction, this::fallback);
+        return new RocketMQFallbackTransaction(transaction, this::fallback);
     }
 
     @Override

+ 2 - 2
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/FallbackTransaction.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackTransaction.java

@@ -15,13 +15,13 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
  * @date 2024/7/14
  */
 @Slf4j
-public class FallbackTransaction implements Transaction {
+public class RocketMQFallbackTransaction implements Transaction {
     private volatile Message message;
     private volatile boolean degraded;
     private final Transaction delegate;
     private final BiConsumer<Message, Transaction> callback;
 
-    public FallbackTransaction(Transaction delegate, @NonNull BiConsumer<Message, Transaction> callback) {
+    public RocketMQFallbackTransaction(Transaction delegate, @NonNull BiConsumer<Message, Transaction> callback) {
         this.delegate = delegate;
         this.callback = callback;
         this.degraded = Objects.isNull(delegate);

+ 7 - 16
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java

@@ -4,11 +4,8 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import com.chelvc.framework.base.context.Session;
-import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
-import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -26,7 +23,7 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
 public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
     private final int capacity;
     private volatile Thread consumer;
-    private volatile BlockingQueue<Pair<Pair<Message, Transaction>, Session>> queue;
+    private volatile BlockingQueue<Pair<Message, Transaction>> queue;
 
     public RocketMQMemoryProducer(@NonNull Producer delegate) {
         this(10000, delegate);
@@ -47,9 +44,8 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
                 if (this.queue == null) {
                     this.queue = new ArrayBlockingQueue<>(this.capacity);
                     this.consumer = ThreadUtils.consume(this.queue, pair -> {
-                        Message message = pair.getLeft().getLeft();
-                        Transaction transaction = pair.getLeft().getRight();
-                        SessionContextHolder.setSession(pair.getRight());
+                        Message message = pair.getLeft();
+                        Transaction transaction = pair.getRight();
                         try {
                             if (transaction == null) {
                                 this.delegate.send(message);
@@ -59,9 +55,7 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
                             }
                             return true;
                         } catch (Exception e) {
-                            log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(message), e);
-                        } finally {
-                            SessionContextHolder.removeSessionContext();
+                            log.error("RocketMQ message send failed: {}", message, e);
                         }
                         ThreadUtils.sleep(5000);
                         return false;
@@ -74,16 +68,13 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
     @Override
     protected void fallback(Message message, Transaction transaction) {
         if (log.isDebugEnabled()) {
-            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-            log.debug("RocketMQ fallback message: {}, {}", topic, body);
+            log.debug("RocketMQ fallback message: {}", message);
         }
 
         // 将消息放入内存消息队列
         this.initializeMessageQueue();
-        Session session = SessionContextHolder.getSession(false);
-        if (!this.queue.offer(Pair.of(Pair.of(message, transaction), session))) {
-            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-            throw new RuntimeException("RocketMQ fallback queue is full: " + topic + ", " + body);
+        if (!this.queue.offer(Pair.of(message, transaction))) {
+            throw new RuntimeException("RocketMQ fallback queue is full: " + message);
         }
     }
 

+ 15 - 35
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQPersistentProducer.java

@@ -2,10 +2,8 @@ package com.chelvc.framework.rocketmq.fallback;
 
 import java.util.Objects;
 
-import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.base.context.SessionContextHolder;
-import com.chelvc.framework.common.util.JacksonUtils;
-import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import com.chelvc.framework.rocketmq.producer.ProducerMessage;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.apis.ClientException;
@@ -28,45 +26,27 @@ public class RocketMQPersistentProducer extends RocketMQFallbackProducer {
         this.client = client;
     }
 
-    /**
-     * 保存消息
-     *
-     * @param message     消息实例
-     * @param transaction 消息处理事务
-     */
-    private void save(Message message, Transaction transaction) {
-        String topic = RocketMQContextHolder.topic(message);
-        Object payload = JacksonUtils.deserialize(RocketMQContextHolder.body(message));
-        String ordering = message.getMessageGroup().orElse(null);
-        RocketMQFallbackMessage fallback =
-                new RocketMQFallbackMessage(topic, payload, ordering, Objects.nonNull(transaction));
-        this.client.save(fallback);
-        if (transaction != null) {
-            try {
-                transaction.commit();
-            } catch (ClientException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
     @Override
     protected void fallback(Message message, Transaction transaction) {
         if (log.isDebugEnabled()) {
-            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-            log.debug("RocketMQ fallback message: {}, {}", topic, body);
+            log.debug("RocketMQ fallback message: {}", message);
         }
 
-        Session session = SessionContextHolder.getSession(false);
-        if (session == null && (session = RocketMQContextHolder.getSession(message)) != null) {
-            SessionContextHolder.setSession(session);
+        ProducerMessage original = (ProducerMessage) message;
+        RocketMQFallbackMessage fallback = new RocketMQFallbackMessage(original.getLabel(), original.getTopic(),
+                original.getPayload(), original.getOrdering(), original.getDelaying(), Objects.nonNull(transaction));
+        SessionContextHolder.setSession(original.getSession());
+        try {
+            this.client.save(fallback);
+        } finally {
+            SessionContextHolder.removeSessionContext();
+        }
+        if (transaction != null) {
             try {
-                this.save(message, transaction);
-            } finally {
-                SessionContextHolder.removeSessionContext();
+                transaction.commit();
+            } catch (ClientException e) {
+                throw new RuntimeException(e);
             }
-        } else {
-            this.save(message, transaction);
         }
     }
 }

+ 20 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/ConsumerMessageInterceptor.java

@@ -0,0 +1,20 @@
+package com.chelvc.framework.rocketmq.interceptor;
+
+import org.apache.rocketmq.client.apis.message.MessageView;
+
+/**
+ * RocketMQ消费消息拦截器接口
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+public interface ConsumerMessageInterceptor {
+    /**
+     * 拦截消费消息
+     *
+     * @param group   消费组
+     * @param message 消息实例
+     * @return 消息实例
+     */
+    MessageView intercept(String group, MessageView message);
+}

+ 19 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/ProducerMessageInterceptor.java

@@ -0,0 +1,19 @@
+package com.chelvc.framework.rocketmq.interceptor;
+
+import org.apache.rocketmq.client.apis.message.Message;
+
+/**
+ * RocketMQ生产消息拦截器接口
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+public interface ProducerMessageInterceptor {
+    /**
+     * 拦截生产消息
+     *
+     * @param message 消息实例
+     * @return 消息实例
+     */
+    Message intercept(Message message);
+}

+ 100 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/RocketMQProducerInterceptor.java

@@ -0,0 +1,100 @@
+package com.chelvc.framework.rocketmq.interceptor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.rocketmq.producer.EmptySendReceipt;
+import com.chelvc.framework.rocketmq.producer.ProducerMessage;
+import com.chelvc.framework.rocketmq.producer.RocketMQProducerWrapper;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+import org.springframework.stereotype.Component;
+
+/**
+ * RocketMQ生产消息处理拦截器
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+@Component
+public class RocketMQProducerInterceptor implements RocketMQProducerWrapper {
+    /**
+     * RocketMQ生产消息拦截器对象
+     */
+    private static class Interceptors {
+        /**
+         * 拦截器实例列表
+         */
+        private static final List<ProducerMessageInterceptor> INSTANCES =
+                ApplicationContextHolder.getSortBeans(ProducerMessageInterceptor.class);
+    }
+
+    @Override
+    public Producer wrap(Producer producer) {
+        return new Producer() {
+            private final Producer delegate = producer;
+
+            /**
+             * 消息拦截处理
+             *
+             * @param message 消息实例
+             * @return 消息实例
+             */
+            private Message intercept(Message message) {
+                // 若消息拦截处理结果为null则不发送消息
+                if (ObjectUtils.notEmpty(Interceptors.INSTANCES)) {
+                    for (ProducerMessageInterceptor interceptor : Interceptors.INSTANCES) {
+                        if ((message = interceptor.intercept(message)) == null) {
+                            return null;
+                        }
+                    }
+                }
+
+                Session session = SessionContextHolder.getSession(false);
+                if (session != null) {
+                    ProducerMessage original = (ProducerMessage) message;
+                    message = new ProducerMessage(original.getTopic(), original.getLabel(), original.getPayload(),
+                            original.getOrdering(), original.getDelaying(), session, original.getKeys(),
+                            original.getProperties(), original.getBody());
+                }
+                return message;
+            }
+
+            @Override
+            public SendReceipt send(Message message) throws ClientException {
+                message = this.intercept(message);
+                return message == null ? EmptySendReceipt.NONE : this.delegate.send(message);
+            }
+
+            @Override
+            public SendReceipt send(Message message, Transaction transaction) throws ClientException {
+                message = this.intercept(message);
+                return message == null ? EmptySendReceipt.NONE : this.delegate.send(message, transaction);
+            }
+
+            @Override
+            public CompletableFuture<SendReceipt> sendAsync(Message message) {
+                message = this.intercept(message);
+                return message == null ? EmptySendReceipt.NONE.future() : this.delegate.sendAsync(message);
+            }
+
+            @Override
+            public Transaction beginTransaction() throws ClientException {
+                return this.delegate.beginTransaction();
+            }
+
+            @Override
+            public void close() throws IOException {
+                this.delegate.close();
+            }
+        };
+    }
+}

+ 44 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/EmptySendReceipt.java

@@ -0,0 +1,44 @@
+package com.chelvc.framework.rocketmq.producer;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+
+/**
+ * RocketMQ消息发送凭据空实现
+ *
+ * @author Woody
+ * @date 2024/7/14
+ */
+public class EmptySendReceipt implements SendReceipt {
+    /**
+     * 无发送凭据实例
+     */
+    public static final EmptySendReceipt NONE = new EmptySendReceipt();
+
+    /**
+     * 降级发送凭据实例
+     */
+    public static final EmptySendReceipt FALLBACK = new EmptySendReceipt();
+
+    private final CompletableFuture<SendReceipt> future;
+
+    private EmptySendReceipt() {
+        this.future = CompletableFuture.completedFuture(this);
+    }
+
+    /**
+     * 获取发送凭据监听器实例
+     *
+     * @return 发送凭据监听器实例
+     */
+    public CompletableFuture<SendReceipt> future() {
+        return this.future;
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return null;
+    }
+}

+ 145 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/ProducerMessage.java

@@ -0,0 +1,145 @@
+package com.chelvc.framework.rocketmq.producer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.common.util.JacksonUtils;
+import lombok.NonNull;
+import org.apache.rocketmq.client.apis.message.Message;
+
+/**
+ * RocketMQ生产消息
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+public class ProducerMessage implements Message {
+    private final String topic;
+    private final String label;
+    private final Long delaying;
+    private final Object payload;
+    private final String ordering;
+    private final Session session;
+    private final ByteBuffer body;
+    private final Collection<String> keys;
+    private final Map<String, String> properties;
+
+    public ProducerMessage(@NonNull String topic, String label, @NonNull Object payload, String ordering,
+                           Long delaying) {
+        this(topic, label, payload, ordering, delaying, null, Collections.emptySet(), Collections.emptyMap());
+    }
+
+    public ProducerMessage(@NonNull String topic, String label, @NonNull Object payload, String ordering, Long delaying,
+                           Session session, @NonNull Collection<String> keys, @NonNull Map<String, String> properties) {
+        this(topic, label, payload, ordering, delaying, session, keys, properties, body(payload));
+    }
+
+    public ProducerMessage(@NonNull String topic, String label, @NonNull Object payload, String ordering, Long delaying,
+                           Session session, @NonNull Collection<String> keys, @NonNull Map<String, String> properties,
+                           @NonNull ByteBuffer body) {
+        this.topic = topic;
+        this.label = label;
+        this.payload = payload;
+        this.ordering = ordering;
+        this.delaying = delaying;
+        this.session = session;
+        this.body = body;
+        this.keys = Collections.unmodifiableCollection(keys);
+        this.properties = Collections.unmodifiableMap(properties);
+    }
+
+    /**
+     * 获取消息载体字节流
+     *
+     * @param payload 消息载体
+     * @return 字节流
+     */
+    public static ByteBuffer body(@NonNull Object payload) {
+        return ByteBuffer.wrap(JacksonUtils.serialize(payload).getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
+    }
+
+    /**
+     * 获取消息标签
+     *
+     * @return 消息标签
+     */
+    public String getLabel() {
+        return this.label;
+    }
+
+    /**
+     * 获取延迟时间戳
+     *
+     * @return 时间戳
+     */
+    public Long getDelaying() {
+        return this.delaying;
+    }
+
+    /**
+     * 获取消息载体
+     *
+     * @return 消息载体
+     */
+    public Object getPayload() {
+        return this.payload;
+    }
+
+    /**
+     * 获取排序标识
+     *
+     * @return 排序标识
+     */
+    public String getOrdering() {
+        return this.ordering;
+    }
+
+    /**
+     * 获取会话信息
+     *
+     * @return 会话信息
+     */
+    public Session getSession() {
+        return this.session;
+    }
+
+    @Override
+    public String getTopic() {
+        return this.topic;
+    }
+
+    @Override
+    public ByteBuffer getBody() {
+        return this.body;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+    @Override
+    public Optional<String> getTag() {
+        return Optional.ofNullable(this.label);
+    }
+
+    @Override
+    public Collection<String> getKeys() {
+        return this.keys;
+    }
+
+    @Override
+    public Optional<String> getMessageGroup() {
+        return Optional.ofNullable(this.ordering);
+    }
+
+    @Override
+    public Optional<Long> getDeliveryTimestamp() {
+        return Optional.ofNullable(this.delaying);
+    }
+}

+ 9 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/RocketMQProducerFactory.java

@@ -3,6 +3,7 @@ package com.chelvc.framework.rocketmq.producer;
 import java.util.List;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import lombok.RequiredArgsConstructor;
@@ -54,7 +55,13 @@ public class RocketMQProducerFactory implements FactoryBean<Producer>, Disposabl
 
         // 设置事务消息本地事务检测回调方法
         builder.setTransactionChecker(message -> {
-            TransactionResolution resolution = RocketMQContextHolder.deserialize(message, checker::check);
+            TransactionResolution resolution;
+            SessionContextHolder.setSession(RocketMQContextHolder.getSession(message));
+            try {
+                resolution = checker.check(message);
+            } finally {
+                SessionContextHolder.removeSessionContext();
+            }
             return ObjectUtils.ifNull(resolution, TransactionResolution.UNKNOWN);
         });
 
@@ -67,6 +74,7 @@ public class RocketMQProducerFactory implements FactoryBean<Producer>, Disposabl
         }
 
         // 消息生产者包装处理
+        producer = new RocketMQProducerIsolator(producer);
         List<RocketMQProducerWrapper> wrappers = ApplicationContextHolder.getSortBeans(RocketMQProducerWrapper.class);
         for (RocketMQProducerWrapper wrapper : wrappers) {
             producer = wrapper.wrap(producer);

+ 75 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/RocketMQProducerIsolator.java

@@ -0,0 +1,75 @@
+package com.chelvc.framework.rocketmq.producer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.context.Session;
+import com.google.common.collect.Maps;
+import lombok.NonNull;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+
+/**
+ * RocketMQ生产消息环境隔离处理器
+ *
+ * @author Woody
+ * @date 2025/9/2
+ */
+public class RocketMQProducerIsolator implements Producer {
+    private final Producer delegate;
+
+    public RocketMQProducerIsolator(@NonNull Producer delegate) {
+        this.delegate = delegate;
+    }
+
+    /**
+     * 消息环境隔离
+     *
+     * @param message 消息实例
+     * @return 消息实例
+     */
+    private Message isolate(Message message) {
+        // 初始化消息会话上下文
+        ProducerMessage original = (ProducerMessage) message;
+        Map<String, String> properties = original.getProperties();
+        if (original.getSession() != null) {
+            properties = Maps.newHashMap(properties);
+            properties.put(Session.NAMING, original.getSession().serialize());
+        }
+
+        // 构建环境隔离的消息记录
+        return new ProducerMessage(ApplicationContextHolder.getProfile(original.getTopic()), original.getLabel(),
+                original.getPayload(), original.getOrdering(), original.getDelaying(), original.getSession(),
+                original.getKeys(), properties, original.getBody());
+    }
+
+    @Override
+    public SendReceipt send(Message message) throws ClientException {
+        return this.delegate.send(this.isolate(message));
+    }
+
+    @Override
+    public SendReceipt send(Message message, Transaction transaction) throws ClientException {
+        return this.delegate.send(this.isolate(message), transaction);
+    }
+
+    @Override
+    public CompletableFuture<SendReceipt> sendAsync(Message message) {
+        return this.delegate.sendAsync(this.isolate(message));
+    }
+
+    @Override
+    public Transaction beginTransaction() throws ClientException {
+        return this.delegate.beginTransaction();
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.delegate.close();
+    }
+}

+ 29 - 13
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/StandardTransactionChecker.java

@@ -3,11 +3,13 @@ package com.chelvc.framework.rocketmq.producer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
-import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -26,27 +28,41 @@ public class StandardTransactionChecker implements TransactionChecker {
     private final Map<String, TransactionCheckProcessor<?>> processors;
 
     public StandardTransactionChecker(@NonNull List<TransactionCheckProcessor<?>> processors) {
-        this.processors = ObjectUtils.isEmpty(processors) ? Collections.emptyMap() :
-                processors.stream().collect(Collectors.toMap(TransactionCheckProcessor::topic, processor -> processor));
+        if (ObjectUtils.isEmpty(processors)) {
+            this.processors = Collections.emptyMap();
+        } else {
+            this.processors = processors.stream().collect(Collectors.toMap(
+                    processor -> {
+                        String topic = processor.topic();
+                        if (StringUtils.isEmpty(topic)) {
+                            throw new IllegalStateException("Transaction check processor topic must not be empty: " +
+                                    processor.getClass().getName());
+                        } else if (Objects.isNull(processor.model())) {
+                            throw new IllegalStateException("Transaction check processor model must not be null: " +
+                                    processor.getClass().getName());
+                        }
+                        String tag = processor.tag();
+                        topic = ApplicationContextHolder.getProfile(topic);
+                        return StringUtils.isEmpty(tag) ? topic : (topic + ":" + tag);
+                    },
+                    processor -> processor
+            ));
+        }
     }
 
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
     public TransactionResolution check(MessageView message) {
-        String topic = RocketMQContextHolder.topic(message);
-        TransactionCheckProcessor processor = this.processors.get(topic);
+        String topic = message.getTopic(), tag = message.getTag().orElse(null);
+        String key = StringUtils.isEmpty(tag) ? topic : (topic + ":" + tag);
+        TransactionCheckProcessor processor = this.processors.get(key);
         if (processor == null) {
-            log.error("Transaction message check processor is missing: {}", topic);
+            log.error("RocketMQ transaction message check processor is missing: {}", message);
             return TransactionResolution.UNKNOWN;
         }
 
-        String body = RocketMQContextHolder.body(message);
+        String body = RocketMQContextHolder.getBody(message);
         Object payload = JacksonUtils.deserialize(body, processor.model());
-        SessionContextHolder.setSession(RocketMQContextHolder.getSession(message));
-        try {
-            return ObjectUtils.ifNull(processor.check(payload), TransactionResolution.UNKNOWN);
-        } finally {
-            SessionContextHolder.removeSessionContext();
-        }
+        return ObjectUtils.ifNull(processor.check(payload), TransactionResolution.UNKNOWN);
     }
 }

+ 9 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionCheckProcessor.java

@@ -10,6 +10,15 @@ import org.apache.rocketmq.client.apis.producer.TransactionResolution;
  * @date 2024/10/29
  */
 public interface TransactionCheckProcessor<T> {
+    /**
+     * 获取消息标签
+     *
+     * @return 消息标签
+     */
+    default String tag() {
+        return null;
+    }
+
     /**
      * 获取消息主题
      *

+ 5 - 3
framework-security/src/main/java/com/chelvc/framework/security/interceptor/SecurityFirewallInterceptor.java

@@ -45,7 +45,7 @@ public class SecurityFirewallInterceptor implements HandlerInterceptor, WebMvcCo
     /**
      * 规则列表类型定义
      */
-    private static final TypeReference<List<Rule>> RULE_LIST_TYPE = new TypeReference<List<Rule>>() {
+    private static final TypeReference<List<Rule>> LIST_RULE_TYPE = new TypeReference<List<Rule>>() {
     };
 
     private final FirewallProcessor processor;
@@ -66,7 +66,7 @@ public class SecurityFirewallInterceptor implements HandlerInterceptor, WebMvcCo
      */
     private List<Rule> getRules() {
         return ApplicationContextHolder.getProperty("security.firewall.rules", json -> {
-            List<Rule> rules = JacksonUtils.deserialize(json, RULE_LIST_TYPE);
+            List<Rule> rules = JacksonUtils.deserialize(json, LIST_RULE_TYPE);
             if (ObjectUtils.notEmpty(rules)) {
                 rules.removeIf(rule -> !this.matches(rule));
             }
@@ -103,7 +103,9 @@ public class SecurityFirewallInterceptor implements HandlerInterceptor, WebMvcCo
             try {
                 return Ip.getAddressNumber(host);
             } catch (Exception e) {
-                log.error("Address number convert failed", e);
+                log.error("IP address number convert failed: {}", e.getMessage());
+                String message = ApplicationContextHolder.getMessage("Forbidden");
+                throw new FrameworkException(HttpStatus.FORBIDDEN.name(), null, message);
             }
         }
         return 0;

+ 1 - 1
framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonArrayDecrypter.java

@@ -17,7 +17,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class JacksonArrayDecrypter extends JsonDeserializer<String[]> {
     @Override
     public String[] deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        String[] value = parser.readValueAs(JacksonUtils.STRING_ARRAY_TYPE);
+        String[] value = parser.readValueAs(JacksonUtils.ARRAY_STRING_TYPE);
         return SecurityContextHolder.getSecurityCipherHandler().decrypt(value);
     }
 }

+ 1 - 1
framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonListDecrypter.java

@@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class JacksonListDecrypter extends JsonDeserializer<List<String>> {
     @Override
     public List<String> deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        List<String> value = parser.readValueAs(JacksonUtils.STRING_LIST_TYPE);
+        List<String> value = parser.readValueAs(JacksonUtils.LIST_STRING_TYPE);
         return SecurityContextHolder.getSecurityCipherHandler().decrypt(value);
     }
 }

+ 1 - 1
framework-security/src/main/java/com/chelvc/framework/security/jackson/JacksonSetDecrypter.java

@@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 public class JacksonSetDecrypter extends JsonDeserializer<Set<String>> {
     @Override
     public Set<String> deserialize(JsonParser parser, DeserializationContext context) throws IOException {
-        Set<String> value = parser.readValueAs(JacksonUtils.STRING_SET_TYPE);
+        Set<String> value = parser.readValueAs(JacksonUtils.SET_STRING_TYPE);
         return SecurityContextHolder.getSecurityCipherHandler().decrypt(value);
     }
 }