Skip to content

Commit c36f5b1

Browse files
authored
Accept number of digits of server time to include in GC match (#223)
as a way of limiting the keyspace scanned. Part of RUNT-381
1 parent abdebd5 commit c36f5b1

File tree

2 files changed

+51
-12
lines changed

2 files changed

+51
-12
lines changed

queue/client.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ var (
2121
ErrInvalidWriteArgs = errors.New("queue: invalid write arguments")
2222
ErrNoMatchingMessageInStream = errors.New("queue: no matching message in stream")
2323
ErrInvalidMetaCancelation = errors.New("queue: invalid meta cancelation")
24+
ErrInvalidNTimeDigits = errors.New("queue: invalid number of timestamp digits")
2425
ErrStopGC = errors.New("queue: stop garbage collection")
2526

2627
streamSuffixPattern = regexp.MustCompile(`\A:s(\d+)\z`)
@@ -62,10 +63,12 @@ func (c *Client) Prepare(ctx context.Context) error {
6263
// argument given is the "track values" as extracted from the meta cancelation key.
6364
type OnGCFunc func(ctx context.Context, trackValues []string) error
6465

65-
// GC performs all garbage collection operations that cannot be automatically
66-
// performed via key expiry, which is the "meta:cancelation" hash at the time of this
67-
// writing.
68-
func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) {
66+
// GC performs all garbage collection operations that cannot be automatically performed
67+
// via key expiry, which is the "meta:cancelation" hash at the time of this writing. The
68+
// nTimeDigits argument is used to construct the key match to include that many digits of
69+
// of the current server time as a way of limiting the keyspace scanned. As a special
70+
// case, any value <= -1 will result in all keys being scanned.
71+
func (c *Client) GC(ctx context.Context, nTimeDigits int, f OnGCFunc) (uint64, uint64, error) {
6972
now, err := c.rdb.Time(ctx).Result()
7073
if err != nil {
7174
return 0, 0, err
@@ -78,7 +81,18 @@ func (c *Client) GC(ctx context.Context, f OnGCFunc) (uint64, uint64, error) {
7881
idsToDelete := []string{}
7982
keysToDelete := []string{}
8083

81-
iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, "*:expiry:*", 0).Iterator()
84+
match := "*:expiry:*"
85+
if nTimeDigits > -1 {
86+
nowUnixString := strconv.Itoa(int(nowUnix))
87+
88+
if nTimeDigits > len(nowUnixString) {
89+
return 0, 0, ErrInvalidNTimeDigits
90+
}
91+
92+
match = fmt.Sprintf("*:expiry:%s*", nowUnixString[:nTimeDigits])
93+
}
94+
95+
iter := c.rdb.HScanNoValues(ctx, MetaCancelationHash, 0, match, 0).Iterator()
8296
total := uint64(0)
8397
twiceDeleted := uint64(0)
8498

queue/client_test.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,6 @@ func TestClientGCIntegration(t *testing.T) {
458458
client := queue.NewTrackingClient(rdb, ttl, "tracketytrack")
459459
require.NoError(t, client.Prepare(ctx))
460460

461-
runClientWriteIntegrationTest(ctx, t, rdb, client, true)
462-
463461
gcTrackedFields := []string{}
464462

465463
onGCFunc := func(_ context.Context, trackedFields []string) error {
@@ -468,12 +466,39 @@ func TestClientGCIntegration(t *testing.T) {
468466
return nil
469467
}
470468

471-
total, nDeleted, err := client.GC(ctx, onGCFunc)
472-
require.NoError(t, err)
473-
require.Equal(t, uint64(15), total)
474-
require.Equal(t, uint64(10), nDeleted)
469+
t.Run("full scan", func(t *testing.T) {
470+
gcTrackedFields = []string{}
471+
472+
require.NoError(t, rdb.FlushAll(t.Context()).Err())
473+
474+
runClientWriteIntegrationTest(ctx, t, rdb, client, true)
475+
476+
total, nDeleted, err := client.GC(ctx, -1, onGCFunc)
477+
require.NoError(t, err)
478+
require.Equal(t, uint64(15), total)
479+
require.Equal(t, uint64(10), nDeleted)
480+
481+
require.Len(t, gcTrackedFields, 10)
482+
})
483+
484+
t.Run("scoped scan", func(t *testing.T) {
485+
require.NoError(t, rdb.FlushAll(t.Context()).Err())
475486

476-
require.Len(t, gcTrackedFields, 10)
487+
runClientWriteIntegrationTest(ctx, t, rdb, client, true)
488+
489+
total, _, err := client.GC(ctx, 6, onGCFunc)
490+
require.NoError(t, err)
491+
require.Equal(t, uint64(10), total)
492+
})
493+
494+
t.Run("invalid nTimeDigits", func(t *testing.T) {
495+
require.NoError(t, rdb.FlushAll(t.Context()).Err())
496+
497+
runClientWriteIntegrationTest(ctx, t, rdb, client, true)
498+
499+
_, _, err := client.GC(ctx, 11, onGCFunc)
500+
require.Error(t, err)
501+
})
477502
}
478503

479504
// TestPickupLatencyIntegration runs a test with a mostly-empty queue -- by

0 commit comments

Comments
 (0)