فهرست منبع

新增Kafka消息记录适配处理器;其他代码优化;

Woody 1 هفته پیش
والد
کامیت
73c2a032be

+ 32 - 0
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordAdapter.java

@@ -0,0 +1,32 @@
+package com.chelvc.framework.kafka.interceptor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Kafka消息记录适配器接口
+ *
+ * @author Woody
+ * @date 2025/8/28
+ */
+public interface KafkaRecordAdapter {
+    /**
+     * 生产记录适配
+     *
+     * @param record 生产记录
+     * @return 生产记录
+     */
+    default ProducerRecord<Object, Object> produce(ProducerRecord<Object, Object> record) {
+        return record;
+    }
+
+    /**
+     * 消费记录适配
+     *
+     * @param record 消费记录
+     * @return 消费记录
+     */
+    default ConsumerRecord<Object, Object> consume(ConsumerRecord<Object, Object> record) {
+        return record;
+    }
+}

+ 33 - 3
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaRecordInterceptor.java

@@ -1,10 +1,12 @@
 package com.chelvc.framework.kafka.interceptor;
 
+import java.util.List;
 import java.util.Map;
 
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.common.util.ErrorUtils;
+import com.chelvc.framework.common.util.ObjectUtils;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -25,6 +27,17 @@ import org.springframework.stereotype.Component;
 @Component
 public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Object>,
         ConsumerAwareRecordInterceptor<Object, Object> {
+    /**
+     * Kafka消息记录适配器列表单例对象
+     */
+    private static class KafkaRecordAdapters {
+        /**
+         * 适配器实例列表
+         */
+        private static final List<KafkaRecordAdapter> INSTANCES =
+                ApplicationContextHolder.getSortBeans(KafkaRecordAdapter.class);
+    }
+
     @Override
     public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
         // 初始化会话信息到消息头
@@ -33,9 +46,17 @@ public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Objec
         // 隔离消息主题环境
         String topic = KafkaContextHolder.isolate(record.topic());
 
-        // 构建新的消息生产记录
-        return new ProducerRecord<>(topic, record.partition(), record.timestamp(), record.key(),
-                record.value(), record.headers());
+        // 构建并适配消息记录
+        record = new ProducerRecord<>(topic, record.partition(), record.timestamp(), record.key(), record.value(),
+                record.headers());
+        if (ObjectUtils.notEmpty(KafkaRecordAdapters.INSTANCES)) {
+            for (KafkaRecordAdapter adapter : KafkaRecordAdapters.INSTANCES) {
+                if ((record = adapter.produce(record)) == null) {
+                    break;
+                }
+            }
+        }
+        return record;
     }
 
     @Override
@@ -63,6 +84,15 @@ public class KafkaRecordInterceptor implements ProducerInterceptor<Object, Objec
 
         // 设置当前会话信息
         SessionContextHolder.setSession(KafkaContextHolder.getSession(record.headers()));
+
+        // 适配消息记录
+        if (ObjectUtils.notEmpty(KafkaRecordAdapters.INSTANCES)) {
+            for (KafkaRecordAdapter adapter : KafkaRecordAdapters.INSTANCES) {
+                if ((record = adapter.consume(record)) == null) {
+                    break;
+                }
+            }
+        }
         return record;
     }
 

+ 27 - 29
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/fallback/RocketMQMemoryProducer.java

@@ -41,32 +41,34 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
     /**
      * 初始化消息队列
      */
