@@ -2,15 +2,16 @@ package queue
22
33import (
44 "context"
5- "crypto/sha1"
65 "encoding/json"
76 "errors"
87 "fmt"
98 "regexp"
9+ "strconv"
1010 "strings"
1111 "time"
1212
1313 "github.com/redis/go-redis/v9"
14+ "go.uber.org/multierr"
1415
1516 "github.com/replicate/go/shuffleshard"
1617)
@@ -20,10 +21,15 @@ var (
2021 ErrInvalidWriteArgs = errors .New ("queue: invalid write arguments" )
2122 ErrNoMatchingMessageInStream = errors .New ("queue: no matching message in stream" )
2223 ErrInvalidMetaCancelation = errors .New ("queue: invalid meta cancelation" )
24+ ErrStopGC = errors .New ("queue: stop garbage collection" )
2325
2426 streamSuffixPattern = regexp .MustCompile (`\A:s(\d+)\z` )
2527)
2628
29+ const (
30+ metaCancelationGCBatchSize = 100
31+ )
32+
2733type Client struct {
2834 rdb redis.Cmdable
2935 ttl time.Duration // ttl for all keys in queue
@@ -52,14 +58,145 @@ func (c *Client) Prepare(ctx context.Context) error {
5258 return prepare (ctx , c .rdb )
5359}
5460
61+ // OnGCFunc is called periodically during GC *before* deleting the expired keys. The
62+ // argument given is the "track values" as extracted from the meta cancelation key.
63+ type OnGCFunc func (ctx context.Context , trackValues []string ) error
64+
5565// GC performs all garbage collection operations that cannot be automatically
56- // performed via key expiry.
57- func (c * Client ) GC (ctx context.Context ) error {
58- if _ , err := gcMetaCancelation (ctx , c .rdb ); err != nil {
66+ // performed via key expiry, which is the "meta:cancelation" hash at the time of this
67+ // writing.
68+ func (c * Client ) GC (ctx context.Context , f OnGCFunc ) (uint64 , uint64 , error ) {
69+ now , err := c .rdb .Time (ctx ).Result ()
70+ if err != nil {
71+ return 0 , 0 , err
72+ }
73+
74+ nowUnix := now .Unix ()
75+
76+ nonFatalErrors := []error {}
77+
78+ idsToDelete := []string {}
79+ keysToDelete := []string {}
80+
81+ iter := c .rdb .HScanNoValues (ctx , MetaCancelationHash , 0 , "*:expiry:*" , 0 ).Iterator ()
82+ total := uint64 (0 )
83+ twiceDeleted := uint64 (0 )
84+
85+ for iter .Next (ctx ) {
86+ key := iter .Val ()
87+ total ++
88+
89+ if len (idsToDelete ) >= metaCancelationGCBatchSize {
90+ n , err := c .gcProcessBatch (ctx , f , idsToDelete , keysToDelete )
91+ if err != nil {
92+ if errors .Is (err , ErrStopGC ) {
93+ return total , twiceDeleted / 2 , err
94+ }
95+
96+ nonFatalErrors = append (nonFatalErrors , err )
97+ }
98+
99+ twiceDeleted += uint64 (n )
100+
101+ idsToDelete = []string {}
102+ keysToDelete = []string {}
103+
104+ now , err = c .rdb .Time (ctx ).Result ()
105+ if err != nil {
106+ return total , twiceDeleted / 2 , err
107+ }
108+
109+ nowUnix = now .Unix ()
110+ }
111+
112+ keyParts := strings .Split (key , ":" )
113+ if len (keyParts ) != 3 {
114+ continue
115+ }
116+
117+ keyTime , err := strconv .ParseInt (keyParts [2 ], 0 , 64 )
118+ if err != nil {
119+ nonFatalErrors = append (nonFatalErrors , err )
120+ continue
121+ }
122+
123+ if nowUnix > keyTime {
124+ keysToDelete = append (keysToDelete , key , keyParts [0 ])
125+ idsToDelete = append (idsToDelete , keyParts [0 ])
126+ }
127+ }
128+
129+ n , err := c .gcProcessBatch (ctx , f , idsToDelete , keysToDelete )
130+ if err != nil {
131+ if errors .Is (err , ErrStopGC ) {
132+ return total , twiceDeleted / 2 , err
133+ }
134+
135+ nonFatalErrors = append (nonFatalErrors , err )
136+ }
137+
138+ twiceDeleted += uint64 (n )
139+
140+ if err := iter .Err (); err != nil {
141+ return total , twiceDeleted / 2 , err
142+ }
143+
144+ return total , twiceDeleted / 2 , multierr .Combine (nonFatalErrors ... )
145+ }
146+
147+ func (c * Client ) gcProcessBatch (ctx context.Context , f OnGCFunc , idsToDelete , keysToDelete []string ) (int64 , error ) {
148+ if len (idsToDelete ) == 0 || len (keysToDelete ) == 0 {
149+ return 0 , nil
150+ }
151+
152+ if err := c .callOnGC (ctx , f , idsToDelete ); err != nil {
153+ // NOTE: The client `OnGCFunc` may request interruption via the `ErrStopGC`
154+ // error as a way to prevent the `HDel`.
155+ if errors .Is (err , ErrStopGC ) {
156+ return 0 , err
157+ }
158+ }
159+
160+ return c .rdb .HDel (
161+ ctx ,
162+ MetaCancelationHash ,
163+ keysToDelete ... ,
164+ ).Result ()
165+ }
166+
167+ func (c * Client ) callOnGC (ctx context.Context , f OnGCFunc , idsToDelete []string ) error {
168+ if f == nil {
169+ return nil
170+ }
171+
172+ pipe := c .rdb .Pipeline ()
173+ hValCmds := make ([]* redis.StringCmd , len (idsToDelete ))
174+
175+ for i , idToDelete := range idsToDelete {
176+ hValCmds [i ] = pipe .HGet (ctx , MetaCancelationHash , idToDelete )
177+ }
178+
179+ if _ , err := pipe .Exec (ctx ); err != nil {
59180 return err
60181 }
61182
62- return nil
183+ trackValues := make ([]string , len (idsToDelete ))
184+
185+ for i , hValCmd := range hValCmds {
186+ msgBytes , err := hValCmd .Bytes ()
187+ if err != nil {
188+ return err
189+ }
190+
191+ msg := & metaCancelation {}
192+ if err := json .Unmarshal (msgBytes , msg ); err != nil {
193+ return err
194+ }
195+
196+ trackValues [i ] = msg .TrackValue
197+ }
198+
199+ return f (ctx , trackValues )
63200}
64201
65202// Len calculates the aggregate length (XLEN) of the queue. It adds up the
@@ -267,12 +404,27 @@ func (c *Client) write(ctx context.Context, args *WriteArgs) (string, error) {
267404 // Capacity: 3 (for seconds, streams, n) + len(shard) + 2*len(values)
268405 cmdArgs := make ([]any , 0 , 3 + len (shard )+ 2 * len (args .Values ))
269406
270- cmdArgs = append (cmdArgs , int (c .ttl .Seconds ()))
271- cmdArgs = append (cmdArgs , args .Streams )
272- cmdArgs = append (cmdArgs , len (shard ))
407+ cmdArgs = append (
408+ cmdArgs ,
409+ int (c .ttl .Seconds ()),
410+ args .Streams ,
411+ len (shard ),
412+ )
273413
274414 if c .trackField != "" {
275- cmdArgs = append (cmdArgs , c .trackField )
415+ deadlineUnix := int64 (0 )
416+ if ! args .Deadline .IsZero () {
417+ deadlineUnix = args .Deadline .Unix ()
418+ }
419+
420+ cmdArgs = append (
421+ cmdArgs ,
422+ c .trackField ,
423+ // NOTE: Deadline is an optional field in WriteArgs, so the Unix value may be
424+ // passed as zero so that the writeTrackingScript uses a default value of the
425+ // server time + ttl.
426+ deadlineUnix ,
427+ )
276428 }
277429
278430 for _ , s := range shard {
@@ -290,16 +442,16 @@ func (c *Client) write(ctx context.Context, args *WriteArgs) (string, error) {
290442}
291443
292444type metaCancelation struct {
293- StreamID string `json:"stream_id"`
294- MsgID string `json:"msg_id"`
445+ StreamID string `json:"stream_id"`
446+ MsgID string `json:"msg_id"`
447+ TrackValue string `json:"track_value"`
448+ Deadline int64 `json:"deadline"`
295449}
296450
297451// Del supports removal of a message when the given `fieldValue` matches a "meta
298452// cancelation" key as written when using a client with tracking support.
299453func (c * Client ) Del (ctx context.Context , fieldValue string ) error {
300- metaCancelationKey := fmt .Sprintf ("%x" , sha1 .Sum ([]byte (fieldValue )))
301-
302- msgBytes , err := c .rdb .HGet (ctx , MetaCancelationHash , metaCancelationKey ).Bytes ()
454+ msgBytes , err := c .rdb .HGet (ctx , MetaCancelationHash , fieldValue ).Bytes ()
303455 if err != nil {
304456 return err
305457 }
@@ -324,8 +476,7 @@ func (c *Client) Del(ctx context.Context, fieldValue string) error {
324476
325477 if n == 0 {
326478 return fmt .Errorf (
327- "key=%q field-value=%q stream=%q message-id=%q: %w" ,
328- metaCancelationKey ,
479+ "field-value=%q stream=%q message-id=%q: %w" ,
329480 fieldValue ,
330481 msg .StreamID ,
331482 msg .MsgID ,
0 commit comments