Skip to content

Commit 8e98210

Browse files
authored
fix: prevent colscan from housekeeper, comment find and count operations on mongo (#64)
Signed-off-by: Cézar Augusto Nascimento e Silva <cezar.silva@blip.ai>
1 parent 034f2d8 commit 8e98210

File tree

10 files changed

+77
-35
lines changed

10 files changed

+77
-35
lines changed

internal/queue/queue.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,16 @@ func NewQueue(auditor audit.Auditor, storageImpl storage.Storage, queueService Q
5858
}
5959

6060
func (pool *Queue) Count(ctx context.Context, opts *storage.FindOptions) (int64, error) {
61+
comment := "queue.Count"
6162
if opts == nil {
62-
opts = &storage.FindOptions{}
63+
opts = &storage.FindOptions{Comment: comment}
6364
}
6465

6566
result, err := pool.storage.Count(ctx, &storage.FindOptions{
6667
InternalFilter: opts.InternalFilter,
68+
Comment: comment,
69+
}, &storage.CountOptions{
70+
Comment: comment,
6771
})
6872

6973
if err != nil {

internal/queue/queue_housekeeper.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,15 @@ func RemoveExceedingMessages(ctx context.Context, pool *Queue) (bool, error) {
370370
}
371371

372372
func (pool *Queue) removeExceedingMessagesFromQueue(ctx context.Context, queueConfiguration *configuration.QueueConfiguration) error {
373-
if queueConfiguration == nil || queueConfiguration.MaxElements <= 0 {
373+
if queueConfiguration == nil || queueConfiguration.MaxElements <= 0 || queueConfiguration.Queue == "" {
374374
return nil
375375
}
376376

377377
queue := queueConfiguration.Queue
378378

379-
total, err := pool.storage.Count(ctx, &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: queue}})
379+
comment := "housekeeper.removeExceedingMessagesFromQueue_1"
380+
total, err := pool.storage.Count(ctx, &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: queue}, Comment: comment},
381+
&storage.CountOptions{Comment: comment})
380382

381383
if err != nil {
382384
logger.S(ctx).Errorf("Error counting queue %s: %v", queue, err)
@@ -395,6 +397,7 @@ func (pool *Queue) removeExceedingMessagesFromQueue(ctx context.Context, queueCo
395397
sort := orderedmap.NewOrderedMap[string, int]()
396398
sort.Set("expiry_date", 1)
397399

400+
comment = "housekeeper.removeExceedingMessagesFromQueue_2"
398401
messages, err := pool.storage.Find(ctx, &storage.FindOptions{
399402
Limit: diff,
400403
InternalFilter: &storage.InternalFilter{
@@ -404,7 +407,8 @@ func (pool *Queue) removeExceedingMessagesFromQueue(ctx context.Context, queueCo
404407
"id": 1,
405408
"_id": 0,
406409
},
407-
Sort: sort,
410+
Sort: sort,
411+
Comment: comment,
408412
})
409413

410414
if err != nil {

internal/queue/queue_housekeeper_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ func TestUpdateOldestQueueMap(t *testing.T) {
6363
{LastUsage: &nowMinusTenSeconds},
6464
}, nil)
6565

66+
comment := "queue.Count"
6667
mockStorage.EXPECT().Count(ctx, &storage.FindOptions{
67-
InternalFilter: &storage.InternalFilter{QueuePrefix: "a"},
68-
}).Return(int64(25), nil)
68+
InternalFilter: &storage.InternalFilter{QueuePrefix: "a"}, Comment: comment,
69+
}, &storage.CountOptions{Comment: comment}).Return(int64(25), nil)
6970

7071
mockStorage.EXPECT().Count(ctx, &storage.FindOptions{
71-
InternalFilter: &storage.InternalFilter{QueuePrefix: "b"},
72-
}).Return(int64(11), nil)
72+
InternalFilter: &storage.InternalFilter{QueuePrefix: "b"}, Comment: comment,
73+
}, &storage.CountOptions{Comment: comment}).Return(int64(11), nil)
7374

7475
mockAuditor := mocks.NewMockAuditor(mockCtrl)
7576
q := NewQueue(mockAuditor, mockStorage, nil, mockCache)

