|
@@ -40,26 +40,28 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
|
|
|
/**
|
|
|
* 初始化消息队列
|
|
|
*/
|
|
|
- private synchronized void initializeMessageQueue() {
|
|
|
- if (this.queue != null) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- this.queue = new ArrayBlockingQueue<>(this.capacity);
|
|
|
- this.consumer = ThreadUtils.consume(this.queue, pair -> {
|
|
|
- ProducerRecord<K, V> message = pair.getLeft();
|
|
|
- SessionContextHolder.setSession(pair.getRight());
|
|
|
- try {
|
|
|
- this.delegate.send(message);
|
|
|
- return true;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Kafka message send failed: {}", message.topic(), e);
|
|
|
- } finally {
|
|
|
- SessionContextHolder.removeSessionContext();
|
|
|
+ private void initializeMessageQueue() {
|
|
|
+ if (this.queue == null) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (this.queue == null) {
|
|
|
+ this.queue = new ArrayBlockingQueue<>(this.capacity);
|
|
|
+ this.consumer = ThreadUtils.consume(this.queue, pair -> {
|
|
|
+ ProducerRecord<K, V> message = pair.getLeft();
|
|
|
+ SessionContextHolder.setSession(pair.getRight());
|
|
|
+ try {
|
|
|
+ this.delegate.send(message);
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Kafka message send failed: {}", message.topic(), e);
|
|
|
+ } finally {
|
|
|
+ SessionContextHolder.removeSessionContext();
|
|
|
+ }
|
|
|
+ ThreadUtils.sleep(5000);
|
|
|
+ return false;
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
- ThreadUtils.sleep(5000);
|
|
|
- return false;
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -68,12 +70,7 @@ public class KafkaMemorySender<K, V> extends KafkaFallbackSender<K, V> {
|
|
|
log.debug("Kafka fallback message: {}, {}", record.topic(), record.value());
|
|
|
}
|
|
|
|
|
|
- // 初始化消息队列
|
|
|
- if (this.queue == null) {
|
|
|
- this.initializeMessageQueue();
|
|
|
- }
|
|
|
-
|
|
|
- // 将消息放入内存消息队列
|
|
|
+ this.initializeMessageQueue();
|
|
|
Session session = SessionContextHolder.getSession(false);
|
|
|
if (!this.queue.offer(Pair.of(record, session))) {
|
|
|
throw new RuntimeException("Kafka fallback queue is full: " + record.topic() + ", " + record.value());
|