ソースを参照

Redis、RocketMQ工具优化

woody 1 年間 前
コミット
642a71ae71

+ 45 - 10
framework-redis/src/main/java/com/chelvc/framework/redis/util/RedisUtils.java

@@ -19,6 +19,7 @@ import com.chelvc.framework.base.function.Executor;
 import com.chelvc.framework.base.util.ObjectUtils;
 import com.chelvc.framework.base.util.StringUtils;
 import com.chelvc.framework.base.util.ThreadUtils;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -53,9 +54,9 @@ import org.springframework.util.CollectionUtils;
 @Component
 public class RedisUtils implements ApplicationContextAware {
     /**
-     * 默认有效时间(秒)
+     * 默认锁超时时间(秒)
      */
-    private static final long DEFAULT_DURATION = 10 * 60;
+    private static final long DEFAULT_LOCK_TIMEOUT = 60;
 
     /**
      * 随机串Rdids Key前缀
@@ -155,6 +156,14 @@ public class RedisUtils implements ApplicationContextAware {
                     "else redis.call('SETEX', KEYS[1], ARGV[3], ARGV[2]) " +
                     "return redis.call('DECRBY', KEYS[1], ARGV[1]) end";
 
+    /**
+     * Redis批量设值脚本(如果成功则更新过期时间)
+     */
+    private static final String MSETNX_WITH_DURATION_SCRIPT =
+            "local args = {} for i = 1, #KEYS do table.insert(args, KEYS[i]) table.insert(args, ARGV[i]) end " +
+                    "if redis.call('MSETNX', unpack(args)) == 1 then for i = 1, #KEYS do " +
+                    "redis.call('EXPIRE', KEYS[i], ARGV[#ARGV]) end return 1 else return 0 end";
+
     /**
      * Redis连接工厂
      */
@@ -219,7 +228,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 锁标识
      */
     public static String lock(String name) {
-        return lock(name, Duration.ofSeconds(DEFAULT_DURATION));
+        return lock(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT));
     }
 
     /**
@@ -261,7 +270,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 锁标识
      */
     public static String tryLock(String name) {
-        return tryLock(name, Duration.ofSeconds(DEFAULT_DURATION));
+        return tryLock(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT));
     }
 
     /**
@@ -315,7 +324,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @param executor 回调方法
      */
     public static void lockAround(String name, @NonNull Executor executor) {
-        lockAround(name, Duration.ofSeconds(DEFAULT_DURATION), executor);
+        lockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), executor);
     }
 
     /**
@@ -345,7 +354,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 执行结果
      */
     public static <T> T lockAround(String name, Supplier<T> executor) {
-        return lockAround(name, Duration.ofSeconds(DEFAULT_DURATION), executor);
+        return lockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), executor);
     }
 
     /**
@@ -371,7 +380,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 执行结果
      */
     public static <T> T lockAround(String name, @NonNull Supplier<T> executor, @NonNull Supplier<T> failure) {
-        return lockAround(name, Duration.ofSeconds(DEFAULT_DURATION), executor, failure);
+        return lockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), executor, failure);
     }
 
     /**
@@ -404,7 +413,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @param executor 回调方法
      */
     public static void tryLockAround(String name, @NonNull Executor executor) {
-        tryLockAround(name, Duration.ofSeconds(DEFAULT_DURATION), executor);
+        tryLockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), executor);
     }
 
     /**
@@ -434,7 +443,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 执行结果
      */
     public static <T> T tryLockAround(String name, Supplier<T> supplier) {
-        return tryLockAround(name, Duration.ofSeconds(DEFAULT_DURATION), supplier);
+        return tryLockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), supplier);
     }
 
     /**
@@ -460,7 +469,7 @@ public class RedisUtils implements ApplicationContextAware {
      * @return 执行结果
      */
     public static <T> T tryLockAround(String name, @NonNull Supplier<T> supplier, @NonNull Supplier<T> failure) {
-        return tryLockAround(name, Duration.ofSeconds(DEFAULT_DURATION), supplier, failure);
+        return tryLockAround(name, Duration.ofSeconds(DEFAULT_LOCK_TIMEOUT), supplier, failure);
     }
 
     /**
@@ -635,6 +644,32 @@ public class RedisUtils implements ApplicationContextAware {
         return template.execute(script, Collections.singletonList(key), initialValue, duration.getSeconds());
     }
 
+    /**
+     * 批量设置带过期时间的值,只有在全部设置成功的情况才更新过期时间
+     *
+     * @param template Redis操作模版实例
+     * @param mapping  键/值映射表
+     * @param duration 有效时间
+     * @param <K>      键类型
+     * @return true/false
+     */
+    public static <K> boolean msetnx(@NonNull RedisTemplate<K, ?> template, @NonNull Map<K, ?> mapping,
+                                     @NonNull Duration duration) {
+        if (CollectionUtils.isEmpty(mapping)) {
+            return false;
+        }
+        int i = 0;
+        List<K> keys = Lists.newArrayListWithCapacity(mapping.size());
+        Object[] values = new Object[mapping.size() + 1];
+        for (Map.Entry<K, ?> entry : mapping.entrySet()) {
+            keys.add(entry.getKey());
+            values[i++] = entry.getValue();
+        }
+        values[i] = duration.getSeconds();
+        RedisScript<Boolean> script = new DefaultRedisScript<>(MSETNX_WITH_DURATION_SCRIPT, Boolean.class);
+        return Boolean.TRUE.equals(template.execute(script, keys, values));
+    }
+
     /**
      * 数字自增1,如果键不存在则对值初始化
      *

+ 26 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -342,6 +342,32 @@ public class RocketMQContextHolder implements RocketMQLocalTransactionListener,
         }
     }
 
+    /**
+     * 发送顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     */
+    public static void sendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key) {
+        sendOrderly(topic, payload, key, true);
+    }
+
+    /**
+     * 发送顺序消息
+     *
+     * @param topic   消息主题
+     * @param payload 消息内容
+     * @param key     唯一标识
+     * @param isolate 是否需要环境隔离
+     */
+    public static void sendOrderly(@NonNull String topic, @NonNull Object payload, @NonNull Object key,
+                                   boolean isolate) {
+        getRocketMQTemplate().syncSendOrderly(
+                isolate ? getProfileTopic(topic) : topic, payload2message(payload), String.valueOf(key)
+        );
+    }
+
     /**
      * 发送事务消息
      *

+ 3 - 3
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/interceptor/NativeRocketMQListener.java

@@ -182,10 +182,10 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
         this.initializeMethodParameter();
 
         // 重置消息监听器
-        RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
+        RocketMQMessageListener annotation = this.target.getAnnotation(RocketMQMessageListener.class);
         ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
         if (mode == ConsumeMode.ORDERLY) {
-            consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
+            this.consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
                 try {
                     return this.consume(messages, context);
                 } catch (Exception e) {
@@ -196,7 +196,7 @@ public class NativeRocketMQListener<T> implements RocketMQPushConsumerLifecycleL
                 }
             });
         } else {
-            consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
+            this.consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
                 try {
                     return this.consume(messages, context);
                 } catch (Exception e) {