Skip to content

Commit a9e0f09

Browse files
Fix issue with delayed tasks for Redis brokers (#34)
* Reset delay on signature for Redis brokers * Address PR comments
1 parent 61086db commit a9e0f09

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

v1/brokers/redis/goredis.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,17 @@ func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) erro
174174
// Adjust routing key (this decides which queue the message will be published to)
175175
b.Broker.AdjustRoutingKey(signature)
176176

177+
// We'll capture the delay here and set it to zero in the signature because otherwise
178+
// the task will be continously delayed.
179+
delay := signature.Delay
180+
signature.Delay = 0
177181
msg, err := json.Marshal(signature)
178182
if err != nil {
179183
return fmt.Errorf("JSON marshal error: %s", err)
180184
}
181185

182-
// Check the delay signature field, if it is set and it is in the future,
183-
// delay the task
184-
if signature.Delay > 0 {
185-
score := time.Now().Add(signature.Delay).UnixNano()
186+
if delay > 0 {
187+
score := time.Now().Add(delay).UnixNano()
186188
err = b.rclient.ZAdd(context.Background(), b.redisDelayedTasksKey, redis.Z{Score: float64(score), Member: msg}).Err()
187189
return err
188190
}

v1/brokers/redis/redis.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
184184
// Adjust routing key (this decides which queue the message will be published to)
185185
b.Broker.AdjustRoutingKey(signature)
186186

187+
// We'll capture the delay here and set it to zero in the signature because otherwise
188+
// the task will be continously delayed.
189+
delay := signature.Delay
190+
signature.Delay = 0
187191
msg, err := json.Marshal(signature)
188192
if err != nil {
189193
return fmt.Errorf("JSON marshal error: %s", err)
@@ -192,10 +196,8 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
192196
conn := b.open()
193197
defer conn.Close()
194198

195-
// Check the delay signature field, if it is set and it is in the future,
196-
// delay the task
197-
if signature.Delay > 0 {
198-
score := time.Now().Add(signature.Delay).UnixNano()
199+
if delay > 0 {
200+
score := time.Now().Add(delay).UnixNano()
199201
_, err = conn.Do("ZADD", b.redisDelayedTasksKey, score, msg)
200202
return err
201203
}

0 commit comments

Comments
 (0)