Selaa lähdekoodia

重载Kafka、RocketMQ事务消息发送方法

Woody 1 kuukausi sitten
vanhempi
commit
7a80d7bb2c

+ 46 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -218,6 +218,18 @@ public final class KafkaContextHolder {
         }
     }
 
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param callback 本地事务回调函数
+     * @param <V>      消息载体类型
+     */
+    public static <V> void send(@NonNull String topic, @NonNull V payload, @NonNull Consumer<String> callback) {
+        send(topic, payload, null, null, callback);
+    }
+
     /**
      * 发送事务消息
      *
@@ -230,6 +242,21 @@ public final class KafkaContextHolder {
         send(topic, payload, null, null, callback);
     }
 
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param ordering 顺序消息标识
+     * @param callback 本地事务回调函数
+     * @param <K>      顺序标识类型
+     * @param <V>      消息载体类型
+     */
+    public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering,
+                                   @NonNull Consumer<String> callback) {
+        send(topic, payload, ordering, null, callback);
+    }
+
     /**
      * 发送事务消息
      *
@@ -245,6 +272,25 @@ public final class KafkaContextHolder {
         send(topic, payload, ordering, null, callback);
     }
 
+    /**
+     * 发送事务消息
+     *
+     * @param topic     消息主题
+     * @param payload   消息内容
+     * @param ordering  顺序消息标识
+     * @param partition 指定分区
+     * @param callback  本地事务回调函数
+     * @param <K>       顺序标识类型
+     * @param <V>       消息载体类型
+     */
+    public static <K, V> void send(@NonNull String topic, @NonNull V payload, K ordering, Integer partition,
+                                   @NonNull Consumer<String> callback) {
+        send(topic, payload, ordering, partition, id -> {
+            callback.accept(id);
+            return true;
+        });
+    }
+
     /**
      * 发送事务消息
      *

+ 30 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -7,6 +7,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -306,6 +307,18 @@ public final class RocketMQContextHolder {
         getProducer().sendAsync(message);
     }
 
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param callback 本地事务回调函数
+     * @param <T>      消息类型
+     */
+    public static <T> void send(@NonNull String topic, @NonNull T payload, @NonNull Consumer<MessageId> callback) {
+        send(topic, payload, null, callback);
+    }
+
     /**
      * 发送事务消息
      *
@@ -318,6 +331,23 @@ public final class RocketMQContextHolder {
         send(topic, payload, null, callback);
     }
 
+    /**
+     * 发送事务消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param ordering 顺序消息标识
+     * @param callback 本地事务回调函数
+     * @param <T>      消息类型
+     */
+    public static <T> void send(@NonNull String topic, @NonNull T payload, String ordering,
+                                @NonNull Consumer<MessageId> callback) {
+        send(topic, payload, ordering, id -> {
+            callback.accept(id);
+            return true;
+        });
+    }
+
     /**
      * 发送事务消息
      *