Browse Source

优化Redis队列处理逻辑

woody 7 months ago
parent
commit
7e3af19183

+ 6 - 39
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalUniqueRedisQueue.java

@@ -6,13 +6,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.redis.context.RedisContextHolder;
-import com.google.common.collect.Maps;
 import lombok.NonNull;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.redis.core.RedisOperations;
@@ -132,45 +130,14 @@ public class TemporalUniqueRedisQueue<E> extends AbstractQueue<E> {
     }
 
     /**
-     * 判断元素是否存在
+     * 获取消费中元素集合
      *
-     * @param c 元素集合
-     * @return 元素/是否存在映射表
+     * @return 元素集合
      */
-    public Map<E, Boolean> contains(Collection<E> c) {
-        if (ObjectUtils.isEmpty(c)) {
-            return Collections.emptyMap();
-        }
-
-        // 处理单个元素
-        if (c.size() == 1) {
-            E e = c.iterator().next();
-            Map<E, Boolean> contains = Maps.newHashMapWithExpectedSize(1);
-            contains.put(e, this.contains(e));
-            return contains;
-        }
-
-        // 批量处理多个元素
-        List<?> indexes = this.template().executePipelined(new SessionCallback<Object>() {
-            @Override
-            @SuppressWarnings("unchecked")
-            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
-                for (E e : c) {
-                    operations.opsForZSet().rank((K) name, e);
-                }
-                return null;
-            }
-        });
-        if (ObjectUtils.isEmpty(indexes)) {
-            return Collections.emptyMap();
-        }
-        int i = 0;
-        Map<E, Boolean> contains = Maps.newHashMapWithExpectedSize(c.size());
-        for (E e : c) {
-            Long index = (Long) indexes.get(i++);
-            contains.put(e, index != null && index >= 0);
-        }
-        return contains;
+    public Set<E> consuming() {
+        long min = System.currentTimeMillis() + 100, max = Long.MAX_VALUE;
+        Set<E> polled = this.template().opsForZSet().rangeByScore(this.name, min, max);
+        return ObjectUtils.ifNull(polled, Collections::emptySet);
     }
 
     @Override