소스 검색

优化MQ降级处理逻辑

woody 4 달 전
부모
커밋
779fc2e17c

+ 2 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMessage.java → framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaFallbackMessage.java

@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
 
 /**
- * Kafka消息
+ * Kafka降级消息
  *
  * @author Woody
  * @date 2024/12/20
@@ -17,7 +17,7 @@ import lombok.experimental.SuperBuilder;
 @SuperBuilder
 @NoArgsConstructor
 @AllArgsConstructor
-public class KafkaMessage implements Serializable {
+public class KafkaFallbackMessage implements Serializable {
     /**
      * 消息主题
      */

+ 4 - 3
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaPersistentSender.java

@@ -31,17 +31,18 @@ public class KafkaPersistentSender<K, V> extends KafkaFallbackSender<K, V> {
             log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
         }
 
-        KafkaMessage message = new KafkaMessage(record.topic(), record.value(), record.key(), record.partition());
+        KafkaFallbackMessage fallback =
+                new KafkaFallbackMessage(record.topic(), record.value(), record.key(), record.partition());
         Session session = SessionContextHolder.getSession(false);
         if (session == null && (session = KafkaContextHolder.getSession(record.headers())) != null) {
             SessionContextHolder.setSession(session);
             try {
-                this.client.save(message);
+                this.client.save(fallback);
             } finally {
                 SessionContextHolder.clearSessionContext();
             }
         } else {
-            this.client.save(message);
+            this.client.save(fallback);
         }
     }
 }

+ 5 - 5
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaStoreClient.java

@@ -10,13 +10,13 @@ import org.springframework.web.bind.annotation.RequestBody;
  * @author Woody
  * @date 2024/12/19
  */
-@FeignClient(value = "${spring.kafka.store-server:kafka}")
+@FeignClient(value = "${spring.kafka.fallback-store-server:kafka}")
 public interface KafkaStoreClient {
     /**
-     * 保存Kafka消息
+     * 保存Kafka降级消息
      *
-     * @param message 消息对象
+     * @param message 降级消息
      */
-    @PostMapping("/kafka/message")
-    void save(@RequestBody KafkaMessage message);
+    @PostMapping("/kafka/fallback/message")
+    void save(@RequestBody KafkaFallbackMessage message);
 }

+ 2 - 2
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMessage.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQFallbackMessage.java

@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
 
 /**
- * RocketMQ消息
+ * RocketMQ降级消息
  *
  * @author Woody
  * @date 2024/12/20
@@ -17,7 +17,7 @@ import lombok.experimental.SuperBuilder;
 @SuperBuilder
 @NoArgsConstructor
 @AllArgsConstructor
-public class RocketMQMessage implements Serializable {
+public class RocketMQFallbackMessage implements Serializable {
     /**
      * 消息主题
      */

+ 3 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQPersistentProducer.java

@@ -38,7 +38,9 @@ public class RocketMQPersistentProducer extends RocketMQFallbackProducer {
         String topic = RocketMQContextHolder.topic(message);
         Object payload = JacksonUtils.deserialize(RocketMQContextHolder.body(message));
         String ordering = message.getMessageGroup().orElse(null);
-        this.client.save(new RocketMQMessage(topic, payload, ordering, Objects.nonNull(transaction)));
+        RocketMQFallbackMessage fallback =
+                new RocketMQFallbackMessage(topic, payload, ordering, Objects.nonNull(transaction));
+        this.client.save(fallback);
         if (transaction != null) {
             try {
                 transaction.commit();

+ 5 - 5
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQStoreClient.java

@@ -10,13 +10,13 @@ import org.springframework.web.bind.annotation.RequestBody;
  * @author Woody
  * @date 2024/12/19
  */
-@FeignClient(value = "${rocketmq.store-server:rocketmq}")
+@FeignClient(value = "${rocketmq.fallback-store-server:rocketmq}")
 public interface RocketMQStoreClient {
     /**
-     * 保存RocketMQ消息
+     * 保存RocketMQ降级消息
      *
-     * @param message 消息对象
+     * @param message 降级消息
      */
-    @PostMapping("/rocketmq/message")
-    void save(@RequestBody RocketMQMessage message);
+    @PostMapping("/rocketmq/fallback/message")
+    void save(@RequestBody RocketMQFallbackMessage message);
 }