|
@@ -12,16 +12,21 @@ import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
+import java.util.function.Consumer;
|
|
import java.util.function.Supplier;
|
|
import java.util.function.Supplier;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
import com.chelvc.framework.base.context.JacksonContextHolder;
|
|
import com.chelvc.framework.base.context.JacksonContextHolder;
|
|
|
|
+import com.chelvc.framework.base.context.SessionContextHolder;
|
|
|
|
+import com.chelvc.framework.base.model.Session;
|
|
import com.chelvc.framework.base.util.ObjectUtils;
|
|
import com.chelvc.framework.base.util.ObjectUtils;
|
|
import com.chelvc.framework.base.util.StringUtils;
|
|
import com.chelvc.framework.base.util.StringUtils;
|
|
import com.chelvc.framework.rocketmq.annotation.MessageDelayer;
|
|
import com.chelvc.framework.rocketmq.annotation.MessageDelayer;
|
|
|
|
+import com.chelvc.framework.rocketmq.interceptor.SessionRocketMQListener;
|
|
import com.chelvc.framework.rocketmq.model.Delay;
|
|
import com.chelvc.framework.rocketmq.model.Delay;
|
|
import com.chelvc.framework.rocketmq.model.DelayMessage;
|
|
import com.chelvc.framework.rocketmq.model.DelayMessage;
|
|
|
|
+import com.chelvc.framework.rocketmq.model.MessageContext;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionChecker;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionChecker;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionExecutor;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionExecutor;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionHandler;
|
|
import com.chelvc.framework.rocketmq.producer.RocketMQTransactionHandler;
|
|
@@ -63,6 +68,7 @@ import org.springframework.context.ApplicationContextAware;
|
|
import org.springframework.core.env.Environment;
|
|
import org.springframework.core.env.Environment;
|
|
import org.springframework.messaging.Message;
|
|
import org.springframework.messaging.Message;
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
|
|
+import org.springframework.messaging.support.GenericMessage;
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.util.Assert;
|
|
import org.springframework.util.Assert;
|
|
@@ -72,14 +78,14 @@ import org.springframework.util.CollectionUtils;
|
|
* RocketMQ上下文工具类
|
|
* RocketMQ上下文工具类
|
|
*
|
|
*
|
|
* @author Woody
|
|
* @author Woody
|
|
- * @date 2023/4/5
|
|
|
|
|
|
+ * @date 2021/9/6
|
|
*/
|
|
*/
|
|
@Slf4j
|
|
@Slf4j
|
|
@Component
|
|
@Component
|
|
@RocketMQTransactionListener
|
|
@RocketMQTransactionListener
|
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
|
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
|
|
-public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
|
|
- ApplicationContextAware, SmartInitializingSingleton {
|
|
|
|
|
|
+public class RocketMQContextHolder implements RocketMQLocalTransactionListener, ApplicationContextAware,
|
|
|
|
+ SmartInitializingSingleton {
|
|
/**
|
|
/**
|
|
* RocketMQ主题消息头标识
|
|
* RocketMQ主题消息头标识
|
|
*/
|
|
*/
|
|
@@ -121,6 +127,11 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
*/
|
|
*/
|
|
private static RocketMQTemplate ROCKETMQ_TEMPLATE;
|
|
private static RocketMQTemplate ROCKETMQ_TEMPLATE;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 消息转换器
|
|
|
|
+ */
|
|
|
|
+ private static MessageConverter MESSAGE_CONVERTER;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 消息类型/本地事务检测器映射表
|
|
* 消息类型/本地事务检测器映射表
|
|
*/
|
|
*/
|
|
@@ -135,40 +146,222 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
CLASS_POOL.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
|
|
CLASS_POOL.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
|
|
}
|
|
}
|
|
|
|
|
|
- private final RocketMQMessageConverter rocketmqMessageConverter;
|
|
|
|
|
|
+ @Override
|
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
|
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
|
+ // 初始化环境名称
|
|
|
|
+ PROFILE = applicationContext.getEnvironment().getProperty("spring.profiles.active");
|
|
|
|
+
|
|
|
|
+ // 初始化RocketMQ模版实例
|
|
|
|
+ ROCKETMQ_TEMPLATE = applicationContext.getBean(RocketMQTemplate.class);
|
|
|
|
+
|
|
|
|
+ // 初始化消息转换器
|
|
|
|
+ MESSAGE_CONVERTER = applicationContext.getBean(RocketMQMessageConverter.class).getMessageConverter();
|
|
|
|
+
|
|
|
|
+ // 初始化消息类型/本地事务检测实例映射表
|
|
|
|
+ Collection<RocketMQTransactionHandler> handlers =
|
|
|
|
+ applicationContext.getBeansOfType(RocketMQTransactionHandler.class).values();
|
|
|
|
+ List<RocketMQTransactionChecker> checkers = handlers.stream()
|
|
|
|
+ .filter(handler -> handler instanceof RocketMQTransactionChecker)
|
|
|
|
+ .map(handler -> (RocketMQTransactionChecker) handler).collect(Collectors.toList());
|
|
|
|
+ if (!CollectionUtils.isEmpty(checkers)) {
|
|
|
|
+ log.info("Loading rocketmq transaction checkers: {}", checkers);
|
|
|
|
+ MESSAGE_CHECKER_MAPPING = Maps.newHashMapWithExpectedSize(checkers.size());
|
|
|
|
+ checkers.forEach(checker -> {
|
|
|
|
+ Class<?> clazz = checker.getPayloadClass();
|
|
|
|
+ Assert.notNull(clazz, "Rocketmq transaction checker payload class unspecified: " + checker);
|
|
|
|
+ String type = clazz.getName();
|
|
|
|
+ Assert.isTrue(!MESSAGE_CHECKER_MAPPING.containsKey(type),
|
|
|
|
+ "Rocketmq transaction checker payload class already exists: " + checker);
|
|
|
|
+ MESSAGE_CHECKER_MAPPING.put(type, checker);
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 初始化消息类型/本地事务执行实例映射表
|
|
|
|
+ List<RocketMQTransactionExecutor> executors = handlers.stream()
|
|
|
|
+ .filter(handler -> handler instanceof RocketMQTransactionExecutor)
|
|
|
|
+ .map(handler -> (RocketMQTransactionExecutor) handler).collect(Collectors.toList());
|
|
|
|
+ if (!CollectionUtils.isEmpty(executors)) {
|
|
|
|
+ log.info("Loading rocketmq transaction executors: {}", executors);
|
|
|
|
+ MESSAGE_EXECUTOR_MAPPING = Maps.newHashMapWithExpectedSize(executors.size());
|
|
|
|
+ executors.forEach(executor -> {
|
|
|
|
+ Class<?> clazz = executor.getPayloadClass();
|
|
|
|
+ Assert.notNull(clazz, "Rocketmq transaction executor payload class unspecified: " + executor);
|
|
|
|
+ String type = clazz.getName();
|
|
|
|
+ Assert.isTrue(!MESSAGE_EXECUTOR_MAPPING.containsKey(type),
|
|
|
|
+ "Rocketmq transaction executor payload class already exists: " + executor);
|
|
|
|
+ MESSAGE_EXECUTOR_MAPPING.put(type, executor);
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void afterSingletonsInstantiated() {
|
|
|
|
+ // 初始化延时消息延时器
|
|
|
|
+ ApplicationContext context = ApplicationContextHolder.getApplicationContext();
|
|
|
|
+ Map<String, Object> consumers = context.getBeansWithAnnotation(MessageDelayer.class);
|
|
|
|
+ consumers.values().forEach(this::registerMessageDelayerContainer);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
|
+ public RocketMQLocalTransactionState checkLocalTransaction(@NonNull Message message) {
|
|
|
|
+ RocketMQTransactionChecker checker = Objects.requireNonNull(
|
|
|
|
+ MESSAGE_CHECKER_MAPPING.get(getMessageType(message)), "Rocketmq transaction checker not found"
|
|
|
|
+ );
|
|
|
|
+ return checker.check(message, message2payload(message, checker.getPayloadClass()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
|
+ public RocketMQLocalTransactionState executeLocalTransaction(@NonNull Message message, Object arg) {
|
|
|
|
+ try {
|
|
|
|
+ if (arg instanceof Supplier) {
|
|
|
|
+ return ((Supplier<RocketMQLocalTransactionState>) arg).get();
|
|
|
|
+ }
|
|
|
|
+ RocketMQTransactionExecutor executor = Objects.requireNonNull(
|
|
|
|
+ MESSAGE_EXECUTOR_MAPPING.get(getMessageType(message)), "Rocketmq transaction executor not found"
|
|
|
|
+ );
|
|
|
|
+ return executor.execute(message, message2payload(message, executor.getPayloadClass()), arg);
|
|
|
|
+ } catch (RuntimeException e) {
|
|
|
|
+ // 将异常存入上下文
|
|
|
|
+ EXCEPTION_CONTEXT.set(e);
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 获取消息对象类型
|
|
|
|
|
|
+ * 查找消费者消息类型
|
|
*
|
|
*
|
|
- * @param clazz 消费者对象类型
|
|
|
|
|
|
+ * @param target 消费者对象类型
|
|
* @return 消息对象类型
|
|
* @return 消息对象类型
|
|
*/
|
|
*/
|
|
- private static Class<?> getMessageType(Class<?> clazz) {
|
|
|
|
- Type genericInterface = null;
|
|
|
|
- while (Objects.nonNull(clazz)) {
|
|
|
|
- Type[] interfaces = clazz.getGenericInterfaces();
|
|
|
|
|
|
+ public static Type lookupConsumerMessageType(@NonNull Class<?> target) {
|
|
|
|
+ Type parameterized = null;
|
|
|
|
+ while (Objects.nonNull(target)) {
|
|
|
|
+ Type[] interfaces = target.getGenericInterfaces();
|
|
for (Type type : interfaces) {
|
|
for (Type type : interfaces) {
|
|
- if (type instanceof ParameterizedType && (
|
|
|
|
- Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class)
|
|
|
|
- || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))
|
|
|
|
- ) {
|
|
|
|
- genericInterface = type;
|
|
|
|
- break;
|
|
|
|
|
|
+ if (type instanceof ParameterizedType) {
|
|
|
|
+ Type raw = ((ParameterizedType) type).getRawType();
|
|
|
|
+ if (Objects.equals(raw, SessionRocketMQListener.class)
|
|
|
|
+ || Objects.equals(raw, RocketMQListener.class)
|
|
|
|
+ || Objects.equals(raw, RocketMQReplyListener.class)) {
|
|
|
|
+ parameterized = type;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- clazz = clazz.getSuperclass();
|
|
|
|
|
|
+ target = target.getSuperclass();
|
|
}
|
|
}
|
|
- if (Objects.isNull(genericInterface)) {
|
|
|
|
|
|
+ if (Objects.isNull(parameterized)) {
|
|
return Object.class;
|
|
return Object.class;
|
|
}
|
|
}
|
|
|
|
|
|
- Type[] arguments = ((ParameterizedType) genericInterface).getActualTypeArguments();
|
|
|
|
|
|
+ Type[] arguments = ((ParameterizedType) parameterized).getActualTypeArguments();
|
|
if (Objects.nonNull(arguments) && arguments.length > 0) {
|
|
if (Objects.nonNull(arguments) && arguments.length > 0) {
|
|
- return (Class<?>) arguments[0];
|
|
|
|
|
|
+ return arguments[0];
|
|
}
|
|
}
|
|
return Object.class;
|
|
return Object.class;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 注册消息延时器消费者容器
|
|
|
|
+ *
|
|
|
|
+ * @param consumer 消费者实例
|
|
|
|
+ */
|
|
|
|
+ private void registerMessageDelayerContainer(Object consumer) {
|
|
|
|
+ // 获取延时消息主题
|
|
|
|
+ Class<?> clazz = AopProxyUtils.ultimateTargetClass(consumer);
|
|
|
|
+ MessageDelayer annotation = clazz.getAnnotation(MessageDelayer.class);
|
|
|
|
+ String topic = annotation.topic();
|
|
|
|
+ if (StringUtils.isEmpty(topic)) {
|
|
|
|
+ RocketMQMessageListener listener = clazz.getAnnotation(RocketMQMessageListener.class);
|
|
|
|
+ topic = ObjectUtils.ifNull(listener, RocketMQMessageListener::topic);
|
|
|
|
+ }
|
|
|
|
+ Assert.isTrue(StringUtils.nonEmpty(topic), "Message delayer topic unassigned: " + clazz.getName());
|
|
|
|
+ Environment environment = ApplicationContextHolder.getApplicationContext().getEnvironment();
|
|
|
|
+ topic = environment.resolvePlaceholders(topic);
|
|
|
|
+ if (!DELAYER_TOPIC_NAME.add(topic)) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 获取延时消息类型
|
|
|
|
+ Class<?> type = annotation.type();
|
|
|
|
+ if (type == Object.class) {
|
|
|
|
+ type = (Class<?>) lookupConsumerMessageType(clazz);
|
|
|
|
+ }
|
|
|
|
+ Assert.isTrue(type != Object.class, "Message delayer type unassigned: " + clazz.getName());
|
|
|
|
+
|
|
|
|
+ // 注册消息延时器消费者容器
|
|
|
|
+ try {
|
|
|
|
+ Class<?> delayer = this.generateMessageDelayerClass(topic, type);
|
|
|
|
+ ListenerContainerConfiguration configuration =
|
|
|
|
+ ApplicationContextHolder.getApplicationContext().getBean(ListenerContainerConfiguration.class);
|
|
|
|
+ Method method = ListenerContainerConfiguration.class.getDeclaredMethod(
|
|
|
|
+ "registerContainer", String.class, Object.class
|
|
|
|
+ );
|
|
|
|
+ method.setAccessible(true);
|
|
|
|
+ method.invoke(configuration, delayer.getName(), delayer.newInstance());
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 动态生成消息延时器类对象
|
|
|
|
+ *
|
|
|
|
+ * @param topic 延时器消息主题
|
|
|
|
+ * @param type 消息类型
|
|
|
|
+ * @return 消息延时器类对象
|
|
|
|
+ * @throws Exception 处理异常
|
|
|
|
+ */
|
|
|
|
+ private Class<?> generateMessageDelayerClass(String topic, Class<?> type) throws Exception {
|
|
|
|
+ // 构建Class及实现接口
|
|
|
|
+ CtClass delayer = CLASS_POOL.makeClass(
|
|
|
|
+ String.format("%s.CustomMessageDelayer_%d", DELAYER_PACKAGE, DELAYER_COUNTER.incrementAndGet())
|
|
|
|
+ );
|
|
|
|
+ delayer.setInterfaces(new CtClass[]{CLASS_POOL.get(RocketMQListener.class.getName())});
|
|
|
|
+ delayer.setGenericSignature(new SignatureAttribute.ClassSignature(null, null,
|
|
|
|
+ new SignatureAttribute.ClassType[]{new SignatureAttribute.ClassType(
|
|
|
|
+ RocketMQListener.class.getName(), new SignatureAttribute.TypeArgument[]{
|
|
|
|
+ new SignatureAttribute.TypeArgument(
|
|
|
|
+ new SignatureAttribute.ClassType(DelayMessage.class.getName())
|
|
|
|
+ )})}).encode()
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ // 添加无参构造方法
|
|
|
|
+ CtConstructor constructor = new CtConstructor(new CtClass[]{}, delayer);
|
|
|
|
+ constructor.setBody("{}");
|
|
|
|
+ delayer.addConstructor(constructor);
|
|
|
|
+
|
|
|
|
+ // 添加自定义延时消息消费方法实现
|
|
|
|
+ delayer.addMethod(CtMethod.make(String.format(
|
|
|
|
+ "public void onMessage(%s message){%s.processing(\"%s\", %s.class, message);}",
|
|
|
|
+ DelayMessage.class.getName(),
|
|
|
|
+ RocketMQContextHolder.class.getName(),
|
|
|
|
+ topic,
|
|
|
|
+ type.getName()
|
|
|
|
+ ), delayer));
|
|
|
|
+
|
|
|
|
+ // 添加泛型参数接口方法桥接
|
|
|
|
+ delayer.addMethod(CtMethod.make(String.format(
|
|
|
|
+ "public void onMessage(Object message){this.onMessage((%s) message);}",
|
|
|
|
+ DelayMessage.class.getName()
|
|
|
|
+ ), delayer));
|
|
|
|
+
|
|
|
|
+ // 设置@RocketMQMessageListener注解
|
|
|
|
+ String unique = getDelayerTopic(topic);
|
|
|
|
+ ClassFile file = delayer.getClassFile();
|
|
|
|
+ ConstPool constPool = file.getConstPool();
|
|
|
|
+ AnnotationsAttribute attribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
|
|
|
|
+ Annotation annotation = new Annotation(RocketMQMessageListener.class.getName(), constPool);
|
|
|
|
+ annotation.addMemberValue("topic", new StringMemberValue(unique, constPool));
|
|
|
|
+ annotation.addMemberValue("consumerGroup", new StringMemberValue(unique, constPool));
|
|
|
|
+ attribute.addAnnotation(annotation);
|
|
|
|
+ file.addAttribute(attribute);
|
|
|
|
+ return delayer.toClass();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 根据当前环境名称获取实际消息主题
|
|
* 根据当前环境名称获取实际消息主题
|
|
*
|
|
*
|
|
@@ -176,7 +369,7 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
* @return 消息主题
|
|
* @return 消息主题
|
|
*/
|
|
*/
|
|
public static String getProfileTopic(@NonNull String topic) {
|
|
public static String getProfileTopic(@NonNull String topic) {
|
|
- return PROFILE + "-" + topic;
|
|
|
|
|
|
+ return Objects.requireNonNull(PROFILE, "Profile has not been initialized") + "-" + topic;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -198,6 +391,15 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
return Objects.requireNonNull(ROCKETMQ_TEMPLATE, "Rocketmq template has not been initialized");
|
|
return Objects.requireNonNull(ROCKETMQ_TEMPLATE, "Rocketmq template has not been initialized");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 获取消息转换器实例
|
|
|
|
+ *
|
|
|
|
+ * @return 消息转换器实例
|
|
|
|
+ */
|
|
|
|
+ public static MessageConverter getMessageConverter() {
|
|
|
|
+ return Objects.requireNonNull(MESSAGE_CONVERTER, "Message converter has not been initialized");
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 将消息内容转换成消息对象
|
|
* 将消息内容转换成消息对象
|
|
*
|
|
*
|
|
@@ -205,7 +407,30 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
* @return 消息对象实例
|
|
* @return 消息对象实例
|
|
*/
|
|
*/
|
|
public static Message<?> payload2message(@NonNull Object payload) {
|
|
public static Message<?> payload2message(@NonNull Object payload) {
|
|
- return MessageBuilder.withPayload(payload).setHeader(ROCKETMQ_TYPE_HEADER, payload.getClass().getName()).build();
|
|
|
|
|
|
+ Object target = JacksonContextHolder.serialize(payload);
|
|
|
|
+ Session session = SessionContextHolder.getSession(false);
|
|
|
|
+ MessageContext<?> context = MessageContext.builder().target(target).session(session).build();
|
|
|
|
+ return MessageBuilder.withPayload(context).setHeader(ROCKETMQ_TYPE_HEADER, payload.getClass().getName()).build();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 将消息对象转换成消息体
|
|
|
|
+ *
|
|
|
|
+ * @param message 消息对象
|
|
|
|
+ * @param type 消息体类型
|
|
|
|
+ * @param <T> 消息体对象类型
|
|
|
|
+ * @return 消息体实例
|
|
|
|
+ */
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ public static <T> T message2payload(@NonNull Message<?> message, @NonNull Class<T> type) {
|
|
|
|
+ MessageConverter converter = getMessageConverter();
|
|
|
|
+ MessageContext<?> context =
|
|
|
|
+ (MessageContext<?>) Objects.requireNonNull(converter.fromMessage(message, MessageContext.class));
|
|
|
|
+ Object payload = context.getTarget();
|
|
|
|
+ if (payload == null || (payload instanceof String && type == String.class)) {
|
|
|
|
+ return (T) payload;
|
|
|
|
+ }
|
|
|
|
+ return (T) converter.fromMessage(new GenericMessage<>(payload), type);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -343,6 +568,16 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 异步发送消息
|
|
|
|
+ *
|
|
|
|
+ * @param topic 消息主题
|
|
|
|
+ * @param payload 消息内容
|
|
|
|
+ */
|
|
|
|
+ public static void asyncSend(@NonNull String topic, @NonNull Object payload) {
|
|
|
|
+ asyncSend(topic, payload, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 异步发送消息
|
|
* 异步发送消息
|
|
*
|
|
*
|
|
@@ -573,179 +808,19 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
|
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
|
- // 初始化环境名称
|
|
|
|
- PROFILE = applicationContext.getEnvironment().getProperty("spring.profiles.active");
|
|
|
|
-
|
|
|
|
- // 初始化RocketMQ模版实例
|
|
|
|
- ROCKETMQ_TEMPLATE = applicationContext.getBean(RocketMQTemplate.class);
|
|
|
|
-
|
|
|
|
- // 初始化消息类型/本地事务检测实例映射表
|
|
|
|
- Collection<RocketMQTransactionHandler> handlers =
|
|
|
|
- applicationContext.getBeansOfType(RocketMQTransactionHandler.class).values();
|
|
|
|
- List<RocketMQTransactionChecker> checkers = handlers.stream()
|
|
|
|
- .filter(handler -> handler instanceof RocketMQTransactionChecker)
|
|
|
|
- .map(handler -> (RocketMQTransactionChecker) handler).collect(Collectors.toList());
|
|
|
|
- if (!CollectionUtils.isEmpty(checkers)) {
|
|
|
|
- log.info("Loading rocketmq transaction checkers: {}", checkers);
|
|
|
|
- MESSAGE_CHECKER_MAPPING = Maps.newHashMapWithExpectedSize(checkers.size());
|
|
|
|
- checkers.forEach(checker -> {
|
|
|
|
- Class<?> clazz = checker.getPayloadClass();
|
|
|
|
- Assert.notNull(clazz, "Rocketmq transaction checker payload class unspecified: " + checker);
|
|
|
|
- String type = clazz.getName();
|
|
|
|
- Assert.isTrue(!MESSAGE_CHECKER_MAPPING.containsKey(type),
|
|
|
|
- "Rocketmq transaction checker payload class already exists: " + checker);
|
|
|
|
- MESSAGE_CHECKER_MAPPING.put(type, checker);
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 初始化消息类型/本地事务执行实例映射表
|
|
|
|
- List<RocketMQTransactionExecutor> executors = handlers.stream()
|
|
|
|
- .filter(handler -> handler instanceof RocketMQTransactionExecutor)
|
|
|
|
- .map(handler -> (RocketMQTransactionExecutor) handler).collect(Collectors.toList());
|
|
|
|
- if (!CollectionUtils.isEmpty(executors)) {
|
|
|
|
- log.info("Loading rocketmq transaction executors: {}", executors);
|
|
|
|
- MESSAGE_EXECUTOR_MAPPING = Maps.newHashMapWithExpectedSize(executors.size());
|
|
|
|
- executors.forEach(executor -> {
|
|
|
|
- Class<?> clazz = executor.getPayloadClass();
|
|
|
|
- Assert.notNull(clazz, "Rocketmq transaction executor payload class unspecified: " + executor);
|
|
|
|
- String type = clazz.getName();
|
|
|
|
- Assert.isTrue(!MESSAGE_EXECUTOR_MAPPING.containsKey(type),
|
|
|
|
- "Rocketmq transaction executor payload class already exists: " + executor);
|
|
|
|
- MESSAGE_EXECUTOR_MAPPING.put(type, executor);
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void afterSingletonsInstantiated() {
|
|
|
|
- // 初始化延时消息延时器
|
|
|
|
- ApplicationContext context = ApplicationContextHolder.getApplicationContext();
|
|
|
|
- Map<String, Object> consumers = context.getBeansWithAnnotation(MessageDelayer.class);
|
|
|
|
- consumers.values().forEach(this::registerMessageDelayerContainer);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
|
- public RocketMQLocalTransactionState checkLocalTransaction(@NonNull Message message) {
|
|
|
|
- RocketMQTransactionChecker checker = MESSAGE_CHECKER_MAPPING.get(getMessageType(message));
|
|
|
|
- return checker.check(message, checker.getPayload(message, this.rocketmqMessageConverter.getMessageConverter()));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
|
- public RocketMQLocalTransactionState executeLocalTransaction(@NonNull Message message, Object arg) {
|
|
|
|
- try {
|
|
|
|
- if (arg instanceof Supplier) {
|
|
|
|
- return ((Supplier<RocketMQLocalTransactionState>) arg).get();
|
|
|
|
- }
|
|
|
|
- MessageConverter messageConverter = this.rocketmqMessageConverter.getMessageConverter();
|
|
|
|
- RocketMQTransactionExecutor executor = MESSAGE_EXECUTOR_MAPPING.get(getMessageType(message));
|
|
|
|
- return executor.execute(message, executor.getPayload(message, messageConverter), arg);
|
|
|
|
- } catch (RuntimeException e) {
|
|
|
|
- // 将异常存入上下文
|
|
|
|
- EXCEPTION_CONTEXT.set(e);
|
|
|
|
- throw e;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
- * 注册消息延时器消费者容器
|
|
|
|
|
|
+ * 消费消息上下文消息
|
|
*
|
|
*
|
|
- * @param consumer 消费者实例
|
|
|
|
|
|
+ * @param context 消息上下文对象
|
|
|
|
+ * @param consumer 消息消费者
|
|
|
|
+ * @param <T> 消息类型
|
|
*/
|
|
*/
|
|
- private void registerMessageDelayerContainer(Object consumer) {
|
|
|
|
- // 获取延时消息主题
|
|
|
|
- Class<?> clazz = AopProxyUtils.ultimateTargetClass(consumer);
|
|
|
|
- MessageDelayer annotation = clazz.getAnnotation(MessageDelayer.class);
|
|
|
|
- String topic = annotation.topic();
|
|
|
|
- if (StringUtils.isEmpty(topic)) {
|
|
|
|
- RocketMQMessageListener listener = clazz.getAnnotation(RocketMQMessageListener.class);
|
|
|
|
- topic = ObjectUtils.ifNull(listener, RocketMQMessageListener::topic);
|
|
|
|
- }
|
|
|
|
- Assert.isTrue(StringUtils.nonEmpty(topic), "Message delayer topic unassigned: " + clazz.getName());
|
|
|
|
- Environment environment = ApplicationContextHolder.getApplicationContext().getEnvironment();
|
|
|
|
- topic = environment.resolvePlaceholders(topic);
|
|
|
|
- if (!DELAYER_TOPIC_NAME.add(topic)) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 获取延时消息类型
|
|
|
|
- Class<?> type = annotation.type();
|
|
|
|
- if (type == Object.class) {
|
|
|
|
- type = getMessageType(clazz);
|
|
|
|
- }
|
|
|
|
- Assert.isTrue(type != Object.class, "Message delayer type unassigned: " + clazz.getName());
|
|
|
|
-
|
|
|
|
- // 注册消息延时器消费者容器
|
|
|
|
|
|
+ public static <T> void consumeMessageContext(@NonNull MessageContext<T> context, @NonNull Consumer<T> consumer) {
|
|
|
|
+ SessionContextHolder.setSession(context.getSession());
|
|
try {
|
|
try {
|
|
- Class<?> delayer = this.generateMessageDelayerClass(topic, type);
|
|
|
|
- ListenerContainerConfiguration configuration =
|
|
|
|
- ApplicationContextHolder.getApplicationContext().getBean(ListenerContainerConfiguration.class);
|
|
|
|
- Method method = ListenerContainerConfiguration.class.getDeclaredMethod(
|
|
|
|
- "registerContainer", String.class, Object.class
|
|
|
|
- );
|
|
|
|
- method.setAccessible(true);
|
|
|
|
- method.invoke(configuration, delayer.getName(), delayer.newInstance());
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
|
+ consumer.accept(context.getTarget());
|
|
|
|
+ } finally {
|
|
|
|
+ SessionContextHolder.clearSessionContext();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 动态生成消息延时器类对象
|
|
|
|
- *
|
|
|
|
- * @param topic 延时器消息主题
|
|
|
|
- * @param type 消息类型
|
|
|
|
- * @return 消息延时器类对象
|
|
|
|
- * @throws Exception 处理异常
|
|
|
|
- */
|
|
|
|
- private Class<?> generateMessageDelayerClass(String topic, Class<?> type) throws Exception {
|
|
|
|
- // 构建Class及实现接口
|
|
|
|
- CtClass delayer = CLASS_POOL.makeClass(
|
|
|
|
- String.format("%s.CustomMessageDelayer_%d", DELAYER_PACKAGE, DELAYER_COUNTER.incrementAndGet())
|
|
|
|
- );
|
|
|
|
- delayer.setInterfaces(new CtClass[]{CLASS_POOL.get(RocketMQListener.class.getName())});
|
|
|
|
- delayer.setGenericSignature(new SignatureAttribute.ClassSignature(null, null,
|
|
|
|
- new SignatureAttribute.ClassType[]{new SignatureAttribute.ClassType(
|
|
|
|
- RocketMQListener.class.getName(), new SignatureAttribute.TypeArgument[]{
|
|
|
|
- new SignatureAttribute.TypeArgument(
|
|
|
|
- new SignatureAttribute.ClassType(DelayMessage.class.getName())
|
|
|
|
- )})}).encode()
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- // 添加无参构造方法
|
|
|
|
- CtConstructor constructor = new CtConstructor(new CtClass[]{}, delayer);
|
|
|
|
- constructor.setBody("{}");
|
|
|
|
- delayer.addConstructor(constructor);
|
|
|
|
-
|
|
|
|
- // 添加自定义延时消息消费方法实现
|
|
|
|
- delayer.addMethod(CtMethod.make(String.format(
|
|
|
|
- "public void onMessage(%s message){%s.processing(\"%s\", %s.class, message);}",
|
|
|
|
- DelayMessage.class.getName(),
|
|
|
|
- RocketMQContextHolder.class.getName(),
|
|
|
|
- topic,
|
|
|
|
- type.getName()
|
|
|
|
- ), delayer));
|
|
|
|
-
|
|
|
|
- // 添加泛型参数接口方法桥接
|
|
|
|
- delayer.addMethod(CtMethod.make(String.format(
|
|
|
|
- "public void onMessage(Object message){this.onMessage((%s) message);}",
|
|
|
|
- DelayMessage.class.getName()
|
|
|
|
- ), delayer));
|
|
|
|
-
|
|
|
|
- // 设置@RocketMQMessageListener注解
|
|
|
|
- String unique = getDelayerTopic(topic);
|
|
|
|
- ClassFile file = delayer.getClassFile();
|
|
|
|
- ConstPool constPool = file.getConstPool();
|
|
|
|
- AnnotationsAttribute attribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
|
|
|
|
- Annotation annotation = new Annotation(RocketMQMessageListener.class.getName(), constPool);
|
|
|
|
- annotation.addMemberValue("topic", new StringMemberValue(unique, constPool));
|
|
|
|
- annotation.addMemberValue("consumerGroup", new StringMemberValue(unique, constPool));
|
|
|
|
- attribute.addAnnotation(annotation);
|
|
|
|
- file.addAttribute(attribute);
|
|
|
|
- return delayer.toClass();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|