Skip to content

Commit 267ae91

Browse files
remove ReadableCursorStream
1 parent 3216d33 commit 267ae91

File tree

3 files changed

+24
-141
lines changed

3 files changed

+24
-141
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 5 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Readable, Transform } from 'stream';
1+
import { Readable } from 'stream';
22

33
import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
44
import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document';
@@ -496,33 +496,10 @@ export abstract class AbstractCursor<
496496
}
497497

498498
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
499-
if (options?.transform) {
500-
const transform = options.transform;
501-
const readable = new ReadableCursorStream(this);
502-
503-
const transformedStream = readable.pipe(
504-
new Transform({
505-
objectMode: true,
506-
highWaterMark: 1,
507-
transform(chunk, _, callback) {
508-
try {
509-
const transformed = transform(chunk);
510-
callback(undefined, transformed);
511-
} catch (err) {
512-
callback(err);
513-
}
514-
}
515-
})
516-
);
517-
518-
// Bubble errors to transformed stream, because otherwise no way
519-
// to handle this error.
520-
readable.on('error', err => transformedStream.emit('error', err));
521-
522-
return transformedStream;
523-
}
524-
525-
return new ReadableCursorStream(this);
499+
const transform = options?.transform ?? (doc => doc);
500+
return Readable.from(this, { autoDestroy: false, highWaterMark: 1, objectMode: true }).map(
501+
transform
502+
);
526503
}
527504

528505
async hasNext(): Promise<boolean> {
@@ -1062,87 +1039,6 @@ export abstract class AbstractCursor<
10621039
}
10631040
}
10641041

1065-
class ReadableCursorStream extends Readable {
1066-
private _cursor: AbstractCursor;
1067-
private _readInProgress = false;
1068-
1069-
constructor(cursor: AbstractCursor) {
1070-
super({
1071-
objectMode: true,
1072-
autoDestroy: false,
1073-
highWaterMark: 1
1074-
});
1075-
this._cursor = cursor;
1076-
}
1077-
1078-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
1079-
override _read(size: number): void {
1080-
if (!this._readInProgress) {
1081-
this._readInProgress = true;
1082-
this._readNext();
1083-
}
1084-
}
1085-
1086-
override _destroy(error: Error | null, callback: (error?: Error | null) => void): void {
1087-
this._cursor.close().then(
1088-
() => callback(error),
1089-
closeError => callback(closeError)
1090-
);
1091-
}
1092-
1093-
private _readNext() {
1094-
if (this._cursor.id === Long.ZERO) {
1095-
this.push(null);
1096-
return;
1097-
}
1098-
1099-
this._cursor.next().then(
1100-
result => {
1101-
if (result == null) {
1102-
this.push(null);
1103-
} else if (this.destroyed) {
1104-
this._cursor.close().then(undefined, squashError);
1105-
} else {
1106-
if (this.push(result)) {
1107-
return this._readNext();
1108-
}
1109-
1110-
this._readInProgress = false;
1111-
}
1112-
},
1113-
err => {
1114-
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
1115-
// desired behavior is that a stream ends cleanly when a user explicitly closes
1116-
// a client during iteration. Alternatively, we could do the "right" thing and
1117-
// propagate the error message by removing this special case.
1118-
if (err.message.match(/server is closed/)) {
1119-
this._cursor.close().then(undefined, squashError);
1120-
return this.push(null);
1121-
}
1122-
1123-
// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
1124-
// to be "operation was interrupted", where a cursor has been closed but there is an
1125-
// active getMore in-flight. This used to check if the cursor was killed but once
1126-
// that changed to happen in cleanup legitimate errors would not destroy the
1127-
// stream. There are change streams test specifically test these cases.
1128-
if (err.message.match(/operation was interrupted/)) {
1129-
return this.push(null);
1130-
}
1131-
1132-
// NOTE: The two above checks on the message of the error will cause a null to be pushed
1133-
// to the stream, thus closing the stream before the destroy call happens. This means
1134-
// that either of those error messages on a change stream will not get a proper
1135-
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
1136-
// relies on that error event to be emitted to create its new cursor and thus was not
1137-
// working on 4.4 servers because the error emitted on failover was "interrupted at
1138-
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
1139-
// See NODE-4475.
1140-
return this.destroy(err);
1141-
}
1142-
);
1143-
}
1144-
}
1145-
11461042
configureResourceManagement(AbstractCursor.prototype);
11471043

11481044
/**

test/integration/crud/misc_cursors.test.js

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,40 +1495,34 @@ describe('Cursor', function () {
14951495
}
14961496
});
14971497

1498-
it('does not auto destroy streams', function (done) {
1498+
it('does not auto destroy streams', async function () {
14991499
const docs = [];
15001500

15011501
for (var i = 0; i < 10; i++) {
15021502
docs.push({ a: i + 1 });
15031503
}
15041504

15051505
const configuration = this.configuration;
1506-
client.connect((err, client) => {
1507-
expect(err).to.not.exist;
1506+
await client.connect();
15081507

1509-
const db = client.db(configuration.db);
1510-
db.createCollection('does_not_autodestroy_streams', (err, collection) => {
1511-
expect(err).to.not.exist;
1508+
const db = client.db(configuration.db);
1509+
const collection = await db.createCollection('does_not_autodestroy_streams');
15121510

1513-
collection.insertMany(docs, configuration.writeConcernMax(), err => {
1514-
expect(err).to.not.exist;
1511+
await collection.insertMany(docs, configuration.writeConcernMax());
15151512

1516-
const cursor = collection.find();
1517-
const stream = cursor.stream();
1518-
stream.on('close', () => {
1519-
expect.fail('extra close event must not be called');
1520-
});
1521-
stream.on('end', () => {
1522-
client.close();
1523-
done();
1524-
});
1525-
stream.on('data', doc => {
1526-
expect(doc).to.exist;
1527-
});
1528-
stream.resume();
1529-
});
1530-
});
1513+
const cursor = collection.find();
1514+
const stream = cursor.stream();
1515+
1516+
const end$ = once(stream, 'end');
1517+
const close$ = once(stream, 'close').then(() => {
1518+
expect.fail('extra close event must not be called');
15311519
});
1520+
1521+
stream.resume();
1522+
1523+
await Promise.race([end$, close$]);
1524+
1525+
await client.close();
15321526
});
15331527

15341528
it('should be able to stream documents', {
@@ -2321,7 +2315,7 @@ describe('Cursor', function () {
23212315
.find()
23222316
.withReadPreference('notsecondary');
23232317
test.ok(false);
2324-
} catch (err) {} // eslint-disable-line
2318+
} catch (err) { } // eslint-disable-line
23252319

23262320
db.collection('shouldFailToSetReadPreferenceOnCursor')
23272321
.find()

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { expect } from 'chai';
22
import { once } from 'events';
33
import * as sinon from 'sinon';
4-
import { Transform } from 'stream';
54
import { inspect } from 'util';
65

76
import {
@@ -299,14 +298,8 @@ describe('class AbstractCursor', function () {
299298
});
300299

301300
it('propagates errors to transform stream', async function () {
302-
const transform = new Transform({
303-
transform(data, encoding, callback) {
304-
callback(null, data);
305-
}
306-
});
307-
308301
// MongoServerError: unknown operator: $bar
309-
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });
302+
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc });
310303

311304
const error: Error | null = await new Promise(resolve => {
312305
stream.on('error', error => resolve(error));

0 commit comments

Comments
 (0)