Bläddra i källkod

新增Redis队列判断元素是否存在方法

woody 7 månader sedan
förälder
incheckning
abd58ece5c

+ 92 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java

@@ -6,13 +6,18 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 
 
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
 import com.chelvc.framework.redis.context.RedisContextHolder;
+import com.google.common.collect.Maps;
 import lombok.NonNull;
 import lombok.NonNull;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.core.RedisOperations;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SessionCallback;
 import org.springframework.data.redis.core.script.DefaultRedisScript;
 import org.springframework.data.redis.core.script.DefaultRedisScript;
 import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.core.script.RedisScript;
 
 
@@ -49,6 +54,15 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
                     "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
                     "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
     );
     );
 
 
+    /**
+     * Redis队列弹出元素脚本
+     */
+    @SuppressWarnings("rawtypes")
+    private static final RedisScript<List> CONTAIN_SCRIPT = new DefaultRedisScript<>(
+            "local value = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, 1) " +
+                    "if value[1] then redis.call('ZADD', KEYS[1], ARGV[3], value[1]) end return value", List.class
+    );
+
     private final long idle;
     private final long idle;
     private final String name;
     private final String name;
     private final long duration;
     private final long duration;
@@ -117,6 +131,48 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         return ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
         return ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
     }
     }
 
 
+    /**
+     * 判断元素是否存在
+     *
+     * @param c 元素集合
+     * @return 元素/是否存在映射表
+     */
+    public Map<E, Boolean> contains(Collection<E> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return Collections.emptyMap();
+        }
+
+        // 处理单个元素
+        if (c.size() == 1) {
+            E e = c.iterator().next();
+            Map<E, Boolean> contains = Maps.newHashMapWithExpectedSize(1);
+            contains.put(e, this.contains(e));
+            return contains;
+        }
+
+        // 批量处理多个元素
+        List<?> indexes = this.template().executePipelined(new SessionCallback<Object>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                for (E e : c) {
+                    operations.opsForZSet().rank((K) name, e);
+                }
+                return null;
+            }
+        });
+        if (ObjectUtils.isEmpty(indexes)) {
+            return Collections.emptyMap();
+        }
+        int i = 0;
+        Map<E, Boolean> contains = Maps.newHashMapWithExpectedSize(c.size());
+        for (E e : c) {
+            Long index = (Long) indexes.get(i++);
+            contains.put(e, index != null && index >= 0);
+        }
+        return contains;
+    }
+
     @Override
     @Override
     public Iterator<E> iterator() {
     public Iterator<E> iterator() {
         return new Iter(this.size());
         return new Iter(this.size());
@@ -192,6 +248,42 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
         return count != null && count > 0;
         return count != null && count > 0;
     }
     }
 
 
+    @Override
+    public boolean contains(Object o) {
+        if (o == null) {
+            return false;
+        }
+
+        Long index = this.template().opsForZSet().rank(this.name, o);
+        return index != null && index >= 0;
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+
+        // 处理单个元素
+        if (c.size() == 1) {
+            return this.contains(c.iterator().next());
+        }
+
+        // 批量处理多个元素
+        List<?> indexes = this.template().executePipelined(new SessionCallback<Object>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
+                for (Object o : c) {
+                    operations.opsForZSet().rank((K) name, o);
+                }
+                return null;
+            }
+        });
+        return ObjectUtils.notEmpty(indexes) && indexes.size() == c.size()
+                && indexes.stream().allMatch(index -> index != null && ((Long) index) >= 0);
+    }
+
     @Override
     @Override
     public boolean remove(Object o) {
     public boolean remove(Object o) {
         if (o == null) {
         if (o == null) {