-    private synchronized void initializeMessageQueue() {
-        if (this.queue != null) {
-            return;
-        }
-
-        this.queue = new ArrayBlockingQueue<>(this.capacity);
-        this.consumer = ThreadUtils.consume(this.queue, pair -> {
-            Message message = pair.getLeft().getLeft();
-            Transaction transaction = pair.getLeft().getRight();
-            SessionContextHolder.setSession(pair.getRight());
-            try {
-                if (transaction == null) {
-                    this.delegate.send(message);
-                } else {
-                    this.delegate.send(message, transaction);
-                    transaction.commit();
+    private void initializeMessageQueue() {
+        if (this.queue == null) {
+            synchronized (this) {
+                if (this.queue == null) {
+                    this.queue = new ArrayBlockingQueue<>(this.capacity);
+                    this.consumer = ThreadUtils.consume(this.queue, pair -> {
+                        Message message = pair.getLeft().getLeft();
+                        Transaction transaction = pair.getLeft().getRight();
+                        SessionContextHolder.setSession(pair.getRight());
+                        try {
+                            if (transaction == null) {
+                                this.delegate.send(message);
+                            } else {
+                                this.delegate.send(message, transaction);
+                                transaction.commit();
+                            }
+                            return true;
+                        } catch (Exception e) {
+                            log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(message), e);
+                        } finally {
+                            SessionContextHolder.removeSessionContext();
+                        }
+                        ThreadUtils.sleep(5000);
+                        return false;
+                    });
                 }
-                return true;
-            } catch (Exception e) {
-                log.error("RocketMQ message send failed: {}", RocketMQContextHolder.topic(message), e);
-            } finally {
-                SessionContextHolder.removeSessionContext();
             }
-            ThreadUtils.sleep(5000);
-            return false;
-        });
+        }
     }
 
     @Override
@@ -76,12 +78,8 @@ public class RocketMQMemoryProducer extends RocketMQFallbackProducer {
             log.debug("RocketMQ fallback message: {}, {}", topic, body);
         }
 
-        // 初始化消息队列
-        if (this.queue == null) {
-            this.initializeMessageQueue();
-        }
-
         // 将消息放入内存消息队列
+        this.initializeMessageQueue();
         Session session = SessionContextHolder.getSession(false);
         if (!this.queue.offer(Pair.of(Pair.of(message, transaction), session))) {
             String topic = RocketMQContextHolder.topic(message), body = RocketMQContextHolder.body(message);

+ 9 - 15
framework-security/src/main/java/com/chelvc/framework/security/firewall/DefaultFirewallProcessor.java

@@ -31,7 +31,6 @@ import org.springframework.data.redis.core.SessionCallback;
 public class DefaultFirewallProcessor implements FirewallProcessor {
     private final VariableParser parser;
     private final ActionExecutor executor;
-    private volatile IdentityUtils.Generator sequenceGenerator;
 
     public DefaultFirewallProcessor(@NonNull VariableParser parser, @NonNull ActionExecutor executor) {
         this.parser = parser;
@@ -39,19 +38,13 @@ public class DefaultFirewallProcessor implements FirewallProcessor {
     }
 
     /**
-     * 获取序列号
-     *
-     * @return 序列号数字
+     * 序列号生成器单例对象
      */
-    private Long getSequence() {
-        if (this.sequenceGenerator == null) {
-            synchronized (this) {
-                if (this.sequenceGenerator == null) {
-                    this.sequenceGenerator = RedisContextHolder.getIdentityGenerator().clone();
-                }
-            }
-        }
-        return this.sequenceGenerator.next();
+    private static class SequenceGenerator {
+        /**
+         * 序列号生成器实例
+         */
+        private static final IdentityUtils.Generator INSTANCE = RedisContextHolder.getIdentityGenerator().clone();
     }
 
     /**
@@ -92,7 +85,8 @@ public class DefaultFirewallProcessor implements FirewallProcessor {
      */
     protected void refreshAccessControls(List<Pair<String, Rule>> pairs) {
         // 批量刷新时间窗口数据
-        Long sequence = this.getSequence(), timestamp = System.currentTimeMillis();
+        Long id = SequenceGenerator.INSTANCE.next();
+        long timestamp = System.currentTimeMillis();
         RedisTemplate<String, Object> template = RedisContextHolder.getDefaultTemplate();
         List<Object> results = template.executePipelined(new SessionCallback<Object>() {
             @Override
@@ -104,7 +98,7 @@ public class DefaultFirewallProcessor implements FirewallProcessor {
                     long mark = timestamp - rule.getCycle() * 1000;
 
                     // 添加访问记录
-                    operations.opsForZSet().add(key, (V) sequence, timestamp);
+                    operations.opsForZSet().add(key, (V) id, timestamp);
 
                     // 移除时间窗口外记录
                     operations.opsForZSet().removeRangeByScore(key, 0, mark - 1000);