internal/queue/queue_test.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import (
2222

2323
var ctx = context.Background()
2424

25+
const queueCountComment = "queue.Count"
26+
const hkRemoveExceedingMessages1Comment = "housekeeper.removeExceedingMessagesFromQueue_1"
27+
const hkRemoveExceedingMessages2Comment = "housekeeper.removeExceedingMessagesFromQueue_2"
28+
2529
func TestPull(t *testing.T) {
2630
t.Parallel()
2731

@@ -829,6 +833,14 @@ func TestAddMessagesError(t *testing.T) {
829833
}
830834

831835
func TestRemoveExceedingMessagesQueueZeroMaxElementsShouldDoNothing(t *testing.T) {
836+
testRemoveExceedingMessagesDoNothing(t, &configuration.QueueConfiguration{MaxElements: 0, Queue: "q1"})
837+
}
838+
839+
func TestRemoveExceedingMessagesQueueNoNameShouldDoNothing(t *testing.T) {
840+
testRemoveExceedingMessagesDoNothing(t, &configuration.QueueConfiguration{MaxElements: 150, Queue: ""})
841+
}
842+
843+
func testRemoveExceedingMessagesDoNothing(t *testing.T, config *configuration.QueueConfiguration) {
832844
t.Parallel()
833845

834846
mockCtrl := gomock.NewController(t)
@@ -839,7 +851,7 @@ func TestRemoveExceedingMessagesQueueZeroMaxElementsShouldDoNothing(t *testing.T
839851

840852
q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache)
841853

842-
require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, &configuration.QueueConfiguration{MaxElements: 0, Queue: "q1"}))
854+
require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, config))
843855
}
844856

845857
func TestRemoveExceedingMessagesEmptyQueueShouldDoNothing(t *testing.T) {
@@ -851,7 +863,9 @@ func TestRemoveExceedingMessagesEmptyQueueShouldDoNothing(t *testing.T) {
851863
mockStorage := mocks.NewMockStorage(mockCtrl)
852864

853865
queueConfiguration := &configuration.QueueConfiguration{MaxElements: 2, Queue: "q1"}
854-
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(int64(0), nil)
866+
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}, Comment: hkRemoveExceedingMessages1Comment}, &storage.CountOptions{
867+
Comment: hkRemoveExceedingMessages1Comment,
868+
}).Return(int64(0), nil)
855869

856870
mockCache := mocks.NewMockCache(mockCtrl)
857871

@@ -869,7 +883,10 @@ func TestRemoveExceedingMessagesErrorCountingShouldReturnError(t *testing.T) {
869883
queueConfiguration := &configuration.QueueConfiguration{MaxElements: 2, Queue: "q1"}
870884

871885
mockStorage := mocks.NewMockStorage(mockCtrl)
872-
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(int64(0), fmt.Errorf("anyerr"))
886+
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"},
887+
Comment: hkRemoveExceedingMessages1Comment}, &storage.CountOptions{
888+
Comment: hkRemoveExceedingMessages1Comment,
889+
}).Return(int64(0), fmt.Errorf("anyerr"))
873890

874891
mockCache := mocks.NewMockCache(mockCtrl)
875892

@@ -891,7 +908,10 @@ func TestRemoveExceedingMessagesShouldRemoveExceedingElements(t *testing.T) {
891908

892909
queueConfiguration := &configuration.QueueConfiguration{MaxElements: maxElements, Queue: "q1"}
893910

894-
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil)
911+
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"},
912+
Comment: hkRemoveExceedingMessages1Comment}, &storage.CountOptions{
913+
Comment: hkRemoveExceedingMessages1Comment,
914+
}).Return(count, nil)
895915

896916
sort := orderedmap.NewOrderedMap[string, int]()
897917
sort.Set("expiry_date", 1)
@@ -905,7 +925,8 @@ func TestRemoveExceedingMessagesShouldRemoveExceedingElements(t *testing.T) {
905925
"id": 1,
906926
"_id": 0,
907927
},
908-
Sort: sort,
928+
Sort: sort,
929+
Comment: hkRemoveExceedingMessages2Comment,
909930
}).Return([]message.Message{{ID: "1"}, {ID: "2"}}, nil)
910931

911932
mockStorage.EXPECT().Remove(gomock.Any(), "q1", []string{"1", "2"}).Return(int64(2), nil)
@@ -931,7 +952,8 @@ func TestRemoveExceedingMessagesFindErrorShouldRemoveResultError(t *testing.T) {
931952

932953
queueConfiguration := &configuration.QueueConfiguration{MaxElements: maxElements, Queue: "q1"}
933954

934-
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil)
955+
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"},
956+
Comment: hkRemoveExceedingMessages1Comment}, &storage.CountOptions{Comment: hkRemoveExceedingMessages1Comment}).Return(count, nil)
935957

