|
@@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
+import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
import com.chelvc.framework.base.context.JacksonContextHolder;
|
|
import com.chelvc.framework.base.context.JacksonContextHolder;
|
|
import com.chelvc.framework.base.util.ObjectUtils;
|
|
import com.chelvc.framework.base.util.ObjectUtils;
|
|
import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
|
|
import com.chelvc.framework.rocketmq.context.RocketMQContextHolder;
|
|
@@ -34,6 +35,7 @@ import org.springframework.aop.framework.AopProxyUtils;
|
|
import org.springframework.beans.BeansException;
|
|
import org.springframework.beans.BeansException;
|
|
import org.springframework.context.ApplicationContext;
|
|
import org.springframework.context.ApplicationContext;
|
|
import org.springframework.core.MethodParameter;
|
|
import org.springframework.core.MethodParameter;
|
|
|
|
+import org.springframework.core.env.Environment;
|
|
import org.springframework.messaging.Message;
|
|
import org.springframework.messaging.Message;
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
import org.springframework.messaging.converter.MessageConverter;
|
|
import org.springframework.messaging.converter.SmartMessageConverter;
|
|
import org.springframework.messaging.converter.SmartMessageConverter;
|
|
@@ -105,11 +107,20 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
|
|
|
|
|
|
// 初始化会话消息监听器
|
|
// 初始化会话消息监听器
|
|
if (this.getRocketMQListener() instanceof SessionRocketMQListener) {
|
|
if (this.getRocketMQListener() instanceof SessionRocketMQListener) {
|
|
|
|
+ // 获取MQ消费者组及主体
|
|
|
|
+ Environment environment = ApplicationContextHolder.getEnvironment();
|
|
|
|
+ RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
|
|
|
|
+ String topic = ObjectUtils.ifNull(
|
|
|
|
+ ObjectUtils.ifNull(annotation, RocketMQMessageListener::topic), environment::resolvePlaceholders
|
|
|
|
+ );
|
|
|
|
+ String group = ObjectUtils.ifNull(
|
|
|
|
+ ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumerGroup),
|
|
|
|
+ environment::resolvePlaceholders
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ // 根据消费模式初始化消息监听器
|
|
DefaultMQPushConsumer consumer = this.getConsumer();
|
|
DefaultMQPushConsumer consumer = this.getConsumer();
|
|
SessionRocketMQListener listener = (SessionRocketMQListener) this.getRocketMQListener();
|
|
SessionRocketMQListener listener = (SessionRocketMQListener) this.getRocketMQListener();
|
|
- RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
|
|
|
|
- String topic = ObjectUtils.ifNull(annotation, RocketMQMessageListener::topic);
|
|
|
|
- String group = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumerGroup);
|
|
|
|
ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
|
|
ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
|
|
if (mode == ConsumeMode.ORDERLY) {
|
|
if (mode == ConsumeMode.ORDERLY) {
|
|
consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
|
|
consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
|
|
@@ -117,7 +128,7 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
- log.error("Orderly message consume failed [{}][{}]: {}", topic, group, ids, e);
|
|
|
|
|
|
+ log.error("Orderly message consume failed [{}][{}]: {}", group, topic, ids, e);
|
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
|
} finally {
|
|
} finally {
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
@@ -130,7 +141,7 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
- log.error("Concurrently message consume failed [{}][{}]: {}", topic, group, ids, e);
|
|
|
|
|
|
+ log.error("Concurrently message consume failed [{}][{}]: {}", group, topic, ids, e);
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
} finally {
|
|
} finally {
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|