Browse Source

RocketMQ消息消费增加手动控制消费状态及重试策略逻辑

woody 1 year ago
parent
commit
4abb0acaee

+ 13 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/config/SessionRocketMQListenerContainer.java

@@ -15,6 +15,7 @@ import com.chelvc.framework.base.context.JacksonContextHolder;
 import com.chelvc.framework.base.util.ObjectUtils;
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
 import com.chelvc.framework.rocketmq.interceptor.SessionRocketMQListener;
+import com.chelvc.framework.rocketmq.model.ConsumeStatus;
 import com.chelvc.framework.rocketmq.model.MessageContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.AccessChannel;
@@ -124,29 +125,37 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
             ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
             if (mode == ConsumeMode.ORDERLY) {
                 consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
+                    ConsumeStatus status;
                     try {
-                        listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
+                        status = listener.onMessageContexts(
+                                messages.stream().map(this::convert).collect(Collectors.toList())
+                        );
                     } catch (Exception e) {
                         String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
                         log.error("Orderly message consume failed [{}][{}]: {}", group, topic, ids, e);
                         return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                     } finally {
                         consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
+                        context.setSuspendCurrentQueueTimeMillis(listener.getOrderlyRetryInterval());
                     }
-                    return ConsumeOrderlyStatus.SUCCESS;
+                    return status == null ? ConsumeOrderlyStatus.SUCCESS : status.orderly();
                 });
             } else {
                 consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
+                    ConsumeStatus status;
                     try {
-                        listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
+                        status = listener.onMessageContexts(
+                                messages.stream().map(this::convert).collect(Collectors.toList())
+                        );
                     } catch (Exception e) {
                         String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
                         log.error("Concurrently message consume failed [{}][{}]: {}", group, topic, ids, e);
                         return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                     } finally {
                         consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
+                        context.setDelayLevelWhenNextConsume(listener.getConcurrentlyRetryInterval());
                     }
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    return status == null ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : status.concurrently();
                 });
             }
         }

+ 26 - 1
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/SessionRocketMQListener.java

@@ -3,7 +3,10 @@ package com.chelvc.framework.rocketmq.interceptor;
 import java.util.List;
 
 import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
+import com.chelvc.framework.rocketmq.model.ConsumeStatus;
 import com.chelvc.framework.rocketmq.model.MessageContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.util.CollectionUtils;
 
@@ -23,14 +26,36 @@ public interface SessionRocketMQListener<T> extends RocketMQListener<T> {
         return 1;
     }
 
+    /**
+     * 获取顺序消费消息重试时间间隔
+     *
+     * @return 时间间隔
+     * @see ConsumeOrderlyContext
+     */
+    default long getOrderlyRetryInterval() {
+        return -1;
+    }
+
+    /**
+     * 获取并发消费消息重试时间间隔
+     *
+     * @return 时间间隔
+     * @see ConsumeConcurrentlyContext
+     */
+    default int getConcurrentlyRetryInterval() {
+        return 0;
+    }
+
     /**
      * 批量处理消息上下文信息
      *
      * @param contexts 消息上下文对象列表
+     * @return 消费状态
      */
-    default void onMessageContexts(List<MessageContext<T>> contexts) {
+    default ConsumeStatus onMessageContexts(List<MessageContext<T>> contexts) {
         if (!CollectionUtils.isEmpty(contexts)) {
             contexts.forEach(context -> RocketMQContextHolder.consumeMessageContext(context, this::onMessage));
         }
+        return ConsumeStatus.SUCCESS;
     }
 }

+ 46 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/model/ConsumeStatus.java

@@ -0,0 +1,46 @@
+package com.chelvc.framework.rocketmq.model;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+
+/**
+ * 消费状态枚举
+ *
+ * @author Woody
+ * @date 2023/7/5
+ */
+public enum ConsumeStatus {
+    /**
+     * 消费成功
+     */
+    SUCCESS,
+
+    /**
+     * 稍后重试
+     */
+    RETRY_LATER;
+
+    /**
+     * 获取顺序消费状态
+     *
+     * @return 顺序消费状态
+     */
+    public ConsumeOrderlyStatus orderly() {
+        if (this == ConsumeStatus.RETRY_LATER) {
+            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+        }
+        return ConsumeOrderlyStatus.SUCCESS;
+    }
+
+    /**
+     * 获取并发消费状态
+     *
+     * @return 并发消费状态
+     */
+    public ConsumeConcurrentlyStatus concurrently() {
+        if (this == ConsumeStatus.RETRY_LATER) {
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+}