|
@@ -108,13 +108,16 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
|
|
DefaultMQPushConsumer consumer = this.getConsumer();
|
|
DefaultMQPushConsumer consumer = this.getConsumer();
|
|
SessionRocketMQListener listener = (SessionRocketMQListener) this.getRocketMQListener();
|
|
SessionRocketMQListener listener = (SessionRocketMQListener) this.getRocketMQListener();
|
|
RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
|
|
RocketMQMessageListener annotation = target.getAnnotation(RocketMQMessageListener.class);
|
|
|
|
+ String topic = ObjectUtils.ifNull(annotation, RocketMQMessageListener::topic);
|
|
|
|
+ String group = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumerGroup);
|
|
ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
|
|
ConsumeMode mode = ObjectUtils.ifNull(annotation, RocketMQMessageListener::consumeMode);
|
|
if (mode == ConsumeMode.ORDERLY) {
|
|
if (mode == ConsumeMode.ORDERLY) {
|
|
consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
|
|
consumer.setMessageListener((MessageListenerOrderly) (messages, context) -> {
|
|
try {
|
|
try {
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- log.warn("consume message failed: {}", e.getMessage(), e);
|
|
|
|
|
|
+ String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
|
|
+ log.error("Orderly message consume failed [{}][{}]: {}", topic, group, ids, e);
|
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
|
} finally {
|
|
} finally {
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
@@ -126,7 +129,8 @@ public class SessionRocketMQListenerContainer extends DefaultRocketMQListenerCon
|
|
try {
|
|
try {
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
listener.onMessageContexts(messages.stream().map(this::convert).collect(Collectors.toList()));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- log.warn("consume message failed: {}", e.getMessage(), e);
|
|
|
|
|
|
+ String ids = messages.stream().map(MessageExt::getMsgId).collect(Collectors.joining(","));
|
|
|
|
+ log.error("Concurrently message consume failed [{}][{}]: {}", topic, group, ids, e);
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
} finally {
|
|
} finally {
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|
|
consumer.setConsumeMessageBatchMaxSize(listener.getBatchConsumeSize());
|