|
@@ -8,9 +8,12 @@ import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
+import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
import com.chelvc.framework.common.util.AssertUtils;
|
|
|
import com.chelvc.framework.common.util.ObjectUtils;
|
|
|
+import com.chelvc.framework.common.util.StringUtils;
|
|
|
import com.chelvc.framework.redis.context.RedisContextHolder;
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.Maps;
|
|
@@ -30,6 +33,12 @@ import org.springframework.data.redis.core.script.RedisScript;
|
|
|
* @date 2024/8/29
|
|
|
*/
|
|
|
public class TemporalRedisQueue<E> extends AbstractQueue<E> {
|
|
|
+ /**
|
|
|
+ * 队列名称/实例映射表
|
|
|
+ */
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ private static final Map<String, TemporalRedisQueue> INSTANCES = Maps.newConcurrentMap();
|
|
|
+
|
|
|
/**
|
|
|
* Redis队列添加元素脚本(如果存在则忽略)
|
|
|
*/
|
|
@@ -47,26 +56,25 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
|
|
|
);
|
|
|
|
|
|
private final long idle;
|
|
|
+ private final long timeout;
|
|
|
private final String name;
|
|
|
- private final RedisTemplate<String, E> template;
|
|
|
+ private final List<String> keys;
|
|
|
|
|
|
public TemporalRedisQueue(@NonNull String name) {
|
|
|
- this(name, Duration.ofMinutes(1));
|
|
|
+ this(name, Duration.ofMinutes(1), Duration.ZERO);
|
|
|
}
|
|
|
|
|
|
public TemporalRedisQueue(@NonNull String name, @NonNull Duration idle) {
|
|
|
- this(name, null, idle);
|
|
|
+ this(name, idle, Duration.ZERO);
|
|
|
}
|
|
|
|
|
|
- public TemporalRedisQueue(@NonNull String name, RedisTemplate<String, E> template) {
|
|
|
- this(name, template, Duration.ofMinutes(1));
|
|
|
- }
|
|
|
-
|
|
|
- public TemporalRedisQueue(@NonNull String name, RedisTemplate<String, E> template, @NonNull Duration idle) {
|
|
|
+ public TemporalRedisQueue(@NonNull String name, @NonNull Duration idle, @NonNull Duration timeout) {
|
|
|
+ String profile = ApplicationContextHolder.getProfile();
|
|
|
+ this.name = StringUtils.isEmpty(profile) ? name : (profile + "-" + name);
|
|
|
this.idle = idle.toMillis();
|
|
|
AssertUtils.check(this.idle > 0, () -> "idle must be greater than 0");
|
|
|
- this.name = name;
|
|
|
- this.template = template;
|
|
|
+ this.timeout = timeout.toMillis();
|
|
|
+ this.keys = Collections.singletonList(this.name);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -102,13 +110,65 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 构建消息队列
|
|
|
+ *
|
|
|
+ * @param name 队列名称
|
|
|
+ * @param <E> 消息类型
|
|
|
+ * @return 队列实例
|
|
|
+ */
|
|
|
+ public static <E> TemporalRedisQueue<E> build(@NonNull String name) {
|
|
|
+ return build(name, TemporalRedisQueue::new);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构建消息队列
|
|
|
+ *
|
|
|
+ * @param name 队列名称
|
|
|
+ * @param function 队列构建函数
|
|
|
+ * @param <E> 消息类型
|
|
|
+ * @return 队列实例
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public static <E> TemporalRedisQueue<E> build(@NonNull String name,
|
|
|
+ @NonNull Function<String, TemporalRedisQueue<E>> function) {
|
|
|
+ TemporalRedisQueue<E> queue = INSTANCES.get(name);
|
|
|
+ return queue == null ? INSTANCES.computeIfAbsent(name, function) : queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加元素
|
|
|
+ *
|
|
|
+ * @param args 添加参数
|
|
|
+ * @return true/false
|
|
|
+ */
|
|
|
+ private boolean add(Object[] args) {
|
|
|
+ Long count;
|
|
|
+ if (this.timeout > 0) {
|
|
|
+ long max = System.currentTimeMillis() - this.timeout - 1000;
|
|
|
+ List<Object> values = this.template().executePipelined(new SessionCallback<Object>() {
|
|
|
+ @Override
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
|
|
|
+ operations.execute(ADD_SCRIPT, (List<K>) keys, args);
|
|
|
+ operations.opsForZSet().removeRangeByScore((K) name, 0, max);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ count = ObjectUtils.isEmpty(values) ? null : (Long) values.get(0);
|
|
|
+ } else {
|
|
|
+ count = this.template().execute(ADD_SCRIPT, this.keys, args);
|
|
|
+ }
|
|
|
+ return count != null && count > 0;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 获取RedisTemplate实例
|
|
|
*
|
|
|
* @return RedisTemplate实例
|
|
|
*/
|
|
|
- private RedisTemplate<String, E> template() {
|
|
|
- return ObjectUtils.ifNull(this.template, RedisContextHolder::getDefaultTemplate);
|
|
|
+ protected RedisTemplate<String, E> template() {
|
|
|
+ return RedisContextHolder.getDefaultTemplate();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -179,21 +239,15 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
|
|
|
|
|
|
@Override
|
|
|
public boolean offer(E e) {
|
|
|
- if (e == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- Object[] args = new Object[]{System.currentTimeMillis(), e};
|
|
|
- Long count = this.template().execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
|
|
|
- return count != null && count > 0;
|
|
|
+ return e != null && this.add(new Object[]{System.currentTimeMillis(), e});
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public E poll() {
|
|
|
- List<String> keys = Collections.singletonList(this.name);
|
|
|
- long min = 0, max = System.currentTimeMillis(), score = max + this.idle;
|
|
|
- List<E> values = this.template().execute(POLL_SCRIPT, keys, min, max, score);
|
|
|
+ long max = System.currentTimeMillis();
|
|
|
+ long min = this.timeout > 0 ? max - this.timeout : 0, score = max + this.idle;
|
|
|
+ List<E> values = this.template().execute(POLL_SCRIPT, this.keys, min, max, score);
|
|
|
return ObjectUtils.isEmpty(values) ? null : values.get(0);
|
|
|
}
|
|
|
|
|
@@ -222,8 +276,7 @@ public class TemporalRedisQueue<E> extends AbstractQueue<E> {
|
|
|
args[i++] = timestamp++;
|
|
|
args[i++] = e;
|
|
|
}
|
|
|
- Long count = this.template().execute(ADD_SCRIPT, Collections.singletonList(this.name), args);
|
|
|
- return count != null && count > 0;
|
|
|
+ return this.add(args);
|
|
|
}
|
|
|
|
|
|
@Override
|