Browse Source

新增手动删除RedisMQ延时消息工具方法

Woody 2 days ago
parent
commit
66a4b5cc1f

+ 9 - 9
framework-jpush/src/main/java/com/chelvc/framework/jpush/config/JPushProperties.java

@@ -25,9 +25,14 @@ public class JPushProperties {
     private String secret;
 
     /**
-     * 华为配置
+     * VIVO配置
      */
-    private final Attribute huawei = new Attribute();
+    private final Attribute vivo = new Attribute();
+
+    /**
+     * OPPO配置
+     */
+    private final Attribute oppo = new Attribute();
 
     /**
      * 荣耀配置
@@ -40,14 +45,9 @@ public class JPushProperties {
     private final Attribute xiaomi = new Attribute();
 
     /**
-     * OPPO配置
-     */
-    private final Attribute oppo = new Attribute();
-
-    /**
-     * VIVO配置
+     * 华为配置
      */
-    private final Attribute vivo = new Attribute();
+    private final Attribute huawei = new Attribute();
 
     /**
      * 属性配置

+ 144 - 31
framework-jpush/src/main/java/com/chelvc/framework/jpush/context/JPushContextHolder.java

@@ -5,6 +5,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -34,6 +35,31 @@ import lombok.NonNull;
  * @date 2024/1/30
  */
 public final class JPushContextHolder {
+    /**
+     * VIVO标识
+     */
+    public static final String VIVO = "vivo";
+
+    /**
+     * OPPO标识
+     */
+    public static final String OPPO = "oppo";
+
+    /**
+     * 荣耀标识
+     */
+    public static final String HONOR = "honor";
+
+    /**
+     * 小米标识
+     */
+    public static final String XIAOMI = "xiaomi";
+
+    /**
+     * 华为标识
+     */
+    public static final String HUAWEI = "huawei";
+
     /**
      * 安卓通知消息工厂
      */
@@ -208,20 +234,37 @@ public final class JPushContextHolder {
         if (ANDROID_OPTIONS == null) {
             synchronized (Options.class) {
                 if (ANDROID_OPTIONS == null) {
-                    JPushProperties properties = ApplicationContextHolder.getBean(JPushProperties.class);
-                    ANDROID_OPTIONS = Options.newBuilder().setThirdPartyChannelV2(ImmutableMap.of(
-                            "huawei", initializeAndroidChannel(properties.getHuawei()),
-                            "honor", initializeAndroidChannel(properties.getHonor()),
-                            "xiaomi", initializeAndroidChannel(properties.getXiaomi()),
-                            "oppo", initializeAndroidChannel(properties.getOppo()),
-                            "vivo", initializeAndroidChannel(properties.getVivo())
-                    )).build();
+                    ANDROID_OPTIONS = getAndroidNotifyOptions((brand, channel) -> {
+                    });
                 }
             }
         }
         return ANDROID_OPTIONS;
     }
 
+    /**
+     * 获取安卓通知可选参数
+     *
+     * @param consumer 手机品牌/配置回调函数
+     * @return 推送可选项
+     */
+    public static Options getAndroidNotifyOptions(@NonNull BiConsumer<String, JsonObject> consumer) {
+        JPushProperties properties = ApplicationContextHolder.getBean(JPushProperties.class);
+        JsonObject vivo = initializeAndroidChannel(properties.getVivo());
+        consumer.accept(VIVO, vivo);
+        JsonObject oppo = initializeAndroidChannel(properties.getOppo());
+        consumer.accept(OPPO, oppo);
+        JsonObject honor = initializeAndroidChannel(properties.getHonor());
+        consumer.accept(HONOR, honor);
+        JsonObject xiaomi = initializeAndroidChannel(properties.getXiaomi());
+        consumer.accept(XIAOMI, xiaomi);
+        JsonObject huawei = initializeAndroidChannel(properties.getHuawei());
+        consumer.accept(HUAWEI, huawei);
+        return Options.newBuilder().setThirdPartyChannelV2(ImmutableMap.of(
+                VIVO, vivo, OPPO, oppo, HONOR, honor, XIAOMI, xiaomi, HUAWEI, huawei
+        )).build();
+    }
+
     /**
      * 初始化安卓渠道
      *
@@ -248,6 +291,20 @@ public final class JPushContextHolder {
         return channel;
     }
 
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param platform 目标平台
+     * @param message  透传消息
+     * @param audience 目标群体
+     * @return 消息推送载体
+     */
+    public static PushPayload payload(@NonNull Platform platform, @NonNull Message message,
+                                      @NonNull Audience audience) {
+        return new PushPayload.Builder().setPlatform(adapter(platform)).setMessage(message)
+                .setAudience(audience).build();
+    }
+
     /**
      * 构建消息推送载体对象
      *
@@ -258,8 +315,7 @@ public final class JPushContextHolder {
      */
     public static PushPayload payload(@NonNull Platform platform, @NonNull Message message,
                                       @NonNull String... aliases) {
-        return new PushPayload.Builder().setPlatform(adapter(platform)).setAudience(Audience.alias(aliases))
-                .setMessage(message).build();
+        return payload(platform, message, Audience.alias(aliases));
     }
 
     /**
@@ -272,8 +328,36 @@ public final class JPushContextHolder {
      */
     public static PushPayload payload(@NonNull Platform platform, @NonNull Message message,
                                       @NonNull Collection<String> aliases) {
-        return new PushPayload.Builder().setPlatform(adapter(platform)).setAudience(Audience.alias(aliases))
-                .setMessage(message).build();
+        return payload(platform, message, Audience.alias(aliases));
+    }
+
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param platform     目标平台
+     * @param notification 推送通知
+     * @param audience     目标群体
+     * @return 消息推送载体
+     */
+    public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
+                                      @NonNull Audience audience) {
+        Options options = platform == Platform.ANDROID ? getAndroidNotifyOptions() : null;
+        return payload(platform, notification, audience, options);
+    }
+
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param platform     目标平台
+     * @param notification 推送通知
+     * @param audience     目标群体
+     * @param options      通知可选项
+     * @return 消息推送载体
+     */
+    public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
+                                      @NonNull Audience audience, Options options) {
+        return new PushPayload.Builder().setPlatform(adapter(platform)).setNotification(notification)
+                .setAudience(audience).setOptions(options).build();
     }
 
     /**
@@ -286,12 +370,21 @@ public final class JPushContextHolder {
      */
     public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
                                       @NonNull String... aliases) {
-        PushPayload.Builder builder = new PushPayload.Builder().setPlatform(adapter(platform))
-                .setAudience(Audience.alias(aliases)).setNotification(notification);
-        if (platform == Platform.ANDROID) {
-            builder.setOptions(getAndroidNotifyOptions());
-        }
-        return builder.build();
+        return payload(platform, notification, Audience.alias(aliases));
+    }
+
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param platform     目标平台
+     * @param notification 推送通知
+     * @param options      通知可选项
+     * @param aliases      用户别名数组
+     * @return 消息推送载体
+     */
+    public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
+                                      Options options, @NonNull String... aliases) {
+        return payload(platform, notification, Audience.alias(aliases), options);
     }
 
     /**
@@ -304,12 +397,21 @@ public final class JPushContextHolder {
      */
     public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
                                       @NonNull Collection<String> aliases) {
-        PushPayload.Builder builder = new PushPayload.Builder().setPlatform(adapter(platform))
-                .setAudience(Audience.alias(aliases)).setNotification(notification);
-        if (platform == Platform.ANDROID) {
-            builder.setOptions(getAndroidNotifyOptions());
-        }
-        return builder.build();
+        return payload(platform, notification, Audience.alias(aliases));
+    }
+
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param platform     目标平台
+     * @param notification 推送通知
+     * @param options      通知可选项
+     * @param aliases      用户别名集合
+     * @return 消息推送载体
+     */
+    public static PushPayload payload(@NonNull Platform platform, @NonNull Notification notification,
+                                      Options options, @NonNull Collection<String> aliases) {
+        return payload(platform, notification, Audience.alias(aliases), options);
     }
 
     /**
@@ -321,7 +423,7 @@ public final class JPushContextHolder {
      * @return 消息推送载体
      */
     public static PushPayload target(@NonNull String target, @NonNull Platform platform, @NonNull Message message) {
-        return new PushPayload.Builder().setPlatform(adapter(platform)).setMessage(message).setTarget(target).build();
+        return new PushPayload.Builder().setTarget(target).setPlatform(adapter(platform)).setMessage(message).build();
     }
 
     /**
@@ -334,11 +436,22 @@ public final class JPushContextHolder {
      */
     public static PushPayload target(@NonNull String target, @NonNull Platform platform,
                                      @NonNull Notification notification) {
-        PushPayload.Builder builder = new PushPayload.Builder().setTarget(target).setPlatform(adapter(platform))
-                .setNotification(notification);
-        if (platform == Platform.ANDROID) {
-            builder.setOptions(getAndroidNotifyOptions());
-        }
-        return builder.build();
+        Options options = platform == Platform.ANDROID ? getAndroidNotifyOptions() : null;
+        return target(target, platform, notification, options);
+    }
+
+    /**
+     * 构建消息推送载体对象
+     *
+     * @param target       目标用户
+     * @param platform     目标平台
+     * @param notification 推送通知
+     * @param options      通知可选项
+     * @return 消息推送载体
+     */
+    public static PushPayload target(@NonNull String target, @NonNull Platform platform,
+                                     @NonNull Notification notification, Options options) {
+        return new PushPayload.Builder().setTarget(target).setPlatform(adapter(platform))
+                .setNotification(notification).setOptions(options).build();
     }
 }

