Просмотр исходного кода

修复RedisMQ服务重连导致数据消费失败问题

woody 8 месяцев назад
Родитель
Сommit
04dfd45c0d

+ 6 - 26
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -7,6 +7,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
@@ -33,7 +34,6 @@ import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.stream.StreamListener;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
-import org.springframework.util.ErrorHandler;
 
 /**
  * Redis Stream操作工具类
@@ -457,38 +457,18 @@ public final class RedisStreamHolder {
     public static StreamMessageListenerContainer<String, MapRecord<String, String, String>>
     initializeMessageListenerContainer(@NonNull StreamListener<String, MapRecord<String, String, String>> listener,
                                        @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
-                                       @NonNull StreamOffset<String> offset, int batch,
-                                       @NonNull java.util.concurrent.Executor executor) {
-        return initializeMessageListenerContainer(listener, consumer, offset, batch, executor, null);
-    }
-
-    /**
-     * 初始化消息监听器容器
-     *
-     * @param listener     消息监听器
-     * @param consumer     消费者信息
-     * @param offset       消息偏移信息
-     * @param batch        批量获取消息数量
-     * @param executor     消息获取执行器
-     * @param errorHandler 异常处理器
-     * @return 消息监听器容器
-     */
-    public static StreamMessageListenerContainer<String, MapRecord<String, String, String>>
-    initializeMessageListenerContainer(@NonNull StreamListener<String, MapRecord<String, String, String>> listener,
-                                       @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
-                                       @NonNull StreamOffset<String> offset, int batch,
-                                       @NonNull java.util.concurrent.Executor executor, ErrorHandler errorHandler) {
+                                       @NonNull StreamOffset<String> offset, int batch, @NonNull Executor executor) {
         StreamMessageListenerContainer.
                 StreamMessageListenerContainerOptionsBuilder<String, MapRecord<String, String, String>>
                 builder = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                 .batchSize(batch).pollTimeout(Duration.ZERO).executor(executor).serializer(RedisSerializer.string());
-        if (errorHandler != null) {
-            builder.errorHandler(errorHandler);
-        }
         RedisConnectionFactory connectionFactory = RedisContextHolder.getDefaultConnectionFactory();
         StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                 StreamMessageListenerContainer.create(connectionFactory, builder.build());
-        container.receive(consumer, offset, listener);
+        StreamMessageListenerContainer.StreamReadRequest<String> request =
+                StreamMessageListenerContainer.StreamReadRequest.builder(offset).consumer(consumer)
+                        .cancelOnError(t -> false).build();
+        container.register(request, listener);
         return container;
     }
 }

+ 1 - 2
framework-redis/src/main/java/com/chelvc/framework/redis/queue/ConsumerPendingCleaner.java

@@ -62,8 +62,7 @@ public class ConsumerPendingCleaner<T> {
                 this.source,
                 StreamOffset.create(this.topic, ReadOffset.from("0")),
                 100,
-                this.executor,
-                t -> log.warn("RedisMQ consumer pending claim failed: {}, {}", this.source, t.getMessage())
+                this.executor
         );
         this.container.start();
 

