Prechádzať zdrojové kódy

优化原生RocketMQ批量消费消息数量设置逻辑

woody 2 rokov pred
rodič
commit
a8e2199fef

+ 14 - 9
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/NativeRocketMQListener.java

@@ -162,6 +162,15 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
         }
     }
 
+    /**
+     * 获取消息批量消费数量
+     *
+     * @return 消息数量
+     */
+    protected int getBatchConsumeSize() {
+        return 1;
+    }
+
     /**
      * 消息转换
      *
@@ -198,15 +207,6 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
         }
     }
 
-    /**
-     * 设置消息批量消费数量
-     *
-     * @param size 消息数量
-     */
-    protected void setBatchConsumeSize(int size) {
-        this.consumer.setConsumeMessageBatchMaxSize(size);
-    }
-
     /**
      * 消费消息
      *
@@ -283,6 +283,7 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
         this.converter = ApplicationContextHolder.getBean(RocketMQMessageConverter.class).getMessageConverter();
         this.type = this.getType();
         this.parameter = this.getParameter(this.type);
+        this.consumer.setConsumeMessageBatchMaxSize(this.getBatchConsumeSize());
 
         // 重置消息监听器
         RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
@@ -294,6 +295,8 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
                 } catch (Exception e) {
                     log.warn("consume message failed", e);
                     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                } finally {
+                    this.consumer.setConsumeMessageBatchMaxSize(this.getBatchConsumeSize());
                 }
             });
         } else {
@@ -303,6 +306,8 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
                 } catch (Exception e) {
                     log.warn("consume message failed", e);
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                } finally {
+                    this.consumer.setConsumeMessageBatchMaxSize(this.getBatchConsumeSize());
                 }
             });
         }