Ver código fonte

优化Redis分布式消息队列处理逻辑

Woody 6 dias atrás
pai
commit
3684f5f398

+ 17 - 14
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java

@@ -6,6 +6,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
@@ -71,28 +72,28 @@ public class RedisQueue<E> extends AbstractQueue<E> {
     }
 
     /**
-     * 构建消息队列
+     * 获取消息队列
      *
      * @param name 队列名称
      * @param <E>  消息类型
      * @return 队列实例
      */
-    public static <E> RedisQueue<E> build(@NonNull String name) {
-        return build(name, RedisQueue::new);
+    public static <E> RedisQueue<E> get(@NonNull String name) {
+        return get(name, RedisQueue::new);
     }
 
     /**
-     * 构建消息队列
+     * 获取消息队列
      *
-     * @param name     队列名称
-     * @param function 队列构建函数
-     * @param <E>      消息类型
+     * @param name    队列名称
+     * @param builder 队列构建函数
+     * @param <E>     消息类型
      * @return 队列实例
      */
     @SuppressWarnings("unchecked")
-    public static <E> RedisQueue<E> build(@NonNull String name, @NonNull Function<String, RedisQueue<E>> function) {
+    public static <E> RedisQueue<E> get(@NonNull String name, @NonNull Function<String, RedisQueue<E>> builder) {
         RedisQueue<E> queue = INSTANCES.get(name);
-        return queue == null ? INSTANCES.computeIfAbsent(name, function) : queue;
+        return queue == null ? INSTANCES.computeIfAbsent(name, builder) : queue;
     }
 
     /**
@@ -116,12 +117,13 @@ public class RedisQueue<E> extends AbstractQueue<E> {
     }
 
     @Override
-    public boolean offer(E e) {
-        if (e == null) {
-            return false;
-        }
+    public boolean add(E e) {
+        return this.offer(e);
+    }
 
-        Long count = this.template().opsForList().rightPush(this.name, e);
+    @Override
+    public boolean offer(E e) {
+        Long count = this.template().opsForList().rightPush(this.name, Objects.requireNonNull(e));
         return count != null && count > 0;
     }
 
@@ -147,6 +149,7 @@ public class RedisQueue<E> extends AbstractQueue<E> {
             return false;
         }
 
+        c.forEach(Objects::requireNonNull);
         Long count = this.template().opsForList().rightPushAll(this.name, (Collection<E>) c);
         return count != null && count > 0;
     }

+ 18 - 12
framework-redis/src/main/java/com/chelvc/framework/redis/queue/TemporalRedisQueue.java

@@ -7,6 +7,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 
@@ -111,29 +112,29 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
     }
 
     /**
-     * 构建消息队列
+     * 获取消息队列
      *
      * @param name 队列名称
      * @param <E>  消息类型
      * @return 队列实例
      */
-    public static <E> TemporalRedisQueue<E> build(@NonNull String name) {
-        return build(name, TemporalRedisQueue::new);
+    public static <E> TemporalRedisQueue<E> get(@NonNull String name) {
+        return get(name, TemporalRedisQueue::new);
     }
 
     /**
-     * 构建消息队列
+     * 获取消息队列
      *
-     * @param name     队列名称
-     * @param function 队列构建函数
-     * @param <E>      消息类型
+     * @param name    队列名称
+     * @param builder 队列构建函数
+     * @param <E>     消息类型
      * @return 队列实例
      */
     @SuppressWarnings("unchecked")
-    public static <E> TemporalRedisQueue<E> build(@NonNull String name,
-                                                  @NonNull Function<String, TemporalRedisQueue<E>> function) {
+    public static <E> TemporalRedisQueue<E> get(@NonNull String name,
+                                                @NonNull Function<String, TemporalRedisQueue<E>> builder) {
         TemporalRedisQueue<E> queue = INSTANCES.get(name);
-        return queue == null ? INSTANCES.computeIfAbsent(name, function) : queue;
+        return queue == null ? INSTANCES.computeIfAbsent(name, builder) : queue;
     }
 
     /**
@@ -237,9 +238,14 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
         return size == null ? 0 : size.intValue();
     }
 
+    @Override
+    public boolean add(E e) {
+        return this.offer(e);
+    }
+
     @Override
     public boolean offer(E e) {
-        return e != null && this.add(new Object[]{System.currentTimeMillis(), e});
+        return this.add(new Object[]{System.currentTimeMillis(), Objects.requireNonNull(e)});
     }
 
     @Override
@@ -274,7 +280,7 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
         long timestamp = System.currentTimeMillis();
         for (E e : c) {
             args[i++] = timestamp++;
-            args[i++] = e;
+            args[i++] = Objects.requireNonNull(e);
         }
         return this.add(args);
     }