|
@@ -4,13 +4,12 @@ import java.time.Duration;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import com.chelvc.framework.base.context.ApplicationContextHolder;
|
|
|
import com.chelvc.framework.common.util.AssertUtils;
|
|
|
import com.chelvc.framework.common.util.ObjectUtils;
|
|
|
-import com.google.common.collect.ImmutableMap;
|
|
|
-import com.google.common.collect.Maps;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import lombok.Getter;
|
|
|
import lombok.NonNull;
|
|
|
import org.springframework.dao.DataAccessException;
|
|
@@ -69,20 +68,20 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 判断元素是否处于延时状态
|
|
|
+ * 获取处于延时状态的元素集合
|
|
|
*
|
|
|
* @param collection 元素集合
|
|
|
- * @return 元素/是否延时映射表
|
|
|
+ * @return 元素集合
|
|
|
*/
|
|
|
- public Map<E, Boolean> isDelaying(Collection<E> collection) {
|
|
|
+ public Set<E> isDelaying(Collection<E> collection) {
|
|
|
if (ObjectUtils.isEmpty(collection)) {
|
|
|
- return Collections.emptyMap();
|
|
|
+ return Collections.emptySet();
|
|
|
}
|
|
|
|
|
|
// 处理单个元素
|
|
|
if (collection.size() == 1) {
|
|
|
E e = collection.iterator().next();
|
|
|
- return ImmutableMap.of(e, this.isDelaying(e));
|
|
|
+ return this.isDelaying(e) ? Collections.singleton(e) : Collections.emptySet();
|
|
|
}
|
|
|
|
|
|
// 批量处理多个元素
|
|
@@ -97,16 +96,18 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
|
|
|
}
|
|
|
});
|
|
|
if (ObjectUtils.isEmpty(scores)) {
|
|
|
- return Collections.emptyMap();
|
|
|
+ return Collections.emptySet();
|
|
|
}
|
|
|
int i = 0;
|
|
|
long timestamp = System.currentTimeMillis();
|
|
|
- Map<E, Boolean> consumings = Maps.newHashMapWithExpectedSize(collection.size());
|
|
|
+ Set<E> delaying = Sets.newHashSetWithExpectedSize(collection.size());
|
|
|
for (E e : collection) {
|
|
|
Double score = (Double) scores.get(i++);
|
|
|
- consumings.put(e, score != null && score > timestamp);
|
|
|
+ if (score != null && score > timestamp) {
|
|
|
+ delaying.add(e);
|
|
|
+ }
|
|
|
}
|
|
|
- return consumings;
|
|
|
+ return delaying.isEmpty() ? Collections.emptySet() : Collections.unmodifiableSet(delaying);
|
|
|
}
|
|
|
|
|
|
/**
|