Quellcode durchsuchen

优化Redis队列实现逻辑

woody vor 9 Monaten
Ursprung
Commit
36e3fe9465

+ 50 - 7
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -1,14 +1,19 @@
 package com.chelvc.framework.redis.queue;
 
+import java.time.Duration;
 import java.util.AbstractQueue;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
 import lombok.NonNull;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
 
 /**
  * Redis队列实现
@@ -18,16 +23,35 @@ import org.springframework.data.redis.core.RedisTemplate;
  * @date 2024/8/19
  */
 public class RedisQueue<E> extends AbstractQueue<E> {
+    /**
+     * 带过期时间Redis队列添加元素脚本
+     */
+    private static final RedisScript<Long> ADD_DURATION_SCRIPT = new DefaultRedisScript<>(
+            "local value = redis.call('RPUSH', KEYS[1], unpack(ARGV, 2)) " +
+                    "if value > 0 and redis.call('TTL', KEYS[1]) < 0 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end " +
+                    "return value", Long.class
+    );
+
     private final String name;
+    private final long duration;
     private final RedisTemplate<String, E> template;
 
     public RedisQueue(@NonNull String name) {
-        this(name, null);
+        this(name, Duration.ofSeconds(-1));
+    }
+
+    public RedisQueue(@NonNull String name, @NonNull Duration duration) {
+        this(name, null, duration);
     }
 
     public RedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
+        this(name, template, Duration.ofSeconds(-1));
+    }
+
+    public RedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration duration) {
         this.name = name;
         this.template = template;
+        this.duration = Math.max(duration.getSeconds(), -1);
     }
 
     /**
@@ -69,8 +93,8 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
     @Override
     public int size() {
-        Long size = this.template().opsForList().size(this.name);
-        return size == null ? 0 : size.intValue();
+        Long count = this.template().opsForList().size(this.name);
+        return count == null ? 0 : count.intValue();
     }
 
     @Override
@@ -78,8 +102,15 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         if (e == null) {
             return false;
         }
-        Long size = this.template().opsForList().rightPush(this.name, e);
-        return size != null && size > 0;
+
+        Long count;
+        if (this.duration > 0) {
+            List<String> keys = Collections.singletonList(this.name);
+            count = this.template().execute(ADD_DURATION_SCRIPT, keys, this.duration, e);
+        } else {
+            count = this.template().opsForList().rightPush(this.name, e);
+        }
+        return count != null && count > 0;
     }
 
     @Override
@@ -103,7 +134,19 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         if (ObjectUtils.isEmpty(c)) {
             return false;
         }
-        Long size = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
-        return size != null && size > 0;
+
+        Long count;
+        if (this.duration > 0) {
+            int i = 0;
+            Object[] args = new Object[c.size() + 1];
+            args[i++] = this.duration;
+            for (E e : c) {
+                args[i++] = e;
+            }
+            count = this.template().execute(ADD_DURATION_SCRIPT, Collections.singletonList(this.name), args);
+        } else {
+            count = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
+        }
+        return count != null && count > 0;
     }
 }

+ 44 - 16
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java

@@ -31,6 +31,15 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
             "return redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV))", Long.class
     );
 
+    /**
+     * 带过期时间Redis队列添加元素脚本(如果存在则忽略)
+     */
+    private static final RedisScript<Long> ADD_DURATION_SCRIPT = new DefaultRedisScript<>(
+            "local value = redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV, 2)) " +
+                    "if value > 0 and redis.call('TTL', KEYS[1]) < 0 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end " +
+                    "return value", Long.class
+    );
+
     /**
      * Redis队列弹出元素脚本
      */
@@ -42,16 +51,27 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
 
     private final long idle;
     private final String name;
+    private final long duration;
     private final RedisTemplate<String, E> template;
 
     public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle) {
-        this(name, null, idle);
+        this(name, idle, Duration.ofSeconds(-1));
+    }
+
+    public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle, @NonNull Duration duration) {
+        this(name, null, idle, duration);
     }
 
     public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle) {
+        this(name, template, idle, Duration.ofSeconds(-1));
+    }
+
+    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle,
+                                    @NonNull Duration duration) {
         this.name = name;
-        this.idle = idle.toMillis();
+        this.idle = Math.max(idle.toMillis(), 0);
         this.template = template;
+        this.duration = Math.max(duration.getSeconds(), -1);
     }
 
     /**
@@ -105,8 +125,8 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
     @Override
     public int size() {
         long min = 0, max = System.currentTimeMillis();
-        Long size = this.template().opsForZSet().count(this.name, min, max);
-        return size == null ? 0 : size.intValue();
+        Long count = this.template().opsForZSet().count(this.name, min, max);
+        return count == null ? 0 : count.intValue();
     }
 
     @Override
@@ -114,13 +134,16 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         if (e == null) {
             return false;
         }
-        long timestamp = System.currentTimeMillis();
-        RedisTemplate<String, E> template = this.template();
-        Object[] args = new Object[]{
-                RedisContextHolder.serialize(template.getValueSerializer(), timestamp),
-                RedisContextHolder.serialize(template.getValueSerializer(), e)
-        };
-        Long count = template.execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
+
+        int i = 0;
+        Object[] args = new Object[this.duration > 0 ? 3 : 2];
+        if (this.duration > 0) {
+            args[i++] = this.duration;
+        }
+        args[i++] = System.currentTimeMillis();
+        args[i] = e;
+        RedisScript<Long> script = this.duration > 0 ? ADD_DURATION_SCRIPT : ADD_SCRIPT;
+        Long count = this.template().execute(script, Collections.singletonList(this.name), args);
         return count != null && count > 0;
     }
 
@@ -155,14 +178,17 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         }
 
         int i = 0;
-        Object[] args = new Object[c.size() * 2];
+        Object[] args = new Object[c.size() * 2 + (this.duration > 0 ? 1 : 0)];
+        if (this.duration > 0) {
+            args[i++] = this.duration;
+        }
         long timestamp = System.currentTimeMillis();
-        RedisTemplate<String, E> template = this.template();
         for (E e : c) {
-            args[i++] = RedisContextHolder.serialize(template.getValueSerializer(), timestamp++);
-            args[i++] = RedisContextHolder.serialize(template.getValueSerializer(), e);
+            args[i++] = timestamp++;
+            args[i++] = e;
         }
-        Long count = template.execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
+        RedisScript<Long> script = this.duration > 0 ? ADD_DURATION_SCRIPT : ADD_SCRIPT;
+        Long count = this.template().execute(script, Collections.singletonList(this.name), args);
         return count != null && count > 0;
     }
 
@@ -171,6 +197,7 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         if (o == null) {
             return false;
         }
+
         Long count = this.template().opsForZSet().remove(this.name, o);
         return count != null && count > 0;
     }
@@ -180,6 +207,7 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         if (ObjectUtils.isEmpty(c)) {
             return false;
         }
+
         Long count = this.template().opsForZSet().remove(this.name, c.toArray());
         return count != null && count > 0;
     }