Jelajahi Sumber

优化Redis队列处理逻辑

woody 7 bulan lalu
induk
melakukan
0ce2adbc30

+ 31 - 4
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -58,6 +58,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
      * 集合迭代器实现
      */
     private class Iter implements Iterator<E> {
+        private E element;
         private int index = 0;
         private final int size;
 
@@ -73,7 +74,15 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
         @Override
         public E next() {
-            return template().opsForList().index(name, this.index++);
+            return this.element = template().opsForList().index(name, this.index++);
+        }
+
+        @Override
+        public void remove() {
+            if (this.element != null) {
+                template().opsForList().remove(name, 1, this.element);
+                this.element = null;
+            }
         }
     }
 
@@ -93,8 +102,8 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
     @Override
     public int size() {
-        Long count = this.template().opsForList().size(this.name);
-        return count == null ? 0 : count.intValue();
+        Long size = this.template().opsForList().size(this.name);
+        return size == null ? 0 : size.intValue();
     }
 
     @Override
@@ -125,7 +134,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
 
     @Override
     public void clear() {
-        this.template().opsForList().getOperations().delete(this.name);
+        this.template().delete(this.name);
     }
 
     @Override
@@ -149,4 +158,22 @@ public class RedisQueue<E> extends AbstractQueue<E> {
         }
         return count != null && count > 0;
     }
+
+    @Override
+    public Object[] toArray() {
+        List<E> values = this.template().opsForList().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptyList).toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        List<E> values = this.template().opsForList().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptyList).toArray(a);
+    }
+
+    @Override
+    public String toString() {
+        List<E> values = this.template().opsForList().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptyList).toString();
+    }
 }

+ 71 - 21
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java

@@ -6,11 +6,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import lombok.NonNull;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.redis.core.RedisOperations;
@@ -52,24 +55,23 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
                     "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
     );
 
-    /**
-     * Redis队列弹出元素脚本
-     */
-    @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> CONTAIN_SCRIPT = new DefaultRedisScript<>(
-            "local value = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, 1) " +
-                    "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
-    );
-
     private final long idle;
     private final String name;
     private final long duration;
     private final RedisTemplate<String, E> template;
 
+    public TemporalUniqueRedisQueue(@NonNull String name) {
+        this(name, Duration.ofMinutes(1));
+    }
+
     public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle) {
         this(name, idle, Duration.ofSeconds(-1));
     }
 
+    public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
+        this(name, template, Duration.ofMinutes(1));
+    }
+
     public TemporalUniqueRedisQueue(@NonNull String name, @NonNull Duration idle, @NonNull Duration duration) {
         this(name, null, idle, duration);
     }
@@ -80,8 +82,9 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
 
     public TemporalUniqueRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle,
                                     @NonNull Duration duration) {
+        this.idle = idle.toMillis();
+        AssertUtils.check(this.idle > 0, () -> "idle must be greater than 0");
         this.name = name;
-        this.idle = Math.max(idle.toMillis(), 0);
         this.template = template;
         this.duration = Math.max(duration.getSeconds(), -1);
     }
@@ -93,7 +96,6 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         private E element;
         private int index = 0;
         private final int size;
-        private final long max = System.currentTimeMillis();
 
         public Iter(int size) {
             AssertUtils.check(size > -1, () -> "size must be greater than -1");
@@ -107,7 +109,7 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
 
         @Override
         public E next() {
-            Set<E> values = template().opsForZSet().rangeByScore(name, 0, this.max, this.index++, 1);
+            Set<E> values = template().opsForZSet().range(name, this.index, this.index++);
             return ObjectUtils.isEmpty(values) ? null : (this.element = values.iterator().next());
         }
 
@@ -130,14 +132,45 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
     }
 
     /**
-     * 获取消费元素集合
+     * 获取消费元素集合
      *
-     * @return 元素集合
+     * @param c 元素集合
+     * @return 元素/是否正在消费映射表
      */
-    public Set<E> consuming() {
-        long min = System.currentTimeMillis() + 100, max = Long.MAX_VALUE;
-        Set<E> polled = this.template().opsForZSet().rangeByScore(this.name, min, max);
-        return ObjectUtils.ifNull(polled, Collections::emptySet);
+    public Map<E, Boolean> consumings(Collection<E> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return Collections.emptyMap();
+        }
+
+        // 处理单个元素
+        if (c.size() == 1) {
+            E e = c.iterator().next();
+            Double score = this.template().opsForZSet().score(this.name, e);
+            return ImmutableMap.of(e, score != null && score > System.currentTimeMillis());
+        }
+
+        // 批量处理多个元素
+        List<?> scores = this.template().executePipelined(new SessionCallback<Object>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                for (E e : c) {
+                    operations.opsForZSet().score((K) name, e);
+                }
+                return null;
+            }
+        });
+        if (ObjectUtils.isEmpty(scores)) {
+            return Collections.emptyMap();
+        }
+        int i = 0;
+        long timestamp = System.currentTimeMillis();
+        Map<E, Boolean> consumings = Maps.newHashMapWithExpectedSize(c.size());
+        for (E e : c) {
+            Double score = (Double) scores.get(i++);
+            consumings.put(e, score != null && score > timestamp);
+        }
+        return consumings;
     }
 
     @Override
@@ -147,9 +180,8 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
 
     @Override
     public int size() {
-        long min = 0, max = System.currentTimeMillis();
-        Long count = this.template().opsForZSet().count(this.name, min, max);
-        return count == null ? 0 : count.intValue();
+        Long size = this.template().opsForZSet().size(this.name);
+        return size == null ? 0 : size.intValue();
     }
 
     @Override
@@ -270,4 +302,22 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         Long count = this.template().opsForZSet().remove(this.name, c.toArray());
         return count != null && count > 0;
     }
+
+    @Override
+    public Object[] toArray() {
+        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toArray(a);
+    }
+
+    @Override
+    public String toString() {
+        Set<E> values = this.template().opsForZSet().range(this.name, 0, -1);
+        return ObjectUtils.ifNull(values, Collections::emptySet).toString();
+    }
 }