Explorar el Código

Kafka处理逻辑优化

woody hace 1 año
padre
commit
5e9f78e1f5

+ 19 - 3
framework-common/src/main/java/com/chelvc/framework/common/util/ObjectUtils.java

@@ -10,6 +10,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -25,6 +26,9 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.chelvc.framework.common.function.Getter;
+import com.chelvc.framework.common.function.Setter;
+import com.chelvc.framework.common.model.Modification;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.protostuff.LinkedBuffer;
@@ -37,9 +41,6 @@ import ma.glasnost.orika.MappingContext;
 import ma.glasnost.orika.converter.BidirectionalConverter;
 import ma.glasnost.orika.impl.DefaultMapperFactory;
 import ma.glasnost.orika.metadata.Type;
-import com.chelvc.framework.common.function.Getter;
-import com.chelvc.framework.common.function.Setter;
-import com.chelvc.framework.common.model.Modification;
 
 /**
  * 对象工具类
@@ -1070,4 +1071,19 @@ public final class ObjectUtils {
             }
         }
     }
+
+    /**
+     * 获取从缓存中获取字节数组
+     *
+     * @param buffer 直接缓存
+     * @return 字节数组
+     */
+    public static byte[] getBytes(ByteBuffer buffer) {
+        if (buffer == null) {
+            return null;
+        }
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes, 0, bytes.length);
+        return bytes;
+    }
 }

+ 20 - 5
framework-kafka/src/main/java/com/chelvc/framework/kafka/config/KafkaConfigurer.java

@@ -2,13 +2,16 @@ package com.chelvc.framework.kafka.config;
 
 import java.lang.reflect.Type;
 import java.util.Collection;
+import java.util.Objects;
 import java.util.Set;
+import java.util.regex.Pattern;
 
-import lombok.RequiredArgsConstructor;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.kafka.context.KafkaContextHolder;
 import com.chelvc.framework.kafka.interceptor.KafkaSessionInterceptor;
+import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -82,14 +85,26 @@ public class KafkaConfigurer implements KafkaListenerConfigurer {
             );
 
             /**
-             * 初始化消息监听器端点主题
+             * 隔离消息监听器端点环境
              *
              * @param endpoint 消息监听器端点实例
              */
-            private void initializeListenerEndpointTopic(MethodKafkaListenerEndpoint<?, ?> endpoint) {
+            private void isolateListenerEndpointEnvironment(MethodKafkaListenerEndpoint<?, ?> endpoint) {
+                if (StringUtils.nonEmpty(endpoint.getGroup())) {
+                    endpoint.setGroup(KafkaContextHolder.isolate(endpoint.getGroup()));
+                }
+                if (StringUtils.nonEmpty(endpoint.getGroupId())) {
+                    endpoint.setGroupId(KafkaContextHolder.isolate(endpoint.getGroupId()));
+                }
+                if (Objects.nonNull(endpoint.getTopicPattern())) {
+                    String pattern = endpoint.getTopicPattern().pattern();
+                    if (StringUtils.nonEmpty(pattern)) {
+                        endpoint.setTopicPattern(Pattern.compile(KafkaContextHolder.isolate(pattern)));
+                    }
+                }
                 Collection<String> topics = endpoint.getTopics();
                 if (ObjectUtils.nonEmpty(topics)) {
-                    endpoint.setTopics(topics.stream().map(KafkaContextHolder::topic).toArray(String[]::new));
+                    endpoint.setTopics(topics.stream().map(KafkaContextHolder::isolate).toArray(String[]::new));
                 }
             }
 
@@ -128,7 +143,7 @@ public class KafkaConfigurer implements KafkaListenerConfigurer {
             public void registerListenerContainer(KafkaListenerEndpoint endpoint,
                                                   KafkaListenerContainerFactory<?> factory, boolean
                                                           startImmediately) {
-                this.initializeListenerEndpointTopic((MethodKafkaListenerEndpoint<?, ?>) endpoint);
+                this.isolateListenerEndpointEnvironment((MethodKafkaListenerEndpoint<?, ?>) endpoint);
                 this.proxy.registerListenerContainer(endpoint, factory, startImmediately);
             }
 

+ 5 - 5
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -3,13 +3,13 @@ package com.chelvc.framework.kafka.context;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 
-import lombok.NonNull;
 import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.base.context.SessionContextHolder;
 import com.chelvc.framework.base.model.Session;
 import com.chelvc.framework.common.util.JacksonUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.kafka.config.KafkaProperties;
+import lombok.NonNull;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.springframework.kafka.core.KafkaTemplate;
@@ -72,12 +72,12 @@ public class KafkaContextHolder {
     }
 
     /**
-     * 消息主题转换
+     * 消息环境隔离
      *
-     * @param original 原始消息主题
-     * @return 消息主题
+     * @param original 原始消息标识
+     * @return 消息标识
      */
-    public static String topic(@NonNull String original) {
+    public static String isolate(@NonNull String original) {
         String namespace = StringUtils.ifEmpty(getProperties().getNamespace(), StringUtils.EMPTY);
         return ApplicationContextHolder.getEnvironment().resolvePlaceholders(namespace + original);
     }

+ 2 - 2
framework-kafka/src/main/java/com/chelvc/framework/kafka/interceptor/KafkaSessionInterceptor.java

@@ -24,8 +24,8 @@ public class KafkaSessionInterceptor implements ProducerInterceptor<Object, Obje
         // 初始化会话信息到消息头
         KafkaContextHolder.initializeSession(record.headers());
 
-        // 重新组装消息主题
-        String topic = KafkaContextHolder.topic(record.topic());
+        // 隔离消息主题环境
+        String topic = KafkaContextHolder.isolate(record.topic());
 
         // 构建新的消息生产记录
         return new ProducerRecord<>(topic, record.partition(), record.timestamp(), record.key(),