woody преди 6 месеца
родител
ревизия
1ecb0e40fe

+ 16 - 6
framework-base/src/main/java/com/chelvc/framework/base/context/SessionContextHolder.java

@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonEncoding;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 import org.springframework.web.context.request.RequestContextHolder;
@@ -471,18 +472,27 @@ public class SessionContextHolder implements ServletRequestListener {
      */
     public static boolean isVersion(@NonNull com.chelvc.framework.base.annotation.Version... versions) {
         return ObjectUtils.notEmpty(versions) && Stream.of(versions).anyMatch(version -> {
+            String value = version.value();
+            if (StringUtils.isEmpty(value)) {
+                return false;
+            }
+            Environment environment = ApplicationContextHolder.getEnvironment(false);
+            if (environment != null) {
+                value = environment.resolvePlaceholders(value);
+            }
+            value = ApplicationContextHolder.getProperty(value, value);
             if (version.compare() == Compare.EQ) {
-                return isVersion(version.terminal(), version.value());
+                return isVersion(version.terminal(), value);
             } else if (version.compare() == Compare.NE) {
-                return !isVersion(version.terminal(), version.value());
+                return !isVersion(version.terminal(), value);
             } else if (version.compare() == Compare.GT) {
-                return isAfterVersion(version.terminal(), version.value());
+                return isAfterVersion(version.terminal(), value);
             } else if (version.compare() == Compare.GE) {
-                return isAfterVersion(version.terminal(), version.value(), true);
+                return isAfterVersion(version.terminal(), value, true);
             } else if (version.compare() == Compare.LT) {
-                return isBeforeVersion(version.terminal(), version.value());
+                return isBeforeVersion(version.terminal(), value);
             } else if (version.compare() == Compare.LE) {
-                return isBeforeVersion(version.terminal(), version.value(), true);
+                return isBeforeVersion(version.terminal(), value, true);
             }
             return false;
         });

+ 4 - 2
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -2,9 +2,10 @@ package com.chelvc.framework.redis.queue;
 
 import java.lang.reflect.Type;
 import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -77,8 +78,9 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
                            @NonNull RedisMQConsumer annotation) {
         int concurrency = annotation.concurrency();
         AssertUtils.check(concurrency > 0, () -> "Consumer concurrency must be greater than 0");
+        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>((int) (concurrency / 0.1) * 2);
         ExecutorService executor = new ThreadPoolExecutor(
-                concurrency, concurrency, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
+                concurrency, concurrency, 60L, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy()
         );
         this.initialize(type, annotation.topic(), annotation.group(), annotation.batch(), listener, executor);
     }

+ 9 - 7
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -25,13 +25,14 @@ import org.springframework.data.redis.stream.StreamListener;
 @Slf4j
 public class MessageStreamListener<T> implements StreamListener<String, MapRecord<String, String, String>> {
     /**
-     * ZSET批量获取同时增加分数值脚本
+     * 批量获取延迟消息脚本
      */
     @SuppressWarnings("rawtypes")
-    private static final RedisScript<List> RANGE_INCREMENT_SCRIPT = new DefaultRedisScript<>(
-            "local records = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[4]) " +
-                    "for i = 1, #records do redis.call('ZINCRBY', KEYS[1], ARGV[3], records[i]) end return records",
-            List.class
+    private static final RedisScript<List> DELAY_RANGE_SCRIPT = new DefaultRedisScript<>(
+            "local records = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
+                    "if #records > 0 then local members = {} for i = 1, #records " +
+                    "do table.insert(members, ARGV[4]) table.insert(members, records[i]) end " +
+                    "redis.call('ZADD', KEYS[1], 'XX', unpack(members)) end return records", List.class
     );
 
     private final Type type;
@@ -57,10 +58,11 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
             while (!Thread.currentThread().isInterrupted()) {
                 // 批量获取延时消息(100条),为避免重复获取将消息延时时间增加1分钟
                 List<String> messages = null;
-                String timestamp = String.valueOf(System.currentTimeMillis());
+                long now = System.currentTimeMillis(), delaying = now + 60000;
                 RedisTemplate<String, T> template = RedisStreamHolder.getStreamTemplate();
+                Object[] args = new Object[]{"-1", String.valueOf(now), "100", String.valueOf(delaying)};
                 try {
-                    messages = template.execute(RANGE_INCREMENT_SCRIPT, keys, "-1", timestamp, "60000", "100");
+                    messages = template.execute(DELAY_RANGE_SCRIPT, keys, args);
                 } catch (Throwable t) {
                     log.error("RedisMQ message consume failed: {}", this.consumer, t);
                 }