Sfoglia il codice sorgente

优化Redis Stream处理逻辑

Woody 1 settimana fa
parent
commit
a2df874402

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -431,7 +431,7 @@ public final class RedisStreamHolder {
         StreamMessageListenerContainer.
                 StreamMessageListenerContainerOptionsBuilder<String, MapRecord<String, String, String>>
                 builder = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
-                .batchSize(batch).pollTimeout(Duration.ZERO).executor(executor).serializer(RedisSerializer.string());
+                .batchSize(batch).executor(executor).serializer(RedisSerializer.string());
         RedisConnectionFactory connectionFactory = RedisContextHolder.getDefaultConnectionFactory();
         StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                 StreamMessageListenerContainer.create(connectionFactory, builder.build());

+ 8 - 8
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -41,14 +41,14 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
     private String group;
     private Consumer consumer;
     private ExecutorService executor;
-    private ScheduledExecutorService cleaner;
     private MessageStreamListener<T> listener;
+    private ScheduledExecutorService heartbeat;
     private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
 
     /**
-     * 清理消息
+     * 消费者心跳处理
      */
-    private void clear() {
+    private void heartbeat() {
         // 发送心跳包
         try {
             RedisStreamHolder.heartbeat(this.topic);
@@ -112,16 +112,16 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
         );
         this.container.start();
 
-        // 初始化消费者清理定时器
-        this.cleaner = Executors.newScheduledThreadPool(1);
-        this.cleaner.scheduleAtFixedRate(this::clear, 0, this.idle, TimeUnit.SECONDS);
+        // 初始化消费者心跳定时器
+        this.heartbeat = Executors.newScheduledThreadPool(1);
+        this.heartbeat.scheduleAtFixedRate(this::heartbeat, 0, this.idle, TimeUnit.SECONDS);
     }
 
     @Override
     public void destroy() throws Exception {
         try {
-            if (this.cleaner != null) {
-                this.cleaner.shutdown();
+            if (this.heartbeat != null) {
+                this.heartbeat.shutdown();
             }
         } finally {
             try {