Browse Source

RocketMQ消息发送降级处理优化

woody 1 year ago
parent
commit
6842bf31f6

+ 3 - 3
framework-common/src/main/java/com/chelvc/framework/common/model/Enumerable.java

@@ -46,11 +46,11 @@ public interface Enumerable extends Serializable {
     }
     }
 
 
     /**
     /**
-     * 获取枚举别名
+     * 获取枚举描述
      *
      *
-     * @return 枚举别名
+     * @return 枚举描述
      */
      */
-    default String getAlias() {
+    default String getDescription() {
         return this.name();
         return this.name();
     }
     }
 }
 }

+ 1 - 1
framework-common/src/main/java/com/chelvc/framework/common/serializer/JacksonEnumerateSerializer.java

@@ -24,7 +24,7 @@ public class JacksonEnumerateSerializer extends StdSerializer<Object> {
         if (value instanceof Enumerable) {
         if (value instanceof Enumerable) {
             Enumerable enumerable = (Enumerable) value;
             Enumerable enumerable = (Enumerable) value;
             generator.writeObject(ImmutableMap.of(
             generator.writeObject(ImmutableMap.of(
-                    String.valueOf(enumerable.getCode()), String.valueOf(enumerable.getAlias())
+                    String.valueOf(enumerable.getCode()), String.valueOf(enumerable.getDescription())
             ));
             ));
         } else if (value instanceof Enum) {
         } else if (value instanceof Enum) {
             Enum<?> enumerable = (Enum<?>) value;
             Enum<?> enumerable = (Enum<?>) value;

+ 4 - 0
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/consumer/MultipleRocketMQListenerContainer.java

@@ -25,6 +25,8 @@ import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.springframework.core.env.Environment;
 import org.springframework.core.env.Environment;
 
 
 /**
 /**
@@ -54,6 +56,8 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
     private List<MessageView> receive() {
     private List<MessageView> receive() {
         try {
         try {
             return this.consumer.receive(this.batch, this.duration);
             return this.consumer.receive(this.batch, this.duration);
+        } catch (ProxyTimeoutException | TooManyRequestsException e) {
+            log.warn("RocketMQ message receive failed: {}", e.getMessage());
         } catch (Throwable t) {
         } catch (Throwable t) {
             if (!(t.getCause() instanceof InterruptedException)) {
             if (!(t.getCause() instanceof InterruptedException)) {
                 log.error("Message receive failed: {}:{}", this.topic, this.group, t);
                 log.error("Message receive failed: {}:{}", this.topic, this.group, t);

+ 4 - 4
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/Demoting.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/MessageSending.java

@@ -1,4 +1,4 @@
-package com.chelvc.framework.rocketmq.producer;
+package com.chelvc.framework.rocketmq.context;
 
 
 import lombok.NonNull;
 import lombok.NonNull;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientException;
@@ -6,12 +6,12 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.Producer;
 
 
 /**
 /**
- * 普通消息发送降级实现
+ * 普通消息发送包装器
  *
  *
  * @author Woody
  * @author Woody
  * @date 2024/1/30
  * @date 2024/1/30
  */
  */
-public class Demoting {
+public class MessageSending {
     /**
     /**
      * 消息生产者
      * 消息生产者
      */
      */
@@ -22,7 +22,7 @@ public class Demoting {
      */
      */
     protected final Message message;
     protected final Message message;
 
 
-    public Demoting(@NonNull Producer producer, @NonNull Message message) {
+    public MessageSending(@NonNull Producer producer, @NonNull Message message) {
         this.producer = producer;
         this.producer = producer;
         this.message = message;
         this.message = message;
     }
     }

+ 14 - 10
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -25,9 +25,7 @@ import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.common.util.ThreadUtils;
 import com.chelvc.framework.rocketmq.config.RocketMQProperties;
 import com.chelvc.framework.rocketmq.config.RocketMQProperties;
-import com.chelvc.framework.rocketmq.producer.Demoting;
 import com.chelvc.framework.rocketmq.producer.RocketMQProducerFactory;
 import com.chelvc.framework.rocketmq.producer.RocketMQProducerFactory;
-import com.chelvc.framework.rocketmq.producer.TransactionDemoting;
 import lombok.NonNull;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -69,7 +67,7 @@ public final class RocketMQContextHolder {
     /**
     /**
      * 消息发送降级队列
      * 消息发送降级队列
      */
      */
-    private static final BlockingQueue<Demoting> DEMOTING_QUEUE = new LinkedBlockingQueue<>();
+    private static final BlockingQueue<MessageSending> MESSAGE_SENDING_QUEUE = new LinkedBlockingQueue<>();
 
 
     /**
     /**
      * 配置属性
      * 配置属性
@@ -91,15 +89,15 @@ public final class RocketMQContextHolder {
         ThreadUtils.run(() -> {
         ThreadUtils.run(() -> {
             while (!Thread.interrupted()) {
             while (!Thread.interrupted()) {
                 // 尝试从队列中获取降级处理信息,如果为空则等待1分钟后继续
                 // 尝试从队列中获取降级处理信息,如果为空则等待1分钟后继续
-                Demoting demoting = DEMOTING_QUEUE.peek();
-                if (demoting == null) {
+                MessageSending sending = MESSAGE_SENDING_QUEUE.peek();
+                if (sending == null) {
                     ThreadUtils.sleep(60000);
                     ThreadUtils.sleep(60000);
                     continue;
                     continue;
                 }
                 }
 
 
                 // 降级处理,重试消息发送
                 // 降级处理,重试消息发送
                 try {
                 try {
-                    demoting.retry();
+                    sending.retry();
                 } catch (ProxyTimeoutException | TooManyRequestsException e) {
                 } catch (ProxyTimeoutException | TooManyRequestsException e) {
                     // 如果因网络超时或触发TPS限制则等待1分钟后继续
                     // 如果因网络超时或触发TPS限制则等待1分钟后继续
                     log.warn("RocketMQ message send failed: {}", e.getMessage());
                     log.warn("RocketMQ message send failed: {}", e.getMessage());
@@ -111,7 +109,7 @@ public final class RocketMQContextHolder {
                 }
                 }
 
 
                 // 降级处理完成后移除队列
                 // 降级处理完成后移除队列
-                DEMOTING_QUEUE.remove();
+                MESSAGE_SENDING_QUEUE.remove();
             }
             }
         });
         });
     }
     }
@@ -333,9 +331,11 @@ public final class RocketMQContextHolder {
         try {
         try {
             producer.send(message);
             producer.send(message);
         } catch (ProxyTimeoutException | TooManyRequestsException e) {
         } catch (ProxyTimeoutException | TooManyRequestsException e) {
+            log.warn("RocketMQ message send failed: {}", e.getMessage());
+
             // 降级处理
             // 降级处理
             try {
             try {
-                DEMOTING_QUEUE.put(new Demoting(producer, message));
+                MESSAGE_SENDING_QUEUE.put(new MessageSending(producer, message));
             } catch (InterruptedException ignore) {
             } catch (InterruptedException ignore) {
             }
             }
         } catch (ClientException e) {
         } catch (ClientException e) {
@@ -442,9 +442,11 @@ public final class RocketMQContextHolder {
         Producer producer = getProducerFactory().getProducer(message.getTopic());
         Producer producer = getProducerFactory().getProducer(message.getTopic());
         producer.sendAsync(message).exceptionally(e -> {
         producer.sendAsync(message).exceptionally(e -> {
             if (e instanceof ProxyTimeoutException || e instanceof TooManyRequestsException) {
             if (e instanceof ProxyTimeoutException || e instanceof TooManyRequestsException) {
+                log.warn("RocketMQ message async send failed: {}", e.getMessage());
+
                 // 降级处理
                 // 降级处理
                 try {
                 try {
-                    DEMOTING_QUEUE.put(new Demoting(producer, message));
+                    MESSAGE_SENDING_QUEUE.put(new MessageSending(producer, message));
                 } catch (InterruptedException ignore) {
                 } catch (InterruptedException ignore) {
                 }
                 }
             } else {
             } else {
@@ -569,9 +571,11 @@ public final class RocketMQContextHolder {
                 transaction.rollback();
                 transaction.rollback();
             }
             }
         } catch (ProxyTimeoutException | TooManyRequestsException e) {
         } catch (ProxyTimeoutException | TooManyRequestsException e) {
+            log.warn("RocketMQ message send failed: {}", e.getMessage());
+
             // 降级处理
             // 降级处理
             try {
             try {
-                DEMOTING_QUEUE.put(new TransactionDemoting(producer, message, executor));
+                MESSAGE_SENDING_QUEUE.put(new TransactionalMessageSending(producer, message, executor));
             } catch (InterruptedException ignore) {
             } catch (InterruptedException ignore) {
             }
             }
         } catch (ClientException e) {
         } catch (ClientException e) {

+ 5 - 5
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/producer/TransactionDemoting.java → framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/TransactionalMessageSending.java

@@ -1,4 +1,4 @@
-package com.chelvc.framework.rocketmq.producer;
+package com.chelvc.framework.rocketmq.context;
 
 
 import java.util.function.Function;
 import java.util.function.Function;
 
 
@@ -10,19 +10,19 @@ import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 
 
 /**
 /**
- * 事务消息发送降级实现
+ * 事务消息发送包装器
  *
  *
  * @author Woody
  * @author Woody
  * @date 2024/1/30
  * @date 2024/1/30
  */
  */
-public class TransactionDemoting extends Demoting {
+public class TransactionalMessageSending extends MessageSending {
     /**
     /**
      * 本地事务执行方法
      * 本地事务执行方法
      */
      */
     protected final Function<String, Boolean> executor;
     protected final Function<String, Boolean> executor;
 
 
-    public TransactionDemoting(@NonNull Producer producer, @NonNull Message message,
-                               @NonNull Function<String, Boolean> executor) {
+    public TransactionalMessageSending(@NonNull Producer producer, @NonNull Message message,
+                                       @NonNull Function<String, Boolean> executor) {
         super(producer, message);
         super(producer, message);
         this.executor = executor;
         this.executor = executor;
     }
     }