woody 2 лет назад
Родитель
Сommit
ef44e7dbc7

+ 293 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/NativeRocketMQListener.java

@@ -0,0 +1,293 @@
+package com.chelvc.framework.rocketmq.interceptor;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+
+import com.chelvc.framework.base.context.ApplicationContextHolder;
+import com.chelvc.framework.base.util.ObjectUtils;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.core.MethodParameter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.SmartMessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.MimeTypeUtils;
+
+/**
+ * 原生消息监听器实现
+ *
+ * @author Woody
+ * @date 2023/5/8
+ */
+@Slf4j
+public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleListener {
+    private final Class<?> target = AopProxyUtils.ultimateTargetClass(this);
+    private Type type;
+    private MethodParameter parameter;
+    private MessageConverter converter;
+    private DefaultMQPushConsumer consumer;
+
+    /**
+     * 获取消息类型
+     *
+     * @return 消息类型
+     */
+    private Type getType() {
+        Type genericInterface = null;
+        Class<?> clazz = this.target;
+        while (Objects.nonNull(clazz)) {
+            for (Type type : clazz.getGenericInterfaces()) {
+                if (type instanceof ParameterizedType && (Objects.equals(
+                        ((ParameterizedType) type).getRawType(), RocketMQListener.class
+                ) || Objects.equals(
+                        ((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))
+                ) {
+                    genericInterface = type;
+                    break;
+                }
+            }
+            clazz = clazz.getSuperclass();
+        }
+        if (Objects.isNull(genericInterface)) {
+            return Object.class;
+        }
+
+        Type[] actualTypeArguments = ((ParameterizedType) genericInterface).getActualTypeArguments();
+        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
+            return actualTypeArguments[0];
+        }
+        return Object.class;
+    }
+
+    /**
+     * 获取消息处理方法参数
+     *
+     * @param type 参数类型
+     * @return 方法参数
+     */
+    private MethodParameter getParameter(Type type) {
+        Class<?> clazz;
+        if (type instanceof ParameterizedType && this.converter instanceof SmartMessageConverter) {
+            clazz = (Class<?>) ((ParameterizedType) type).getRawType();
+        } else if (type instanceof Class) {
+            clazz = (Class<?>) type;
+        } else {
+            throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
+        }
+        try {
+            final Method method = this.target.getMethod("onMessage", clazz);
+            return new MethodParameter(method, 0);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("parameterType:" + type + " of onMessage method is not supported");
+        }
+    }
+
+    /**
+     * 构建消息
+     *
+     * @param payload 消息体
+     * @param headers 消息头
+     * @return 消息对象
+     */
+    private Message<?> build(Object payload, MessageHeaders headers) {
+        Message<?> message = this.converter instanceof SmartMessageConverter ?
+                ((SmartMessageConverter) this.converter).toMessage(payload, headers, null) :
+                this.converter.toMessage(payload, headers);
+        if (message == null) {
+            String payloadType = payload.getClass().getName();
+            Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
+            throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+                    "', contentType='" + contentType + "', converter=[" + this.converter + "]");
+        }
+        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
+        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
+        return builder.build();
+    }
+
+    /**
+     * 将消息转换成字节数组
+     *
+     * @param message 消息对象
+     * @return 字节数组
+     */
+    private byte[] message2bytes(Message<?> message) {
+        Message<?> target = this.build(message.getPayload(), message.getHeaders());
+        Object payload = target.getPayload();
+        try {
+            if (payload instanceof String) {
+                return ((String) payload).getBytes(StandardCharsets.UTF_8);
+            } else if (payload instanceof byte[]) {
+                return (byte[]) payload;
+            } else {
+                String json = (String) this.converter.fromMessage(target, payload.getClass());
+                if (json == null) {
+                    throw new RuntimeException(String.format(
+                            "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+                            this.converter.getClass(), payload.getClass(), payload));
+                }
+                return json.getBytes(StandardCharsets.UTF_8);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("convert to bytes failed.", e);
+        }
+    }
+
+    /**
+     * 消息转换
+     *
+     * @param message 消息
+     * @return 消息
+     */
+    @SuppressWarnings("unchecked")
+    protected T convert(@NonNull MessageExt message) {
+        if (Objects.equals(this.type, MessageExt.class)) {
+            return (T) message;
+        }
+
+        String str = new String(message.getBody(), StandardCharsets.UTF_8);
+        if (Objects.equals(this.type, String.class)) {
+            return (T) str;
+        }
+
+        // If msgType not string, use objectMapper change it.
+        try {
+            if (this.type instanceof Class) {
+                //if the messageType has not Generic Parameter
+                return (T) this.converter.fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) this.type);
+            }
+            //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third
+            // parameter "conversionHint".
+            //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
+            return (T) ((SmartMessageConverter) this.converter).fromMessage(
+                    MessageBuilder.withPayload(str).build(),
+                    (Class<?>) ((ParameterizedType) this.type).getRawType(),
+                    this.parameter
+            );
+        } catch (Exception e) {
+            throw new RuntimeException("cannot convert message to " + this.type, e);
+        }
+    }
+
+    /**
+     * 设置消息批量消费数量
+     *
+     * @param size 消息数量
+     */
+    protected void setBatchConsumeSize(int size) {
+        this.consumer.setConsumeMessageBatchMaxSize(size);
+    }
+
+    /**
+     * 顺序消费消息
+     *
+     * @param messages 消息列表
+     * @param context  上下文对象
+     * @return 消费状态
+     */
+    protected ConsumeOrderlyStatus consume(List<MessageExt> messages, ConsumeOrderlyContext context) {
+        return ConsumeOrderlyStatus.SUCCESS;
+    }
+
+    /**
+     * 并发消费消息
+     *
+     * @param messages 消息列表
+     * @param context  上下文对象
+     * @return 消费状态
+     */
+    protected ConsumeConcurrentlyStatus consume(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+
+    /**
+     * 重放消息
+     *
+     * @param message 原始消息
+     * @param payload 重放消息体
+     */
+    protected void reply(MessageExt message, Object payload) {
+        Message<?> wrapper = MessageBuilder.withPayload(payload).build();
+        DefaultMQProducer producer =
+                this.consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
+        try {
+            producer.send(MessageUtil.createReplyMessage(message, this.message2bytes(wrapper)), new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
+                    } else {
+                        log.info("Consumer replies message success.");
+                    }
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
+                }
+            });
+        } catch (MQClientException | RemotingException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void prepareStart(DefaultMQPushConsumer consumer) {
+        // 初始化消息类型及消费方法参数
+        this.consumer = consumer;
+        this.converter = ApplicationContextHolder.getBean(RocketMQMessageConverter.class).getMessageConverter();
+        this.type = this.getType();
+        this.parameter = this.getParameter(this.type);
+
+        // 重置消息监听器
+        RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
+        ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
+        if (mode == ConsumeMode.ORDERLY) {
+            consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
+                try {
+                    return this.consume(messages, context);
+                } catch (Exception e) {
+                    log.warn("consume message failed", e);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            });
+        } else {
+            consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
+                try {
+                    return this.consume(messages, context);
+                } catch (Exception e) {
+                    log.warn("consume message failed", e);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            });
+        }
+    }
+}