|
@@ -1,62 +1,43 @@
|
|
|
package com.chelvc.framework.redis.context;
|
|
|
|
|
|
-import java.lang.reflect.Type;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.time.Duration;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
-import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
-import com.chelvc.framework.base.context.Session;
|
|
|
-import com.chelvc.framework.base.context.SessionContextHolder;
|
|
|
import com.chelvc.framework.common.function.Executor;
|
|
|
import com.chelvc.framework.common.util.IdentityUtils;
|
|
|
-import com.chelvc.framework.common.util.JacksonUtils;
|
|
|
import com.chelvc.framework.common.util.ObjectUtils;
|
|
|
import com.chelvc.framework.common.util.StringUtils;
|
|
|
import com.chelvc.framework.common.util.ThreadUtils;
|
|
|
-import com.google.common.collect.ImmutableMap;
|
|
|
-import com.google.common.collect.Lists;
|
|
|
import com.google.common.collect.Maps;
|
|
|
import lombok.NonNull;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
|
|
|
-import org.springframework.data.redis.RedisSystemException;
|
|
|
import org.springframework.data.redis.connection.RedisConnection;
|
|
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
|
|
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
|
|
-import org.springframework.data.redis.connection.RedisStreamCommands;
|
|
|
import org.springframework.data.redis.connection.RedisStringCommands;
|
|
|
import org.springframework.data.redis.connection.ReturnType;
|
|
|
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
|
|
-import org.springframework.data.redis.connection.stream.ByteRecord;
|
|
|
-import org.springframework.data.redis.connection.stream.MapRecord;
|
|
|
-import org.springframework.data.redis.connection.stream.RecordId;
|
|
|
-import org.springframework.data.redis.connection.stream.StreamInfo;
|
|
|
-import org.springframework.data.redis.connection.stream.StreamOffset;
|
|
|
import org.springframework.data.redis.core.RedisConnectionUtils;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
|
|
import org.springframework.data.redis.core.script.DigestUtils;
|
|
|
import org.springframework.data.redis.core.script.RedisScript;
|
|
|
import org.springframework.data.redis.core.types.Expiration;
|
|
|
-import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
|
|
import org.springframework.data.redis.serializer.RedisSerializer;
|
|
|
-import org.springframework.data.redis.stream.StreamListener;
|
|
|
-import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
-import org.springframework.util.ErrorHandler;
|
|
|
|
|
|
/**
|
|
|
* Redis上下文工具类
|
|
@@ -66,26 +47,6 @@ import org.springframework.util.ErrorHandler;
|
|
|
*/
|
|
|
@Slf4j
|
|
|
public final class RedisContextHolder {
|
|
|
- /**
|
|
|
- * 消息ID标识
|
|
|
- */
|
|
|
- private static final String ID = "id";
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息主题标识
|
|
|
- */
|
|
|
- private static final String TOPIC = "topic";
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息载体标识
|
|
|
- */
|
|
|
- private static final String PAYLOAD = "payload";
|
|
|
-
|
|
|
- /**
|
|
|
- * 延时时间标识
|
|
|
- */
|
|
|
- private static final String DELAYING = "delaying";
|
|
|
-
|
|
|
/**
|
|
|
* 默认锁超时时间(秒)
|
|
|
*/
|
|
@@ -224,11 +185,6 @@ public final class RedisContextHolder {
|
|
|
"else redis.call('SET', KEYS[1], 0) return 0 end", Long.class
|
|
|
);
|
|
|
|
|
|
- /**
|
|
|
- * 新增Stream消息流脚本
|
|
|
- */
|
|
|
- private static RedisScript<String> STREAM_ADD_SCRIPT;
|
|
|
-
|
|
|
/**
|
|
|
* Redis连接工厂
|
|
|
*/
|
|
@@ -249,21 +205,11 @@ public final class RedisContextHolder {
|
|
|
*/
|
|
|
private static RedisTemplate<String, Object> REDIS_TEMPLATE;
|
|
|
|
|
|
- /**
|
|
|
- * 字符串RedisTemplate实例
|
|
|
- */
|
|
|
- private static RedisTemplate<String, Object> STRING_TEMPLATE;
|
|
|
-
|
|
|
/**
|
|
|
* 默认RedisTemplate实例
|
|
|
*/
|
|
|
private static RedisTemplate<String, Object> DEFAULT_TEMPLATE;
|
|
|
|
|
|
- /**
|
|
|
- * 配置属性
|
|
|
- */
|
|
|
- private static com.chelvc.framework.redis.config.RedisProperties PROPERTIES;
|
|
|
-
|
|
|
private RedisContextHolder() {
|
|
|
}
|
|
|
|
|
@@ -343,49 +289,6 @@ public final class RedisContextHolder {
|
|
|
return REDIS_TEMPLATE;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 获取当前RedisTemplate实例
|
|
|
- *
|
|
|
- * @param database 数据库下标
|
|
|
- * @return RedisTemplate实例
|
|
|
- */
|
|
|
- public static RedisTemplate<String, Object> getRedisTemplate(int database) {
|
|
|
- RedisSerializer<?> valueSerializer =
|
|
|
- ApplicationContextHolder.getBean(Jackson2JsonRedisSerializer.class);
|
|
|
- RedisTemplate<String, Object> template = new RedisTemplate<>();
|
|
|
- template.setConnectionFactory(getConnectionFactory(getConfiguration(database)));
|
|
|
- template.setKeySerializer(RedisSerializer.string());
|
|
|
- template.setValueSerializer(valueSerializer);
|
|
|
- template.setHashKeySerializer(RedisSerializer.string());
|
|
|
- template.setHashValueSerializer(valueSerializer);
|
|
|
- template.afterPropertiesSet();
|
|
|
- return template;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取基于字符串配置的0号库RedisTemplate实例
|
|
|
- *
|
|
|
- * @return RedisTemplate实例
|
|
|
- */
|
|
|
- public static RedisTemplate<String, Object> getStringTemplate() {
|
|
|
- if (STRING_TEMPLATE == null) {
|
|
|
- synchronized (RedisTemplate.class) {
|
|
|
- if (STRING_TEMPLATE == null) {
|
|
|
- RedisTemplate<String, Object> template = new RedisTemplate<>();
|
|
|
- template.setConnectionFactory(getDefaultConnectionFactory());
|
|
|
- template.setKeySerializer(RedisSerializer.string());
|
|
|
- template.setValueSerializer(RedisSerializer.string());
|
|
|
- template.setHashKeySerializer(RedisSerializer.string());
|
|
|
- template.setHashValueSerializer(RedisSerializer.string());
|
|
|
- template.afterPropertiesSet();
|
|
|
-
|
|
|
- STRING_TEMPLATE = template;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return STRING_TEMPLATE;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 获取基于默认配置的0号库RedisTemplate实例
|
|
|
*
|
|
@@ -403,39 +306,21 @@ public final class RedisContextHolder {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取新增Stream消息流脚本
|
|
|
- *
|
|
|
- * @return Lua脚本
|
|
|
- */
|
|
|
- private static RedisScript<String> getStreamAddScript() {
|
|
|
- if (STREAM_ADD_SCRIPT == null) {
|
|
|
- synchronized (RedisContextHolder.class) {
|
|
|
- if (STREAM_ADD_SCRIPT == null) {
|
|
|
- int capacity = getProperties().getStream().getCapacity();
|
|
|
- if (capacity > 0) {
|
|
|
- STREAM_ADD_SCRIPT = new DefaultRedisScript<>(String.format(
|
|
|
- "return redis.call('XADD', KEYS[1], 'MAXLEN', '~', %d, '*', unpack(ARGV))", capacity
|
|
|
- ), String.class);
|
|
|
- } else {
|
|
|
- STREAM_ADD_SCRIPT = new DefaultRedisScript<>(
|
|
|
- "return redis.call('XADD', KEYS[1], '*', unpack(ARGV))", String.class
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return STREAM_ADD_SCRIPT;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 环境隔离
|
|
|
+ * 获取当前RedisTemplate实例
|
|
|
*
|
|
|
- * @param original 原始标记
|
|
|
- * @return 环境隔离标记
|
|
|
+ * @param database 数据库下标
|
|
|
+ * @return RedisTemplate实例
|
|
|
*/
|
|
|
- public static String isolate(@NonNull String original) {
|
|
|
- String namespace = getProperties().getNamespace();
|
|
|
- return StringUtils.isEmpty(namespace) ? original : (namespace + original);
|
|
|
+ public static RedisTemplate<String, Object> getRedisTemplate(int database) {
|
|
|
+ RedisTemplate<?, ?> reference = getRedisTemplate();
|
|
|
+ RedisTemplate<String, Object> template = new RedisTemplate<>();
|
|
|
+ template.setConnectionFactory(getConnectionFactory(getConfiguration(database)));
|
|
|
+ template.setKeySerializer(reference.getKeySerializer());
|
|
|
+ template.setValueSerializer(reference.getValueSerializer());
|
|
|
+ template.setHashKeySerializer(reference.getHashKeySerializer());
|
|
|
+ template.setHashValueSerializer(reference.getHashValueSerializer());
|
|
|
+ template.afterPropertiesSet();
|
|
|
+ return template;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -479,33 +364,6 @@ public final class RedisContextHolder {
|
|
|
return IDENTITY_GENERATOR;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 获取消息流配置属性
|
|
|
- *
|
|
|
- * @return 配置属性
|
|
|
- */
|
|
|
- private static com.chelvc.framework.redis.config.RedisProperties getProperties() {
|
|
|
- if (PROPERTIES == null) {
|
|
|
- synchronized (com.chelvc.framework.redis.config.RedisProperties.class) {
|
|
|
- if (PROPERTIES == null) {
|
|
|
- PROPERTIES = ApplicationContextHolder.getBean(
|
|
|
- com.chelvc.framework.redis.config.RedisProperties.class
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return PROPERTIES;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取MQ消费者空闲时间(秒)
|
|
|
- *
|
|
|
- * @return 空闲时间
|
|
|
- */
|
|
|
- public static int getStreamIdle() {
|
|
|
- return getProperties().getStream().getIdle();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 执行Redis操作
|
|
|
*
|
|
@@ -570,6 +428,21 @@ public final class RedisContextHolder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 对象序列化
|
|
|
+ *
|
|
|
+ * @param serializer 序列化处理器
|
|
|
+ * @param value 对象实例
|
|
|
+ * @return 字节数组
|
|
|
+ */
|
|
|
+ @SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
+ public static byte[] serialize(RedisSerializer serializer, Object value) {
|
|
|
+ if (serializer == null && value instanceof byte[]) {
|
|
|
+ return (byte[]) value;
|
|
|
+ }
|
|
|
+ return Objects.requireNonNull(serializer).serialize(value);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 加锁(阻塞),加锁成功则返回锁标识
|
|
|
*
|
|
@@ -881,7 +754,7 @@ public final class RedisContextHolder {
|
|
|
public static long sequence(@NonNull Duration duration) {
|
|
|
ThreadLocalRandom random = ThreadLocalRandom.current();
|
|
|
RedisConnectionFactory factory = getDefaultConnectionFactory();
|
|
|
- for (int i = 0; i < 10; i++) {
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
String time = IDENTITY_DATETIME_FORMATTER.format(LocalDateTime.now());
|
|
|
String value = StringUtils.rjust(String.valueOf(random.nextInt(100000)), 5, '0');
|
|
|
long identity = Long.parseLong(time + value);
|
|
@@ -1080,7 +953,7 @@ public final class RedisContextHolder {
|
|
|
* @param initialValue 初始化值
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static Long increment(@NonNull String key, long initialValue) {
|
|
|
+ public static long increment(@NonNull String key, long initialValue) {
|
|
|
return increment(getRedisTemplate(), key, initialValue);
|
|
|
}
|
|
|
|
|
@@ -1093,8 +966,9 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static <K> Long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue) {
|
|
|
- return template.execute(INCR_WITH_INITIAL_SCRIPT, Collections.singletonList(key), initialValue);
|
|
|
+ public static <K> long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue) {
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(INCR_WITH_INITIAL_SCRIPT, keys, initialValue));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1105,7 +979,7 @@ public final class RedisContextHolder {
|
|
|
* @param initialValue 初始化值
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static Long increment(@NonNull String key, long delta, long initialValue) {
|
|
|
+ public static long increment(@NonNull String key, long delta, long initialValue) {
|
|
|
return increment(getRedisTemplate(), key, delta, initialValue);
|
|
|
}
|
|
|
|
|
@@ -1119,9 +993,10 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static <K> Long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
+ public static <K> long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
long initialValue) {
|
|
|
- return template.execute(INCRBY_WITH_INITIAL_SCRIPT, Collections.singletonList(key), delta, initialValue);
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(INCRBY_WITH_INITIAL_SCRIPT, keys, delta, initialValue));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1132,7 +1007,7 @@ public final class RedisContextHolder {
|
|
|
* @param duration 有效时间
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static Long increment(@NonNull String key, long initialValue, @NonNull Duration duration) {
|
|
|
+ public static long increment(@NonNull String key, long initialValue, @NonNull Duration duration) {
|
|
|
return increment(getRedisTemplate(), key, initialValue, duration);
|
|
|
}
|
|
|
|
|
@@ -1146,10 +1021,12 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static <K> Long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue,
|
|
|
+ public static <K> long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue,
|
|
|
@NonNull Duration duration) {
|
|
|
- return template.execute(INCR_WITH_INITIAL_DURATION_SCRIPT, Collections.singletonList(key), initialValue,
|
|
|
- duration.getSeconds());
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(
|
|
|
+ INCR_WITH_INITIAL_DURATION_SCRIPT, keys, initialValue, duration.getSeconds()
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1161,7 +1038,7 @@ public final class RedisContextHolder {
|
|
|
* @param duration 有效时间
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static Long increment(@NonNull String key, long delta, long initialValue, @NonNull Duration duration) {
|
|
|
+ public static long increment(@NonNull String key, long delta, long initialValue, @NonNull Duration duration) {
|
|
|
return increment(getRedisTemplate(), key, delta, initialValue, duration);
|
|
|
}
|
|
|
|
|
@@ -1176,10 +1053,12 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自增后的值
|
|
|
*/
|
|
|
- public static <K> Long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
+ public static <K> long increment(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
long initialValue, @NonNull Duration duration) {
|
|
|
- return template.execute(INCRBY_WITH_INITIAL_DURATION_SCRIPT, Collections.singletonList(key), delta,
|
|
|
- initialValue, duration.getSeconds());
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(
|
|
|
+ INCRBY_WITH_INITIAL_DURATION_SCRIPT, keys, delta, initialValue, duration.getSeconds()
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1189,7 +1068,7 @@ public final class RedisContextHolder {
|
|
|
* @param initialValue 初始化值
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static Long decrement(@NonNull String key, long initialValue) {
|
|
|
+ public static long decrement(@NonNull String key, long initialValue) {
|
|
|
return decrement(getRedisTemplate(), key, initialValue);
|
|
|
}
|
|
|
|
|
@@ -1202,8 +1081,9 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static <K> Long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue) {
|
|
|
- return template.execute(DECR_WITH_INITIAL_SCRIPT, Collections.singletonList(key), initialValue);
|
|
|
+ public static <K> long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue) {
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(DECR_WITH_INITIAL_SCRIPT, keys, initialValue));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1214,7 +1094,7 @@ public final class RedisContextHolder {
|
|
|
* @param initialValue 初始化值
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static Long decrement(@NonNull String key, long delta, long initialValue) {
|
|
|
+ public static long decrement(@NonNull String key, long delta, long initialValue) {
|
|
|
return decrement(getRedisTemplate(), key, delta, initialValue);
|
|
|
}
|
|
|
|
|
@@ -1228,9 +1108,10 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static <K> Long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
+ public static <K> long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
long initialValue) {
|
|
|
- return template.execute(DECRBY_WITH_INITIAL_SCRIPT, Collections.singletonList(key), delta, initialValue);
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(DECRBY_WITH_INITIAL_SCRIPT, keys, delta, initialValue));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1241,7 +1122,7 @@ public final class RedisContextHolder {
|
|
|
* @param duration 有效时间
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static Long decrement(@NonNull String key, long initialValue, @NonNull Duration duration) {
|
|
|
+ public static long decrement(@NonNull String key, long initialValue, @NonNull Duration duration) {
|
|
|
return decrement(getRedisTemplate(), key, initialValue, duration);
|
|
|
}
|
|
|
|
|
@@ -1255,10 +1136,12 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static <K> Long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue,
|
|
|
+ public static <K> long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long initialValue,
|
|
|
@NonNull Duration duration) {
|
|
|
- return template.execute(DECR_WITH_INITIAL_DURATION_SCRIPT, Collections.singletonList(key), initialValue,
|
|
|
- duration.getSeconds());
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(
|
|
|
+ DECR_WITH_INITIAL_DURATION_SCRIPT, keys, initialValue, duration.getSeconds()
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1270,7 +1153,7 @@ public final class RedisContextHolder {
|
|
|
* @param duration 有效时间
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static Long decrement(@NonNull String key, long delta, long initialValue, @NonNull Duration duration) {
|
|
|
+ public static long decrement(@NonNull String key, long delta, long initialValue, @NonNull Duration duration) {
|
|
|
return decrement(getRedisTemplate(), key, delta, initialValue, duration);
|
|
|
}
|
|
|
|
|
@@ -1285,10 +1168,12 @@ public final class RedisContextHolder {
|
|
|
* @param <K> 键类型
|
|
|
* @return 自减后的值
|
|
|
*/
|
|
|
- public static <K> Long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
+ public static <K> long decrement(@NonNull RedisTemplate<K, ?> template, @NonNull K key, long delta,
|
|
|
long initialValue, @NonNull Duration duration) {
|
|
|
- return template.execute(DECRBY_WITH_INITIAL_DURATION_SCRIPT, Collections.singletonList(key), delta,
|
|
|
- initialValue, duration.getSeconds());
|
|
|
+ List<K> keys = Collections.singletonList(key);
|
|
|
+ return Objects.requireNonNull(template.execute(
|
|
|
+ DECRBY_WITH_INITIAL_DURATION_SCRIPT, keys, delta, initialValue, duration.getSeconds()
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1373,341 +1258,4 @@ public final class RedisContextHolder {
|
|
|
}
|
|
|
throw new RuntimeException("Generate random attempts limit exceeded");
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取当前时间距过期时间的时长
|
|
|
- *
|
|
|
- * @param expiration 过期时间
|
|
|
- * @return 持续时长
|
|
|
- */
|
|
|
- public static Duration duration(@NonNull Date expiration) {
|
|
|
- return Duration.ofMillis(Math.max(expiration.getTime() - System.currentTimeMillis(), 1000));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @return 消息记录ID
|
|
|
- */
|
|
|
- public static RecordId send(@NonNull String topic, @NonNull Object payload) {
|
|
|
- return send(topic, payload, (Long) null);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param delaying 延时消息时间
|
|
|
- * @return 消息记录ID
|
|
|
- */
|
|
|
- public static RecordId send(@NonNull String topic, @NonNull Object payload, Duration delaying) {
|
|
|
- return send(topic, payload, delaying == null ? null : System.currentTimeMillis() + delaying.toMillis());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param delaying 延时消息时间
|
|
|
- * @return 消息记录ID
|
|
|
- */
|
|
|
- public static RecordId send(@NonNull String topic, @NonNull Object payload, Date delaying) {
|
|
|
- return send(topic, payload, ObjectUtils.ifNull(delaying, Date::getTime));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @param delaying 延时消息时间
|
|
|
- * @return 消息记录ID
|
|
|
- */
|
|
|
- public static RecordId send(@NonNull String topic, @NonNull Object payload, Long delaying) {
|
|
|
- List<String> args = Lists.newLinkedList();
|
|
|
- args.add(PAYLOAD);
|
|
|
- args.add(JacksonUtils.serialize(payload));
|
|
|
- if (delaying != null) {
|
|
|
- args.add(DELAYING);
|
|
|
- args.add(String.valueOf(delaying));
|
|
|
- }
|
|
|
- Session session = SessionContextHolder.getSession(false);
|
|
|
- if (session != null) {
|
|
|
- args.add(Session.NAMING);
|
|
|
- args.add(JacksonUtils.serialize(session));
|
|
|
- }
|
|
|
- List<String> keys = Collections.singletonList(isolate(topic));
|
|
|
- String id = getStringTemplate().execute(getStreamAddScript(), keys, args.toArray());
|
|
|
- return StringUtils.ifEmpty(id, RecordId::of);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 转移消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param consumer 目标消费者
|
|
|
- * @param ids 目标记录ID数组
|
|
|
- * @return 转移成功消息记录列表
|
|
|
- */
|
|
|
- public static List<ByteRecord> claim(@NonNull String topic,
|
|
|
- @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
|
|
|
- @NonNull RecordId... ids) {
|
|
|
- return claim(topic, consumer.getGroup(), consumer.getName(), ids);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 转移消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param group 消费者组
|
|
|
- * @param consumer 目标消费者名称
|
|
|
- * @param ids 目标记录ID数组
|
|
|
- * @return 转移成功消息记录列表
|
|
|
- */
|
|
|
- public static List<ByteRecord> claim(@NonNull String topic, @NonNull String group, @NonNull String consumer,
|
|
|
- @NonNull RecordId... ids) {
|
|
|
- return claim(topic, group, consumer, Duration.ZERO, ids);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 转移消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param consumer 目标消费者
|
|
|
- * @param idle 消息空闲时间
|
|
|
- * @param ids 目标ID数组
|
|
|
- * @return 转移成功消息记录列表
|
|
|
- */
|
|
|
- public static List<ByteRecord> claim(@NonNull String topic,
|
|
|
- @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
|
|
|
- @NonNull Duration idle, @NonNull RecordId... ids) {
|
|
|
- return claim(topic, consumer.getGroup(), consumer.getName(), idle, ids);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 转移消息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param group 消费者组
|
|
|
- * @param consumer 目标消费者名称
|
|
|
- * @param idle 消息空闲时间
|
|
|
- * @param ids 目标ID数组
|
|
|
- * @return 转移成功消息记录列表
|
|
|
- */
|
|
|
- public static List<ByteRecord> claim(@NonNull String topic, @NonNull String group, @NonNull String consumer,
|
|
|
- @NonNull Duration idle, @NonNull RecordId... ids) {
|
|
|
- if (ObjectUtils.isEmpty(ids)) {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
- RedisStreamCommands.XClaimOptions options = RedisStreamCommands.XClaimOptions.minIdle(idle).ids(ids);
|
|
|
- return execute(getDefaultConnectionFactory(),
|
|
|
- connection -> connection.streamCommands().xClaim(topic.getBytes(), group, consumer, options));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息消费确认
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param group 消费者组
|
|
|
- * @param ids 消息记录ID数组
|
|
|
- */
|
|
|
- public static void ack(@NonNull String topic, @NonNull String group, @NonNull RecordId... ids) {
|
|
|
- if (ObjectUtils.notEmpty(ids)) {
|
|
|
- getStringTemplate().opsForStream().acknowledge(topic, group, ids);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 删除消费者
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param ids 消息记录ID数组
|
|
|
- */
|
|
|
- public static void delete(@NonNull String topic, @NonNull RecordId... ids) {
|
|
|
- if (ObjectUtils.notEmpty(ids)) {
|
|
|
- getStringTemplate().opsForStream().delete(topic, ids);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 删除消费者
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param consumer 消费者信息
|
|
|
- */
|
|
|
- public static void delete(@NonNull String topic,
|
|
|
- @NonNull org.springframework.data.redis.connection.stream.Consumer consumer) {
|
|
|
- getStringTemplate().opsForStream().deleteConsumer(topic, consumer);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送心跳包
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- */
|
|
|
- public static void heartbeat(@NonNull String topic) {
|
|
|
- List<String> keys = Collections.singletonList(topic);
|
|
|
- getStringTemplate().execute(getStreamAddScript(), keys, PAYLOAD, StringUtils.EMPTY);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 判断消息是否是心跳包
|
|
|
- *
|
|
|
- * @param record 消息记录
|
|
|
- * @return true/false
|
|
|
- */
|
|
|
- public static boolean isHeartbeat(MapRecord<String, String, String> record) {
|
|
|
- Map<String, String> value = ObjectUtils.ifNull(record, MapRecord::getValue);
|
|
|
- return ObjectUtils.isEmpty(value) || StringUtils.isEmpty(value.get(PAYLOAD));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 序列化消息记录
|
|
|
- *
|
|
|
- * @param record 消息记录
|
|
|
- * @return 消息内容
|
|
|
- */
|
|
|
- public static String serialize(@NonNull MapRecord<String, String, String> record) {
|
|
|
- Map<String, String> value = record.getValue();
|
|
|
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
|
|
|
- builder.put(ID, record.getId().getValue());
|
|
|
- builder.put(TOPIC, Objects.requireNonNull(record.getStream()));
|
|
|
- String payload = value.get(PAYLOAD);
|
|
|
- if (StringUtils.notEmpty(payload)) {
|
|
|
- builder.put(PAYLOAD, payload);
|
|
|
- }
|
|
|
- String session = value.get(Session.NAMING);
|
|
|
- if (StringUtils.notEmpty(session)) {
|
|
|
- builder.put(Session.NAMING, session);
|
|
|
- }
|
|
|
- String delaying = value.get(DELAYING);
|
|
|
- if (StringUtils.notEmpty(delaying)) {
|
|
|
- builder.put(DELAYING, delaying);
|
|
|
- }
|
|
|
- return JacksonUtils.serialize(builder.build());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 反序列化消息记录
|
|
|
- *
|
|
|
- * @param message 消息内容
|
|
|
- * @return 消息记录实例
|
|
|
- */
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public static MapRecord<String, String, String> deserialize(@NonNull String message) {
|
|
|
- Map<String, String> content = JacksonUtils.deserialize(message, Map.class);
|
|
|
- String id = content.remove(ID);
|
|
|
- String topic = content.remove(TOPIC);
|
|
|
- return MapRecord.create(topic, content).withId(RecordId.of(id));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 消费消息
|
|
|
- *
|
|
|
- * @param record 消息记录
|
|
|
- * @param type 消息类型
|
|
|
- * @param consumer 消息消费者
|
|
|
- */
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public static <T> void consume(@NonNull MapRecord<String, String, String> record, @NonNull Type type,
|
|
|
- @NonNull Consumer<T> consumer) {
|
|
|
- Map<String, String> value = record.getValue();
|
|
|
- T payload = (T) JacksonUtils.deserialize(value.get(PAYLOAD), type);
|
|
|
- Session session = JacksonUtils.deserialize(value.get(Session.NAMING), Session.class);
|
|
|
- SessionContextHolder.setSession(session);
|
|
|
- try {
|
|
|
- consumer.accept(payload);
|
|
|
- } finally {
|
|
|
- SessionContextHolder.clearSessionContext();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取消费者信息
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param group 消费者组
|
|
|
- * @return 消费者信息
|
|
|
- */
|
|
|
- public static StreamInfo.XInfoConsumers consumers(@NonNull String topic, @NonNull String group) {
|
|
|
- return getStringTemplate().opsForStream().consumers(topic, group);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取消息延时时间戳
|
|
|
- *
|
|
|
- * @param record 消息记录
|
|
|
- * @return 时间戳
|
|
|
- */
|
|
|
- public static Long getMessageDelaying(@NonNull MapRecord<String, String, String> record) {
|
|
|
- String delaying = ObjectUtils.ifNull(record.getValue(), value -> value.get(DELAYING));
|
|
|
- return StringUtils.ifEmpty(delaying, Long::parseLong);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化消费者组
|
|
|
- *
|
|
|
- * @param topic 消息主题
|
|
|
- * @param group 消费者组
|
|
|
- */
|
|
|
- public static void initializeConsumerGroup(@NonNull String topic, @NonNull String group) {
|
|
|
- try {
|
|
|
- getStringTemplate().opsForStream().createGroup(topic, group);
|
|
|
- } catch (RedisSystemException e) {
|
|
|
- if (StringUtils.isEmpty(e.getMessage()) || !e.getMessage().contains("BUSYGROUP")) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化消息监听器容器
|
|
|
- *
|
|
|
- * @param listener 消息监听器
|
|
|
- * @param consumer 消费者信息
|
|
|
- * @param offset 消息偏移信息
|
|
|
- * @param batch 批量获取消息数量
|
|
|
- * @param executor 消息获取执行器
|
|
|
- * @return 消息监听器容器
|
|
|
- */
|
|
|
- public static StreamMessageListenerContainer<String, MapRecord<String, String, String>>
|
|
|
- initializeMessageListenerContainer(@NonNull StreamListener<String, MapRecord<String, String, String>> listener,
|
|
|
- @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
|
|
|
- @NonNull StreamOffset<String> offset, int batch,
|
|
|
- @NonNull java.util.concurrent.Executor executor) {
|
|
|
- return initializeMessageListenerContainer(listener, consumer, offset, batch, executor, null);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化消息监听器容器
|
|
|
- *
|
|
|
- * @param listener 消息监听器
|
|
|
- * @param consumer 消费者信息
|
|
|
- * @param offset 消息偏移信息
|
|
|
- * @param batch 批量获取消息数量
|
|
|
- * @param executor 消息获取执行器
|
|
|
- * @param errorHandler 异常处理器
|
|
|
- * @return 消息监听器容器
|
|
|
- */
|
|
|
- public static StreamMessageListenerContainer<String, MapRecord<String, String, String>>
|
|
|
- initializeMessageListenerContainer(@NonNull StreamListener<String, MapRecord<String, String, String>> listener,
|
|
|
- @NonNull org.springframework.data.redis.connection.stream.Consumer consumer,
|
|
|
- @NonNull StreamOffset<String> offset, int batch,
|
|
|
- @NonNull java.util.concurrent.Executor executor, ErrorHandler errorHandler) {
|
|
|
- StreamMessageListenerContainer.
|
|
|
- StreamMessageListenerContainerOptionsBuilder<String, MapRecord<String, String, String>>
|
|
|
- builder = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
|
|
- .batchSize(batch).pollTimeout(Duration.ZERO).executor(executor).serializer(RedisSerializer.string());
|
|
|
- if (errorHandler != null) {
|
|
|
- builder.errorHandler(errorHandler);
|
|
|
- }
|
|
|
- StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
|
|
|
- StreamMessageListenerContainer.create(getDefaultConnectionFactory(), builder.build());
|
|
|
- container.receive(consumer, offset, listener);
|
|
|
- return container;
|
|
|
- }
|
|
|
}
|