Ver Fonte

新增RocketMQ消息发送工具方法

woody há 2 anos atrás
pai
commit
ca71e8a5ff

+ 156 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -41,6 +41,7 @@ import javassist.bytecode.annotation.StringMemberValue;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
@@ -342,6 +343,52 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
         }
     }
 
+    /**
+     * 异步发送消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param isolate 是否需要环境隔离
+     */
+    public static void asyncSend(@NonNull String topic, @NonNull Object payload, boolean isolate) {
+        asyncSend(topic, payload, isolate, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult result) {
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                log.warn("RocketMQ message send failed: {}", e.getMessage(), e);
+            }
+        });
+    }
+
+    /**
+     * 异步发送消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param callback 发送结果回调接口
+     */
+    public static void asyncSend(@NonNull String topic, @NonNull Object payload, @NonNull SendCallback callback) {
+        asyncSend(topic, payload, true, callback);
+    }
+
+    /**
+     * 异步发送消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param isolate  是否需要环境隔离
+     * @param callback 发送结果回调接口
+     */
+    public static void asyncSend(@NonNull String topic, @NonNull Object payload, boolean isolate,
+                                 @NonNull SendCallback callback) {
+        getRocketMQTemplate().asyncSend(
+                isolate ? getProfileTopic(topic) : topic, payload2message(payload), callback
+        );
+    }
+
     /**
      * 发送顺序消息
      *
@@ -368,6 +415,115 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
         );
     }
 
+    /**
+     * 异步发送顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     */
+    public static void asyncSendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key) {
+        asyncSendOrderly(topic, payload, key, true);
+    }
+
+    /**
+     * 异步发送顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     * @param isolate 是否需要环境隔离
+     */
+    public static void asyncSendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key,
+                                        boolean isolate) {
+        asyncSendOrderly(topic, payload, key, isolate, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult result) {
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                log.warn("RocketMQ message send failed: {}", e.getMessage(), e);
+            }
+        });
+    }
+
+    /**
+     * 异步发送顺序消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param key      唯一标识
+     * @param callback 发送结果回调接口
+     */
+    public static void asyncSendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key,
+                                        @NonNull SendCallback callback) {
+        asyncSendOrderly(topic, payload, key, true, callback);
+    }
+
+    /**
+     * 异步发送顺序消息
+     *
+     * @param topic    消息主题
+     * @param payload  消息内容
+     * @param key      唯一标识
+     * @param isolate  是否需要环境隔离
+     * @param callback 发送结果回调接口
+     */
+    public static void asyncSendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key,
+                                        boolean isolate, @NonNull SendCallback callback) {
+        getRocketMQTemplate().asyncSendOrderly(
+                isolate ? getProfileTopic(topic) : topic, payload2message(payload), String.valueOf(key), callback
+        );
+    }
+
+    /**
+     * 发送单向消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     */
+    public static void sendOneWay(@NonNull String topic, @NonNull Object payload) {
+        sendOneWay(topic, payload, true);
+    }
+
+    /**
+     * 发送单向消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param isolate 是否需要环境隔离
+     */
+    public static void sendOneWay(@NonNull String topic, @NonNull Object payload, boolean isolate) {
+        getRocketMQTemplate().sendOneWay(isolate ? getProfileTopic(topic) : topic, payload2message(payload));
+    }
+
+    /**
+     * 发送单向顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     */
+    public static void sendOneWayOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key) {
+        sendOneWayOrderly(topic, payload, key, true);
+    }
+
+    /**
+     * 发送单向顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     * @param isolate 是否需要环境隔离
+     */
+    public static void sendOneWayOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key,
+                                         boolean isolate) {
+        getRocketMQTemplate().sendOneWayOrderly(
+                isolate ? getProfileTopic(topic) : topic, payload2message(payload), String.valueOf(key)
+        );
+    }
+
     /**
      * 发送事务消息
      *