ソースを参照

Redis消息队列优化

woody 8 ヶ月 前
コミット
2a14648c3d

+ 28 - 0
framework-common/src/main/java/com/chelvc/framework/common/util/ThreadUtils.java

@@ -146,6 +146,34 @@ public final class ThreadUtils {
         }
         }
     }
     }
 
 
+    /**
+     * 异步消费消息
+     *
+     * @param queue    消息队列
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     * @return 消费线程
+     */
+    public static <T> Thread consume(@NonNull Queue<T> queue, @NonNull Consumer<T> consumer) {
+        return run(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                // 从队列中获取消息
+                T message = queue.poll();
+                if (message == null) {
+                    sleep(1000);
+                    continue;
+                }
+
+                // 消费消息
+                try {
+                    consumer.accept(message);
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+        });
+    }
+
     /**
     /**
      * 异步消费消息
      * 异步消费消息
      *
      *

+ 0 - 47
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisContextHolder.java

@@ -4,7 +4,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -771,52 +770,6 @@ public final class RedisContextHolder {
         throw new RuntimeException("Generate identity attempts limit exceeded");
         throw new RuntimeException("Generate identity attempts limit exceeded");
     }
     }
 
 
-    /**
-     * 从队列中弹出数据
-     *
-     * @param queue    队列名称
-     * @param supplier 数据提供者
-     * @param <T>      数据类型
-     * @return 数据对象实例
-     */
-    public static <T> T pop(@NonNull String queue, @NonNull Supplier<List<T>> supplier) {
-        return pop(getRedisTemplate(), queue, supplier);
-    }
-
-    /**
-     * 从队列中弹出数据
-     *
-     * @param template Redis操作模版实例
-     * @param queue    队列名称
-     * @param supplier 数据提供者
-     * @param <K>      键类型
-     * @param <V>      值类型
-     * @return 数据对象实例
-     */
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public static <K, V> V pop(@NonNull RedisTemplate<K, ?> template, @NonNull K queue,
-                               @NonNull Supplier<List<V>> supplier) {
-        // 尝试从队列中获取数据
-        V object = (V) template.opsForList().rightPop(queue);
-        if (object == null) {
-            // 获取redis锁
-            object = lockAround(queue + ":lock", () -> {
-                V v = (V) template.opsForList().rightPop(queue);
-                if (v == null) {
-                    // 加载队列数据
-                    List<V> objects = supplier.get();
-                    if (ObjectUtils.isEmpty(objects)) {
-                        return null;
-                    }
-                    template.opsForList().leftPushAll(queue, (Collection) objects);
-                    v = (V) template.opsForList().rightPop(queue);
-                }
-                return v;
-            });
-        }
-        return object;
-    }
-
     /**
     /**
      * 获取键值
      * 获取键值
      *
      *

+ 106 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -0,0 +1,106 @@
+package com.chelvc.framework.redis.queue;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+
+import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.redis.context.RedisContextHolder;
+import lombok.NonNull;
+import org.springframework.data.redis.core.ListOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+
+/**
+ * Redis队列实现
+ *
+ * @param <E> 元素类型
+ * @author Woody
+ * @date 2024/8/19
+ */
+public class RedisQueue<E> extends AbstractQueue<E> {
+    private final String name;
+    private final RedisTemplate<String, E> template;
+
+    public RedisQueue(@NonNull String name) {
+        this(name, null);
+    }
+
+    public RedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
+        this.name = name;
+        this.template = template;
+    }
+
+    /**
+     * 集合迭代器实现
+     */
+    private class Iter implements Iterator<E> {
+        private int index = 0;
+        private final int size;
+
+        public Iter(int size) {
+            this.size = size;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.index < this.size;
+        }
+
+        @Override
+        public E next() {
+            return handler().index(name, this.index++);
+        }
+    }
+
+    /**
+     * 获取Redis List处理器实例
+     *
+     * @return Redis List处理器实例
+     */
+    private ListOperations<String, E> handler() {
+        RedisTemplate<String, E> template = ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
+        return template.opsForList();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return new Iter(this.size());
+    }
+
+    @Override
+    public int size() {
+        Long size = this.handler().size(this.name);
+        return size == null ? 0 : size.intValue();
+    }
+
+    @Override
+    public boolean offer(E e) {
+        Long size = this.handler().rightPush(this.name, e);
+        return size != null && size > 0;
+    }
+
+    @Override
+    public E poll() {
+        return this.handler().leftPop(this.name);
+    }
+
+    @Override
+    public E peek() {
+        return this.handler().index(this.name, 0);
+    }
+
+    @Override
+    public void clear() {
+        this.handler().getOperations().delete(this.name);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean addAll(Collection<? extends E> c) {
+        if (ObjectUtils.isEmpty(c)) {
+            return false;
+        }
+        Long size = this.handler().rightPushAll(this.name, (Collection<E>) c);
+        return size != null && size > 0;
+    }
+}