|
@@ -33,7 +33,7 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
private static final RedisScript<List> POLL_DELAY_SCRIPT = new DefaultRedisScript<>(
|
|
|
"local values = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3]) " +
|
|
|
- "if #values > 0 then for i = 1, #values do redis.call('ZINCRBY', KEYS[1], ARGV[4], values[i]) " +
|
|
|
+ "if #values > 0 then for i = 1, #values do redis.call('ZADD', KEYS[1], 'XX', ARGV[4], values[i]) " +
|
|
|
"end end return values", List.class
|
|
|
);
|
|
|
|
|
@@ -135,8 +135,9 @@ public class DelayRedisQueue<E> extends PriorityRedisQueue<E> {
|
|
|
long now = System.currentTimeMillis();
|
|
|
long min = this.expiration > 0 ? now - this.expiration : Long.MIN_VALUE;
|
|
|
if (duration.compareTo(Duration.ZERO) > 0) {
|
|
|
+ long score = now + duration.toMillis();
|
|
|
List<String> keys = Collections.singletonList(this.getName());
|
|
|
- List<E> values = this.template().execute(POLL_DELAY_SCRIPT, keys, min, now, size, duration.toMillis());
|
|
|
+ List<E> values = this.template().execute(POLL_DELAY_SCRIPT, keys, min, now, size, score);
|
|
|
return ObjectUtils.ifEmpty(values, Collections::emptyList);
|
|
|
}
|
|
|
return super.poll(min, now, size);
|