Skip to content

Commit d0cf01d

Browse files
authored
Track all meta cancelation records in one hash (#220)
so as not to dramatically increase the number of keys used. Part of RUNT-369 which broadly addresses cancelation of work that is considered "in the queue".
1 parent b976526 commit d0cf01d

File tree

4 files changed

+109
-10
lines changed

4 files changed

+109
-10
lines changed

queue/client.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var (
1919
ErrInvalidReadArgs = errors.New("queue: invalid read arguments")
2020
ErrInvalidWriteArgs = errors.New("queue: invalid write arguments")
2121
ErrNoMatchingMessageInStream = errors.New("queue: no matching message in stream")
22+
ErrInvalidMetaCancelation = errors.New("queue: invalid meta cancelation")
2223

2324
streamSuffixPattern = regexp.MustCompile(`\A:s(\d+)\z`)
2425
)
@@ -51,6 +52,16 @@ func (c *Client) Prepare(ctx context.Context) error {
5152
return prepare(ctx, c.rdb)
5253
}
5354

55+
// GC performs all garbage collection operations that cannot be automatically
56+
// performed via key expiry.
57+
func (c *Client) GC(ctx context.Context) error {
58+
if _, err := gcMetaCancelation(ctx, c.rdb); err != nil {
59+
return err
60+
}
61+
62+
return nil
63+
}
64+
5465
// Len calculates the aggregate length (XLEN) of the queue. It adds up the
5566
// lengths of all the streams in the queue.
5667
func (c *Client) Len(ctx context.Context, name string) (int64, error) {
@@ -286,9 +297,9 @@ type metaCancelation struct {
286297
// Del supports removal of a message when the given `fieldValue` matches a "meta
287298
// cancelation" key as written when using a client with tracking support.
288299
func (c *Client) Del(ctx context.Context, fieldValue string) error {
289-
metaCancelationKey := fmt.Sprintf("_meta:cancelation:%x", sha1.Sum([]byte(fieldValue)))
300+
metaCancelationKey := fmt.Sprintf("%x", sha1.Sum([]byte(fieldValue)))
290301

291-
msgBytes, err := c.rdb.Get(ctx, metaCancelationKey).Bytes()
302+
msgBytes, err := c.rdb.HGet(ctx, MetaCancelationHash, metaCancelationKey).Bytes()
292303
if err != nil {
293304
return err
294305
}
@@ -298,6 +309,14 @@ func (c *Client) Del(ctx context.Context, fieldValue string) error {
298309
return err
299310
}
300311

312+
if msg.StreamID == "" {
313+
return fmt.Errorf("empty stream_id: %w", ErrInvalidMetaCancelation)
314+
}
315+
316+
if msg.MsgID == "" {
317+
return fmt.Errorf("empty msg_id: %w", ErrInvalidMetaCancelation)
318+
}
319+
301320
n, err := c.rdb.XDel(ctx, msg.StreamID, msg.MsgID).Result()
302321
if err != nil {
303322
return err

queue/client_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,19 +439,32 @@ func TestClientDelIntegration(t *testing.T) {
439439
require.Error(t, client.Del(ctx, trackIDs[0]+"oops"))
440440
require.Error(t, client.Del(ctx, "bogustown"))
441441

442-
metaCancelationKey := "_meta:cancelation:" + fmt.Sprintf("%x", sha1.Sum([]byte(trackIDs[1])))
442+
metaCancelationKey := fmt.Sprintf("%x", sha1.Sum([]byte(trackIDs[1])))
443443

444-
metaCancel, err := rdb.Get(ctx, metaCancelationKey).Result()
444+
metaCancel, err := rdb.HGet(ctx, queue.MetaCancelationHash, metaCancelationKey).Result()
445445
require.NoError(t, err)
446446

447-
rdb.SetEx(ctx, metaCancelationKey, "{{[,"+metaCancel, 5*time.Second)
447+
rdb.HSet(ctx, queue.MetaCancelationHash, metaCancelationKey, "{{[,bogus"+metaCancel)
448448

449449
require.Error(t, client.Del(ctx, trackIDs[1]))
450450

451451
require.NoError(t, client.Del(ctx, trackIDs[2]))
452452
require.ErrorIs(t, client.Del(ctx, trackIDs[2]), queue.ErrNoMatchingMessageInStream)
453453
}
454454

455+
func TestClientGCIntegration(t *testing.T) {
456+
ctx := test.Context(t)
457+
rdb := test.Redis(ctx, t)
458+
459+
ttl := 24 * time.Hour
460+
client := queue.NewTrackingClient(rdb, ttl, "tracketytrack")
461+
require.NoError(t, client.Prepare(ctx))
462+
463+
runClientWriteIntegrationTest(ctx, t, rdb, client, true)
464+
465+
require.NoError(t, client.GC(ctx))
466+
}
467+
455468
// TestPickupLatencyIntegration runs a test with a mostly-empty queue -- by
456469
// running artificially slow producers and full-speed consumers -- to ensure
457470
// that the blocking read operation has low latency.

queue/queue.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ package queue
3535
import (
3636
"context"
3737
_ "embed" // to provide go:embed support
38+
"strconv"
39+
"strings"
40+
"time"
3841

3942
"github.com/redis/go-redis/v9"
4043
)
@@ -62,7 +65,19 @@ var (
6265

6366
//go:embed writetracking.lua
6467
writeTrackingCmd string
65-
writeTrackingScript = redis.NewScript(writeTrackingCmd)
68+
writeTrackingScript = redis.NewScript(
69+
strings.ReplaceAll(
70+
writeTrackingCmd,
71+
"__META_CANCELATION_HASH__",
72+
MetaCancelationHash,
73+
),
74+
)
75+
)
76+
77+
const (
78+
MetaCancelationHash = "meta:cancelation"
79+
80+
metaCancelationGCBatchSize = 100
6681
)
6782

6883
func prepare(ctx context.Context, rdb redis.Cmdable) error {
@@ -86,3 +101,48 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error {
86101
}
87102
return nil
88103
}
104+
105+
func gcMetaCancelation(ctx context.Context, rdb redis.Cmdable) (int, error) {
106+
now := time.Now().UTC().Unix()
107+
keysToDelete := []string{}
108+
iter := rdb.HScan(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator()
109+
110+
for iter.Next(ctx) {
111+
key := iter.Val()
112+
113+
keyParts := strings.Split(key, ":")
114+
if len(keyParts) != 3 {
115+
continue
116+
}
117+
118+
keyTime, err := strconv.ParseInt(keyParts[2], 0, 64)
119+
if err != nil {
120+
continue
121+
}
122+
123+
if keyTime > now {
124+
keysToDelete = append(keysToDelete, key, keyParts[0])
125+
}
126+
}
127+
128+
if err := iter.Err(); err != nil {
129+
return 0, err
130+
}
131+
132+
for i := 0; i < len(keysToDelete); i += metaCancelationGCBatchSize {
133+
sliceEnd := i + metaCancelationGCBatchSize
134+
if sliceEnd > len(keysToDelete) {
135+
sliceEnd = len(keysToDelete)
136+
}
137+
138+
if err := rdb.HDel(
139+
ctx,
140+
MetaCancelationHash,
141+
keysToDelete[i:sliceEnd]...,
142+
).Err(); err != nil {
143+
return 0, err
144+
}
145+
}
146+
147+
return len(keysToDelete), nil
148+
}

queue/writetracking.lua

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,22 @@ local id = redis.call('XADD', key_stream, '*', unpack(fields))
116116
redis.call('XADD', key_notifications, 'MAXLEN', '1', '*', 's', selected_sid)
117117

118118
if track_value ~= '' then
119+
local cancelation_key = redis.sha1hex(track_value)
120+
local server_time = redis.call('TIME')
121+
local expiry_unixtime = tonumber(server_time[1]) + 90000 -- 25 hours
122+
local cancelation_expiry_key = cancelation_key .. ':expiry:' .. tostring(expiry_unixtime)
123+
119124
redis.call(
120-
'SETEX',
121-
'_meta:cancelation:' .. redis.sha1hex(track_value),
122-
90000, -- 25 hours
125+
'HSET',
126+
'__META_CANCELATION_HASH__',
127+
cancelation_key,
123128
cjson.encode({
124129
['stream_id'] = key_stream,
125130
['track_value'] = track_value,
126131
['msg_id'] = id,
127-
})
132+
}),
133+
cancelation_expiry_key,
134+
'1'
128135
)
129136
end
130137

0 commit comments

Comments
 (0)