|
@@ -4,7 +4,6 @@ import java.lang.reflect.Type;
|
|
|
import java.time.Duration;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
import com.chelvc.framework.base.context.Session;
|
|
@@ -36,12 +35,11 @@ import org.springframework.core.env.Environment;
|
|
|
*/
|
|
|
@Slf4j
|
|
|
public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerContainer<T> {
|
|
|
+ private int batch;
|
|
|
private Type type;
|
|
|
- private String topic;
|
|
|
private String group;
|
|
|
- private int batch;
|
|
|
- private Duration duration;
|
|
|
private Thread thread;
|
|
|
+ private Duration duration;
|
|
|
private SimpleConsumer consumer;
|
|
|
private RocketMQListener<T> listener;
|
|
|
private volatile boolean running = true;
|
|
@@ -58,7 +56,7 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
if (t.getCause() instanceof InterruptedException) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
} else {
|
|
|
- log.error("RocketMQ message receive failed: {}:{}", this.topic, this.group, t);
|
|
|
+ log.error("RocketMQ message receive failed: {}", this.group, t);
|
|
|
}
|
|
|
}
|
|
|
return Collections.emptyList();
|
|
@@ -72,15 +70,22 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
*/
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private boolean consume(List<MessageView> messages) {
|
|
|
+ // 如果消费者组关闭则忽略该消息
|
|
|
+ if (!ApplicationContextHolder.getProperty(this.group, boolean.class, true)) {
|
|
|
+ if (log.isDebugEnabled()) {
|
|
|
+ log.debug("RocketMQ message skipping: {}, {}", this.group, messages);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息消费处理
|
|
|
try {
|
|
|
List<Pair<T, Session>> payloads = RocketMQContextHolder.deserialize(
|
|
|
messages, this.type, (payload, session) -> Pair.of((T) payload, session)
|
|
|
);
|
|
|
return this.listener.consume(payloads);
|
|
|
} catch (Throwable t) {
|
|
|
- String ids = messages.stream().map(MessageView::getMessageId).map(String::valueOf)
|
|
|
- .collect(Collectors.joining(","));
|
|
|
- log.error("RocketMQ message consume failed: {}:{}, {}", this.topic, this.group, ids, t);
|
|
|
+ log.error("RocketMQ message consume failed: {}, {}", this.group, messages, t);
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -98,8 +103,7 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
if (t.getCause() instanceof InterruptedException) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
} else {
|
|
|
- String id = String.valueOf(message.getMessageId());
|
|
|
- log.error("RocketMQ message acknowledge failed: {}:{}, {}", this.topic, this.group, id, t);
|
|
|
+ log.error("RocketMQ message acknowledge failed: {}, {}", this.group, message, t);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -113,17 +117,8 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
private Thread startup() {
|
|
|
return ThreadUtils.run(() -> {
|
|
|
while (this.running && !Thread.currentThread().isInterrupted()) {
|
|
|
- // 批量接收消息
|
|
|
List<MessageView> messages = this.receive();
|
|
|
- if (ObjectUtils.isEmpty(messages)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // 消费消息
|
|
|
- boolean success = this.consume(messages);
|
|
|
-
|
|
|
- // 回执消息
|
|
|
- if (success) {
|
|
|
+ if (ObjectUtils.notEmpty(messages) && this.consume(messages)) {
|
|
|
this.acknowledge(messages);
|
|
|
}
|
|
|
}
|
|
@@ -144,17 +139,17 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
|
|
|
// 初始化消费者实例
|
|
|
this.type = type;
|
|
|
- Environment environment = ApplicationContextHolder.getEnvironment();
|
|
|
- this.topic = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.topic()));
|
|
|
- this.group = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.group()));
|
|
|
this.batch = annotation.batch();
|
|
|
- this.duration = Duration.ofSeconds(annotation.duration());
|
|
|
this.listener = listener;
|
|
|
+ this.duration = Duration.ofSeconds(annotation.duration());
|
|
|
+ Environment environment = ApplicationContextHolder.getEnvironment();
|
|
|
+ this.group = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.group()));
|
|
|
+ String topic = RocketMQContextHolder.isolate(environment.resolvePlaceholders(annotation.topic()));
|
|
|
Duration interval = Duration.ofSeconds(annotation.interval());
|
|
|
FilterExpression filter = new FilterExpression(annotation.tag(), FilterExpressionType.TAG);
|
|
|
SimpleConsumerBuilder builder = provider.newSimpleConsumerBuilder().setClientConfiguration(configuration)
|
|
|
.setConsumerGroup(this.group).setAwaitDuration(interval)
|
|
|
- .setSubscriptionExpressions(Collections.singletonMap(this.topic, filter));
|
|
|
+ .setSubscriptionExpressions(Collections.singletonMap(topic, filter));
|
|
|
try {
|
|
|
this.consumer = builder.build();
|
|
|
} catch (ClientException e) {
|
|
@@ -177,7 +172,7 @@ public class MultipleRocketMQListenerContainer<T> implements RocketMQListenerCon
|
|
|
try {
|
|
|
this.consumer.close();
|
|
|
} catch (Throwable t) {
|
|
|
- log.warn("RocketMQ consumer shutdown failed: {}:{}, {}", this.topic, this.group, t.getMessage());
|
|
|
+ log.warn("RocketMQ consumer shutdown failed: {}, {}", this.group, t.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|