Skip to content

Commit e401f9e

Browse files
committed
TMP
1 parent 509c5ee commit e401f9e

File tree

7 files changed

+97
-37
lines changed

7 files changed

+97
-37
lines changed

api/frontend.swagger.json

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,46 @@
424424
]
425425
}
426426
},
427+
"/v1/frontendservice/tickets/expired": {
428+
"get": {
429+
"summary": "GetExpiredTickets returns the ticket Ids corresponding to tickets outside the valid window",
430+
"operationId": "FrontendService_GetExpiredTickets",
431+
"responses": {
432+
"200": {
433+
"description": "A successful response.",
434+
"schema": {
435+
"$ref": "#/definitions/openmatchGetExpiredTicketsResponse"
436+
}
437+
},
438+
"404": {
439+
"description": "Returned when the resource does not exist.",
440+
"schema": {
441+
"type": "string",
442+
"format": "string"
443+
}
444+
},
445+
"default": {
446+
"description": "An unexpected error response.",
447+
"schema": {
448+
"$ref": "#/definitions/rpcStatus"
449+
}
450+
}
451+
},
452+
"parameters": [
453+
{
454+
"name": "limit",
455+
"description": "The limit on how many ticket Ids to return",
456+
"in": "query",
457+
"required": false,
458+
"type": "integer",
459+
"format": "int32"
460+
}
461+
],
462+
"tags": [
463+
"FrontendService"
464+
]
465+
}
466+
},
427467
"/v1/frontendservice/tickets/{ticket_id}": {
428468
"get": {
429469
"summary": "GetTicket get the Ticket associated with the specified TicketId.",
@@ -671,6 +711,18 @@
671711
}
672712
}
673713
},
714+
"openmatchGetExpiredTicketsResponse": {
715+
"type": "object",
716+
"properties": {
717+
"ticket_ids": {
718+
"type": "array",
719+
"items": {
720+
"type": "string"
721+
},
722+
"description": "Expired TicketIds of generated Tickets to be deleted."
723+
}
724+
}
725+
},
674726
"openmatchGetIndexedTicketCountResponse": {
675727
"type": "object",
676728
"properties": {

internal/app/query/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func updateTicketCache(store statestore.Service, value interface{}) error {
148148
t := time.Now()
149149
previousCount := len(tickets)
150150
// get all indexed tickets within the valid time window
151-
currentAll, err := store.GetIndexedIDSetWithTTL(context.Background())
151+
currentAll, err := store.GetIndexedIDSetWithTTL(context.Background(), 0)
152152
if err != nil {
153153
return err
154154
}

internal/app/synchronizer/synchronizer_service.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,6 @@ Registration:
303303
if err != nil {
304304
logger.Errorf("Failed to clean up backfills, %s", err.Error())
305305
}
306-
307-
// TODO: this should probably be enabled much later like after a day of running without it for the transition for Prod
308-
err = s.store.CleanupTickets(ctx)
309-
if err != nil {
310-
logger.Errorf("Failed to clean up tickets, %s", err.Error())
311-
}
312306
}
313307

314308
///////////////////////////////////////

internal/statestore/instrumented.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,15 +232,15 @@ func (is *instrumentedService) DeleteTicketCompletely(ctx context.Context, id st
232232
}
233233

234234
// GetExpiredTicketIDs gets all ticket IDs which are expired
235-
func (is *instrumentedService) GetExpiredTicketIDs(ctx context.Context) ([]string, error) {
235+
func (is *instrumentedService) GetExpiredTicketIDs(ctx context.Context, limit int) ([]string, error) {
236236
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetExpiredTicketIDs")
237237
defer span.End()
238-
return is.s.GetExpiredTicketIDs(ctx)
238+
return is.s.GetExpiredTicketIDs(ctx, 0)
239239
}
240240

241241
// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
242-
func (is *instrumentedService) GetIndexedIDSetWithTTL(ctx context.Context) (map[string]struct{}, error) {
242+
func (is *instrumentedService) GetIndexedIDSetWithTTL(ctx context.Context, limit int) (map[string]struct{}, error) {
243243
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetIndexedIDSetWithTTL")
244244
defer span.End()
245-
return is.s.GetIndexedIDSetWithTTL(ctx)
245+
return is.s.GetIndexedIDSetWithTTL(ctx, 0)
246246
}

internal/statestore/public.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ type Service interface {
135135
DeleteTicketCompletely(ctx context.Context, id string) error
136136

137137
// GetExpiredTicketIDs gets all ticket IDs which are expired
138-
GetExpiredTicketIDs(ctx context.Context) ([]string, error)
138+
GetExpiredTicketIDs(ctx context.Context, limit int) ([]string, error)
139139

140140
// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
141-
GetIndexedIDSetWithTTL(ctx context.Context) (map[string]struct{}, error)
141+
GetIndexedIDSetWithTTL(ctx context.Context, limit int) (map[string]struct{}, error)
142142
}
143143

144144
// New creates a Service based on the configuration.

internal/statestore/ticket.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ import (
1818
"context"
1919

2020
"fmt"
21-
"github.com/sirupsen/logrus"
2221
"sync"
2322
"time"
2423

24+
"github.com/sirupsen/logrus"
25+
2526
"github.com/cenkalti/backoff"
2627
"github.com/gomodule/redigo/redis"
2728
"github.com/pkg/errors"
@@ -359,7 +360,7 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
359360
}
360361

361362
// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
362-
func (rb *redisBackend) GetIndexedIDSetWithTTL(ctx context.Context) (map[string]struct{}, error) {
363+
func (rb *redisBackend) GetIndexedIDSetWithTTL(ctx context.Context, limit int) (map[string]struct{}, error) {
363364
redisConn, err := rb.redisPool.GetContext(ctx)
364365
if err != nil {
365366
return nil, status.Errorf(codes.Unavailable, "GetIndexedIDSetWithTTL, failed to connect to redis: %v", err)
@@ -378,10 +379,16 @@ func (rb *redisBackend) GetIndexedIDSetWithTTL(ctx context.Context) (map[string]
378379
}
379380

380381
curTimeUnix := curTime.UnixNano()
382+
args := redis.Args{
383+
allTicketsWithTTL, curTimeUnix, "+inf",
384+
}
385+
if limit > 0 {
386+
args = args.Add("LIMIT", 0, limit)
387+
}
381388
// fetch only tickets with a score or ttl ahead of or equal to current time
382-
idsIndexed, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", allTicketsWithTTL, curTimeUnix, "+inf"))
389+
idsIndexed, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", args...))
383390
if err != nil {
384-
return nil, status.Errorf(codes.Internal, "error getting all indexed ticket ids %v", err)
391+
return nil, status.Errorf(codes.Internal, "error getting indexed ticket ids %v", err)
385392
}
386393

387394
r := make(map[string]struct{}, len(idsIndexed))
@@ -696,7 +703,7 @@ func (rb *redisBackend) newConstantBackoffStrategy() backoff.BackOff {
696703
}
697704

698705
// GetExpiredTicketIDs gets all ticket IDs which are expired
699-
func (rb *redisBackend) GetExpiredTicketIDs(ctx context.Context) ([]string, error) {
706+
func (rb *redisBackend) GetExpiredTicketIDs(ctx context.Context, limit int) ([]string, error) {
700707
redisConn, err := rb.redisPool.GetContext(ctx)
701708
if err != nil {
702709
return nil, status.Errorf(codes.Unavailable, "GetExpiredBackfillIDs, failed to connect to redis: %v", err)
@@ -708,8 +715,15 @@ func (rb *redisBackend) GetExpiredTicketIDs(ctx context.Context) ([]string, erro
708715
endTimeInt := curTime.Add(-ticketTTL).UnixNano() // anything before the now - ttl
709716
startTimeInt := 0 // unix epoc start time
710717

718+
args := redis.Args{
719+
allTicketsWithTTL, startTimeInt, endTimeInt,
720+
}
721+
if limit > 0 {
722+
args = append(args, "LIMIT", 0, limit)
723+
}
724+
711725
// Filter out ticket IDs that are fetched but not assigned within TTL time (ms).
712-
expiredTicketIds, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", allTicketsWithTTL, startTimeInt, endTimeInt))
726+
expiredTicketIds, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", args...))
713727
if err != nil {
714728
return nil, status.Errorf(codes.Internal, "error getting expired tickets %v", err)
715729
}
@@ -778,7 +792,7 @@ func (rb *redisBackend) cleanupTicketsWorker(ctx context.Context, ticketIDsCh <-
778792
}
779793

780794
func (rb *redisBackend) CleanupTickets(ctx context.Context) error {
781-
expiredTicketIDs, err := rb.GetExpiredTicketIDs(ctx)
795+
expiredTicketIDs, err := rb.GetExpiredTicketIDs(ctx, 0)
782796
if err != nil {
783797
return err
784798
}

internal/statestore/ticket_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func TestGetTicketWithTTLRefresh(t *testing.T) {
549549

550550
// sleep for less than expiry, should not expire the tickets
551551
time.Sleep(cfg.GetDuration("ticketDeleteTimeout") - 100*time.Millisecond)
552-
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx)
552+
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx, 0)
553553
require.NoError(t, err)
554554
require.Len(t, idsIndexedMap, 1)
555555
_, exists := idsIndexedMap[ticket.Id]
@@ -559,7 +559,7 @@ func TestGetTicketWithTTLRefresh(t *testing.T) {
559559
time.Sleep(cfg.GetDuration("ticketDeleteTimeout") + 100*time.Millisecond)
560560

561561
// check that all the tickets expired even before clean up
562-
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx)
562+
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx, 0)
563563
require.Nil(t, err)
564564
require.Empty(t, idsIndexedMap)
565565

@@ -745,7 +745,7 @@ func TestIndexTicket(t *testing.T) {
745745
}
746746

747747
// also check that there is only 1 ticket in the returned map
748-
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx)
748+
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx, 0)
749749
require.NoError(t, err)
750750
require.Len(t, idsIndexedMap, 2)
751751
checkExists("mockTicketID-0", idsIndexedMap)
@@ -784,7 +784,7 @@ func TestDeindexTicket(t *testing.T) {
784784
require.Equal(t, "mockTicketID-1", idsIndexed[1])
785785

786786
// also check that both tickets are indexed correctly
787-
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx)
787+
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx, 0)
788788
require.NoError(t, err)
789789
require.Len(t, idsIndexedMap, 2)
790790

@@ -797,7 +797,7 @@ func TestDeindexTicket(t *testing.T) {
797797
require.Equal(t, "mockTicketID-0", idsIndexed[0])
798798

799799
// also check that there is only 1 ticket in the returned map
800-
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx)
800+
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx, 0)
801801
require.NoError(t, err)
802802
require.Len(t, idsIndexedMap, 1)
803803
_, exists := idsIndexedMap["mockTicketID-0"]
@@ -833,7 +833,7 @@ func TestDeindexTickets(t *testing.T) {
833833
require.Equal(t, "mockTicketID-1", idsIndexed[1])
834834

835835
// also check that both tickets exist
836-
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx)
836+
idsIndexedMap, err := service.GetIndexedIDSetWithTTL(ctx, 0)
837837
require.NoError(t, err)
838838
require.Len(t, idsIndexedMap, 2)
839839

@@ -846,7 +846,7 @@ func TestDeindexTickets(t *testing.T) {
846846
require.Equal(t, "mockTicketID-0", idsIndexed[0])
847847

848848
// also check that there is only 1 ticket in the returned map
849-
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx)
849+
idsIndexedMap, err = service.GetIndexedIDSetWithTTL(ctx, 0)
850850
require.NoError(t, err)
851851
require.Len(t, idsIndexedMap, 1)
852852
_, exists := idsIndexedMap["mockTicketID-0"]
@@ -916,7 +916,7 @@ func TestGetIndexedIDSetWithTTL(t *testing.T) {
916916
ctx := utilTesting.NewContext(t)
917917

918918
verifyTickets := func(service Service, tickets []*pb.Ticket) {
919-
ids, err := service.GetIndexedIDSetWithTTL(ctx)
919+
ids, err := service.GetIndexedIDSetWithTTL(ctx, len(tickets))
920920
require.Nil(t, err)
921921
require.Equal(t, len(tickets), len(ids))
922922

@@ -926,7 +926,7 @@ func TestGetIndexedIDSetWithTTL(t *testing.T) {
926926
}
927927
}
928928

929-
tickets, _ := generateTickets(ctx, t, service, 2)
929+
tickets, _ := generateTickets(ctx, t, service, 20)
930930

931931
// Verify all tickets are created and returned
932932
verifyTickets(service, tickets)
@@ -996,7 +996,7 @@ func TestGetTicketsWithTTLRefresh(t *testing.T) {
996996
}
997997

998998
verifyTickets := func(service Service, tickets []*pb.Ticket) {
999-
ids, err := service.GetIndexedIDSetWithTTL(ctx)
999+
ids, err := service.GetIndexedIDSetWithTTL(ctx, 0)
10001000
require.Nil(t, err)
10011001
require.Equal(t, len(tickets), len(ids))
10021002

@@ -1112,7 +1112,7 @@ func TestReleaseAllTickets(t *testing.T) {
11121112
}
11131113

11141114
// check that we also de-indexed from our ordered set
1115-
ids, err = service.GetIndexedIDSetWithTTL(ctx)
1115+
ids, err = service.GetIndexedIDSetWithTTL(ctx, 0)
11161116
require.Nil(t, err)
11171117
require.Equal(t, len(tickets), len(ids))
11181118
}
@@ -1163,7 +1163,7 @@ func TestAddTicketsToPendingRelease(t *testing.T) {
11631163
require.True(t, ok)
11641164
}
11651165

1166-
ids, err = service.GetIndexedIDSetWithTTL(ctx)
1166+
ids, err = service.GetIndexedIDSetWithTTL(ctx, 0)
11671167
require.Nil(t, err)
11681168
require.Equal(t, len(tickets), len(ids))
11691169
}
@@ -1242,20 +1242,20 @@ func TestGetExpiredTicketIDs(t *testing.T) {
12421242
require.NotNil(t, ticketActual)
12431243

12441244
// no tickets expired yet
1245-
expiredTicketIDs, err := service.GetExpiredTicketIDs(ctx)
1245+
expiredTicketIDs, err := service.GetExpiredTicketIDs(ctx, 0)
12461246
require.NoError(t, err)
12471247
require.Len(t, expiredTicketIDs, 0)
12481248

12491249
// sleep for expiry and let all tickets expire
12501250
time.Sleep(getTicketReleaseTimeout(cfg) + 500*time.Millisecond)
12511251

12521252
// there should be an expired ticket
1253-
expiredTicketIDs, err = service.GetExpiredTicketIDs(ctx)
1253+
expiredTicketIDs, err = service.GetExpiredTicketIDs(ctx, 0)
12541254
require.NoError(t, err)
12551255
require.Len(t, expiredTicketIDs, 1)
12561256

12571257
// the ticket should not exist but not indexed anymore
1258-
indexedTickets, err := service.GetIndexedIDSetWithTTL(ctx)
1258+
indexedTickets, err := service.GetIndexedIDSetWithTTL(ctx, 0)
12591259
require.NoError(t, err)
12601260
require.Empty(t, indexedTickets)
12611261

@@ -1307,7 +1307,7 @@ func TestCleanupTickets(t *testing.T) {
13071307
require.NotNil(t, ticketActual)
13081308

13091309
// no tickets expired yet
1310-
expiredTicketIDs, err := service.GetExpiredTicketIDs(ctx)
1310+
expiredTicketIDs, err := service.GetExpiredTicketIDs(ctx, 0)
13111311
require.NoError(t, err)
13121312
require.Len(t, expiredTicketIDs, 0)
13131313

@@ -1319,7 +1319,7 @@ func TestCleanupTickets(t *testing.T) {
13191319
require.NoError(t, err)
13201320

13211321
// the ticket shouldn't exist anymore after the cleanup
1322-
indexedTickets, err := service.GetIndexedIDSetWithTTL(ctx)
1322+
indexedTickets, err := service.GetIndexedIDSetWithTTL(ctx, 0)
13231323
require.NoError(t, err)
13241324
require.Empty(t, indexedTickets)
13251325

0 commit comments

Comments
 (0)