@@ -3,6 +3,7 @@ package redis
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "errors"
6
7
"fmt"
7
8
"strings"
8
9
"time"
@@ -86,7 +87,7 @@ func (q *taskQueue[T]) Prepare(ctx context.Context, rdb redis.UniversalClient, q
86
87
}
87
88
88
89
_ , err := prepareCmd .Run (ctx , rdb , keys , q .groupName ).Result ()
89
- if err != nil && err != redis .Nil {
90
+ if err != nil && ! errors . Is ( err , redis .Nil ) {
90
91
return fmt .Errorf ("preparing queues: %w" , err )
91
92
}
92
93
@@ -164,11 +165,11 @@ func (q *taskQueue[T]) Dequeue(ctx context.Context, rdb redis.UniversalClient, q
164
165
Count : 1 ,
165
166
Block : timeout ,
166
167
}).Result ()
167
- if err != nil && err != redis .Nil {
168
+ if err != nil && ! errors . Is ( err , redis .Nil ) {
168
169
return nil , fmt .Errorf ("dequeueing task: %w" , err )
169
170
}
170
171
171
- if len (ids ) == 0 || len (ids [0 ].Messages ) == 0 || err == redis .Nil {
172
+ if len (ids ) == 0 || len (ids [0 ].Messages ) == 0 || errors . Is ( err , redis .Nil ) {
172
173
return nil , nil
173
174
}
174
175
@@ -186,7 +187,7 @@ func (q *taskQueue[T]) Extend(ctx context.Context, p redis.Pipeliner, queue work
186
187
Messages : []string {taskID },
187
188
MinIdle : 0 , // Always claim this message
188
189
}).Result ()
189
- if err != nil && err != redis .Nil {
190
+ if err != nil && ! errors . Is ( err , redis .Nil ) {
190
191
return fmt .Errorf ("extending lease: %w" , err )
191
192
}
192
193
@@ -198,7 +199,7 @@ func (q *taskQueue[T]) Complete(ctx context.Context, p redis.Pipeliner, queue wo
198
199
q .Keys (queue ).SetKey ,
199
200
q .Keys (queue ).StreamKey ,
200
201
}, taskID , q .groupName )
201
- if err := cmd .Err (); err != nil && err != redis .Nil {
202
+ if err := cmd .Err (); err != nil && ! errors . Is ( err , redis .Nil ) {
202
203
return nil , fmt .Errorf ("completing task: %w" , err )
203
204
}
204
205
@@ -207,7 +208,7 @@ func (q *taskQueue[T]) Complete(ctx context.Context, p redis.Pipeliner, queue wo
207
208
208
209
func (q * taskQueue [T ]) Data (ctx context.Context , p redis.Pipeliner , queue workflow.Queue , taskID string ) (* TaskItem [T ], error ) {
209
210
msg , err := p .XRange (ctx , q .Keys (queue ).StreamKey , taskID , taskID ).Result ()
210
- if err != nil && err != redis .Nil {
211
+ if err != nil && ! errors . Is ( err , redis .Nil ) {
211
212
return nil , fmt .Errorf ("finding task: %w" , err )
212
213
}
213
214
@@ -230,7 +231,7 @@ func (q *taskQueue[T]) recover(ctx context.Context, rdb redis.UniversalClient, q
230
231
"0" ,
231
232
).Slice ()
232
233
if err != nil {
233
- if err == redis .Nil {
234
+ if errors . Is ( err , redis .Nil ) {
234
235
return nil , nil
235
236
}
236
237
0 commit comments