Ver Fonte

优化Redis队列逻辑

woody há 7 meses atrás
pai
commit
1182a3c2d3

+ 8 - 8
framework-redis/src/main/java/com/chelvc/framework/redis/cache/RedisZSetCaching.java

@@ -122,7 +122,7 @@ public class RedisZSetCaching implements Caching {
      * 缓存升序批量获取脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> AES_LIST_SCRIPT = new DefaultRedisScript<>(
+    private static final RedisScript<List> ASC_LIST_SCRIPT = new DefaultRedisScript<>(
             "if redis.call('EXISTS', KEYS[2]) == 0 then return nil end " +
                     "local ids = redis.call('ZRANGEBYSCORE', KEYS[2], '('..ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
                     "local values = {} for i = 1, #ids do local value = redis.call('GET', KEYS[1]..':'..ids[i]) " +
@@ -133,7 +133,7 @@ public class RedisZSetCaching implements Caching {
      * 缓存升序批量获取并设置过期时间脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> AES_LIST_DEFER_SCRIPT = new DefaultRedisScript<>(
+    private static final RedisScript<List> ASC_LIST_DEFER_SCRIPT = new DefaultRedisScript<>(
             "if redis.call('EXISTS', KEYS[2]) == 0 then return nil end " +
                     "local ids = redis.call('ZRANGEBYSCORE', KEYS[2], '('..ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
                     "local values = {} for i = 1, #ids do local unique = KEYS[1]..':'..ids[i] " +
@@ -148,7 +148,7 @@ public class RedisZSetCaching implements Caching {
      * 缓存降序批量获取脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> DES_LIST_SCRIPT = new DefaultRedisScript<>(
+    private static final RedisScript<List> DESC_LIST_SCRIPT = new DefaultRedisScript<>(
             "if redis.call('EXISTS', KEYS[2]) == 0 then return nil end " +
                     "local ids = redis.call('ZREVRANGEBYSCORE', KEYS[2], '('..ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
                     "local values = {} for i = 1, #ids do local value = redis.call('GET', KEYS[1]..':'..ids[i]) " +
@@ -159,7 +159,7 @@ public class RedisZSetCaching implements Caching {
      * 缓存降序批量获取并设置过期时间脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> DES_LIST_DEFER_SCRIPT = new DefaultRedisScript<>(
+    private static final RedisScript<List> DESC_LIST_DEFER_SCRIPT = new DefaultRedisScript<>(
             "if redis.call('EXISTS', KEYS[2]) == 0 then return nil end " +
                     "local ids = redis.call('ZREVRANGEBYSCORE', KEYS[2], '('..ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
                     "local values = {} for i = 1, #ids do local unique = KEYS[1]..':'..ids[i] " +
@@ -338,15 +338,15 @@ public class RedisZSetCaching implements Caching {
         try {
             if (naming.isReversed()) {
                 if (naming.getDuration() > 0) {
-                    values = this.template.execute(DES_LIST_DEFER_SCRIPT, keys, max, min, size, naming.getDuration());
+                    values = this.template.execute(DESC_LIST_DEFER_SCRIPT, keys, max, min, size, naming.getDuration());
                 } else {
-                    values = this.template.execute(DES_LIST_SCRIPT, keys, max, min, size);
+                    values = this.template.execute(DESC_LIST_SCRIPT, keys, max, min, size);
                 }
             } else {
                 if (naming.getDuration() > 0) {
-                    values = this.template.execute(AES_LIST_DEFER_SCRIPT, keys, min, max, size, naming.getDuration());
+                    values = this.template.execute(ASC_LIST_DEFER_SCRIPT, keys, min, max, size, naming.getDuration());
                 } else {
-                    values = this.template.execute(AES_LIST_SCRIPT, keys, min, max, size);
+                    values = this.template.execute(ASC_LIST_SCRIPT, keys, min, max, size);
                 }
             }
         } catch (Throwable t) {

+ 16 - 13
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -4,10 +4,10 @@ import java.util.AbstractQueue;
 import java.util.Collection;
 import java.util.Iterator;
 
+import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
 import lombok.NonNull;
-import org.springframework.data.redis.core.ListOperations;
 import org.springframework.data.redis.core.RedisTemplate;
 
 /**
@@ -38,6 +38,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         private final int size;
 
         public Iter(int size) {
+            AssertUtils.check(size > -1, () -> "size must be greater than -1");
             this.size = size;
         }
 
@@ -48,18 +49,17 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
         @Override
         public E next() {
-            return handler().index(name, this.index++);
+            return template().opsForList().index(name, this.index++);
         }
     }
 
     /**
-     * 获取Redis List处理器实例
+     * 获取RedisTemplate实例
      *
-     * @return Redis List处理器实例
+     * @return RedisTemplate实例
      */
-    private ListOperations<String, E> handler() {
-        RedisTemplate<String, E> template = ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
-        return template.opsForList();
+    private RedisTemplate<String, E> template() {
+        return ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
     }
 
     @Override
@@ -69,29 +69,32 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
     @Override
     public int size() {
-        Long size = this.handler().size(this.name);
+        Long size = this.template().opsForList().size(this.name);
         return size == null ? 0 : size.intValue();
     }
 
     @Override
     public boolean offer(E e) {
-        Long size = this.handler().rightPush(this.name, e);
+        if (e == null) {
+            return false;
+        }
+        Long size = this.template().opsForList().rightPush(this.name, e);
         return size != null && size > 0;
     }
 
     @Override
     public E poll() {
-        return this.handler().leftPop(this.name);
+        return this.template().opsForList().leftPop(this.name);
     }
 
     @Override
     public E peek() {
-        return this.handler().index(this.name, 0);
+        return this.template().opsForList().index(this.name, 0);
     }
 
     @Override
     public void clear() {
-        this.handler().getOperations().delete(this.name);
+        this.template().opsForList().getOperations().delete(this.name);
     }
 
     @Override
@@ -100,7 +103,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         if (ObjectUtils.isEmpty(c)) {
             return false;
         }
-        Long size = this.handler().rightPushAll(this.name, (Collection<E>) c);
+        Long size = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
         return size != null && size > 0;
     }
 }

+ 186 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java

@@ -0,0 +1,186 @@
+package com.chelvc.framework.redis.queue;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.chelvc.framework.common.util.AssertUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.redis.context.RedisContextHolder;
+import lombok.NonNull;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
+
+/**
+ * 带时间的唯一性Redis队列
+ *
+ * @param <E> 元素类型
+ * @author Woody
+ * @date 2024/8/29
+ */
+public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
+    /**
+     * Redis队列添加元素脚本(如果存在则忽略)
+     */
+    private static final RedisScript<Long> ADD_SCRIPT = new DefaultRedisScript<>(
+            "return redis.call('ZADD', KEYS[1], 'NX', 'CH', unpack(ARGV))", Long.class
+    );
+
+    /**
+     * Redis队列弹出元素脚本
+     */
+    @SuppressWarnings("rawtypes")
+    private static final RedisScript<List> POLL_SCRIPT = new DefaultRedisScript<>(
+            "local value = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, 1) " +
+                    "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
+    );
+
+    private final long idle;
+    private final String name;
+    private final RedisTemplate<String, E> template;
+
+    public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle) {
+        this(name, null, idle);
+    }
+
+    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle) {
+        this.name = name;
+        this.idle = idle.toMillis();
+        this.template = template;
+    }
+
+    /**
+     * 集合迭代器实现
+     */
+    private class Iter implements Iterator<E> {
+        private E element;
+        private int index = 0;
+        private final int size;
+        private final long max = System.currentTimeMillis();
+
+        public Iter(int size) {
+            AssertUtils.check(size > -1, () -> "size must be greater than -1");
+            this.size = size;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.index < this.size;
+        }
+
+        @Override
+        public E next() {
+            Set<E> values = template().opsForZSet().rangeByScore(name, 0, this.max, this.index++, 1);
+            return ObjectUtils.isEmpty(values) ? null : (this.element = values.iterator().next());
+        }
+
+        @Override
+        public void remove() {
+            if (this.element != null) {
+                template().opsForZSet().remove(name, this.element);
+                this.element = null;
+            }
+        }
+    }
+
+    /**
+     * 获取RedisTemplate实例
+     *
+     * @return RedisTemplate实例
+     */
+    private RedisTemplate<String, E> template() {
+        return ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return new Iter(this.size());
+    }
+
+    @Override
+    public int size() {
+        long min = 0, max = System.currentTimeMillis();
+        Long size = this.template().opsForZSet().count(this.name, min, max);
+        return size == null ? 0 : size.intValue();
+    }
+
+    @Override
+    public boolean offer(E e) {
+        if (e == null) {
+            return false;
+        }
+        long timestamp = System.currentTimeMillis();
+        RedisTemplate<String, E> template = this.template();
+        Object[] args = new Object[]{
+                RedisContextHolder.serialize(template.getValueSerializer(), timestamp),
+                RedisContextHolder.serialize(template.getValueSerializer(), e)
+        };
+        Long count = template.execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
+        return count != null && count > 0;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public E poll() {
+        List<String> keys = Collections.singletonList(this.name);
+        long min = 0, max = System.currentTimeMillis(), score = max + this.idle;
+        List<E> values = this.template().execute(POLL_SCRIPT, keys, min, max, score);
+        if (ObjectUtils.isEmpty(values)) {
+            return null;
+        }
+        return values.get(0);
+    }
+
+    @Override
+    public E peek() {
+        long min = 0, max = System.currentTimeMillis();
+        Set<E> values = this.template().opsForZSet().rangeByScore(this.name, min, max, 0, 1);
+        return ObjectUtils.isEmpty(values) ? null : values.iterator().next();
+    }
+
+    @Override
+    public void clear() {
+        this.template().delete(this.name);
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+
+        int i = 0;
+        Object[] args = new Object[c.size() * 2];
+        long timestamp = System.currentTimeMillis();
+        RedisTemplate<String, E> template = this.template();
+        for (E e : c) {
+            args[i++] = RedisContextHolder.serialize(template.getValueSerializer(), timestamp++);
+            args[i++] = RedisContextHolder.serialize(template.getValueSerializer(), e);
+        }
+        Long count = template.execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
+        return count != null && count > 0;
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        if (o == null) {
+            return false;
+        }
+        Long count = this.template().opsForZSet().remove(this.name, o);
+        return count != null && count > 0;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+        Long count = this.template().opsForZSet().remove(this.name, c.toArray());
+        return count != null && count > 0;
+    }
+}