Skip to content

Commit 8548f05

Browse files
authored
fix(shell-api): handle errors in cursor.map/.forEach MONGOSH-703 (#816)
Handle exceptions from callbacks passed to `cursor.map()` and `cursor.forEach()`. Since `cursor.forEach()` returns a Promise, use that to propagate the error to the user. For `cursor.map()`, this is a bit trickier, since the method itself does not return any results to the user. Therefore, we add checks before and after accessing cursor methods that read data which forward the exception if one has occurred. Both of these feel like situations that the driver should take care of explicitly, so NODE tickets have been opened for both cases. (One alternative would have been to add async context/Node.js domain propagation to these methods. However, I decided against that, because that solution would be a) Node.js-specific, and b) because it would still lead to undesirable results, like the Promise returned by `.forEach()` never resolving.)
1 parent 4de4d39 commit 8548f05

File tree

8 files changed

+116
-18
lines changed

8 files changed

+116
-18
lines changed
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
null
2-
[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ]
1+
[ { "_id": 1, "name": "Vasya" }, { "_id": 2, "name": "Petya" }, { "_id": 3, "name": "Lyusya" } ]

packages/shell-api/src/abstract-cursor.ts

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,37 @@ export abstract class AbstractCursor extends ShellApiClass {
2424
abstract _cursor: ServiceProviderAggregationCursor | ServiceProviderCursor;
2525
_currentIterationResult: CursorIterationResult | null = null;
2626
_batchSize: number | null = null;
27+
_mapError: Error | null = null;
2728

2829
constructor(mongo: Mongo) {
2930
super();
3031
this._mongo = mongo;
3132
}
3233

34+
// Wrap a function with checks before and after that verify whether a .map()
35+
// callback has resulted in an exception. Such an error would otherwise result
36+
// in an uncaught exception, bringing the whole process down.
37+
// The downside to this is that errors will not actually be visible until
38+
// the caller tries to interact with this cursor in a way that triggers
39+
// these checks. Since that is also the behavior for errors coming from the
40+
// database server, it makes sense to match that.
41+
// Ideally, this kind of code could be lifted into the driver (NODE-3231 and
42+
// NODE-3232 are the tickets for that).
43+
async _withCheckMapError<Ret>(fn: () => Ret): Promise<Ret> {
44+
if (this._mapError) {
45+
// If an error has already occurred, we don't want to call the function
46+
// at all.
47+
throw this._mapError;
48+
}
49+
const ret = await fn();
50+
if (this._mapError) {
51+
// If an error occurred during the function, we don't want to forward its
52+
// results.
53+
throw this._mapError;
54+
}
55+
return ret;
56+
}
57+
3358
/**
3459
* Internal method to determine what is printed for this class.
3560
*/
@@ -39,7 +64,7 @@ export abstract class AbstractCursor extends ShellApiClass {
3964

4065
async _it(): Promise<CursorIterationResult> {
4166
const results = this._currentIterationResult = new CursorIterationResult();
42-
await iterate(results, this._cursor, this._batchSize ?? await this._mongo._batchSize());
67+
await iterate(results, this, this._batchSize ?? await this._mongo._batchSize());
4368
results.cursorHasMore = !this.isExhausted();
4469
return results;
4570
}
@@ -57,17 +82,31 @@ export abstract class AbstractCursor extends ShellApiClass {
5782

5883
@returnsPromise
5984
async forEach(f: (doc: Document) => void): Promise<void> {
60-
return this._cursor.forEach(f);
85+
// Work around https://jira.mongodb.org/browse/NODE-3231
86+
let exception;
87+
const wrapped = (doc: Document): boolean | undefined => {
88+
try {
89+
f(doc);
90+
return undefined;
91+
} catch (err) {
92+
exception = err;
93+
return false; // Stop iteration.
94+
}
95+
};
96+
await this._cursor.forEach(wrapped);
97+
if (exception) {
98+
throw exception;
99+
}
61100
}
62101

63102
@returnsPromise
64103
async hasNext(): Promise<boolean> {
65-
return this._cursor.hasNext();
104+
return this._withCheckMapError(() => this._cursor.hasNext());
66105
}
67106

68107
@returnsPromise
69108
async tryNext(): Promise<Document | null> {
70-
return this._cursor.tryNext();
109+
return this._withCheckMapError(() => this._cursor.tryNext());
71110
}
72111

73112
async* [Symbol.asyncIterator]() {
@@ -96,7 +135,7 @@ export abstract class AbstractCursor extends ShellApiClass {
96135

97136
@returnsPromise
98137
async toArray(): Promise<Document[]> {
99-
return this._cursor.toArray();
138+
return this._withCheckMapError(() => this._cursor.toArray());
100139
}
101140

102141
@returnType('this')
@@ -106,7 +145,20 @@ export abstract class AbstractCursor extends ShellApiClass {
106145

107146
@returnType('this')
108147
map(f: (doc: Document) => Document): this {
109-
this._cursor.map(f);
148+
// Work around https://jira.mongodb.org/browse/NODE-3232
149+
const wrapped = (doc: Document): Document => {
150+
if (this._mapError) {
151+
// These errors should never become visible to the user.
152+
return { __errored: true };
153+
}
154+
try {
155+
return f(doc);
156+
} catch (err) {
157+
this._mapError = err;
158+
return { __errored: true };
159+
}
160+
};
161+
this._cursor.map(wrapped);
110162
return this;
111163
}
112164

@@ -118,7 +170,7 @@ export abstract class AbstractCursor extends ShellApiClass {
118170

119171
@returnsPromise
120172
async next(): Promise<Document | null> {
121-
return this._cursor.next();
173+
return this._withCheckMapError(() => this._cursor.next());
122174
}
123175

124176
@returnType('this')

packages/shell-api/src/aggregation-cursor.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ describe('AggregationCursor', () => {
6767
it('calls wrappee.map with arguments', () => {
6868
const arg = {};
6969
cursor.map(arg);
70-
expect(wrappee.map.calledWith(arg)).to.equal(true);
70+
expect(wrappee.map).to.have.callCount(1);
7171
});
7272
});
7373

packages/shell-api/src/change-stream-cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export default class ChangeStreamCursor extends ShellApiClass {
4343
throw new MongoshRuntimeError('ChangeStreamCursor is closed');
4444
}
4545
const result = this._currentIterationResult = new CursorIterationResult();
46-
return iterate(result, this._cursor, this._batchSize ?? await this._mongo._batchSize());
46+
return iterate(result, this, this._batchSize ?? await this._mongo._batchSize());
4747
}
4848

4949
/**

packages/shell-api/src/cursor.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ describe('Cursor', () => {
7171
it('calls wrappee.map with arguments', () => {
7272
const arg = {};
7373
cursor.map(arg);
74-
expect(wrappee.map.calledWith(arg)).to.equal(true);
74+
expect(wrappee.map).to.have.callCount(1);
7575
});
7676

7777
it('has the correct metadata', () => {

packages/shell-api/src/explainable-cursor.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ describe('ExplainableCursor', () => {
5656
it('calls wrappee.map with arguments', () => {
5757
const arg = () => {};
5858
eCursor.map(arg);
59-
expect(wrappee.map.calledWith(arg)).to.equal(true);
59+
expect(wrappee.map).to.have.callCount(1);
6060
});
6161

6262
it('has the correct metadata', () => {

packages/shell-api/src/helpers.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@ import type {
33
DbOptions,
44
Document,
55
ExplainVerbosityLike,
6-
FindCursor,
7-
AggregationCursor as SPAggregationCursor,
86
FindAndModifyOptions,
97
DeleteOptions,
108
MapReduceOptions,
11-
ChangeStream,
129
KMSProviders,
1310
ExplainOptions
1411
} from '@mongosh/service-provider-core';
@@ -22,6 +19,8 @@ import { BinaryType, ReplPlatform } from '@mongosh/service-provider-core';
2219
import { ClientSideFieldLevelEncryptionOptions } from './field-level-encryption';
2320
import { AutoEncryptionOptions } from 'mongodb';
2421
import { shellApiType } from './enums';
22+
import type { AbstractCursor } from './abstract-cursor';
23+
import type ChangeStreamCursor from './change-stream-cursor';
2524

2625
/**
2726
* Helper method to adapt aggregation pipeline options.
@@ -525,9 +524,9 @@ export function addHiddenDataProperty<T = any>(target: T, key: string|symbol, va
525524

526525
export async function iterate(
527526
results: CursorIterationResult,
528-
cursor: FindCursor | SPAggregationCursor | ChangeStream,
527+
cursor: AbstractCursor | ChangeStreamCursor,
529528
batchSize: number): Promise<CursorIterationResult> {
530-
if (cursor.closed) {
529+
if (cursor.isClosed()) {
531530
return results;
532531
}
533532

packages/shell-api/src/integration.spec.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1959,4 +1959,52 @@ describe('Shell API (integration)', function() {
19591959
expect((await collection.find().batchSize(50)._it()).documents).to.have.lengthOf(50);
19601960
});
19611961
});
1962+
1963+
describe('cursor map/forEach', () => {
1964+
beforeEach(async() => {
1965+
await collection.insertMany([...Array(10).keys()].map(i => ({ i })));
1966+
});
1967+
1968+
it('forEach() iterates over input but does not return anything', async() => {
1969+
let value = 0;
1970+
const result = await collection.find().forEach(({ i }) => { value += i; });
1971+
expect(result).to.equal(undefined);
1972+
expect(value).to.equal(45);
1973+
});
1974+
1975+
it('map() iterates over input and changes documents in-place', async() => {
1976+
const cursor = collection.find();
1977+
cursor.map(({ i }) => ({ j: i }));
1978+
expect((await cursor._it()).documents[0]).to.deep.equal({ j: 0 });
1979+
});
1980+
1981+
it('forEach() errors lead to a rejected promise', async() => {
1982+
const error = new Error();
1983+
let calls = 0;
1984+
try {
1985+
await collection.find().forEach(() => { calls++; throw error; });
1986+
expect.fail('missed exception');
1987+
} catch (err) {
1988+
expect(err).to.equal(error);
1989+
}
1990+
expect(calls).to.equal(1);
1991+
});
1992+
1993+
it('map() errors show up when reading the cursor', async() => {
1994+
const error = new Error();
1995+
const cursor = collection.find();
1996+
let calls = 0;
1997+
cursor.map(() => { calls++; throw error; });
1998+
for (let i = 0; i < 2; i++) {
1999+
// Try reading twice to make sure .map() is not called again for the second attempt.
2000+
try {
2001+
await cursor.tryNext();
2002+
expect.fail('missed exception');
2003+
} catch (err) {
2004+
expect(err).to.equal(error);
2005+
}
2006+
}
2007+
expect(calls).to.equal(1);
2008+
});
2009+
});
19622010
});

0 commit comments

Comments
 (0)