|
@@ -4,6 +4,7 @@ import java.util.Map;
|
|
|
|
|
|
import com.chelvc.framework.base.context.SessionContextHolder;
|
|
|
import com.chelvc.framework.kafka.context.KafkaContextHolder;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.Consumer;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
|
@@ -17,6 +18,7 @@ import org.springframework.kafka.listener.RecordInterceptor;
|
|
|
* @author Woody
|
|
|
* @date 2024/1/30
|
|
|
*/
|
|
|
+@Slf4j
|
|
|
public class KafkaSessionInterceptor implements ProducerInterceptor<Object, Object>, RecordInterceptor<Object, Object> {
|
|
|
@Override
|
|
|
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
|
|
@@ -55,4 +57,10 @@ public class KafkaSessionInterceptor implements ProducerInterceptor<Object, Obje
|
|
|
// 清空当前会话
|
|
|
SessionContextHolder.removeSessionContext();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void failure(ConsumerRecord<Object, Object> record, Exception exception, Consumer<Object, Object> consumer) {
|
|
|
+ String topic = record.topic(), group = consumer.groupMetadata().groupId();
|
|
|
+ log.error("Kafka message consume failed: {}:{}, {}", topic, group, record.value(), exception);
|
|
|
+ }
|
|
|
}
|