|
@@ -7,12 +7,9 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.Objects;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.function.BiFunction;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Predicate;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
import com.chelvc.framework.base.context.Session;
|
|
@@ -23,6 +20,7 @@ import com.chelvc.framework.common.util.StringUtils;
|
|
|
import com.chelvc.framework.kafka.producer.TransactionMessage;
|
|
|
import com.chelvc.framework.redis.queue.DelayRedisQueue;
|
|
|
import com.chelvc.framework.redis.queue.RedisQueues;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import lombok.NonNull;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
@@ -116,7 +114,7 @@ public final class KafkaContextHolder {
|
|
|
* 消息反序列化
|
|
|
*
|
|
|
* @param record 消息记录
|
|
|
- * @param type 消息类型
|
|
|
+ * @param type 消息体类型
|
|
|
* @param <K> 顺序标识类型
|
|
|
* @param <V> 消息载体类型
|
|
|
* @return 消息体实例
|
|
@@ -135,7 +133,7 @@ public final class KafkaContextHolder {
|
|
|
* 消息反序列化
|
|
|
*
|
|
|
* @param record 消息记录
|
|
|
- * @param type 消息类型
|
|
|
+ * @param type 消息体类型
|
|
|
* @param consumer 消息消费者
|
|
|
* @param <K> 顺序标识类型
|
|
|
* @param <V> 消息载体类型
|
|
@@ -155,24 +153,20 @@ public final class KafkaContextHolder {
|
|
|
/**
|
|
|
* 消息反序列化
|
|
|
*
|
|
|
- * @param records 消息记录集合
|
|
|
- * @param type 消息类型
|
|
|
- * @param function 消息处理函数
|
|
|
- * @param <K> 顺序标识类型
|
|
|
- * @param <V> 消息载体类型
|
|
|
- * @param <T> 消息体类型
|
|
|
+ * @param records 消息记录集合
|
|
|
+ * @param type 消息体类型
|
|
|
+ * @param <K> 顺序标识类型
|
|
|
+ * @param <V> 消息载体类型
|
|
|
* @return 消息列表
|
|
|
*/
|
|
|
- public static <K, V, T> List<T> deserialize(@NonNull Collection<ConsumerRecord<K, V>> records, @NonNull Type type,
|
|
|
- @NonNull BiFunction<Object, Session, T> function) {
|
|
|
+ public static <K, V> List<Object> deserialize(@NonNull Collection<ConsumerRecord<K, V>> records,
|
|
|
+ @NonNull Type type) {
|
|
|
if (ObjectUtils.isEmpty(records)) {
|
|
|
return Collections.emptyList();
|
|
|
}
|
|
|
- return records.stream().map(record -> {
|
|
|
- Object payload = deserialize(record, type);
|
|
|
- Session session = getSession(record.headers());
|
|
|
- return function.apply(payload, session);
|
|
|
- }).filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
+ List<Object> messages = Lists.newArrayListWithCapacity(records.size());
|
|
|
+ records.forEach(record -> messages.add(deserialize(record, type)));
|
|
|
+ return messages;
|
|
|
}
|
|
|
|
|
|
/**
|