Woody 3 недель назад
Родитель
Сommit
704bccb1db

+ 4 - 4
framework-feign-circuitbreaker/src/main/java/com/chelvc/framework/feign/circuitbreaker/CircuitBreakerInterceptor.java

@@ -25,23 +25,23 @@ public class CircuitBreakerInterceptor extends DefaultFallbackDecorator implemen
     }
 
     @Override
-    public CheckedFunction0<Object> decorate(FallbackMethod fallbackMethod, CheckedFunction0<Object> supplier) {
+    public CheckedFunction0<Object> decorate(FallbackMethod method, CheckedFunction0<Object> supplier) {
         return () -> {
             try {
                 return supplier.apply();
             } catch (IllegalReturnTypeException | FrameworkException e) {
                 throw e;
             } catch (Throwable t) {
-                return fallbackMethod.fallback(t);
+                return method.fallback(t);
             }
         };
     }
 
     @Override
-    public Object handle(ProceedingJoinPoint point, CircuitBreaker circuitBreaker, String method) throws Throwable {
+    public Object handle(ProceedingJoinPoint point, CircuitBreaker breaker, String method) throws Throwable {
         boolean enabled = ApplicationContextHolder.getProperty("circuit.breaker.enabled", boolean.class, true);
         if (enabled) {
-            return circuitBreaker.executeCheckedSupplier(point::proceed);
+            return breaker.executeCheckedSupplier(point::proceed);
         }
         return point.proceed();
     }

+ 10 - 3
framework-feign-circuitbreaker/src/main/java/com/chelvc/framework/feign/circuitbreaker/EnableFeignCircuitBreaker.java → framework-feign-circuitbreaker/src/main/java/com/chelvc/framework/feign/circuitbreaker/FeignCircuitBreaker.java

@@ -7,15 +7,22 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Feign调用熔断启用注解
+ * Feign接口调用熔断注解
  *
  * @author Woody
  * @date 2025/4/14
  */
 @Documented
-@Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
-public @interface EnableFeignCircuitBreaker {
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface FeignCircuitBreaker {
+    /**
+     * 是否启用
+     *
+     * @return true/false
+     */
+    boolean enabled() default true;
+
     /**
      * 获取启用熔断服务名称数组
      *

+ 44 - 28
framework-feign-circuitbreaker/src/main/java/com/chelvc/framework/feign/circuitbreaker/FeignCircuitBreakerInitializer.java

@@ -1,7 +1,7 @@
 package com.chelvc.framework.feign.circuitbreaker;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Map;
@@ -9,8 +9,8 @@ import java.util.Set;
 
 import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
-import com.google.common.collect.Sets;
 import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
+import javassist.CannotCompileException;
 import javassist.ClassPool;
 import javassist.CtClass;
 import javassist.CtMethod;
@@ -22,10 +22,10 @@ import javassist.bytecode.ConstPool;
 import javassist.bytecode.MethodInfo;
 import javassist.bytecode.annotation.Annotation;
 import javassist.bytecode.annotation.StringMemberValue;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.support.BeanDefinitionRegistry;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringApplicationRunListener;
-import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.core.type.AnnotationMetadata;
 import org.springframework.util.ClassUtils;
@@ -36,6 +36,7 @@ import org.springframework.util.ClassUtils;
  * @author Woody
  * @date 2025/4/14
  */
+@Slf4j
 public class FeignCircuitBreakerInitializer implements SpringApplicationRunListener {
     /**
      * 是否已初始化
@@ -43,20 +44,13 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
     private static volatile boolean INITIALIZED;
 
     /**
-     * 服务名称集合
+     * 服务熔断器注解
      */
-    private static volatile Set<String> SERVERS;
-
-    private final boolean enabled;
+    private static FeignCircuitBreaker MAIN_BREAKER;
 
     public FeignCircuitBreakerInitializer(SpringApplication application, String[] args) {
         Class<?> main = application.getMainApplicationClass();
-        this.enabled = main != null && main.isAnnotationPresent(EnableFeignClients.class)
-                && main.isAnnotationPresent(EnableFeignCircuitBreaker.class);
-        if (this.enabled && !INITIALIZED) {
-            EnableFeignCircuitBreaker configuration = main.getAnnotation(EnableFeignCircuitBreaker.class);
-            SERVERS = ObjectUtils.ifEmpty(configuration.servers(), Sets::newHashSet, Collections::emptySet);
-        }
+        MAIN_BREAKER = main.getAnnotation(FeignCircuitBreaker.class);
     }
 
     /**
@@ -156,6 +150,21 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
         return CtNewMethod.make(body, clazz);
     }
 
+    /**
+     * 查找接口@EnableFeignCircuitBreaker注解
+     *
+     * @param method 接口方法
+     * @return @EnableFeignCircuitBreaker注解实例
+     * @throws Exception 操作异常
+     */
+    private static FeignCircuitBreaker findFeignCircuitBreaker(CtMethod method) throws Exception {
+        Object breaker = method.getAnnotation(FeignCircuitBreaker.class);
+        if (breaker == null) {
+            breaker = method.getDeclaringClass().getAnnotation(FeignCircuitBreaker.class);
+        }
+        return breaker == null ? MAIN_BREAKER : (FeignCircuitBreaker) breaker;
+    }
+
     /**
      * 初始化Feign调用方法@CircuitBreaker注解
      *
@@ -196,7 +205,8 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
 
         // 忽略不需要熔断服务调用
         String server = getFeignClientName(attributes);
-        if (StringUtils.isEmpty(server) || (ObjectUtils.notEmpty(SERVERS) && !SERVERS.contains(server))) {
+        if (MAIN_BREAKER != null && ObjectUtils.notEmpty(MAIN_BREAKER.servers())
+                && !Arrays.asList(MAIN_BREAKER.servers()).contains(server)) {
             return;
         }
 
@@ -208,7 +218,9 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
             CtMethod[] methods = clazz.getDeclaredMethods();
             if (ObjectUtils.notEmpty(methods)) {
                 for (CtMethod method : methods) {
-                    if (Modifier.isAbstract(method.getModifiers()) && !method.hasAnnotation(CircuitBreaker.class)) {
+                    FeignCircuitBreaker breaker = findFeignCircuitBreaker(method);
+                    if (Modifier.isAbstract(method.getModifiers()) && !method.hasAnnotation(CircuitBreaker.class)
+                            && breaker != null && breaker.enabled()) {
                         initializeFeignCircuitBreaker(server, method);
                         bound = true;
                     }
@@ -217,6 +229,8 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
             if (bound) {
                 clazz.toClass();
             }
+        } catch (CannotCompileException e) {
+            log.warn("Feign circuit breaker initialize failed: {}", e.getMessage());
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -224,19 +238,21 @@ public class FeignCircuitBreakerInitializer implements SpringApplicationRunListe
 
     @Override
     public void contextLoaded(ConfigurableApplicationContext context) {
-        if (this.enabled && !INITIALIZED) {
-            synchronized (FeignCircuitBreakerInitializer.class) {
-                if (!INITIALIZED) {
-                    INITIALIZED = true;
-
-                    ClassPool pool = ClassPool.getDefault();
-                    pool.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
-
-                    try {
-                        this.initializeFeignClientRegisterListener(pool);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
+        if (INITIALIZED || (MAIN_BREAKER != null && !MAIN_BREAKER.enabled())) {
+            return;
+        }
+
+        synchronized (FeignCircuitBreakerInitializer.class) {
+            if (!INITIALIZED) {
+                INITIALIZED = true;
+
+                ClassPool pool = ClassPool.getDefault();
+                pool.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
+
+                try {
+                    this.initializeFeignClientRegisterListener(pool);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
             }
         }

+ 3 - 5
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -71,10 +71,8 @@ public final class KafkaContextHolder {
      * @return 队列实例
      */
     public static DelayRedisQueue<String> getTransactionQueue() {
-        return RedisQueues.getDelayRedisQueue(
-                ApplicationContextHolder.getApplicationName(true) + "-kafka-transaction-message",
-                name -> new DelayRedisQueue<>(name, Duration.ofSeconds(60), Duration.ZERO)
-        );
+        String application = ApplicationContextHolder.getApplicationName(true);
+        return RedisQueues.getDelayRedisQueue(application + "-kafka-transaction-message", DelayRedisQueue::new);
     }
 
     /**
@@ -267,7 +265,7 @@ public final class KafkaContextHolder {
                 .ordering(ordering).partition(partition).session(session).build();
         String message = JacksonUtils.serialize(transaction);
         DelayRedisQueue<String> queue = getTransactionQueue();
-        queue.offer(message, Duration.ofMillis(queue.getTimeout()));
+        queue.offer(message, Duration.ofSeconds(60));
 
         // 处理本地事务
         boolean committable;

+ 3 - 1
framework-kafka/src/main/java/com/chelvc/framework/kafka/producer/TransactionMessageProcessor.java

@@ -1,5 +1,7 @@
 package com.chelvc.framework.kafka.producer;
 
+import java.time.Duration;
+
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
@@ -62,7 +64,7 @@ public class TransactionMessageProcessor implements ApplicationRunner {
     @Override
     public void run(ApplicationArguments args) throws Exception {
         DelayRedisQueue<String> queue = KafkaContextHolder.getTransactionQueue();
-        RedisQueues.consume(queue, 100, messages -> messages.forEach(message -> {
+        RedisQueues.consume(queue, 100, Duration.ofSeconds(60), messages -> messages.forEach(message -> {
             try {
                 this.processing(queue, message);
             } catch (Exception e) {

+ 37 - 17
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DelayRedisQueue.java

@@ -38,20 +38,18 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
                     "end end return values", List.class
     );
 
-    private final long timeout;
     private final long expiration;
 
     public DelayRedisQueue(@NonNull String name) {
-        this(name, Duration.ZERO, Duration.ZERO);
+        this(name, Duration.ZERO);
     }
 
-    public DelayRedisQueue(@NonNull String name, @NonNull Duration timeout, @NonNull Duration expiration) {
-        this(ApplicationContextHolder.getProfile(), name, timeout, expiration);
+    public DelayRedisQueue(@NonNull String name, @NonNull Duration expiration) {
+        this(ApplicationContextHolder.getProfile(), name, expiration);
     }
 
-    DelayRedisQueue(String namespace, @NonNull String name, @NonNull Duration timeout, @NonNull Duration expiration) {
+    DelayRedisQueue(String namespace, @NonNull String name, @NonNull Duration expiration) {
         super(namespace, name);
-        this.timeout = timeout.toMillis();
         this.expiration = expiration.toMillis();
     }
 
@@ -111,6 +109,38 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
         return consumings;
     }
 
+    /**
+     * 弹出元素
+     *
+     * @param duration 数据有效期
+     * @return 元素信息
+     */
+    public E poll(@NonNull Duration duration) {
+        List<E> values = this.poll(1, duration);
+        return ObjectUtils.isEmpty(values) ? null : values.get(0);
+    }
+
+    /**
+     * 弹出元素
+     *
+     * @param size     元素个数
+     * @param duration 数据有效期
+     * @return 元素列表
+     */
+    @SuppressWarnings("unchecked")
+    public List<E> poll(int size, @NonNull Duration duration) {
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
+
+        long now = System.currentTimeMillis();
+        long min = this.expiration > 0 ? now - this.expiration : Long.MIN_VALUE;
+        if (duration.compareTo(Duration.ZERO) > 0) {
+            List<String> keys = Collections.singletonList(this.getName());
+            List<E> values = this.template().execute(POLL_DELAY_SCRIPT, keys, min, now, size, duration.toMillis());
+            return ObjectUtils.ifEmpty(values, Collections::emptyList);
+        }
+        return super.poll(min, now, size);
+    }
+
     /**
      * 将元素放入队列
      *
@@ -163,17 +193,7 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public List<E> poll(int size) {
-        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
-
-        long now = System.currentTimeMillis();
-        long min = this.expiration > 0 ? now - this.expiration : Long.MIN_VALUE;
-        if (this.timeout > 0) {
-            List<String> keys = Collections.singletonList(this.getName());
-            List<E> values = this.template().execute(POLL_DELAY_SCRIPT, keys, min, now, size, this.timeout);
-            return ObjectUtils.ifEmpty(values, Collections::emptyList);
-        }
-        return super.poll(min, now, size);
+        return this.poll(size, Duration.ZERO);
     }
 }

+ 7 - 6
framework-redis/src/main/java/com/chelvc/framework/redis/queue/MessageStreamListener.java

@@ -2,6 +2,7 @@ package com.chelvc.framework.redis.queue;
 
 import java.lang.reflect.Type;
 import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.Executor;
 
 import com.chelvc.framework.redis.context.RedisStreamHolder;
@@ -36,22 +37,22 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
 
         // 初始化延时消息队列
         String name = this.topic + ":" + this.consumer.getGroup();
-        this.delayQueue = new DelayRedisQueue<String>(null, name, Duration.ofSeconds(60), Duration.ZERO) {
+        this.delayQueue = new DelayRedisQueue<String>(null, name, Duration.ZERO) {
             @Override
             protected RedisTemplate<String, String> template() {
                 return RedisStreamHolder.getStreamTemplate();
             }
         };
-        RedisQueues.consume(this.delayQueue, 100, messages -> messages.forEach(this::processing));
+        RedisQueues.consume(this.delayQueue, 100, Duration.ofSeconds(60), this::processing);
     }
 
     /**
      * 消息消费处理
      *
-     * @param message 消息内容(JSON格式)
+     * @param messages 消息列表(JSON格式)
      */
-    private void processing(String message) {
-        this.executor.execute(() -> {
+    private void processing(List<String> messages) {
+        messages.forEach(message -> this.executor.execute(() -> {
             try {
                 MapRecord<String, String, String> record = RedisStreamHolder.deserialize(message);
                 if (RedisStreamHolder.isHeartbeat(record)) {
@@ -64,7 +65,7 @@ public class MessageStreamListener<T> implements StreamListener<String, MapRecor
             } catch (Throwable t) {
                 log.error("RedisMQ message consume failed: {}, {}", this.consumer, message, t);
             }
-        });
+        }));
     }
 
     /**

+ 91 - 5
framework-redis/src/main/java/com/chelvc/framework/redis/queue/RedisQueues.java

@@ -1,5 +1,6 @@
 package com.chelvc.framework.redis.queue;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -134,7 +135,7 @@ public final class RedisQueues {
                 try {
                     message = queue.poll();
                 } catch (Throwable t) {
-                    log.error("RedisMQ message poll failed: {}", queue.getName(), t);
+                    log.error("Redis normal queue poll failed: {}", queue.getName(), t);
                 }
 
                 // 如果当前没有消息则暂停1秒后继续
@@ -147,14 +148,25 @@ public final class RedisQueues {
                 try {
                     consumer.accept(message);
                 } catch (Throwable t) {
-                    log.error("RedisMQ message consume failed: {}, {}", queue.getName(), message, t);
+                    log.error("Redis normal queue consume failed: {}, {}", queue.getName(), message, t);
                 }
             }
         });
     }
 
     /**
-     * 消费队列消息
+     * 消费优先级队列消息
+     *
+     * @param queue    消息队列
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull PriorityRedisQueue<T> queue, @NonNull Consumer<T> consumer) {
+        consume(queue, 1, messages -> consumer.accept(messages.get(0)));
+    }
+
+    /**
+     * 消费优先级队列消息
      *
      * @param queue    消息队列
      * @param size     拉取数量
@@ -171,7 +183,81 @@ public final class RedisQueues {
                 try {
                     messages = queue.poll(size);
                 } catch (Throwable t) {
-                    log.error("RedisMQ message poll failed: {}", queue.getName(), t);
+                    log.error("Redis priority queue poll failed: {}", queue.getName(), t);
+                }
+
+                // 如果当前没有消息则暂停1秒后继续
+                if (ObjectUtils.isEmpty(messages)) {
+                    ThreadUtils.sleep(1000);
+                    continue;
+                }
+
+                // 消费消息
+                try {
+                    consumer.accept(messages);
+                } catch (Throwable t) {
+                    log.error("Redis priority queue consume failed: {}, {}", queue.getName(), messages, t);
+                }
+            }
+        });
+    }
+
+    /**
+     * 消费延时队列消息
+     *
+     * @param queue    消息队列
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull DelayRedisQueue<T> queue, @NonNull Consumer<T> consumer) {
+        consume(queue, Duration.ZERO, consumer);
+    }
+
+    /**
+     * 消费延时队列消息
+     *
+     * @param queue    消息队列
+     * @param duration 数据有效期
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull DelayRedisQueue<T> queue, @NonNull Duration duration,
+                                   @NonNull Consumer<T> consumer) {
+        consume(queue, 1, duration, messages -> consumer.accept(messages.get(0)));
+    }
+
+    /**
+     * 消费延时队列消息
+     *
+     * @param queue    消息队列
+     * @param size     拉取数量
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull DelayRedisQueue<T> queue, int size, @NonNull Consumer<List<T>> consumer) {
+        consume(queue, size, Duration.ZERO, consumer);
+    }
+
+    /**
+     * 消费延时队列消息
+     *
+     * @param queue    消息队列
+     * @param size     拉取数量
+     * @param duration 数据有效期
+     * @param consumer 消息消费者
+     * @param <T>      消息类型
+     */
+    public static <T> void consume(@NonNull DelayRedisQueue<T> queue, int size, @NonNull Duration duration,
+                                   @NonNull Consumer<List<T>> consumer) {
+        AssertUtils.arg(size > 0, () -> "size must be greater than 0");
+        ThreadUtils.run(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                // 拉取消息
+                List<T> messages = null;
+                try {
+                    messages = queue.poll(size, duration);
+                } catch (Throwable t) {
+                    log.error("Redis delay queue poll failed: {}", queue.getName(), t);
                 }
 
                 // 如果当前没有消息则暂停1秒后继续
@@ -184,7 +270,7 @@ public final class RedisQueues {
                 try {
                     consumer.accept(messages);
                 } catch (Throwable t) {
-                    log.error("RedisMQ message consume failed: {}, {}", queue.getName(), messages, t);
+                    log.error("Redis delay queue consume failed: {}, {}", queue.getName(), messages, t);
                 }
             }
         });