Ver Fonte

优化Kafka事务消息处理逻辑

Woody há 13 horas atrás
pai
commit
3c2102792b

+ 2 - 16
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -267,22 +267,8 @@ public final class KafkaContextHolder {
         DelayRedisQueue<String> queue = getTransactionQueue();
         queue.offer(message, Duration.ofSeconds(60));
 
-        // 处理本地事务
-        boolean committable;
-        try {
-            committable = callback.test(id);
-        } catch (Exception e) {
-            // 本地事务处理异常则删除事务延时消息
-            try {
-                queue.remove(message);
-            } catch (Throwable t) {
-                log.error("Kafka transaction message remove failed: {}", message, t);
-            }
-            throw e;
-        }
-
-        // 本地事务处理成功则发送Kafka消息
-        if (committable) {
+        // 处理本地事务并发送消息
+        if (callback.test(id)) {
             KafkaTemplate<K, V> template = getKafkaTemplate();
             try {
                 template.send(topic, partition, ordering, payload).get();

+ 1 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -341,7 +341,7 @@ public final class RocketMQContextHolder {
             throw new RuntimeException(e);
         }
 
-        // 处理本地事务
+        // 处理本地事务并执行事务消息提交或回滚
         try {
             if (callback.test(id)) {
                 transaction.commit();