woody 4 ماه پیش
والد
کامیت
f63e3749f0

+ 31 - 24
framework-kafka/src/main/java/com/chelvc/framework/kafka/fallback/KafkaMemorySender.java

@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 @Slf4j
 public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
     private final int capacity;
-    private volatile Thread thread;
+    private volatile Thread consumer;
     private volatile BlockingQueue<Pair<ProducerRecord<K, V>, Session>> queue;
 
     public KafkaMemorySender(@NonNull KafkaMessageSender<K, V> delegate) {
@@ -37,33 +37,40 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
         this.capacity = capacity;
     }
 
+    /**
+     * 初始化消息队列
+     */
+    private synchronized void initializeMessageQueue() {
+        if (this.queue != null) {
+            return;
+        }
+
+        this.queue = new ArrayBlockingQueue<>(this.capacity);
+        this.consumer = ThreadUtils.consume(this.queue, pair -> {
+            ProducerRecord<K, V> message = pair.getLeft();
+            SessionContextHolder.setSession(pair.getRight());
+            try {
+                this.delegate.send(message);
+                return true;
+            } catch (Exception e) {
+                log.error("Kafka message send failed: {}", message.topic(), e);
+            } finally {
+                SessionContextHolder.clearSessionContext();
+            }
+            ThreadUtils.sleep(5000);
+            return false;
+        });
+    }
+
     @Override
     protected void fallback(ProducerRecord<K, V> record) {
         if (log.isDebugEnabled()) {
             log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
         }
 
-        // 初始化消息队列及重试线程
+        // 初始化消息队列
         if (this.queue == null) {
-            synchronized (this) {
-                if (this.queue == null) {
-                    this.queue = new ArrayBlockingQueue<>(this.capacity);
-                    this.thread = ThreadUtils.consume(this.queue, pair -> {
-                        ProducerRecord<K, V> message = pair.getLeft();
-                        SessionContextHolder.setSession(pair.getRight());
-                        try {
-                            this.delegate.send(message);
-                            return true;
-                        } catch (Exception e) {
-                            log.error("Kafka message send failed: {}", message.topic(), e);
-                        } finally {
-                            SessionContextHolder.clearSessionContext();
-                        }
-                        ThreadUtils.sleep(5000);
-                        return false;
-                    });
-                }
-            }
+            this.initializeMessageQueue();
         }
 
         // 将消息放入内存消息队列
@@ -75,9 +82,9 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
 
     @Override
     public void destroy() throws Exception {
-        if (this.thread != null) {
-            this.thread.interrupt();
-            ThreadUtils.join(this.thread);
+        if (this.consumer != null) {
+            this.consumer.interrupt();
+            ThreadUtils.join(this.consumer);
         }
     }
 }

+ 37 - 30
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java

@@ -25,7 +25,7 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
 @Slf4j
 public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
     private final int capacity;
-    private volatile Thread thread;
+    private volatile Thread consumer;
     private volatile BlockingQueue<Pair<Pair<Message, Transaction>, Session>> queue;
 
     public RocketMQMemoryProducer(@NonNull Producer delegate) {
@@ -38,6 +38,37 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
         this.capacity = capacity;
     }
 
+    /**
+     * 初始化消息队列
+     */
+    private synchronized void initializeMessageQueue() {
+        if (this.queue != null) {
+            return;
+        }
+
+        this.queue = new ArrayBlockingQueue<>(this.capacity);
+        this.consumer = ThreadUtils.consume(this.queue, pair -> {
+            Message message = pair.getLeft().getLeft();
+            Transaction transaction = pair.getLeft().getRight();
+            SessionContextHolder.setSession(pair.getRight());
+            try {
+                if (transaction == null) {
+                    this.delegate.send(message);
+                } else {
+                    this.delegate.send(message, transaction);
+                    transaction.commit();
+                }
+                return true;
+            } catch (Exception e) {
+                log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(message), e);
+            } finally {
+                SessionContextHolder.clearSessionContext();
+            }
+            ThreadUtils.sleep(5000);
+            return false;
+        });
+    }
+
     @Override
     protected void fallback(Message message, Transaction transaction) {
         if (log.isDebugEnabled()) {
@@ -45,33 +76,9 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
             log.debug("RocketMQ fallback message: {}, {}", topic, body);
         }
 
-        // 初始化消息队列及重试线程
+        // 初始化消息队列
         if (this.queue == null) {
-            synchronized (this) {
-                if (this.queue == null) {
-                    this.queue = new ArrayBlockingQueue<>(this.capacity);
-                    this.thread = ThreadUtils.consume(this.queue, pair -> {
-                        Message _message = pair.getLeft().getLeft();
-                        Transaction _transaction = pair.getLeft().getRight();
-                        SessionContextHolder.setSession(pair.getRight());
-                        try {
-                            if (_transaction == null) {
-                                this.delegate.send(_message);
-                            } else {
-                                this.delegate.send(_message, _transaction);
-                                _transaction.commit();
-                            }
-                            return true;
-                        } catch (Exception e) {
-                            log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(_message), e);
-                        } finally {
-                            SessionContextHolder.clearSessionContext();
-                        }
-                        ThreadUtils.sleep(5000);
-                        return false;
-                    });
-                }
-            }
+            this.initializeMessageQueue();
         }
 
         // 将消息放入内存消息队列
@@ -85,9 +92,9 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
     @Override
     public void close() throws IOException {
         try {
-            if (this.thread != null) {
-                this.thread.interrupt();
-                ThreadUtils.join(this.thread);
+            if (this.consumer != null) {
+                this.consumer.interrupt();
+                ThreadUtils.join(this.consumer);
             }
         } finally {
             super.close();