+ 172 - 0
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -8,12 +8,16 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.Session;
 import com.chelvc.framework.base.context.SessionContextHolder;
+import com.chelvc.framework.common.model.Pair;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
@@ -341,6 +345,162 @@ public final class RedisStreamHolder {
         getStreamTemplate().opsForStream().deleteConsumer(topic, consumer);
     }
 
+    /**
+     * 删除延迟消息
+     *
+     * @param topic  消息主题
+     * @param group  消费者组
+     * @param type   消息类型
+     * @param filter 过滤函数
+     * @param <T>    数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, @NonNull Class<T> type,
+                                              @NonNull Predicate<T> filter) {
+        deleteDelayMessage(topic, group, type, filter, true);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic  消息主题
+     * @param group  消费者组
+     * @param type   消息类型
+     * @param filter 过滤函数
+     * @param single 是否单个删除
+     * @param <T>    数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, @NonNull Class<T> type,
+                                              @NonNull Predicate<T> filter, boolean single) {
+        deleteDelayMessage(topic, group, 0, Long.MAX_VALUE, type, filter, single);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic    消息主题
+     * @param group    消费者组
+     * @param delaying 延迟时间
+     * @param type     消息类型
+     * @param filter   过滤函数
+     * @param <T>      数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, long delaying,
+                                              @NonNull Class<T> type, @NonNull Predicate<T> filter) {
+        deleteDelayMessage(topic, group, delaying, type, filter, true);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic    消息主题
+     * @param group    消费者组
+     * @param delaying 延迟时间
+     * @param type     消息类型
+     * @param filter   过滤函数
+     * @param single   是否单个删除
+     * @param <T>      数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, long delaying,
+                                              @NonNull Class<T> type, @NonNull Predicate<T> filter, boolean single) {
+        long min = delaying - 3000, max = delaying + 3000;
+        deleteDelayMessage(topic, group, () -> Pair.of(min, max), type, filter, single);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic  消息主题
+     * @param group  消费者组
+     * @param min    最小时间
+     * @param max    最大时间
+     * @param type   消息类型
+     * @param filter 过滤函数
+     * @param <T>    数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, long min, long max,
+                                              @NonNull Class<T> type, @NonNull Predicate<T> filter) {
+        deleteDelayMessage(topic, group, min, max, type, filter, true);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic  消息主题
+     * @param group  消费者组
+     * @param min    最小时间
+     * @param max    最大时间
+     * @param type   消息类型
+     * @param filter 过滤函数
+     * @param single 是否单个删除
+     * @param <T>    数据类型
+     */
+    public static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group, long min, long max,
+                                              @NonNull Class<T> type, @NonNull Predicate<T> filter, boolean single) {
+        deleteDelayMessage(topic, group, () -> Pair.of(min, max), type, filter, single);
+    }
+
+    /**
+     * 删除延迟消息
+     *
+     * @param topic  消息主题
+     * @param group  消费者组
+     * @param period 时间周期
+     * @param type   消息类型
+     * @param filter 过滤函数
+     * @param single 是否单个删除
+     * @param <T>    数据类型
+     */
+    private static <T> void deleteDelayMessage(@NonNull String topic, @NonNull String group,
+                                               @NonNull Supplier<Pair<Long, Long>> period, @NonNull Class<T> type,
+                                               @NonNull Predicate<T> filter, boolean single) {
+        long offset = 0, count = 1000;
+        String name = isolate(topic) + ":" + isolate(group);
+        RedisTemplate<String, String> template = getStreamTemplate();
+
+        batch:
+        while (true) {
+            //  获取延迟时间范围,开始时间点最小值为当前时间(毫秒)+ 3秒;
+            //  为避免同时触发延迟消息的消费、删除逻辑,故延迟时间点在3秒内的消息不支持删除
+            Pair<Long, Long> range = Objects.requireNonNull(period.get());
+            long min = Math.max(range.getKey(), System.currentTimeMillis() + 3000), max = range.getValue();
+            if (min > max) {
+                break;
+            }
+
+            // 分批次删除延迟消息
+            Set<String> values = template.opsForZSet().rangeByScore(name, min, max, offset, count);
+            if (ObjectUtils.notEmpty(values)) {
+                List<String> matches = single ? null : Lists.newLinkedList();
+                for (String value : values) {
+                    MapRecord<String, String, String> record = deserialize(value);
+                    T message = JacksonUtils.deserialize(record.getValue().get(PAYLOAD), type);
+                    if (filter.test(message)) {
+                        if (single) {
+                            if (min - System.currentTimeMillis() < 1000) {
+                                throw new IllegalStateException("RedisMQ delay message delete timeout");
+                            }
+                            template.opsForZSet().remove(name, value);
+                            break batch;
+                        } else {
+                            matches.add(value);
+                        }
+                    }
+                }
+                if (ObjectUtils.notEmpty(matches)) {
+                    if (min - System.currentTimeMillis() < 1000) {
+                        throw new IllegalStateException("RedisMQ delay message delete timeout");
+                    }
+                    template.opsForZSet().remove(name, matches.toArray());
+                }
+            }
+            if (ObjectUtils.size(values) < count) {
+                break;
+            }
+
+            offset += count;
+        }
+    }
+
     /**
      * 发送心跳包
      *
@@ -383,6 +543,18 @@ public final class RedisStreamHolder {
         }
     }
 
+    /**
+     * 消费消息
+     *
+     * @param record   消息记录
+     * @param type     消息类型
+     * @param consumer 消息消费者
+     */
+    public static <T> void consume(@NonNull MapRecord<String, String, String> record, @NonNull Class<T> type,
+                                   @NonNull Consumer<T> consumer) {
+        consume(record, (Type) type, consumer);
+    }
+
     /**
      * 获取消费者信息
      *

+ 15 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/PriorityRedisQueue.java

@@ -151,10 +151,24 @@ public class PriorityRedisQueue<E> extends NormalRedisQueue<E> {
      * @return 元素
      */
     public E peek(double min, double max) {
-        Set<E> values = this.template().opsForZSet().rangeByScore(this.getName(), min, max, 0, 1);
+        Set<E> values = this.peek(min, max, 0, 1);
         return ObjectUtils.isEmpty(values) ? null : values.iterator().next();
     }
 
+    /**
+     * 查看元素
+     *
+     * @param min    最小优先级
+     * @param max    最大优先级
+     * @param offset 元素偏移位置
+     * @param count  获取元素数量
+     * @return 元素集合
+     */
+    public Set<E> peek(double min, double max, long offset, long count) {
+        Set<E> values = this.template().opsForZSet().rangeByScore(this.getName(), min, max, offset, count);
+        return ObjectUtils.isEmpty(values) ? Collections.emptySet() : values;
+    }
+
     /**
      * 将元素放入队列
      *