Преглед на файлове

优化Kafka、RocketMQ降级处理逻辑

woody преди 5 месеца
родител
ревизия
3526a2cedf
променени са 22 файла, в които са добавени 422 реда и са изтрити 314 реда
  1. 1 1
      framework-group/src/main/java/com/chelvc/framework/group/GroupClient.java
  2. 1 1
      framework-kafka/pom.xml
  3. 22 2
      framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaConfigurer.java
  4. 2 2
      framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaProperties.java
  5. 19 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java
  6. 7 2
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallbackPolicy.java
  7. 28 11
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java
  8. 40 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMessage.java
  9. 47 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaPersistentSender.java
  10. 22 0
      framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaStoreClient.java
  11. 1 8
      framework-rocketmq/pom.xml
  12. 22 2
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQConfigurer.java
  13. 2 2
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQProperties.java
  14. 42 35
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java
  15. 5 5
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackPolicy.java
  16. 0 56
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQKafkaConfigurer.java
  17. 0 46
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQKafkaProducer.java
  18. 0 127
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQListenerKafkaProcessor.java
  19. 29 14
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java
  20. 40 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMessage.java
  21. 70 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQPersistentProducer.java
  22. 22 0
      framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQStoreClient.java

+ 1 - 1
framework-group/src/main/java/com/chelvc/framework/group/GroupClient.java

@@ -17,7 +17,7 @@ import org.springframework.web.bind.annotation.RequestParam;
  * @author Woody
  * @date 2024/12/5
  */
