Skip to content

Commit 3070981

Browse files
committed
add configurable batch size
1 parent fae884d commit 3070981

File tree

8 files changed

+139
-29
lines changed

8 files changed

+139
-29
lines changed

docs/usage/core-concepts.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ const queryLeaf = new QueryLeaf(mongoClient, 'mydatabase');
5555
const results = await queryLeaf.execute('SELECT * FROM users');
5656

5757
// Or use a cursor for more control and memory efficiency
58-
const cursor = await queryLeaf.executeCursor('SELECT * FROM users');
58+
// You can optionally specify the batch size to control memory usage
59+
const cursor = await queryLeaf.executeCursor('SELECT * FROM users', { batchSize: 50 });
5960
await cursor.forEach(user => {
6061
console.log(`Processing user: ${user.name}`);
6162
});

docs/usage/examples.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,12 @@ const usersInNY = await queryLeaf.execute(`
7777

7878
```typescript
7979
// Use cursor for memory-efficient processing of large result sets
80+
// Optionally specify a batch size to control how many documents are fetched at once
8081
const cursor = await queryLeaf.executeCursor(`
8182
SELECT _id, customer, total, items
8283
FROM orders
8384
WHERE status = 'completed'
84-
`);
85+
`, { batchSize: 100 }); // Set batch size to 100 documents per batch
8586

8687
try {
8788
// Process one document at a time without loading everything in memory

packages/lib/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ const results = await queryLeaf.execute('SELECT * FROM users WHERE age > 21');
4949
console.log(results);
5050

5151
// Get a MongoDB cursor for more control over result processing and memory efficiency
52-
const cursor = await queryLeaf.executeCursor('SELECT * FROM users WHERE age > 30');
52+
// You can optionally specify a batch size to control how many documents are fetched at once
53+
const cursor = await queryLeaf.executeCursor('SELECT * FROM users WHERE age > 30', { batchSize: 50 });
5354
await cursor.forEach((doc) => {
5455
console.log(`User: ${doc.name}`);
5556
});
@@ -106,7 +107,8 @@ When working with large result sets, using MongoDB cursors directly can be more
106107

107108
```typescript
108109
// Get a cursor for a SELECT query
109-
const cursor = await queryLeaf.executeCursor('SELECT * FROM products WHERE price > 100');
110+
// You can specify a batch size to control memory usage and network behavior
111+
const cursor = await queryLeaf.executeCursor('SELECT * FROM products WHERE price > 100', { batchSize: 100 });
110112

111113
// Option 1: Convert to array (loads all results into memory)
112114
const results = await cursor.toArray();
@@ -131,7 +133,7 @@ await cursor.close();
131133
Features:
132134
- Returns MongoDB `FindCursor` for normal queries and `AggregationCursor` for aggregations
133135
- Supports all cursor methods like `forEach()`, `toArray()`, `next()`, `hasNext()`
134-
- Efficiently handles large result sets with MongoDB's batching system
136+
- Efficiently handles large result sets with MongoDB's batching system (configurable batch size)
135137
- Works with all advanced QueryLeaf features (filtering, sorting, aggregations, etc.)
136138
- Only available for read operations (SELECT queries)
137139
```

packages/lib/src/examples/basic-usage.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ async function main() {
106106
let cursor: CursorResult<Document> = null;
107107
try {
108108
// Using the executeCursor method to get a MongoDB cursor
109-
cursor = await queryLeaf.executeCursor('SELECT * FROM users WHERE active = true');
109+
// You can optionally specify a batch size to control memory usage
110+
cursor = await queryLeaf.executeCursor('SELECT * FROM users WHERE active = true', {
111+
batchSize: 20,
112+
});
110113

111114
// Check if we got a cursor back
112115
if (cursor) {

packages/lib/src/executor/index.ts

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { CommandExecutor, Command, ExecutionResult, CursorResult } from '../interfaces';
1+
import {
2+
CommandExecutor,
3+
Command,
4+
ExecutionResult,
5+
CursorResult,
6+
CursorOptions,
7+
} from '../interfaces';
28
import { Document, MongoClient, ObjectId } from 'mongodb';
39

410
/**
@@ -47,28 +53,32 @@ export class MongoExecutor implements CommandExecutor {
4753
for (const command of commands) {
4854
switch (command.type) {
4955
case 'FIND':
50-
const findCursor = database
51-
.collection(command.collection)
52-
.find(this.convertObjectIds(command.filter || {}));
56+
// Prepare find options
57+
const findOptions: any = {};
5358

5459
// Apply projection if specified
5560
if (command.projection) {
56-
findCursor.project(command.projection);
61+
findOptions.projection = command.projection;
5762
}
5863

5964
// Apply sorting if specified
6065
if (command.sort) {
61-
findCursor.sort(command.sort);
66+
findOptions.sort = command.sort;
6267
}
6368

6469
// Apply pagination if specified
6570
if (command.skip) {
66-
findCursor.skip(command.skip);
71+
findOptions.skip = command.skip;
6772
}
6873
if (command.limit && command.limit > 0) {
69-
findCursor.limit(command.limit);
74+
findOptions.limit = command.limit;
7075
}
7176

77+
// Create the cursor with all options at once
78+
const findCursor = database
79+
.collection(command.collection)
80+
.find(this.convertObjectIds(command.filter || {}), findOptions);
81+
7282
// Always return array for the regular execute
7383
result = await findCursor.toArray();
7484
break;
@@ -97,7 +107,14 @@ export class MongoExecutor implements CommandExecutor {
97107
case 'AGGREGATE':
98108
// Handle aggregation commands
99109
const pipeline = command.pipeline.map((stage) => this.convertObjectIds(stage));
100-
const aggregateCursor = database.collection(command.collection).aggregate(pipeline);
110+
111+
// Prepare aggregation options
112+
const aggregateOptions: any = {};
113+
114+
// Create the cursor with options
115+
const aggregateCursor = database
116+
.collection(command.collection)
117+
.aggregate(pipeline, aggregateOptions);
101118

102119
// Always return array for the regular execute
103120
result = await aggregateCursor.toArray();
@@ -114,10 +131,14 @@ export class MongoExecutor implements CommandExecutor {
114131
/**
115132
* Execute a series of MongoDB commands and return cursors
116133
* @param commands Array of commands to execute
134+
* @param options Options for cursor execution
117135
* @returns Cursor for FIND and AGGREGATE commands, null for other commands
118136
* @typeParam T - The type of documents that will be returned (defaults to Document)
119137
*/
120-
async executeCursor<T = Document>(commands: Command[]): Promise<CursorResult<T>> {
138+
async executeCursor<T = Document>(
139+
commands: Command[],
140+
options?: CursorOptions
141+
): Promise<CursorResult<T>> {
121142
// We assume the client is already connected
122143
const database = this.client.db(this.dbName);
123144

@@ -128,36 +149,55 @@ export class MongoExecutor implements CommandExecutor {
128149
for (const command of commands) {
129150
switch (command.type) {
130151
case 'FIND':
131-
const findCursor = database
132-
.collection(command.collection)
133-
.find(this.convertObjectIds(command.filter || {}));
152+
// Prepare find options
153+
const findOptions: any = {};
134154

135155
// Apply projection if specified
136156
if (command.projection) {
137-
findCursor.project(command.projection);
157+
findOptions.projection = command.projection;
138158
}
139159

140160
// Apply sorting if specified
141161
if (command.sort) {
142-
findCursor.sort(command.sort);
162+
findOptions.sort = command.sort;
143163
}
144164

145165
// Apply pagination if specified
146166
if (command.skip) {
147-
findCursor.skip(command.skip);
167+
findOptions.skip = command.skip;
148168
}
149169
if (command.limit && command.limit > 0) {
150-
findCursor.limit(command.limit);
170+
findOptions.limit = command.limit;
151171
}
152172

173+
// Apply batch size from options
174+
if (options?.batchSize) {
175+
findOptions.batchSize = options.batchSize;
176+
}
177+
178+
// Create the cursor with all options at once
179+
const findCursor = database
180+
.collection(command.collection)
181+
.find(this.convertObjectIds(command.filter || {}), findOptions);
182+
153183
// Return the cursor directly
154184
result = findCursor;
155185
break;
156186

157187
case 'AGGREGATE':
158188
// Handle aggregation commands
159189
const pipeline = command.pipeline.map((stage) => this.convertObjectIds(stage));
160-
result = database.collection(command.collection).aggregate(pipeline);
190+
191+
// Prepare aggregation options
192+
const aggregateOptions: any = {};
193+
194+
// Apply batch size from options
195+
if (options?.batchSize) {
196+
aggregateOptions.batchSize = options.batchSize;
197+
}
198+
199+
// Create the cursor with options
200+
result = database.collection(command.collection).aggregate(pipeline, aggregateOptions);
161201
break;
162202

163203
case 'INSERT':

packages/lib/src/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
CommandExecutor,
77
ExecutionResult,
88
CursorResult,
9+
CursorOptions,
910
} from './interfaces';
1011
import { Document, MongoClient } from 'mongodb';
1112
import { SqlParserImpl } from './parser';
@@ -47,13 +48,17 @@ export class QueryLeaf {
4748
/**
4849
* Execute a SQL query on MongoDB and return a cursor
4950
* @param sql SQL query string
51+
* @param options Options for cursor execution
5052
* @returns Cursor for SELECT queries, null for other queries
5153
* @typeParam T - The type of documents that will be returned (defaults to Document)
5254
*/
53-
async executeCursor<T = Document>(sql: string): Promise<CursorResult<T>> {
55+
async executeCursor<T = Document>(
56+
sql: string,
57+
options?: CursorOptions
58+
): Promise<CursorResult<T>> {
5459
const statement = this.parse(sql);
5560
const commands = this.compile(statement);
56-
return await this.executor.executeCursor(commands);
61+
return await this.executor.executeCursor(commands, options);
5762
}
5863

5964
/**

packages/lib/src/interfaces.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ export type CursorResult<T = Document> =
122122
| AggregationCursor<T> // Cursor from AGGREGATE command
123123
| null; // No result
124124

125+
/**
126+
* Options for cursor execution
127+
*/
128+
export interface CursorOptions {
129+
/**
130+
* Number of documents to fetch per batch
131+
* This will be set directly on the query command, not on the cursor after creation
132+
*/
133+
batchSize?: number;
134+
}
135+
125136
/**
126137
* MongoDB command executor interface
127138
*/
@@ -138,9 +149,13 @@ export interface CommandExecutor {
138149
/**
139150
* Execute MongoDB commands and return cursors for FIND and AGGREGATE commands
140151
* @param commands Array of commands to execute
152+
* @param options Options for cursor execution
141153
* @returns Cursor for FIND and AGGREGATE commands, null for other commands
142154
*/
143-
executeCursor<T = Document>(commands: Command[]): Promise<CursorResult<T>>;
155+
executeCursor<T = Document>(
156+
commands: Command[],
157+
options?: CursorOptions
158+
): Promise<CursorResult<T>>;
144159
}
145160

146161
/**
@@ -157,9 +172,10 @@ export interface QueryLeaf {
157172
/**
158173
* Execute a SQL query and return a cursor for SELECT queries
159174
* @param sql SQL query string
175+
* @param options Options for cursor execution
160176
* @returns Cursor for SELECT queries, null for other queries
161177
*/
162-
executeCursor<T = Document>(sql: string): Promise<CursorResult<T>>;
178+
executeCursor<T = Document>(sql: string, options?: CursorOptions): Promise<CursorResult<T>>;
163179

164180
parse(sql: string): SqlStatement;
165181
compile(statement: SqlStatement): Command[];
@@ -169,7 +185,7 @@ export interface QueryLeaf {
169185

170186
export interface Squongo extends QueryLeaf {
171187
execute<T = Document>(sql: string): Promise<ExecutionResult<T>>;
172-
executeCursor<T = Document>(sql: string): Promise<CursorResult<T>>;
188+
executeCursor<T = Document>(sql: string, options?: CursorOptions): Promise<CursorResult<T>>;
173189
parse(sql: string): SqlStatement;
174190
compile(statement: SqlStatement): Command[];
175191
getExecutor(): CommandExecutor;

packages/lib/tests/integration/cursor.integration.test.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,4 +335,46 @@ describe('MongoDB Cursor Functionality Tests', () => {
335335
// Clean up
336336
await cursor?.close();
337337
});
338+
339+
test('should support custom batchSize option', async () => {
340+
// This test verifies the batchSize option is used correctly
341+
// Arrange
342+
const queryLeaf = testSetup.getQueryLeaf();
343+
const db = testSetup.getDb();
344+
345+
// Generate more data to make batching more apparent
346+
const moreDocuments = [];
347+
for (let i = 0; i < 50; i++) {
348+
moreDocuments.push({ name: `Extra Product ${i}`, value: i });
349+
}
350+
await db.collection('batch_test').deleteMany({});
351+
await db.collection('batch_test').insertMany(moreDocuments);
352+
353+
// Define a small batch size
354+
const batchSize = 10;
355+
356+
// Act - Create a cursor with a specific batch size
357+
const cursor = await queryLeaf.executeCursor(
358+
'SELECT * FROM batch_test ORDER BY value ASC',
359+
{ batchSize }
360+
);
361+
362+
// Assert
363+
expect(cursor).not.toBeNull();
364+
365+
// Consume the cursor in multiple steps to verify batching
366+
const result1 = await cursor!.toArray();
367+
368+
// We should have all documents
369+
expect(result1.length).toBe(50);
370+
371+
// Verify results are in correct order
372+
for (let i = 1; i < result1.length; i++) {
373+
expect(result1[i].value).toBeGreaterThanOrEqual(result1[i-1].value);
374+
}
375+
376+
// Clean up
377+
await cursor!.close();
378+
await db.collection('batch_test').drop();
379+
});
338380
});

0 commit comments

Comments
 (0)