Bladeren bron

代码优化

Woody 3 weken geleden
bovenliggende
commit
1c56cf3c40

+ 2 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -22,7 +22,7 @@ import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.kafka.producer.TransactionMessage;
 import com.chelvc.framework.redis.queue.DelayRedisQueue;
-import com.chelvc.framework.redis.queue.Queues;
+import com.chelvc.framework.redis.queue.RedisQueues;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,7 +71,7 @@ public final class KafkaContextHolder {
      * @return 队列实例
      */
     public static DelayRedisQueue<String> getTransactionQueue() {
-        return Queues.getDelayRedisQueue(
+        return RedisQueues.getDelayRedisQueue(
                 ApplicationContextHolder.getApplicationName(true) + "-kafka-transaction-message",
                 name -> new DelayRedisQueue<>(name, Duration.ofSeconds(60), Duration.ZERO)
         );

+ 2 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java

@@ -6,7 +6,7 @@ import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
 import com.chelvc.framework.redis.context.RedisContextHolder;
 import com.chelvc.framework.redis.queue.DelayRedisQueue;
-import com.chelvc.framework.redis.queue.Queues;
+import com.chelvc.framework.redis.queue.RedisQueues;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -62,7 +62,7 @@ public class TransactionMessageProcessor implements ApplicationRunner {
     @Override
     public void run(ApplicationArguments args) throws Exception {
         DelayRedisQueue<String> queue = KafkaContextHolder.getTransactionQueue();
-        Queues.consume(queue, 100, messages -> messages.forEach(message -> {
+        RedisQueues.consume(queue, 100, messages -> messages.forEach(message -> {
             try {
                 this.processing(queue, message);
             } catch (Exception e) {

+ 1 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -42,7 +42,7 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
                 return RedisStreamHolder.getStreamTemplate();
             }
         };
-        Queues.consume(this.delayQueue, 100, messages -> messages.forEach(this::processing));
+        RedisQueues.consume(this.delayQueue, 100, messages -> messages.forEach(this::processing));
     }
 
     /**

+ 4 - 4
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueue.java → framework-redis/src/main/java/com/chelvc/framework/redis/queue/NormalRedisQueue.java

@@ -16,21 +16,21 @@ import lombok.NonNull;
 import org.springframework.data.redis.core.RedisTemplate;
 
 /**
- * Redis队列实现
+ * Redis普通队列实现
  *
  * @param <E> 元素类型
  * @author Woody
  * @date 2024/8/19
  */
 @Getter
-public class RedisQueue<E> extends AbstractQueue<E> {
+public class NormalRedisQueue<E> extends AbstractQueue<E> {
     private final String name;
 
-    public RedisQueue(@NonNull String name) {
+    public NormalRedisQueue(@NonNull String name) {
         this(ApplicationContextHolder.getProfile(), name);
     }
 
-    RedisQueue(String namespace, @NonNull String name) {
+    NormalRedisQueue(String namespace, @NonNull String name) {
         this.name = StringUtils.isEmpty(namespace) ? name : (namespace + "-" + name);
     }
 

+ 13 - 2
framework-redis/src/main/java/com/chelvc/framework/redis/queue/PriorityRedisQueue.java

@@ -25,7 +25,7 @@ import org.springframework.data.redis.core.script.RedisScript;
  * @author Woody
  * @date 2025/4/28
  */
-public class PriorityRedisQueue<E> extends RedisQueue<E> {
+public class PriorityRedisQueue<E> extends NormalRedisQueue<E> {
     /**
      * 添加或忽略脚本
      */
@@ -166,6 +166,17 @@ public class PriorityRedisQueue<E> extends RedisQueue<E> {
         return this.offer(e, priority, true);
     }
 
+    /**
+     * 将元素放入队列
+     *
+     * @param e        元素
+     * @param override 是否可覆盖
+     * @return true/false
+     */
+    public boolean offer(@NonNull E e, boolean override) {
+        return this.offer(e, System.currentTimeMillis(), override);
+    }
+
     /**
      * 将元素放入队列
      *
@@ -208,7 +219,7 @@ public class PriorityRedisQueue<E> extends RedisQueue<E> {
 
     @Override
     public boolean offer(E e) {
-        return this.offer(e, System.currentTimeMillis());
+        return this.offer(e, true);
     }
 
     @Override

+ 34 - 34
framework-redis/src/main/java/com/chelvc/framework/redis/queue/Queues.java → framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueues.java

@@ -13,47 +13,47 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 /**
- * 分布式队列处理工具类
+ * Redis队列处理工具类
  *
  * @author Woody
  * @date 2025/4/28
  */
 @Slf4j
-public final class Queues {
+public final class RedisQueues {
     /**
-     * 队列名称/实例映射表
+     * 普通队列名称/实例映射表
      */
     @SuppressWarnings("rawtypes")
-    private static final Map<String, RedisQueue> REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+    private static final Map<String, NormalRedisQueue> NORMAL_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
 
     /**
-     * 延时队列名称/实例映射表
+     * 优先级队列名称/实例映射表
      */
     @SuppressWarnings("rawtypes")
-    private static final Map<String, DelayRedisQueue> DELAY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+    private static final Map<String, PriorityRedisQueue> PRIORITY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
 
     /**
-     * 优先级队列名称/实例映射表
+     * 延时队列名称/实例映射表
      */
     @SuppressWarnings("rawtypes")
-    private static final Map<String, PriorityRedisQueue> PRIORITY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
+    private static final Map<String, DelayRedisQueue> DELAY_REDIS_QUEUE_INSTANCES = Maps.newConcurrentMap();
 
-    private Queues() {
+    private RedisQueues() {
     }
 
     /**
-     * 获取消息队列
+     * 获取普通消息队列
      *
      * @param name 队列名称
      * @param <E>  消息类型
      * @return 队列实例
      */
-    public static <E> RedisQueue<E> getRedisQueue(@NonNull String name) {
-        return getRedisQueue(name, RedisQueue::new);
+    public static <E> NormalRedisQueue<E> getNormalRedisQueue(@NonNull String name) {
+        return getNormalRedisQueue(name, NormalRedisQueue::new);
     }
 
     /**
-     * 获取消息队列
+     * 获取普通消息队列
      *
      * @param name    队列名称
      * @param builder 队列构建函数
@@ -61,25 +61,25 @@ public final class Queues {
      * @return 队列实例
      */
     @SuppressWarnings("unchecked")
-    public static <E> RedisQueue<E> getRedisQueue(@NonNull String name,
-                                                  @NonNull Function<String, RedisQueue<E>> builder) {
-        RedisQueue<E> queue = REDIS_QUEUE_INSTANCES.get(name);
-        return queue == null ? REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    public static <E> NormalRedisQueue<E> getNormalRedisQueue(@NonNull String name,
+                                                              @NonNull Function<String, NormalRedisQueue<E>> builder) {
+        NormalRedisQueue<E> queue = NORMAL_REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? NORMAL_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
     }
 
     /**
-     * 获取延时消息队列
+     * 获取优先级消息队列
      *
      * @param name 队列名称
      * @param <E>  消息类型
      * @return 队列实例
      */
-    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name) {
-        return getDelayRedisQueue(name, DelayRedisQueue::new);
+    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name) {
+        return getPriorityRedisQueue(name, PriorityRedisQueue::new);
     }
 
     /**
-     * 获取延时消息队列
+     * 获取优先级消息队列
      *
      * @param name    队列名称
      * @param builder 队列构建函数
@@ -87,25 +87,25 @@ public final class Queues {
      * @return 队列实例
      */
     @SuppressWarnings("unchecked")
-    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name,
-                                                            @NonNull Function<String, DelayRedisQueue<E>> builder) {
-        DelayRedisQueue<E> queue = DELAY_REDIS_QUEUE_INSTANCES.get(name);
-        return queue == null ? DELAY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name,
+                                                                  @NonNull Function<String, PriorityRedisQueue<E>> builder) {
+        PriorityRedisQueue<E> queue = PRIORITY_REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? PRIORITY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
     }
 
     /**
-     * 获取优先级消息队列
+     * 获取延时消息队列
      *
      * @param name 队列名称
      * @param <E>  消息类型
      * @return 队列实例
      */
-    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name) {
-        return getPriorityRedisQueue(name, PriorityRedisQueue::new);
+    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name) {
+        return getDelayRedisQueue(name, DelayRedisQueue::new);
     }
 
     /**
-     * 获取优先级消息队列
+     * 获取延时消息队列
      *
      * @param name    队列名称
      * @param builder 队列构建函数
@@ -113,10 +113,10 @@ public final class Queues {
      * @return 队列实例
      */
     @SuppressWarnings("unchecked")
-    public static <E> PriorityRedisQueue<E> getPriorityRedisQueue(@NonNull String name,
-                                                                  @NonNull Function<String, PriorityRedisQueue<E>> builder) {
-        PriorityRedisQueue<E> queue = PRIORITY_REDIS_QUEUE_INSTANCES.get(name);
-        return queue == null ? PRIORITY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
+    public static <E> DelayRedisQueue<E> getDelayRedisQueue(@NonNull String name,
+                                                            @NonNull Function<String, DelayRedisQueue<E>> builder) {
+        DelayRedisQueue<E> queue = DELAY_REDIS_QUEUE_INSTANCES.get(name);
+        return queue == null ? DELAY_REDIS_QUEUE_INSTANCES.computeIfAbsent(name, builder) : queue;
     }
 
     /**
@@ -126,7 +126,7 @@ public final class Queues {
      * @param consumer 消息消费者
      * @param <T>      消息类型
      */
-    public static <T> void consume(@NonNull RedisQueue<T> queue, @NonNull Consumer<T> consumer) {
+    public static <T> void consume(@NonNull NormalRedisQueue<T> queue, @NonNull Consumer<T> consumer) {
         ThreadUtils.run(() -> {
             while (!Thread.currentThread().isInterrupted()) {
                 // 拉取消息