|
@@ -19,29 +19,17 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
|
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
|
|
-import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
|
-import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
-import org.apache.rocketmq.client.producer.SendResult;
|
|
|
-import org.apache.rocketmq.client.producer.SendStatus;
|
|
|
-import org.apache.rocketmq.client.utils.MessageUtil;
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
-import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQReplyListener;
|
|
|
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
|
|
|
import org.springframework.aop.framework.AopProxyUtils;
|
|
|
import org.springframework.core.MethodParameter;
|
|
|
-import org.springframework.messaging.Message;
|
|
|
-import org.springframework.messaging.MessageHeaders;
|
|
|
-import org.springframework.messaging.converter.MessageConversionException;
|
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
|
import org.springframework.messaging.converter.SmartMessageConverter;
|
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
|
-import org.springframework.util.MimeTypeUtils;
|
|
|
|
|
|
/**
|
|
|
* 原生消息监听器实现
|
|
@@ -58,45 +46,35 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
|
|
|
private DefaultMQPushConsumer consumer;
|
|
|
|
|
|
/**
|
|
|
- * 获取消息类型
|
|
|
- *
|
|
|
- * @return 消息类型
|
|
|
+ * 初始化消息类型
|
|
|
*/
|
|
|
- private Type getType() {
|
|
|
- Type genericInterface = null;
|
|
|
+ private void initializeMessageType() {
|
|
|
+ Type parameterized = null;
|
|
|
Class<?> clazz = this.target;
|
|
|
while (Objects.nonNull(clazz)) {
|
|
|
for (Type type : clazz.getGenericInterfaces()) {
|
|
|
- if (type instanceof ParameterizedType && (Objects.equals(
|
|
|
- ((ParameterizedType) type).getRawType(), RocketMQListener.class
|
|
|
- ) || Objects.equals(
|
|
|
- ((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))
|
|
|
- ) {
|
|
|
- genericInterface = type;
|
|
|
+ if (type instanceof ParameterizedType
|
|
|
+ && Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class)) {
|
|
|
+ parameterized = type;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
clazz = clazz.getSuperclass();
|
|
|
}
|
|
|
- if (Objects.isNull(genericInterface)) {
|
|
|
- return Object.class;
|
|
|
- }
|
|
|
-
|
|
|
- Type[] actualTypeArguments = ((ParameterizedType) genericInterface).getActualTypeArguments();
|
|
|
- if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
|
|
|
- return actualTypeArguments[0];
|
|
|
+ if (Objects.isNull(parameterized)) {
|
|
|
+ this.type = Object.class;
|
|
|
+ } else {
|
|
|
+ Type[] arguments = ((ParameterizedType) parameterized).getActualTypeArguments();
|
|
|
+ this.type = Objects.nonNull(arguments) && arguments.length > 0 ? arguments[0] : Object.class;
|
|
|
}
|
|
|
- return Object.class;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取消息处理方法参数
|
|
|
- *
|
|
|
- * @param type 参数类型
|
|
|
- * @return 方法参数
|
|
|
+ * 初始化消息处理方法参数
|
|
|
*/
|
|
|
- private MethodParameter getParameter(Type type) {
|
|
|
+ private void initializeMethodParameter() {
|
|
|
Class<?> clazz;
|
|
|
+ Type type = Objects.requireNonNull(this.type, "Message type has not been initialized");
|
|
|
if (type instanceof ParameterizedType && this.converter instanceof SmartMessageConverter) {
|
|
|
clazz = (Class<?>) ((ParameterizedType) type).getRawType();
|
|
|
} else if (type instanceof Class) {
|
|
@@ -105,63 +83,12 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
|
|
|
throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
|
|
|
}
|
|
|
try {
|
|
|
- final Method method = this.target.getMethod("onMessage", clazz);
|
|
|
- return new MethodParameter(method, 0);
|
|
|
+ this.parameter = new MethodParameter(this.target.getMethod("onMessage", clazz), 0);
|
|
|
} catch (NoSuchMethodException e) {
|
|
|
throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 构建消息
|
|
|
- *
|
|
|
- * @param payload 消息体
|
|
|
- * @param headers 消息头
|
|
|
- * @return 消息对象
|
|
|
- */
|
|
|
- private Message<?> build(Object payload, MessageHeaders headers) {
|
|
|
- Message<?> message = this.converter instanceof SmartMessageConverter ?
|
|
|
- ((SmartMessageConverter) this.converter).toMessage(payload, headers, null) :
|
|
|
- this.converter.toMessage(payload, headers);
|
|
|
- if (message == null) {
|
|
|
- String payloadType = payload.getClass().getName();
|
|
|
- Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
|
|
|
- throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
|
|
|
- "', contentType='" + contentType + "', converter=[" + this.converter + "]");
|
|
|
- }
|
|
|
- MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
|
|
|
- builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
|
|
|
- return builder.build();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 将消息转换成字节数组
|
|
|
- *
|
|
|
- * @param message 消息对象
|
|
|
- * @return 字节数组
|
|
|
- */
|
|
|
- private byte[] message2bytes(Message<?> message) {
|
|
|
- Message<?> target = this.build(message.getPayload(), message.getHeaders());
|
|
|
- Object payload = target.getPayload();
|
|
|
- try {
|
|
|
- if (payload instanceof String) {
|
|
|
- return ((String) payload).getBytes(StandardCharsets.UTF_8);
|
|
|
- } else if (payload instanceof byte[]) {
|
|
|
- return (byte[]) payload;
|
|
|
- } else {
|
|
|
- String json = (String) this.converter.fromMessage(target, payload.getClass());
|
|
|
- if (json == null) {
|
|
|
- throw new RuntimeException(String.format(
|
|
|
- "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
|
|
|
- this.converter.getClass(), payload.getClass(), payload));
|
|
|
- }
|
|
|
- return json.getBytes(StandardCharsets.UTF_8);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException("convert to bytes failed.", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 获取消息批量消费数量
|
|
|
*
|
|
@@ -245,45 +172,14 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 重放消息
|
|
|
- *
|
|
|
- * @param message 原始消息
|
|
|
- * @param payload 重放消息体
|
|
|
- */
|
|
|
- protected void reply(MessageExt message, Object payload) {
|
|
|
- Message<?> wrapper = MessageBuilder.withPayload(payload).build();
|
|
|
- DefaultMQProducer producer =
|
|
|
- this.consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
|
|
|
- try {
|
|
|
- producer.send(MessageUtil.createReplyMessage(message, this.message2bytes(wrapper)), new SendCallback() {
|
|
|
- @Override
|
|
|
- public void onSuccess(SendResult sendResult) {
|
|
|
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
|
|
|
- log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
|
|
|
- } else {
|
|
|
- log.info("Consumer replies message success.");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onException(Throwable e) {
|
|
|
- log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (MQClientException | RemotingException | InterruptedException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public void prepareStart(DefaultMQPushConsumer consumer) {
|
|
|
// 初始化消息类型及消费方法参数
|
|
|
this.consumer = consumer;
|
|
|
- this.converter = ApplicationContextHolder.getBean(RocketMQMessageConverter.class).getMessageConverter();
|
|
|
- this.type = this.getType();
|
|
|
- this.parameter = this.getParameter(this.type);
|
|
|
this.consumer.setConsumeMessageBatchMaxSize(this.getBatchConsumeSize());
|
|
|
+ this.converter = ApplicationContextHolder.getBean(RocketMQMessageConverter.class).getMessageConverter();
|
|
|
+ this.initializeMessageType();
|
|
|
+ this.initializeMethodParameter();
|
|
|
|
|
|
// 重置消息监听器
|
|
|
RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
|