+ 10 - 39
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -10,7 +10,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
-import com.chelvc.framework.common.function.Executor;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.redis.annotation.RedisMQConsumer;
@@ -18,7 +17,6 @@ import com.chelvc.framework.redis.context.RedisStreamHolder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.core.env.Environment;
-import org.springframework.data.redis.RedisSystemException;
 import org.springframework.data.redis.connection.stream.Consumer;
 import org.springframework.data.redis.connection.stream.MapRecord;
 import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -36,16 +34,13 @@ import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 @Slf4j
 public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContainer<T> {
     private final String name = StringUtils.uuid();
+    private int idle;
     private String topic;
     private String group;
-    private int idle;
-    private int batch;
     private Consumer consumer;
     private ExecutorService executor;
-    private StreamOffset<String> offset;
     private ScheduledExecutorService cleaner;
     private MessageStreamListener<T> listener;
-    private Executor starter;
     private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
 
     /**
@@ -93,49 +88,25 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
                            @NonNull RedisMQListener<T> listener, @NonNull ExecutorService executor) {
         AssertUtils.nonempty(topic, () -> "Consumer topic must not be empty");
         AssertUtils.nonempty(group, () -> "Consumer group must not be empty");
-        AssertUtils.check((this.batch = batch) > 0, () -> "Consumer batch must be greater than 0");
+        AssertUtils.check(batch > 0, () -> "Consumer batch must be greater than 0");
         AssertUtils.check((this.idle = RedisStreamHolder.getIdle()) > 0, () -> "Consumer idle must be greater than 0");
         Environment environment = ApplicationContextHolder.getEnvironment();
         this.topic = RedisStreamHolder.isolate(environment.resolvePlaceholders(topic));
         this.group = RedisStreamHolder.isolate(environment.resolvePlaceholders(group));
         this.consumer = Consumer.from(this.group, this.name);
-        this.offset = StreamOffset.create(this.topic, ReadOffset.lastConsumed());
         this.listener = new MessageStreamListener<>(
                 type, this.topic, this.consumer, listener, this.executor = executor
         );
 
-        // 初始化消费者容器启动函数,当检测到消费者组不存在则自动重新创建消费者并重启消费者容器
-        this.starter = () -> {
-            // 停止当前容器(如果存在)
-            if (this.container != null) {
-                try {
-                    this.container.stop();
-                } catch (Throwable t) {
-                    log.warn("RedisMQ consumer container stop failed: {}, {}", this.consumer, t.getMessage());
-                }
-            }
-
-            // 初始化消费者组
-            RedisStreamHolder.initializeConsumerGroup(this.topic, this.group);
+        // 初始化消费者组
+        RedisStreamHolder.initializeConsumerGroup(this.topic, this.group);
 
-            // 初始化消息监听器容器
-            this.container = RedisStreamHolder.initializeMessageListenerContainer(
-                    this.listener, this.consumer, this.offset, this.batch, this.executor,
-                    t -> {
-                        if (!(t instanceof RedisSystemException) || StringUtils.isEmpty(t.getMessage())
-                                || !t.getMessage().contains("NOGROUP")) {
-                            log.error("RedisMQ consumer handle exception: {}", this.consumer, t);
-                            return;
-                        }
-
-                        // 如果是消费者组不存在异常则重启容器
-                        log.warn("RedisMQ consumer group has been destroyed and the container will be restarted");
-                        this.starter.execute();
-                    }
-            );
-            this.container.start();
-        };
-        this.starter.execute();
+        // 初始化消息监听器容器
+        StreamOffset<String> offset = StreamOffset.create(this.topic, ReadOffset.lastConsumed());
+        this.container = RedisStreamHolder.initializeMessageListenerContainer(
+                this.listener, this.consumer, offset, batch, this.executor
+        );
+        this.container.start();
 
         // 初始化消费者清理定时器
         this.cleaner = Executors.newScheduledThreadPool(1);

+ 9 - 4
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -11,6 +11,7 @@ import com.chelvc.framework.redis.context.RedisStreamHolder;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.stream.Consumer;
 import org.springframework.data.redis.connection.stream.MapRecord;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.script.DefaultRedisScript;
 import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.stream.StreamListener;
@@ -27,7 +28,7 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
      * ZSET批量获取同时增加分数值脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> RANGE_AND_INCREMENT_SCRIPT = new DefaultRedisScript<>(
+    private static final RedisScript<List> RANGE_INCREMENT_SCRIPT = new DefaultRedisScript<>(
             "local records = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[4]) " +
                     "for i = 1, #records do redis.call('ZINCRBY', KEYS[1], ARGV[3], records[i]) end return records",
             List.class
@@ -55,10 +56,14 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
             List<String> keys = Collections.singletonList(this.delayed);
             while (!Thread.currentThread().isInterrupted()) {
                 // 批量获取延时消息(100条),为避免重复获取将消息延时时间增加1分钟
+                List<String> messages = null;
                 String timestamp = String.valueOf(System.currentTimeMillis());
-                List<String> messages = RedisStreamHolder.getStreamTemplate().execute(
-                        RANGE_AND_INCREMENT_SCRIPT, keys, "-1", timestamp, "60000", "100"
-                );
+                RedisTemplate<String, T> template = RedisStreamHolder.getStreamTemplate();
+                try {
+                    messages = template.execute(RANGE_INCREMENT_SCRIPT, keys, "-1", timestamp, "60000", "100");
+                } catch (Throwable t) {
+                    log.error("RedisMQ message consume failed: {}", this.consumer, t);
+                }
 
                 // 如果当前没有延时消息则暂停1秒后继续
                 if (ObjectUtils.isEmpty(messages)) {