Skip to content

Commit 4e2ecde

Browse files
fix: set max batch size to 10_000 during mongo find (#63)
``` "find": "queue", "batchSize": 1047448, "filter": {}, "limit": "1047448", "projection": { "_id": 0, "id": 1 }, "sort": { "expiry_date": 1 }, ``` Signed-off-by: Cézar Augusto Nascimento e Silva <cezar.silva@blip.ai> Co-authored-by: Lucas Soares <lucas.soares@blip.ai>
1 parent eab1b40 commit 4e2ecde

File tree

2 files changed

+87
-22
lines changed

2 files changed

+87
-22
lines changed

internal/queue/storage/mongo_storage.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,9 @@ func (storage *MongoStorage) Find(ctx context.Context, opt *FindOptions) ([]mess
362362

363363
batchSize := int32(opt.Limit)
364364
if batchSize <= 1 {
365-
batchSize = 1000
365+
batchSize = 1_000
366+
} else if batchSize > 10_000{
367+
batchSize = 10_000
366368
}
367369

368370
findOptions := &options.FindOptions{

internal/queue/storage/mongo_storage_test.go

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,66 @@ import (
1717
"go.mongodb.org/mongo-driver/mongo/options"
1818
)
1919

20+
const DEL_MANY = "DeleteMany"
21+
const FIND = "Find"
22+
23+
type MockDetails struct {
24+
args []interface{}
25+
calls int
26+
err error
27+
}
28+
2029
type MockCollection struct {
21-
deleteManyArgs []interface{}
22-
deleteManyCalls int
23-
errorDeleteMany error
30+
mockDetails map[string]*MockDetails
2431
}
2532

2633
func newMockCollection() *MockCollection {
2734
return newMockCollectionErr(nil)
2835
}
2936
func newMockCollectionErr(err error) *MockCollection {
3037
return &MockCollection{
31-
deleteManyArgs: []interface{}{},
32-
deleteManyCalls: 0,
33-
errorDeleteMany: err,
38+
mockDetails: map[string]*MockDetails{
39+
DEL_MANY: {args: []interface{}{},
40+
calls: 0,
41+
err: err,
42+
},
43+
FIND: {args: []interface{}{},
44+
calls: 0,
45+
err: err,
46+
},
47+
},
3448
}
3549
}
3650

37-
func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
51+
func (col *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
3852
return nil, nil
3953
}
40-
func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
54+
func (col *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
4155
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
4256
return nil, nil
4357
}
44-
func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
58+
func (col *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
4559
opts ...*options.DistinctOptions) ([]interface{}, error) {
4660
return nil, nil
4761
}
48-
func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{},
62+
func (col *MockCollection) DeleteMany(ctx context.Context, filter interface{},
4963
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
50-
this.deleteManyArgs = append(this.deleteManyArgs, filter)
51-
this.deleteManyCalls++
64+
details := col.mockDetails[DEL_MANY]
65+
details.args = append(details.args, filter)
66+
details.calls++
5267
return &mongo.DeleteResult{
5368
DeletedCount: 1,
54-
}, this.errorDeleteMany
69+
}, details.err
5570
}
56-
func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) {
71+
func (col *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) {
5772
return 0, nil
5873
}
59-
func (this *MockCollection) Find(ctx context.Context, filter interface{},
74+
func (col *MockCollection) Find(ctx context.Context, filter interface{},
6075
opts ...*options.FindOptions) (cur *mongo.Cursor, err error) {
61-
return nil, nil
76+
details := col.mockDetails[FIND]
77+
details.args = append(details.args, filter, opts)
78+
details.calls++
79+
return mongo.NewCursorFromDocuments(make([]interface{}, 0), nil, nil)
6280
}
6381

6482
func TestMongoStorageIntegration(t *testing.T) {
@@ -292,11 +310,12 @@ func TestRemove(t *testing.T) {
292310
messagesCollection: colMock,
293311
}
294312

313+
details := colMock.mockDetails[DEL_MANY]
295314
queue := "test_queue"
296315
count, err := storage.Remove(context.Background(), queue, "1", "2")
297316
require.NoError(t, err)
298317
require.Equal(t, int64(2), count)
299-
require.Equal(t, 2, colMock.deleteManyCalls)
318+
require.Equal(t, 2, details.calls, "should call delete many twice")
300319
require.Equal(t, []interface{}{bson.M{
301320
"queue": queue,
302321
"id": bson.M{
@@ -308,24 +327,68 @@ func TestRemove(t *testing.T) {
308327
"id": bson.M{
309328
"$in": []string{"2"},
310329
},
311-
}}, colMock.deleteManyArgs)
330+
}}, details.args)
312331
}
313332

314-
func TestRemoveErrors(t *testing.T) {
333+
func TestRemoveWithErrors(t *testing.T) {
315334
colMock := newMockCollectionErr(fmt.Errorf("Mocked error"))
316335
storage := &MongoStorage{
317336
messagesCollection: colMock,
318337
}
319338

339+
details := colMock.mockDetails[DEL_MANY]
320340
queue := "test_queue"
321341
count, err := storage.Remove(context.Background(), queue, "1", "2")
322342
require.ErrorContains(t, err, "Mocked error")
323343
require.Equal(t, int64(0), count)
324-
require.Equal(t, 1, colMock.deleteManyCalls)
344+
require.Equal(t, 1, details.calls, "should call delete many once")
325345
require.Equal(t, []interface{}{bson.M{
326346
"queue": queue,
327347
"id": bson.M{
328348
"$in": []string{"1", "2"},
329349
},
330-
}}, colMock.deleteManyArgs)
350+
}}, details.args)
351+
}
352+
353+
func TestFindWithLimitAndBatchVariations(t *testing.T) {
354+
lim := int64(1)
355+
batch := int32(1000)
356+
testFindBatch(t, &lim, &batch)
357+
lim = int64(2)
358+
batch = int32(2)
359+
testFindBatch(t, &lim, &batch)
360+
lim = int64(10_000)
361+
batch = int32(10_000)
362+
testFindBatch(t, &lim, &batch)
363+
lim = int64(10_001)
364+
batch = int32(10_000)
365+
testFindBatch(t, &lim, &batch)
366+
}
367+
368+
func testFindBatch(t *testing.T, limit *int64, expectedBatch *int32) {
369+
colMock := newMockCollection()
370+
storage := &MongoStorage{
371+
messagesCollection: colMock,
372+
}
373+
messages, err := storage.Find(context.Background(), &FindOptions{
374+
Limit: *limit,
375+
})
376+
377+
details := colMock.mockDetails[FIND]
378+
findOpt, ok := details.args[1].([]*options.FindOptions) // Type assertion
379+
if !ok {
380+
fmt.Println("Type assertion failed")
381+
return
382+
}
383+
384+
require.NoError(t, err)
385+
require.Equal(t, 0, len(messages), "should return no messages")
386+
require.Equal(t, 1, details.calls, "should call find once")
387+
require.Equal(t, 1, len(findOpt), "only one find option was used")
388+
require.Equal(t, &options.FindOptions{
389+
Projection: &bson.M{},
390+
Sort: &bson.D{},
391+
Limit: limit,
392+
BatchSize: expectedBatch,
393+
}, findOpt[0], fmt.Sprintf("should call with batch size = %d, lim = %d and empty projection and sort", *expectedBatch, *limit))
331394
}

0 commit comments

Comments
 (0)