Parcourir la source

更新原生RocketMQ监听器

woody il y a 1 an
Parent
commit
ff8a74b0d9

+ 17 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/NativeRocketMQListener.java

@@ -1,5 +1,6 @@
 package com.chelvc.framework.rocketmq.interceptor;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -206,6 +207,20 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
         this.consumer.setConsumeMessageBatchMaxSize(size);
     }
 
+    /**
+     * 消费消息
+     *
+     * @param message 消息对象
+     */
+    protected void consume(MessageExt message) {
+        Method method = Objects.requireNonNull(this.parameter.getMethod());
+        try {
+            method.invoke(this, this.convert(message));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * 顺序消费消息
      *
@@ -214,6 +229,7 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
      * @return 消费状态
      */
     protected ConsumeOrderlyStatus consume(List<MessageExt> messages, ConsumeOrderlyContext context) {
+        messages.forEach(this::consume);
         return ConsumeOrderlyStatus.SUCCESS;
     }
 
@@ -225,6 +241,7 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
      * @return 消费状态
      */
     protected ConsumeConcurrentlyStatus consume(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
+        messages.forEach(this::consume);
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }