Эх сурвалжийг харах

优化Redis分布式对列处理逻辑

woody 2 сар өмнө
parent
commit
a183e2d52e

+ 3 - 42
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -1,6 +1,5 @@
 package com.chelvc.framework.redis.queue;
 
-import java.time.Duration;
 import java.util.AbstractQueue;
 import java.util.Collection;
 import java.util.Collections;
@@ -12,8 +11,6 @@ import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
 import lombok.NonNull;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.script.DefaultRedisScript;
-import org.springframework.data.redis.core.script.RedisScript;
 
 /**
  * Redis队列实现
@@ -23,35 +20,16 @@ import org.springframework.data.redis.core.script.RedisScript;
  * @date 2024/8/19
  */
 public class RedisQueue<E> extends AbstractQueue<E> {
-    /**
-     * 带过期时间Redis队列添加元素脚本
-     */
-    private static final RedisScript<Long> ADD_DURATION_SCRIPT = new DefaultRedisScript<>(
-            "local value = redis.call('RPUSH', KEYS[1], unpack(ARGV, 2)) " +
-                    "if value > 0 and redis.call('TTL', KEYS[1]) < 0 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end " +
-                    "return value", Long.class
-    );
-
     private final String name;
-    private final long duration;
     private final RedisTemplate<String, E> template;
 
     public RedisQueue(@NonNull String name) {
-        this(name, Duration.ofSeconds(-1));
-    }
-
-    public RedisQueue(@NonNull String name, @NonNull Duration duration) {
-        this(name, null, duration);
+        this(name, null);
     }
 
     public RedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
-        this(name, template, Duration.ofSeconds(-1));
-    }
-
-    public RedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration duration) {
         this.name = name;
         this.template = template;
-        this.duration = Math.max(duration.getSeconds(), -1);
     }
 
     /**
@@ -112,13 +90,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
             return false;
         }
 
-        Long count;
-        if (this.duration > 0) {
-            List<String> keys = Collections.singletonList(this.name);
-            count = this.template().execute(ADD_DURATION_SCRIPT, keys, this.duration, e);
-        } else {
-            count = this.template().opsForList().rightPush(this.name, e);
-        }
+        Long count = this.template().opsForList().rightPush(this.name, e);
         return count != null && count > 0;
     }
 
@@ -144,18 +116,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
             return false;
         }
 
-        Long count;
-        if (this.duration > 0) {
-            int i = 0;
-            Object[] args = new Object[c.size() + 1];
-            args[i++] = this.duration;
-            for (E e : c) {
-                args[i++] = e;
-            }
-            count = this.template().execute(ADD_DURATION_SCRIPT, Collections.singletonList(this.name), args);
-        } else {
-            count = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
-        }
+        Long count = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
         return count != null && count > 0;
     }
 

+ 19 - 47
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java → framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalRedisQueue.java

@@ -23,13 +23,13 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
 import org.springframework.data.redis.core.script.RedisScript;
 
 /**
- * 带时间的唯一性Redis队列
+ * 带时性Redis队列
  *
  * @param <E> 元素类型
  * @author Woody
  * @date 2024/8/29
  */
-public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
+public class TemporalRedisQueue<E> extends AbstractQueue<E> {
     /**
      * Redis队列添加元素脚本(如果存在则忽略)
      */
@@ -37,56 +37,36 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
             "return redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV))", Long.class
     );
 
-    /**
-     * 带过期时间Redis队列添加元素脚本(如果存在则忽略)
-     */
-    private static final RedisScript<Long> ADD_DURATION_SCRIPT = new DefaultRedisScript<>(
-            "local value = redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV, 2)) " +
-                    "if value > 0 and redis.call('TTL', KEYS[1]) < 0 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end " +
-                    "return value", Long.class
-    );
-
     /**
      * Redis队列弹出元素脚本
      */
     @SuppressWarnings("rawtypes")
     private static final RedisScript<List> POLL_SCRIPT = new DefaultRedisScript<>(
             "local value = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, 1) " +
-                    "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
+                    "if value[1] then redis.call('ZADD', KEYS[1], 'XX', ARGV[3], value[1]) end return value", List.class
     );
 
     private final long idle;
     private final String name;
-    private final long duration;
     private final RedisTemplate<String, E> template;
 
-    public TemporalUniqueRedisQueue(@NonNull String name) {
+    public TemporalRedisQueue(@NonNull String name) {
         this(name, Duration.ofMinutes(1));
     }
 
-    public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle) {
-        this(name, idle, Duration.ofSeconds(-1));
+    public TemporalRedisQueue(@NonNull String name, @NonNull Duration idle) {
+        this(name, null, idle);
     }
 
-    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
+    public TemporalRedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
         this(name, template, Duration.ofMinutes(1));
     }
 
-    public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle, @NonNull Duration duration) {
-        this(name, null, idle, duration);
-    }
-
-    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle) {
-        this(name, template, idle, Duration.ofSeconds(-1));
-    }
-
-    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle,
-                                    @NonNull Duration duration) {
+    public TemporalRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle) {
         this.idle = idle.toMillis();
         AssertUtils.check(this.idle > 0, () -> "idle must be greater than 0");
         this.name = name;
         this.template = template;
-        this.duration = Math.max(duration.getSeconds(), -1);
     }
 
     /**
@@ -132,12 +112,12 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
     }
 
     /**
-     * 获取消费元素集合
+     * 判断元素是否在临时状态
      *
      * @param c 元素集合
-     * @return 元素/是否正在消费映射表
+     * @return 元素/是否在临时状态映射表
      */
-    public Map<E, Boolean> consumings(Collection<E> c) {
+    public Map<E, Boolean> temporally(Collection<E> c) {
         if (ObjectUtils.isEmpty(c)) {
             return Collections.emptyMap();
         }
@@ -190,15 +170,8 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
             return false;
         }
 
-        int i = 0;
-        Object[] args = new Object[this.duration > 0 ? 3 : 2];
-        if (this.duration > 0) {
-            args[i++] = this.duration;
-        }
-        args[i++] = System.currentTimeMillis();
-        args[i] = e;
-        RedisScript<Long> script = this.duration > 0 ? ADD_DURATION_SCRIPT : ADD_SCRIPT;
-        Long count = this.template().execute(script, Collections.singletonList(this.name), args);
+        Object[] args = new Object[]{System.currentTimeMillis(), e};
+        Long count = this.template().execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
         return count != null && count > 0;
     }
 
@@ -208,7 +181,10 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         List<String> keys = Collections.singletonList(this.name);
         long min = 0, max = System.currentTimeMillis(), score = max + this.idle;
         List<E> values = this.template().execute(POLL_SCRIPT, keys, min, max, score);
-        return ObjectUtils.isEmpty(values) ? null : values.get(0);
+        if (ObjectUtils.isEmpty(values)) {
+            return null;
+        }
+        return values.get(0);
     }
 
     @Override
@@ -230,17 +206,13 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         }
 
         int i = 0;
-        Object[] args = new Object[c.size() * 2 + (this.duration > 0 ? 1 : 0)];
-        if (this.duration > 0) {
-            args[i++] = this.duration;
-        }
+        Object[] args = new Object[c.size() * 2];
         long timestamp = System.currentTimeMillis();
         for (E e : c) {
             args[i++] = timestamp++;
             args[i++] = e;
         }
-        RedisScript<Long> script = this.duration > 0 ? ADD_DURATION_SCRIPT : ADD_SCRIPT;
-        Long count = this.template().execute(script, Collections.singletonList(this.name), args);
+        Long count = this.template().execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
         return count != null && count > 0;
     }