-@FeignClient(value = "${group.store.server}")
+@FeignClient(value = "${group.store-server}")
 public interface GroupClient {
     /**
      * 获取分组标识

+ 1 - 1
framework-kafka/pom.xml

@@ -6,7 +6,7 @@
 
     <parent>
         <groupId>com.chelvc.framework</groupId>
-        <artifactId>framework-dependencies</artifactId>
+        <artifactId>framework-cloud-feign-client</artifactId>
         <version>1.0.0-RELEASE</version>
         <relativePath/>
     </parent>

+ 22 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaConfigurer.java

@@ -5,8 +5,10 @@ import java.util.List;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
-import com.chelvc.framework.kafka.fallback.KafkaFallback;
+import com.chelvc.framework.kafka.fallback.KafkaFallbackPolicy;
 import com.chelvc.framework.kafka.fallback.KafkaMemorySender;
+import com.chelvc.framework.kafka.fallback.KafkaPersistentSender;
+import com.chelvc.framework.kafka.fallback.KafkaStoreClient;
 import com.chelvc.framework.kafka.interceptor.KafkaSessionInterceptor;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
 import com.chelvc.framework.kafka.producer.KafkaSenderWrapper;
@@ -104,7 +106,7 @@ public class KafkaConfigurer implements KafkaListenerConfigurer {
     @ConditionalOnExpression("'${spring.kafka.fallbacks}'.contains('MEMORY')")
     public KafkaSenderWrapper memorySenderWrapper() {
         return new KafkaSenderWrapper() {
-            private final int order = properties.getFallbacks().indexOf(KafkaFallback.MEMORY);
+            private final int order = properties.getFallbacks().indexOf(KafkaFallbackPolicy.MEMORY);
 
             @Override
             public int getOrder() {
@@ -118,6 +120,24 @@ public class KafkaConfigurer implements KafkaListenerConfigurer {
         };
     }
 
+    @Bean
+    @ConditionalOnExpression("'${spring.kafka.fallbacks}'.contains('PERSISTENT')")
+    public KafkaSenderWrapper persistentSenderWrapper(KafkaStoreClient client) {
+        return new KafkaSenderWrapper() {
+            private final int order = properties.getFallbacks().indexOf(KafkaFallbackPolicy.PERSISTENT);
+
+            @Override
+            public int getOrder() {
+                return this.order;
+            }
+
+            @Override
+            public <K, V> KafkaMessageSender<K, V> wrap(KafkaMessageSender<K, V> sender) {
+                return new KafkaPersistentSender<K, V>(client, sender);
+            }
+        };
+    }
+
     @Bean
     @ConditionalOnBean(KafkaSenderWrapper.class)
     public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> producerFactory,

+ 2 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaProperties.java

@@ -5,7 +5,7 @@ import java.util.Collections;
 import java.util.List;
 
 import com.chelvc.framework.common.util.JacksonUtils;
-import com.chelvc.framework.kafka.fallback.KafkaFallback;
+import com.chelvc.framework.kafka.fallback.KafkaFallbackPolicy;
 import com.chelvc.framework.kafka.interceptor.KafkaSessionInterceptor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -47,7 +47,7 @@ public class KafkaProperties extends org.springframework.boot.autoconfigure.kafk
     /**
      * 降级策略列表
      */
-    private List<KafkaFallback> fallbacks = Collections.emptyList();
+    private List<KafkaFallbackPolicy> fallbacks = Collections.emptyList();
 
     {
         // 初始化生产者默认配置

+ 19 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -213,4 +213,23 @@ public final class KafkaContextHolder {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * 发送消息
+     *
+     * @param topic     消息主题
+     * @param payload   消息内容
+     * @param ordering  顺序消息标识
+     * @param partition 指定分区
+     * @param <K>       顺序标识类型
+     * @param <V>       消息载体类型
+     */
+    public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering, Integer partition) {
+        KafkaTemplate<K, V> template = getKafkaTemplate();
+        try {
+            template.send(topic, partition, ordering, payload).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

+ 7 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallback.java → framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallbackPolicy.java

@@ -6,9 +6,14 @@ package com.chelvc.framework.kafka.fallback;
  * @author Woody
  * @date 2024/7/14
  */
-public enum KafkaFallback {
+public enum KafkaFallbackPolicy {
     /**
      * 内存降级策略
      */
-    MEMORY;
+    MEMORY,
+
+    /**
+     * 持久化降级策略
+     */
+    PERSISTENT;
 }

+ 28 - 11
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java

@@ -3,9 +3,14 @@ package com.chelvc.framework.kafka.fallback;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.kafka.producer.KafkaMessageSender;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
 /**
@@ -18,28 +23,43 @@ import org.apache.kafka.clients.producer.ProducerRecord;
  */
 @Slf4j
 public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
+    private final int capacity;
     private volatile Thread thread;
-    private volatile BlockingQueue<ProducerRecord<K, V>> queue;
+    private volatile BlockingQueue<Pair<ProducerRecord<K, V>, Session>> queue;
 
-    public KafkaMemorySender(KafkaMessageSender<K, V> delegate) {
+    public KafkaMemorySender(@NonNull KafkaMessageSender<K, V> delegate) {
+        this(10000, delegate);
+    }
+
+    public KafkaMemorySender(int capacity, @NonNull KafkaMessageSender<K, V> delegate) {
         super(delegate);
+        AssertUtils.check(capacity > 0, () -> "Kafka fallback queue capacity must be greater than 0");
+        this.capacity = capacity;
     }
 
     @Override
     protected void fallback(ProducerRecord<K, V> record) {
+        if (log.isDebugEnabled()) {
+            log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
+        }
+
         // 初始化消息队列及重试线程
         if (this.queue == null) {
             synchronized (this) {
                 if (this.queue == null) {
-                    this.queue = new ArrayBlockingQueue<>(10000);
-                    this.thread = ThreadUtils.consume(this.queue, message -> {
+                    this.queue = new ArrayBlockingQueue<>(this.capacity);
+                    this.thread = ThreadUtils.consume(this.queue, pair -> {
+                        ProducerRecord<K, V> message = pair.getLeft();
+                        SessionContextHolder.setSession(pair.getRight());
                         try {
                             this.delegate.send(message);
                             return true;
                         } catch (Exception e) {
                             log.error("Kafka message send failed: {}", message.topic(), e);
-                            ThreadUtils.sleep(5000);
+                        } finally {
+                            SessionContextHolder.clearSessionContext();
                         }
+                        ThreadUtils.sleep(5000);
                         return false;
                     });
                 }
@@ -47,13 +67,10 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
         }
 
         // 将消息放入内存消息队列
-        if (this.queue.offer(record)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Kafka fallback memory message: {}, {}", record.topic(), record.value());
-            }
-            return;
+        Session session = SessionContextHolder.getSession(false);
+        if (!this.queue.offer(Pair.of(record, session))) {
+            throw new RuntimeException("Kafka fallback queue is full: " + record.topic() + ", " + record.value());
         }
-        throw new RuntimeException("Kafka fallback memory queue is full: " + record.topic() + ", " + record.value());
     }
 
     @Override

+ 40 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMessage.java

@@ -0,0 +1,40 @@
+package com.chelvc.framework.kafka.fallback;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Kafka消息
+ *
+ * @author Woody
+ * @date 2024/12/20
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaMessage implements Serializable {
+    /**
+     * 消息主题
+     */
+    private String topic;
+
+    /**
+     * 消息载体
+     */
+    private Object payload;
+
+    /**
+     * 排序标识
+     */
+    private Object ordering;
+
+    /**
+     * 消息分区
+     */
+    private Integer partition;
+}

+ 47 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaPersistentSender.java

@@ -0,0 +1,47 @@
+package com.chelvc.framework.kafka.fallback;
+
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.kafka.context.KafkaContextHolder;
+import com.chelvc.framework.kafka.producer.KafkaMessageSender;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Kafka持久化降级发送处理器实现
+ *
+ * @param <K> 顺序标识类型
+ * @param <V> 消息载体类型
+ * @author Woody
+ * @date 2024/12/19
+ */
+@Slf4j
+public class KafkaPersistentSender<K, V> extends KafkaFallbackSender<K, V> {
+    private final KafkaStoreClient client;
+
+    public KafkaPersistentSender(@NonNull KafkaStoreClient client, @NonNull KafkaMessageSender<K, V> delegate) {
+        super(delegate);
+        this.client = client;
+    }
+
+    @Override
+    protected void fallback(ProducerRecord<K, V> record) {
+        if (log.isDebugEnabled()) {
+            log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
+        }
+
+        KafkaMessage message = new KafkaMessage(record.topic(), record.value(), record.key(), record.partition());
+        Session session = SessionContextHolder.getSession(false);
+        if (session == null && (session = KafkaContextHolder.getSession(record.headers())) != null) {
+            SessionContextHolder.setSession(session);
+            try {
+                this.client.save(message);
+            } finally {
+                SessionContextHolder.clearSessionContext();
+            }
+        } else {
+            this.client.save(message);
+        }
+    }
+}

+ 22 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaStoreClient.java

@@ -0,0 +1,22 @@
+package com.chelvc.framework.kafka.fallback;
+
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+/**
+ * Kafka消息存储服务客户端
+ *
+ * @author Woody
+ * @date 2024/12/19
+ */
+@FeignClient(value = "${spring.kafka.store-server:kafka}")
+public interface KafkaStoreClient {
+    /**
+     * 保存Kafka消息
+     *
+     * @param message 消息对象
+     */
+    @PostMapping("/kafka/message")
+    void save(@RequestBody KafkaMessage message);
+}

+ 1 - 8
framework-rocketmq/pom.xml

@@ -6,7 +6,7 @@
 
     <parent>
         <groupId>com.chelvc.framework</groupId>
-        <artifactId>framework-dependencies</artifactId>
+        <artifactId>framework-cloud-feign-client</artifactId>
         <version>1.0.0-RELEASE</version>
         <relativePath/>
     </parent>
@@ -16,7 +16,6 @@
 
     <properties>
         <framework-base.version>1.0.0-RELEASE</framework-base.version>
-        <framework-kafka.version>1.0.0-RELEASE</framework-kafka.version>
     </properties>
 
     <dependencies>
@@ -25,12 +24,6 @@
             <artifactId>framework-base</artifactId>
             <version>${framework-base.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.chelvc.framework</groupId>
-            <artifactId>framework-kafka</artifactId>
-            <version>${framework-kafka.version}</version>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client-java</artifactId>

+ 22 - 2
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQConfigurer.java

@@ -1,7 +1,9 @@
 package com.chelvc.framework.rocketmq.config;
 
-import com.chelvc.framework.rocketmq.fallback.RocketMQFallback;
+import com.chelvc.framework.rocketmq.fallback.RocketMQFallbackPolicy;
 import com.chelvc.framework.rocketmq.fallback.RocketMQMemoryProducer;
+import com.chelvc.framework.rocketmq.fallback.RocketMQPersistentProducer;
+import com.chelvc.framework.rocketmq.fallback.RocketMQStoreClient;
 import com.chelvc.framework.rocketmq.producer.CommonTransactionChecker;
 import com.chelvc.framework.rocketmq.producer.RocketMQProducerWrapper;
 import lombok.RequiredArgsConstructor;
@@ -63,7 +65,7 @@ public class RocketMQConfigurer {
     @ConditionalOnExpression("'${rocketmq.fallbacks}'.contains('MEMORY')")
     public RocketMQProducerWrapper memoryProducerWrapper() {
         return new RocketMQProducerWrapper() {
-            private final int order = properties.getFallbacks().indexOf(RocketMQFallback.MEMORY);
+            private final int order = properties.getFallbacks().indexOf(RocketMQFallbackPolicy.MEMORY);
 
             @Override
             public int getOrder() {
@@ -76,4 +78,22 @@ public class RocketMQConfigurer {
             }
         };
     }
+
+    @Bean
+    @ConditionalOnExpression("'${rocketmq.fallbacks}'.contains('PERSISTENT')")
+    public RocketMQProducerWrapper persistentProducerWrapper(RocketMQStoreClient client) {
+        return new RocketMQProducerWrapper() {
+            private final int order = properties.getFallbacks().indexOf(RocketMQFallbackPolicy.PERSISTENT);
+
+            @Override
+            public int getOrder() {
+                return this.order;
+            }
+
+            @Override
+            public Producer wrap(Producer producer) {
+                return new RocketMQPersistentProducer(client, producer);
+            }
+        };
+    }
 }

+ 2 - 2
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/RocketMQProperties.java

@@ -3,7 +3,7 @@ package com.chelvc.framework.rocketmq.config;
 import java.util.Collections;
 import java.util.List;
 
-import com.chelvc.framework.rocketmq.fallback.RocketMQFallback;
+import com.chelvc.framework.rocketmq.fallback.RocketMQFallbackPolicy;
 import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
@@ -41,5 +41,5 @@ public class RocketMQProperties {
     /**
      * 降级策略列表
      */
-    private List<RocketMQFallback> fallbacks = Collections.emptyList();
+    private List<RocketMQFallbackPolicy> fallbacks = Collections.emptyList();
 }

+ 42 - 35
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -34,6 +34,11 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
  * @date 2024/1/30
  */
 public final class RocketMQContextHolder {
+    /**
+     * topic、tag分隔符
+     */
+    public static final char TOPIC_TAG_DELIMITER = ':';
+
     /**
      * 消息生产者
      */
@@ -107,7 +112,12 @@ public final class RocketMQContextHolder {
      * @return 消息主题
      */
     public static String topic(@NonNull Message message) {
-        return topic(message, ':');
+        String namespace = getProperties().getNamespace();
+        String topic = message.getTopic(), tag = message.getTag().orElse(null);
+        if (StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
+            topic = topic.substring(namespace.length());
+        }
+        return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
     }
 
     /**
@@ -117,33 +127,12 @@ public final class RocketMQContextHolder {
      * @return 消息主题
      */
     public static String topic(@NonNull MessageView message) {
-        return topic(message, ':');
-    }
-
-    /**
-     * 获取消息主题
-     *
-     * @param message   消息实例
-     * @param delimiter TOPIC与TAG分隔符
-     * @return 消息主题
-     */
-    public static String topic(@NonNull Message message, char delimiter) {
         String namespace = getProperties().getNamespace();
-        String topic = message.getTopic() + delimiter + message.getTag().orElse(null);
-        return topic.startsWith(namespace) ? topic.substring(namespace.length()) : topic;
-    }
-
-    /**
-     * 获取消息主题
-     *
-     * @param message   消息实例
-     * @param delimiter TOPIC与TAG分隔符
-     * @return 消息主题
-     */
-    public static String topic(@NonNull MessageView message, char delimiter) {
-        String namespace = getProperties().getNamespace();
-        String topic = message.getTopic() + delimiter + message.getTag().orElse(null);
-        return topic.startsWith(namespace) ? topic.substring(namespace.length()) : topic;
+        String topic = message.getTopic(), tag = message.getTag().orElse(null);
+        if (StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
+            topic = topic.substring(namespace.length());
+        }
+        return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
     }
 
     /**
@@ -177,6 +166,28 @@ public final class RocketMQContextHolder {
         return StringUtils.isEmpty(namespace) ? original : (namespace + original);
     }
 
+    /**
+     * 获取会话信息
+     *
+     * @param message 消息实例
+     * @return 会话实例
+     */
+    public static Session getSession(@NonNull Message message) {
+        String json = message.getProperties().get(Session.NAMING);
+        return StringUtils.isEmpty(json) ? null : JacksonUtils.deserialize(json, Session.class);
+    }
+
+    /**
+     * 获取会话信息
+     *
+     * @param message 消息实例
+     * @return 会话实例
+     */
+    public static Session getSession(@NonNull MessageView message) {
+        String json = message.getProperties().get(Session.NAMING);
+        return StringUtils.isEmpty(json) ? null : JacksonUtils.deserialize(json, Session.class);
+    }
+
     /**
      * 消息序列化
      *
@@ -189,7 +200,7 @@ public final class RocketMQContextHolder {
         byte[] body = JacksonUtils.serialize(payload).getBytes(StandardCharsets.UTF_8);
         String session = JacksonUtils.serialize(SessionContextHolder.getSession(false));
         MessageBuilder builder = getServiceProvider().newMessageBuilder().setBody(body);
-        int delimiter = topic.indexOf(':');
+        int delimiter = topic.indexOf(TOPIC_TAG_DELIMITER);
         if (delimiter <= 0) {
             builder.setTopic(isolate(topic));
         } else {
@@ -225,8 +236,7 @@ public final class RocketMQContextHolder {
      * @return 消息处理结果
      */
     public static <T> T deserialize(@NonNull MessageView message, @NonNull Function<MessageView, T> function) {
-        Session session = JacksonUtils.deserialize(message.getProperties().get(Session.NAMING), Session.class);
-        SessionContextHolder.setSession(session);
+        SessionContextHolder.setSession(getSession(message));
         try {
             return function.apply(message);
         } finally {
@@ -262,11 +272,8 @@ public final class RocketMQContextHolder {
         if (ObjectUtils.isEmpty(messages)) {
             return Collections.emptyList();
         }
-        return messages.stream().map(message -> {
-            Object payload = deserialize(message, type);
-            Session session = JacksonUtils.deserialize(message.getProperties().get(Session.NAMING), Session.class);
-            return function.apply(payload, session);
-        }).filter(Objects::nonNull).collect(Collectors.toList());
+        return messages.stream().map(message -> function.apply(deserialize(message, type), getSession(message)))
+                .filter(Objects::nonNull).collect(Collectors.toList());
     }
 
     /**

+ 5 - 5
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallback.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackPolicy.java

@@ -6,14 +6,14 @@ package com.chelvc.framework.rocketmq.fallback;
  * @author Woody
  * @date 2024/7/14
  */
-public enum RocketMQFallback {
+public enum RocketMQFallbackPolicy {
     /**
-     * Kafka降级策略
+     * 内存降级策略
      */
-    KAFKA,
+    MEMORY,
 
     /**
-     * 内存降级策略
+     * 持久化降级策略
      */
-    MEMORY;
+    PERSISTENT;
 }

+ 0 - 56
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQKafkaConfigurer.java

@@ -1,56 +0,0 @@
-package com.chelvc.framework.rocketmq.fallback;
-
-import com.chelvc.framework.kafka.context.KafkaContextHolder;
-import com.chelvc.framework.rocketmq.config.RocketMQProperties;
-import com.chelvc.framework.rocketmq.producer.RocketMQProducerWrapper;
-import lombok.RequiredArgsConstructor;
-import org.apache.rocketmq.client.apis.producer.Producer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
-import org.springframework.kafka.config.KafkaListenerConfigUtils;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-
-/**
- * RocketMQ降级Kafka配置
- *
- * @author Woody
- * @date 2024/7/13
- */
-@Configuration
-@ConditionalOnClass(KafkaContextHolder.class)
-@ConditionalOnExpression("'${rocketmq.fallbacks}'.contains('KAFKA')")
-@RequiredArgsConstructor(onConstructor = @__(@Autowired))
-public class RocketMQKafkaConfigurer {
-    private final RocketMQProperties properties;
-
-    @Bean
-    public RocketMQProducerWrapper kafkaProducerWrapper() {
-        return new RocketMQProducerWrapper() {
-            private final int order = properties.getFallbacks().indexOf(RocketMQFallback.KAFKA);
-
-            @Override
-            public int getOrder() {
-                return this.order;
-            }
-
-            @Override
-            public Producer wrap(Producer producer) {
-                return new RocketMQKafkaProducer(producer);
-            }
-        };
-    }
-
-    @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
-    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
-        return new KafkaListenerEndpointRegistry();
-    }
-
-    @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
-    public KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor() {
-        return new RocketMQListenerKafkaProcessor();
-    }
-}

+ 0 - 46
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQKafkaProducer.java

@@ -1,46 +0,0 @@
-package com.chelvc.framework.rocketmq.fallback;
-
-import com.chelvc.framework.base.context.Session;
-import com.chelvc.framework.base.context.SessionContextHolder;
-import com.chelvc.framework.common.util.JacksonUtils;
-import com.chelvc.framework.kafka.context.KafkaContextHolder;
-import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.Transaction;
-
-/**
- * RocketMQ降级Kafka消息生产者实现
- *
- * @author Woody
- * @date 2024/7/13
- */
-@Slf4j
-public class RocketMQKafkaProducer extends RocketMQFallbackProducer {
-    public RocketMQKafkaProducer(Producer delegate) {
-        super(delegate);
-    }
-
-    @Override
-    protected void fallback(Message message, Transaction transaction) {
-        if (log.isDebugEnabled()) {
-            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-            log.debug("RocketMQ fallback kafka message: {}, {}", topic, body);
-        }
-        String topic = RocketMQContextHolder.topic(message, '-');
-        Object payload = JacksonUtils.deserialize(RocketMQContextHolder.body(message));
-        String ordering = message.getMessageGroup().orElse(null);
-        Session session = JacksonUtils.deserialize(message.getProperties().get(Session.NAMING), Session.class);
-        SessionContextHolder.setSession(session);
-        try {
-            if (ordering == null) {
-                KafkaContextHolder.send(topic, payload);
-            } else {
-                KafkaContextHolder.send(topic, payload, ordering);
-            }
-        } finally {
-            SessionContextHolder.clearSessionContext();
-        }
-    }
-}

+ 0 - 127
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQListenerKafkaProcessor.java

@@ -1,127 +0,0 @@
-package com.chelvc.framework.rocketmq.fallback;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Stream;
-
-import com.beust.jcommander.internal.Maps;
-import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.common.util.ObjectUtils;
-import com.chelvc.framework.common.util.StringUtils;
-import com.chelvc.framework.rocketmq.annotation.RocketMQConsumer;
-import com.chelvc.framework.rocketmq.consumer.RocketMQListener;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.aop.framework.AopProxyUtils;
-import org.springframework.boot.context.event.ApplicationStartedEvent;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationListener;
-import org.springframework.core.env.Environment;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
-import org.springframework.kafka.annotation.TopicPartition;
-import sun.reflect.annotation.AnnotationParser;
-
-/**
- * RocketMQ降级Kafka消息监听器处理器
- *
- * @author Woody
- * @date 2024/7/13
- */
-@Slf4j
-public class RocketMQListenerKafkaProcessor extends KafkaListenerAnnotationBeanPostProcessor<Object, Object>
-        implements ApplicationListener<ApplicationStartedEvent> {
-    /**
-     * 创建Kafka消息监听器注解实例
-     *
-     * @param topic    消息主题
-     * @param consumer RocketMQ消息监听器注解实例
-     * @return Kafka消息监听器注解实例
-     */
-    protected KafkaListener createKafkaListener(@NonNull String topic, @NonNull RocketMQConsumer consumer) {
-        Map<String, Object> properties = Maps.newHashMap();
-        properties.put("id", StringUtils.EMPTY);
-        properties.put("containerFactory", consumer.batch() > 1 ? "batchListenerContainerFactory" : StringUtils.EMPTY);
-        properties.put("topics", new String[]{topic});
-        properties.put("topicPattern", StringUtils.EMPTY);
-        properties.put("topicPartitions", new TopicPartition[0]);
-        properties.put("containerGroup", StringUtils.EMPTY);
-        properties.put("errorHandler", StringUtils.EMPTY);
-        properties.put("groupId", consumer.group());
-        properties.put("idIsGroup", true);
-        properties.put("clientIdPrefix", StringUtils.EMPTY);
-        properties.put("beanRef", "__listener");
-        properties.put("concurrency", StringUtils.EMPTY);
-        properties.put("autoStartup", StringUtils.EMPTY);
-        properties.put("properties", StringUtils.EMPTY_ARRAY);
-        properties.put("splitIterables", true);
-        properties.put("contentTypeConverter", StringUtils.EMPTY);
-        properties.put("batch", String.valueOf(consumer.batch()));
-        return (KafkaListener) AnnotationParser.annotationForMap(KafkaListener.class, properties);
-    }
-
-    /**
-     * 注册消息监听器容器
-     *
-     * @param applicationContext 应用上下文
-     * @param name               消息监听器名称
-     * @param listener           消息监听器实例
-     * @param <T>                消息对象类型
-     */
-    protected <T> void registerListenerContainer(@NonNull ApplicationContext applicationContext, @NonNull String name,
-                                                 @NonNull RocketMQListener<T> listener) {
-        // 获取RocketMQ消息监听器注解实例
-        Class<?> clazz = AopProxyUtils.ultimateTargetClass(listener);
-        RocketMQConsumer consumer = clazz.getAnnotation(RocketMQConsumer.class);
-        if (consumer == null) {
-            return;
-        }
-
-        // 判断类对象是否存在相同topic @KafkaListener注解,如果存在则忽略
-        Environment environment = applicationContext.getEnvironment();
-        String topic = ObjectUtils.ifNull(consumer.topic(), environment::resolvePlaceholders);
-        String tag = ObjectUtils.ifNull(consumer.tag(), environment::resolvePlaceholders);
-        if (StringUtils.notEmpty(tag) && !Objects.equals(tag, "*")) {
-            topic = topic + "-" + tag;
-        }
-        KafkaListener annotation = clazz.getAnnotation(KafkaListener.class);
-        if (annotation != null && Stream.concat(Stream.of(annotation.topics()), Stream.of(annotation.topicPattern()))
-                .filter(StringUtils::notEmpty).map(environment::resolvePlaceholders).anyMatch(topic::equals)) {
-            return;
-        }
-
-        // 判断消息消费方法是否存在相同topic @KafkaListener注解,如果存在则忽略
-        Method method;
-        Class<?> type = ObjectUtils.type2class(ObjectUtils.lookupSuperclassParameterized(
-                clazz, RocketMQListener.class, Object.class
-        ));
-        try {
-            method = clazz.getMethod("consume", type);
-        } catch (NoSuchMethodException e) {
-            log.warn("Kafka fallback method is missing: {}.({})", clazz.getName(), type.getName());
-            return;
-        }
-        annotation = method.getAnnotation(KafkaListener.class);
-        if (annotation != null && Stream.concat(Stream.of(annotation.topics()), Stream.of(annotation.topicPattern()))
-                .filter(StringUtils::notEmpty).map(environment::resolvePlaceholders).anyMatch(topic::equals)) {
-            return;
-        }
-
-        // 注册Kafka消息监听器
-        try {
-            annotation = this.createKafkaListener(topic, consumer);
-            this.processKafkaListener(annotation, method, listener, name);
-        } catch (Exception e) {
-            log.error("Kafka listener process failed", e);
-        }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void onApplicationEvent(ApplicationStartedEvent event) {
-        ApplicationContext applicationContext = event.getApplicationContext();
-        ApplicationContextHolder.getBeans(RocketMQListener.class)
-                .forEach((name, listener) -> this.registerListenerContainer(applicationContext, name, listener));
-    }
-}

+ 29 - 14
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java

@@ -4,8 +4,12 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.client.apis.message.Message;
@@ -20,23 +24,36 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
  */
 @Slf4j
 public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
+    private final int capacity;
     private volatile Thread thread;
-    private volatile BlockingQueue<Pair<Message, Transaction>> queue;
+    private volatile BlockingQueue<Pair<Pair<Message, Transaction>, Session>> queue;
 
-    public RocketMQMemoryProducer(Producer delegate) {
+    public RocketMQMemoryProducer(@NonNull Producer delegate) {
+        this(10000, delegate);
+    }
+
+    public RocketMQMemoryProducer(int capacity, @NonNull Producer delegate) {
         super(delegate);
+        AssertUtils.check(capacity > 0, () -> "RocketMQ fallback queue capacity must be greater than 0");
+        this.capacity = capacity;
     }
 
     @Override
     protected void fallback(Message message, Transaction transaction) {
+        if (log.isDebugEnabled()) {
+            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
+            log.debug("RocketMQ fallback message: {}, {}", topic, body);
+        }
+
         // 初始化消息队列及重试线程
         if (this.queue == null) {
             synchronized (this) {
                 if (this.queue == null) {
-                    this.queue = new ArrayBlockingQueue<>(10000);
+                    this.queue = new ArrayBlockingQueue<>(this.capacity);
                     this.thread = ThreadUtils.consume(this.queue, pair -> {
-                        Message _message = pair.getLeft();
-                        Transaction _transaction = pair.getRight();
+                        Message _message = pair.getLeft().getLeft();
+                        Transaction _transaction = pair.getLeft().getRight();
+                        SessionContextHolder.setSession(pair.getRight());
                         try {
                             if (_transaction == null) {
                                 this.delegate.send(_message);
@@ -47,8 +64,10 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
                             return true;
                         } catch (Exception e) {
                             log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(_message), e);
-                            ThreadUtils.sleep(5000);
+                        } finally {
+                            SessionContextHolder.clearSessionContext();
                         }
+                        ThreadUtils.sleep(5000);
                         return false;
                     });
                 }
@@ -56,15 +75,11 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
         }
 
         // 将消息放入内存消息队列
-        if (this.queue.offer(Pair.of(message, transaction))) {
-            if (log.isDebugEnabled()) {
-                String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-                log.debug("RocketMQ fallback memory message: {}, {}", topic, body);
-            }
-            return;
+        Session session = SessionContextHolder.getSession(false);
+        if (!this.queue.offer(Pair.of(Pair.of(message, transaction), session))) {
+            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
+            throw new RuntimeException("RocketMQ fallback queue is full: " + topic + ", " + body);
         }
-        String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
-        throw new RuntimeException("RocketMQ fallback memory queue is full: " + topic + ", " + body);
     }
 
     @Override

+ 40 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMessage.java

@@ -0,0 +1,40 @@
+package com.chelvc.framework.rocketmq.fallback;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * RocketMQ消息
+ *
+ * @author Woody
+ * @date 2024/12/20
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+public class RocketMQMessage implements Serializable {
+    /**
+     * 消息主题
+     */
+    private String topic;
+
+    /**
+     * 消息载体
+     */
+    private Object payload;
+
+    /**
+     * 顺序标识
+     */
+    private String ordering;
+
+    /**
+     * 是否是事务消息
+     */
+    private boolean transactional;
+}

+ 70 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQPersistentProducer.java

@@ -0,0 +1,70 @@
+package com.chelvc.framework.rocketmq.fallback;
+
+import java.util.Objects;
+
+import com.chelvc.framework.base.context.Session;
+import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.util.JacksonUtils;
+import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+
+/**
+ * RocketMQ持久化降级消息生产者实现
+ *
+ * @author Woody
+ * @date 2024/12/20
+ */
+@Slf4j
+public class RocketMQPersistentProducer extends RocketMQFallbackProducer {
+    private final RocketMQStoreClient client;
+
+    public RocketMQPersistentProducer(@NonNull RocketMQStoreClient client, @NonNull Producer delegate) {
+        super(delegate);
+        this.client = client;
+    }
+
+    /**
+     * 保存消息
+     *
+     * @param message     消息实例
+     * @param transaction 消息处理事务
+     */
+    private void save(Message message, Transaction transaction) {
+        String topic = RocketMQContextHolder.topic(message);
+        Object payload = JacksonUtils.deserialize(RocketMQContextHolder.body(message));
+        String ordering = message.getMessageGroup().orElse(null);
+        this.client.save(new RocketMQMessage(topic, payload, ordering, Objects.nonNull(transaction)));
+        if (transaction != null) {
+            try {
+                transaction.commit();
+            } catch (ClientException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    protected void fallback(Message message, Transaction transaction) {
+        if (log.isDebugEnabled()) {
+            String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);
+            log.debug("RocketMQ fallback message: {}, {}", topic, body);
+        }
+
+        Session session = SessionContextHolder.getSession(false);
+        if (session == null && (session = RocketMQContextHolder.getSession(message)) != null) {
+            SessionContextHolder.setSession(session);
+            try {
+                this.save(message, transaction);
+            } finally {
+                SessionContextHolder.clearSessionContext();
+            }
+        } else {
+            this.save(message, transaction);
+        }
+    }
+}

+ 22 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQStoreClient.java

@@ -0,0 +1,22 @@
+package com.chelvc.framework.rocketmq.fallback;
+
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+/**
+ * RocketMQ消息存储服务客户端
+ *
+ * @author Woody
+ * @date 2024/12/19
+ */
+@FeignClient(value = "${rocketmq.store-server:rocketmq}")
+public interface RocketMQStoreClient {
+    /**
+     * 保存RocketMQ消息
+     *
+     * @param message 消息对象
+     */
+    @PostMapping("/rocketmq/message")
+    void save(@RequestBody RocketMQMessage message);
+}