|
@@ -1,6 +1,5 @@
|
|
|
package com.chelvc.framework.rocketmq.context;
|
|
|
|
|
|
-import java.lang.reflect.Method;
|
|
|
import java.lang.reflect.ParameterizedType;
|
|
|
import java.lang.reflect.Type;
|
|
|
import java.time.Duration;
|
|
@@ -10,8 +9,6 @@ import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -20,53 +17,29 @@ import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
import com.chelvc.framework.base.context.JacksonContextHolder;
|
|
|
import com.chelvc.framework.base.context.SessionContextHolder;
|
|
|
import com.chelvc.framework.base.model.Session;
|
|
|
-import com.chelvc.framework.common.model.Delayer;
|
|
|
import com.chelvc.framework.common.util.AssertUtils;
|
|
|
-import com.chelvc.framework.common.util.ObjectUtils;
|
|
|
-import com.chelvc.framework.common.util.StringUtils;
|
|
|
-import com.chelvc.framework.rocketmq.annotation.DelayConsumer;
|
|
|
+import com.chelvc.framework.common.util.DateUtils;
|
|
|
import com.chelvc.framework.rocketmq.interceptor.SessionRocketMQListener;
|
|
|
-import com.chelvc.framework.rocketmq.model.Delay;
|
|
|
import com.chelvc.framework.rocketmq.model.MessageContext;
|
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionChecker;
|
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionExecutor;
|
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionHandler;
|
|
|
import com.google.common.collect.Maps;
|
|
|
-import com.google.common.collect.Sets;
|
|
|
-import javassist.ClassPool;
|
|
|
-import javassist.CtClass;
|
|
|
-import javassist.CtConstructor;
|
|
|
-import javassist.CtMethod;
|
|
|
-import javassist.LoaderClassPath;
|
|
|
-import javassist.bytecode.AnnotationsAttribute;
|
|
|
-import javassist.bytecode.ClassFile;
|
|
|
-import javassist.bytecode.ConstPool;
|
|
|
-import javassist.bytecode.SignatureAttribute;
|
|
|
-import javassist.bytecode.annotation.Annotation;
|
|
|
-import javassist.bytecode.annotation.StringMemberValue;
|
|
|
import lombok.NonNull;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
-import org.apache.rocketmq.client.producer.SendStatus;
|
|
|
import org.apache.rocketmq.client.producer.TransactionSendResult;
|
|
|
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
|
|
|
-import org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
|
|
|
-import org.springframework.aop.framework.AopProxyUtils;
|
|
|
import org.springframework.beans.BeansException;
|
|
|
-import org.springframework.beans.factory.SmartInitializingSingleton;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
import org.springframework.context.ApplicationContextAware;
|
|
|
-import org.springframework.core.env.Environment;
|
|
|
import org.springframework.messaging.Message;
|
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
|
import org.springframework.messaging.support.GenericMessage;
|
|
@@ -78,49 +51,31 @@ import org.springframework.util.CollectionUtils;
|
|
|
* RocketMQ上下文工具类
|
|
|
*
|
|
|
* @author Woody
|
|
|
- * @date 2021/9/6
|
|
|
+ * @date 2023/9/9
|
|
|
*/
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
@RocketMQTransactionListener
|
|
|
-@RequiredArgsConstructor(onConstructor = @__(@Autowired))
|
|
|
-public class RocketMQContextHolder implements RocketMQLocalTransactionListener, ApplicationContextAware,
|
|
|
- SmartInitializingSingleton {
|
|
|
+public class RocketMQContextHolder implements RocketMQLocalTransactionListener, ApplicationContextAware {
|
|
|
/**
|
|
|
* RocketMQ主题消息头标识
|
|
|
*/
|
|
|
private static final String ROCKETMQ_TYPE_HEADER = "type";
|
|
|
|
|
|
- /**
|
|
|
- * 对象池
|
|
|
- */
|
|
|
- private static final ClassPool CLASS_POOL = ClassPool.getDefault();
|
|
|
-
|
|
|
- /**
|
|
|
- * 自定义延时消息延时器计数器
|
|
|
- */
|
|
|
- private static final AtomicLong DELAYER_COUNTER = new AtomicLong(0);
|
|
|
-
|
|
|
- /**
|
|
|
- * 自定义延时消息主题名称集合
|
|
|
- */
|
|
|
- private static final Set<String> DELAYER_TOPIC_NAME = Sets.newConcurrentHashSet();
|
|
|
-
|
|
|
/**
|
|
|
* 异常上下文
|
|
|
*/
|
|
|
private static final ThreadLocal<RuntimeException> EXCEPTION_CONTEXT = new ThreadLocal<>();
|
|
|
|
|
|
/**
|
|
|
- * 自定义延时消息延时器包名
|
|
|
+ * 消息类型/本地事务检测器映射表
|
|
|
*/
|
|
|
- private static final String DELAYER_PACKAGE =
|
|
|
- RocketMQContextHolder.class.getPackage().getName().replace("context", "delayer");
|
|
|
+ private static Map<String, RocketMQTransactionChecker<?>> MESSAGE_CHECKER_MAPPING = Collections.emptyMap();
|
|
|
|
|
|
/**
|
|
|
- * 环境名称
|
|
|
+ * 消息类型/本地事务执行器映射表
|
|
|
*/
|
|
|
- private static String PROFILE;
|
|
|
+ private static Map<String, RocketMQTransactionExecutor<?>> MESSAGE_EXECUTOR_MAPPING = Collections.emptyMap();
|
|
|
|
|
|
/**
|
|
|
* RocketMQ模版实例
|
|
@@ -132,26 +87,9 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
*/
|
|
|
private static MessageConverter MESSAGE_CONVERTER;
|
|
|
|
|
|
- /**
|
|
|
- * 消息类型/本地事务检测器映射表
|
|
|
- */
|
|
|
- private static Map<String, RocketMQTransactionChecker<?>> MESSAGE_CHECKER_MAPPING = Collections.emptyMap();
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息类型/本地事务执行器映射表
|
|
|
- */
|
|
|
- private static Map<String, RocketMQTransactionExecutor<?>> MESSAGE_EXECUTOR_MAPPING = Collections.emptyMap();
|
|
|
-
|
|
|
- static {
|
|
|
- CLASS_POOL.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
- // 初始化环境名称
|
|
|
- PROFILE = applicationContext.getEnvironment().getProperty("spring.profiles.active");
|
|
|
-
|
|
|
// 初始化RocketMQ模版实例
|
|
|
ROCKETMQ_TEMPLATE = applicationContext.getBean(RocketMQTemplate.class);
|
|
|
|
|
@@ -195,14 +133,6 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void afterSingletonsInstantiated() {
|
|
|
- // 初始化延时消息延时器
|
|
|
- ApplicationContext context = ApplicationContextHolder.getApplicationContext();
|
|
|
- Map<String, Object> consumers = context.getBeansWithAnnotation(DelayConsumer.class);
|
|
|
- consumers.values().forEach(this::registerMessageDelayerContainer);
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
public RocketMQLocalTransactionState checkLocalTransaction(@NonNull Message message) {
|
|
@@ -230,6 +160,24 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取RocketMQ模版实例
|
|
|
+ *
|
|
|
+ * @return RocketMQ模版实例
|
|
|
+ */
|
|
|
+ public static RocketMQTemplate getRocketMQTemplate() {
|
|
|
+ return Objects.requireNonNull(ROCKETMQ_TEMPLATE, "Rocketmq template has not been initialized");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取消息转换器实例
|
|
|
+ *
|
|
|
+ * @return 消息转换器实例
|
|
|
+ */
|
|
|
+ public static MessageConverter getMessageConverter() {
|
|
|
+ return Objects.requireNonNull(MESSAGE_CONVERTER, "Message converter has not been initialized");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 查找消费者消息类型
|
|
|
*
|
|
@@ -264,104 +212,6 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
return Object.class;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 注册消息延时器消费者容器
|
|
|
- *
|
|
|
- * @param consumer 消费者实例
|
|
|
- */
|
|
|
- private void registerMessageDelayerContainer(Object consumer) {
|
|
|
- // 获取延时消息主题
|
|
|
- Class<?> clazz = AopProxyUtils.ultimateTargetClass(consumer);
|
|
|
- DelayConsumer annotation = clazz.getAnnotation(DelayConsumer.class);
|
|
|
- String topic = annotation.topic();
|
|
|
- if (StringUtils.isEmpty(topic)) {
|
|
|
- RocketMQMessageListener listener = clazz.getAnnotation(RocketMQMessageListener.class);
|
|
|
- topic = ObjectUtils.ifNull(listener, RocketMQMessageListener::topic);
|
|
|
- }
|
|
|
- AssertUtils.check(StringUtils.nonEmpty(topic), "Message delayer topic unassigned: " + clazz.getName());
|
|
|
- Environment environment = ApplicationContextHolder.getApplicationContext().getEnvironment();
|
|
|
- topic = environment.resolvePlaceholders(topic);
|
|
|
- if (!DELAYER_TOPIC_NAME.add(topic)) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // 获取延时消息类型
|
|
|
- Class<?> type = annotation.type();
|
|
|
- if (type == Object.class) {
|
|
|
- type = (Class<?>) lookupConsumerMessageType(clazz);
|
|
|
- }
|
|
|
- AssertUtils.check(type != Object.class, "Message delayer type unassigned: " + clazz.getName());
|
|
|
-
|
|
|
- // 注册消息延时器消费者容器
|
|
|
- try {
|
|
|
- Class<?> delayer = this.generateMessageDelayerClass(topic, type);
|
|
|
- ListenerContainerConfiguration configuration =
|
|
|
- ApplicationContextHolder.getApplicationContext().getBean(ListenerContainerConfiguration.class);
|
|
|
- Method method = ListenerContainerConfiguration.class.getDeclaredMethod(
|
|
|
- "registerContainer", String.class, Object.class
|
|
|
- );
|
|
|
- method.setAccessible(true);
|
|
|
- method.invoke(configuration, delayer.getName(), delayer.newInstance());
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 动态生成消息延时器类对象
|
|
|
- *
|
|
|
- * @param topic 延时器消息主题
|
|
|
- * @param type 消息类型
|
|
|
- * @return 消息延时器类对象
|
|
|
- * @throws Exception 处理异常
|
|
|
- */
|
|
|
- private Class<?> generateMessageDelayerClass(String topic, Class<?> type) throws Exception {
|
|
|
- // 构建Class及实现接口
|
|
|
- CtClass delayer = CLASS_POOL.makeClass(
|
|
|
- String.format("%s.CustomMessageDelayer_%d", DELAYER_PACKAGE, DELAYER_COUNTER.incrementAndGet())
|
|
|
- );
|
|
|
- delayer.setInterfaces(new CtClass[]{CLASS_POOL.get(RocketMQListener.class.getName())});
|
|
|
- delayer.setGenericSignature(new SignatureAttribute.ClassSignature(null, null,
|
|
|
- new SignatureAttribute.ClassType[]{new SignatureAttribute.ClassType(
|
|
|
- RocketMQListener.class.getName(), new SignatureAttribute.TypeArgument[]{
|
|
|
- new SignatureAttribute.TypeArgument(
|
|
|
- new SignatureAttribute.ClassType(Delayer.class.getName())
|
|
|
- )})}).encode()
|
|
|
- );
|
|
|
-
|
|
|
- // 添加无参构造方法
|
|
|
- CtConstructor constructor = new CtConstructor(new CtClass[]{}, delayer);
|
|
|
- constructor.setBody("{}");
|
|
|
- delayer.addConstructor(constructor);
|
|
|
-
|
|
|
- // 添加自定义延时消息消费方法实现
|
|
|
- delayer.addMethod(CtMethod.make(String.format(
|
|
|
- "public void onMessage(%s message){%s.processing(\"%s\", %s.class, message);}",
|
|
|
- Delayer.class.getName(),
|
|
|
- RocketMQContextHolder.class.getName(),
|
|
|
- topic,
|
|
|
- type.getName()
|
|
|
- ), delayer));
|
|
|
-
|
|
|
- // 添加泛型参数接口方法桥接
|
|
|
- delayer.addMethod(CtMethod.make(String.format(
|
|
|
- "public void onMessage(Object message){this.onMessage((%s) message);}",
|
|
|
- Delayer.class.getName()
|
|
|
- ), delayer));
|
|
|
-
|
|
|
- // 设置@RocketMQMessageListener注解
|
|
|
- String unique = getDelayerTopic(topic);
|
|
|
- ClassFile file = delayer.getClassFile();
|
|
|
- ConstPool constPool = file.getConstPool();
|
|
|
- AnnotationsAttribute attribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
|
|
|
- Annotation annotation = new Annotation(RocketMQMessageListener.class.getName(), constPool);
|
|
|
- annotation.addMemberValue("topic", new StringMemberValue(unique, constPool));
|
|
|
- annotation.addMemberValue("consumerGroup", new StringMemberValue(unique, constPool));
|
|
|
- attribute.addAnnotation(annotation);
|
|
|
- file.addAttribute(attribute);
|
|
|
- return delayer.toClass();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 根据当前环境名称获取实际消息主题
|
|
|
*
|
|
@@ -369,35 +219,19 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
* @return 消息主题
|
|
|
*/
|
|
|
public static String getProfileTopic(@NonNull String topic) {
|
|
|
- return Objects.requireNonNull(PROFILE, "Profile has not been initialized") + "-" + topic;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取自定义延时消息实际消息主题
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @return 消息主题
|
|
|
- */
|
|
|
- public static String getDelayerTopic(@NonNull String topic) {
|
|
|
- return topic + "-delay";
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取RocketMQ模版实例
|
|
|
- *
|
|
|
- * @return RocketMQ模版实例
|
|
|
- */
|
|
|
- public static RocketMQTemplate getRocketMQTemplate() {
|
|
|
- return Objects.requireNonNull(ROCKETMQ_TEMPLATE, "Rocketmq template has not been initialized");
|
|
|
+ return ApplicationContextHolder.getProfile() + "-" + topic;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取消息转换器实例
|
|
|
+ * 获取消息类型
|
|
|
*
|
|
|
- * @return 消息转换器实例
|
|
|
+ * @param message 消息对象实例
|
|
|
+ * @return 消息类型
|
|
|
*/
|
|
|
- public static MessageConverter getMessageConverter() {
|
|
|
- return Objects.requireNonNull(MESSAGE_CONVERTER, "Message converter has not been initialized");
|
|
|
+ public static String getMessageType(@NonNull Message<?> message) {
|
|
|
+ String type = message.getHeaders().get(ROCKETMQ_TYPE_HEADER, String.class);
|
|
|
+ AssertUtils.nonnull(type, "Rocketmq message type not found: " + message);
|
|
|
+ return type;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -433,18 +267,6 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
return (T) converter.fromMessage(new GenericMessage<>(payload), type);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 获取消息类型
|
|
|
- *
|
|
|
- * @param message 消息对象实例
|
|
|
- * @return 消息类型
|
|
|
- */
|
|
|
- public static String getMessageType(@NonNull Message<?> message) {
|
|
|
- String type = message.getHeaders().get(ROCKETMQ_TYPE_HEADER, String.class);
|
|
|
- AssertUtils.nonnull(type, "Rocketmq message type not found: " + message);
|
|
|
- return type;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 发送消息
|
|
|
*
|
|
@@ -467,93 +289,51 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送延时消息
|
|
|
+ * 发送自定义延时消息
|
|
|
*
|
|
|
* @param topic 消息主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param delay 延时类型
|
|
|
+ * @param delay 延时时间
|
|
|
*/
|
|
|
- public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Delay delay) {
|
|
|
+ public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Date delay) {
|
|
|
send(topic, payload, delay, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送延时消息
|
|
|
+ * 发送自定义延时消息
|
|
|
*
|
|
|
* @param topic 消息主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param delay 延时类型
|
|
|
+ * @param delay 延时时间
|
|
|
* @param isolate 是否需要环境隔离
|
|
|
*/
|
|
|
- private static void send(@NonNull String topic, @NonNull Object payload, @NonNull Delay delay, boolean isolate) {
|
|
|
- RocketMQTemplate template = getRocketMQTemplate();
|
|
|
- int timeout = template.getProducer().getSendMsgTimeout();
|
|
|
- SendResult result = template.syncSend(
|
|
|
- isolate ? getProfileTopic(topic) : topic, payload2message(payload), timeout, delay.level()
|
|
|
+ public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Date delay, boolean isolate) {
|
|
|
+ getRocketMQTemplate().syncSendDeliverTimeMills(
|
|
|
+ isolate ? getProfileTopic(topic) : topic, payload2message(payload), delay.getTime()
|
|
|
);
|
|
|
- SendStatus status = ObjectUtils.ifNull(result, SendResult::getSendStatus);
|
|
|
- if (status != SendStatus.SEND_OK) {
|
|
|
- throw new IllegalStateException("Send delay message failed: " + status);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送自定义延时消息
|
|
|
*
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param delaying 延时时长
|
|
|
- */
|
|
|
- public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Duration delaying) {
|
|
|
- Delay delay = Delay.close(delaying);
|
|
|
- if (delay == Delay.S1) {
|
|
|
- send(topic, payload);
|
|
|
- } else if (delay.time() == delaying.getSeconds()) {
|
|
|
- send(topic, payload, delay);
|
|
|
- } else {
|
|
|
- String json = JacksonContextHolder.serialize(payload);
|
|
|
- long expiration = System.currentTimeMillis() + Math.max(delaying.toMillis(), 0);
|
|
|
- send(getDelayerTopic(getProfileTopic(topic)), new Delayer<>(json, expiration), delay, false);
|
|
|
- }
|
|
|
+ * @param topic 消息主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param delay 延时时长
|
|
|
+ */
|
|
|
+ public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Duration delay) {
|
|
|
+ send(topic, payload, delay, true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送自定义延时消息
|
|
|
*
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param delaying 延时时间
|
|
|
- */
|
|
|
- public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Date delaying) {
|
|
|
- long duration = (delaying.getTime() - System.currentTimeMillis()) / 1000;
|
|
|
- Delay delay = Delay.close(duration);
|
|
|
- if (delay == Delay.S1) {
|
|
|
- send(topic, payload);
|
|
|
- } else if (delay.time() == duration) {
|
|
|
- send(topic, payload, delay);
|
|
|
- } else {
|
|
|
- String json = JacksonContextHolder.serialize(payload);
|
|
|
- send(getDelayerTopic(getProfileTopic(topic)), new Delayer<>(json, delaying.getTime()), delay, false);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 延时消息处理,供消息延时器调用
|
|
|
- *
|
|
|
* @param topic 消息主题
|
|
|
- * @param type 消息类型
|
|
|
- * @param message 延时消息实例
|
|
|
- */
|
|
|
- public static void processing(@NonNull String topic, @NonNull Class<?> type, @NonNull Delayer<String> message) {
|
|
|
- long duration = (message.getExpiration() - System.currentTimeMillis()) / 1000;
|
|
|
- Delay delay = Delay.close(duration);
|
|
|
- if (delay == Delay.S1) {
|
|
|
- send(topic, JacksonContextHolder.deserialize(message.getPayload(), type), false);
|
|
|
- } else if (delay.time() == duration) {
|
|
|
- send(topic, JacksonContextHolder.deserialize(message.getPayload(), type), delay, false);
|
|
|
- } else {
|
|
|
- send(getDelayerTopic(topic), message, delay, false);
|
|
|
- }
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param delay 延时时长
|
|
|
+ * @param isolate 是否需要环境隔离
|
|
|
+ */
|
|
|
+ public static void send(@NonNull String topic, @NonNull Object payload, @NonNull Duration delay, boolean isolate) {
|
|
|
+ send(topic, payload, DateUtils.add(delay), isolate);
|
|
|
}
|
|
|
|
|
|
/**
|