936958
sort := orderedmap.NewOrderedMap[string, int]()
937959
sort.Set("expiry_date", 1)
@@ -958,7 +980,7 @@ func TestRemoveExceedingMessagesRemoveErrorShouldResultError(t *testing.T) {
958980

959981
queueConfiguration := &configuration.QueueConfiguration{MaxElements: maxElements, Queue: "q1"}
960982

961-
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil)
983+
mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}, Comment: hkRemoveExceedingMessages1Comment}, &storage.CountOptions{Comment: hkRemoveExceedingMessages1Comment}).Return(count, nil)
962984

963985
sort := orderedmap.NewOrderedMap[string, int]()
964986
sort.Set("expiry_date", 1)
@@ -980,10 +1002,10 @@ func TestCountShouldCallStorage(t *testing.T) {
9801002
mockCtrl := gomock.NewController(t)
9811003
defer mockCtrl.Finish()
9821004

983-
opts := &storage.FindOptions{}
1005+
opts := &storage.FindOptions{Comment: queueCountComment}
9841006

9851007
mockStorage := mocks.NewMockStorage(mockCtrl)
986-
mockStorage.EXPECT().Count(ctx, opts).Return(int64(12), nil)
1008+
mockStorage.EXPECT().Count(ctx, opts, &storage.CountOptions{Comment: queueCountComment}).Return(int64(12), nil)
9871009
mockCache := mocks.NewMockCache(mockCtrl)
9881010

9891011
q := NewQueue(nil, mockStorage, nil, mockCache)
@@ -1000,10 +1022,10 @@ func TestNilOptsShouldCreateEmptyOpts(t *testing.T) {
10001022
mockCtrl := gomock.NewController(t)
10011023
defer mockCtrl.Finish()
10021024

1003-
opts := &storage.FindOptions{}
1025+
opts := &storage.FindOptions{Comment: queueCountComment}
10041026

10051027
mockStorage := mocks.NewMockStorage(mockCtrl)
1006-
mockStorage.EXPECT().Count(ctx, opts).Return(int64(12), nil)
1028+
mockStorage.EXPECT().Count(ctx, opts, &storage.CountOptions{Comment: queueCountComment}).Return(int64(12), nil)
10071029
mockCache := mocks.NewMockCache(mockCtrl)
10081030

10091031
q := NewQueue(nil, mockStorage, nil, mockCache)
@@ -1020,10 +1042,10 @@ func TestCountStorageErrorShouldResultError(t *testing.T) {
10201042
mockCtrl := gomock.NewController(t)
10211043
defer mockCtrl.Finish()
10221044

1023-
opts := &storage.FindOptions{}
1045+
opts := &storage.FindOptions{Comment: queueCountComment}
10241046

10251047
mockStorage := mocks.NewMockStorage(mockCtrl)
1026-
mockStorage.EXPECT().Count(ctx, opts).Return(int64(0), fmt.Errorf("error"))
1048+
mockStorage.EXPECT().Count(ctx, opts, &storage.CountOptions{Comment: queueCountComment}).Return(int64(0), fmt.Errorf("error"))
10271049
mockCache := mocks.NewMockCache(mockCtrl)
10281050

10291051
q := NewQueue(nil, mockStorage, nil, mockCache)

internal/queue/storage/memory_storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ func getKey(message *message.Message) string {
138138
return message.Queue + ":" + message.ID
139139
}
140140

141-
func (storage *MemoryStorage) Count(_ context.Context, opts *FindOptions) (int64, error) {
141+
func (storage *MemoryStorage) Count(_ context.Context, findOpts *FindOptions, _ *CountOptions) (int64, error) {
142142
count := int64(0)
143143

144144
storage.lock.RLock()
145145
for _, value := range storage.docs {
146-
matches, err := messageMatchesFilter(value, opts)
146+
matches, err := messageMatchesFilter(value, findOpts)
147147

148148
if err != nil {
149149
return 0, err

internal/queue/storage/mongo_storage.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,21 +283,23 @@ func (storage *MongoStorage) Flush(ctx context.Context) (int64, error) {
283283
return result.DeletedCount + deletedMessages, nil
284284
}
285285

286-
func (storage *MongoStorage) Count(ctx context.Context, opt *FindOptions) (int64, error) {
286+
func (storage *MongoStorage) Count(ctx context.Context, findOpt *FindOptions, countOpt *CountOptions) (int64, error) {
287287
now := dtime.Now()
288288
defer func() {
289289
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "count")))
290290
}()
291291

292-
mongoFilter, err := getMongoMessage(opt)
292+
mongoFilter, err := getMongoMessage(findOpt)
293293

294294
if err != nil {
295295
return 0, err
296296
}
297297

298298
logger.S(ctx).Debugw("Storage operation: count operation.", "filter", mongoFilter)
299299

300-
result, err := storage.messagesCollection.CountDocuments(context.Background(), mongoFilter)
300+
result, err := storage.messagesCollection.CountDocuments(context.Background(), mongoFilter, &options.CountOptions{
301+
Comment: &countOpt.Comment,
302+
})
301303

302304
if err != nil {
303305
return 0, fmt.Errorf("error counting elements in storage: %w", err)
@@ -363,7 +365,7 @@ func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]mess
363365
batchSize := int32(opt.Limit)
364366
if batchSize <= 1 {
365367
batchSize = 1_000
366-
} else if batchSize > 10_000{
368+
} else if batchSize > 10_000 {
367369
batchSize = 10_000
368370
}
369371

@@ -372,6 +374,7 @@ func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]mess
372374
Sort: mongoSort,
373375
Limit: &opt.Limit,
374376
BatchSize: &batchSize,
377+
Comment: &opt.Comment,
375378
}
376379

377380
logger.S(ctx).Debugw("Storage operation: find operation.",

internal/queue/storage/mongo_storage_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,10 @@ func testFindBatch(t *testing.T, limit *int64, expectedBatch *int32) {
370370
storage := &MongoStorage{
371371
messagesCollection: colMock,
372372
}
373+
expectedComment := "testFindBatch"
373374
messages, err := storage.Find(context.Background(), &FindOptions{
374-
Limit: *limit,
375+
Limit: *limit,
376+
Comment: expectedComment,
375377
})
376378

377379
details := colMock.mockDetails[FIND]
@@ -390,5 +392,6 @@ func testFindBatch(t *testing.T, limit *int64, expectedBatch *int32) {
390392
Sort: &bson.D{},
391393
Limit: limit,
392394
BatchSize: expectedBatch,
395+
Comment: &expectedComment,
393396
}, findOpt[0], fmt.Sprintf("should call with batch size = %d, lim = %d and empty projection and sort", *expectedBatch, *limit))
394397
}

internal/queue/storage/storage.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Storage interface {
3232

3333
ListQueueNames(ctx context.Context) (queues []string, err error)
3434
ListQueuePrefixes(ctx context.Context) (queues []string, err error)
35-
Count(ctx context.Context, opt *FindOptions) (int64, error)
35+
Count(ctx context.Context, findOpt *FindOptions, countOpt *CountOptions) (int64, error)
3636

3737
GetStringInternalId(ctx context.Context, message *message.Message) string
3838

@@ -65,7 +65,12 @@ type FindOptions struct {
6565
*InternalFilter
6666

6767
// Boolean to deal with storage retries when storage reports missing elements
68-
Retry bool
68+
Retry bool
69+
Comment string
70+
}
71+
72+
type CountOptions struct {
73+
Comment string
6974
}
7075

7176
func CreateStorage(ctx context.Context, storageType Type) (Storage, error) {

internal/queue/storage/storage_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func (suite *StorageTestSuite) TestClearOk() {
532532

533533
suite.insertDataNoError(toInsert...)
534534

535-
count, err := suite.storage.Count(ctx, &FindOptions{})
535+
count, err := suite.storage.Count(ctx, &FindOptions{}, &CountOptions{})
536536

537537
require.NoError(suite.T(), err)
538538
require.Equal(suite.T(), int64(100), count)
@@ -542,7 +542,7 @@ func (suite *StorageTestSuite) TestClearOk() {
542542
require.NoError(suite.T(), err)
543543
require.Equal(suite.T(), int64(100), deleted)
544544

545-
posCount, err := suite.storage.Count(ctx, &FindOptions{})
545+
posCount, err := suite.storage.Count(ctx, &FindOptions{}, &CountOptions{})
546546

547547
require.NoError(suite.T(), err)
548548
require.Equal(suite.T(), int64(0), posCount)

internal/service/deckard_service_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func TestMessageSizeLimitDeckardGRPCServeIntegration(t *testing.T) {
7070
_, err = client.Add(ctx, &deckard.AddRequest{
7171
Messages: []*deckard.AddMessage{
7272
{
73-
Id: "1",
74-
Queue: "queue",
75-
Timeless: true,
73+
Id: "1",
74+
Queue: "queue",
75+
Timeless: true,
7676
StringPayload: "This is a message test",
7777
},
7878
},

0 commit comments

Comments
 (0)