Skip to content

Commit e8996c9

Browse files
authored
Add deadline API for tracked messages (#224)
* Add deadline API for tracked messages to get a slice of tracked message values that have exceeded their deadline within a given time range into the past. * Switch name and impl to "cordon" track values that have exceeded deadline so that the `GC` can delete them without double checking.
1 parent 1723049 commit e8996c9

File tree

4 files changed

+412
-7
lines changed

4 files changed

+412
-7
lines changed

queue/client.go

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,36 @@ type OnGCFunc func(ctx context.Context, trackValues []string) error
6969
// of the current server time as a way of limiting the keyspace scanned. As a special
7070
// case, any value <= -1 will result in all keys being scanned.
7171
func (c *Client) GC(ctx context.Context, nTimeDigits int, f OnGCFunc) (uint64, uint64, error) {
72+
pipe := c.rdb.Pipeline()
73+
presortedGarbageCmd := pipe.SMembers(ctx, MetaPresortedGarbageSet)
74+
pipe.Del(ctx, MetaPresortedGarbageSet)
75+
76+
if _, err := pipe.Exec(ctx); err != nil {
77+
return 0, 0, err
78+
}
79+
80+
presortedGarbage := presortedGarbageCmd.Val()
81+
82+
if len(presortedGarbage) > 0 {
83+
toDelete := []string{}
84+
for _, key := range presortedGarbage {
85+
trackValue, deadline, ok := strings.Cut(key, ":")
86+
if !ok {
87+
continue
88+
}
89+
90+
toDelete = append(
91+
toDelete,
92+
trackValue,
93+
fmt.Sprintf("%s:expiry:%s", trackValue, deadline),
94+
)
95+
}
96+
97+
if err := c.rdb.HDel(ctx, MetaCancelationHash, toDelete...).Err(); err != nil {
98+
return 0, 0, err
99+
}
100+
}
101+
72102
now, err := c.rdb.Time(ctx).Result()
73103
if err != nil {
74104
return 0, 0, err
@@ -171,11 +201,23 @@ func (c *Client) gcProcessBatch(ctx context.Context, f OnGCFunc, idsToDelete, ke
171201
}
172202
}
173203

174-
return c.rdb.HDel(
175-
ctx,
176-
MetaCancelationHash,
177-
keysToDelete...,
178-
).Result()
204+
nDeleted, err := c.rdb.HDel(ctx, MetaCancelationHash, keysToDelete...).Result()
205+
if err != nil {
206+
return nDeleted, err
207+
}
208+
209+
// NOTE: ZRem requires an explicit []any which cannot be automatically
210+
// converted from a []string.
211+
zremArgs := make([]any, len(idsToDelete))
212+
for i, id := range idsToDelete {
213+
zremArgs[i] = id
214+
}
215+
216+
if err := c.rdb.ZRem(ctx, MetaDeadlinesZSet, zremArgs...).Err(); err != nil {
217+
return nDeleted, err
218+
}
219+
220+
return nDeleted, nil
179221
}
180222

181223
func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string) error {
@@ -213,6 +255,60 @@ func (c *Client) callOnGC(ctx context.Context, f OnGCFunc, idsToDelete []string)
213255
return f(ctx, trackValues)
214256
}
215257

258+
// CordonDeadlineExceeded selects a chunk of "track values" that have exceeded their
259+
// deadline within a given duration into the past, moves them into the "presorted garbage"
260+
// set for use with `GC`, and returns them as a slice. The times are truncated to the
261+
// second because the deadlines scored set uses unix timestamps as scores.
262+
func (c *Client) CordonDeadlineExceeded(ctx context.Context, within time.Duration) ([]string, error) {
263+
start, err := c.rdb.Time(ctx).Result()
264+
if err != nil {
265+
return []string{}, err
266+
}
267+
268+
pipe := c.rdb.Pipeline()
269+
zRangeOpts := &redis.ZRangeBy{
270+
Min: strconv.Itoa(int(start.Add(-within).Unix())),
271+
Max: strconv.Itoa(int(start.Add(1 * time.Second).Unix())),
272+
}
273+
274+
zRangeCmd := c.rdb.ZRangeByScoreWithScores(
275+
ctx,
276+
MetaDeadlinesZSet,
277+
zRangeOpts,
278+
)
279+
280+
c.rdb.ZRemRangeByScore(
281+
ctx,
282+
MetaDeadlinesZSet,
283+
zRangeOpts.Min,
284+
zRangeOpts.Max,
285+
)
286+
287+
if _, err := pipe.Exec(ctx); err != nil {
288+
return []string{}, err
289+
}
290+
291+
trackValues := zRangeCmd.Val()
292+
293+
if len(trackValues) == 0 {
294+
return []string{}, nil
295+
}
296+
297+
ret := make([]string, len(trackValues))
298+
sMembers := make([]any, len(trackValues))
299+
300+
for i, trackValue := range trackValues {
301+
sMembers[i] = fmt.Sprintf("%s:%d", trackValue.Member, int(trackValue.Score))
302+
ret[i] = fmt.Sprintf("%v", trackValue.Member)
303+
}
304+
305+
if err := c.rdb.SAdd(ctx, MetaPresortedGarbageSet, sMembers...).Err(); err != nil {
306+
return ret, err
307+
}
308+
309+
return ret, nil
310+
}
311+
216312
// Len calculates the aggregate length (XLEN) of the queue. It adds up the
217313
// lengths of all the streams in the queue.
218314
func (c *Client) Len(ctx context.Context, name string) (int64, error) {

0 commit comments

Comments
 (0)