Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 9 additions & 109 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Readable, Transform } from 'stream';
import { Readable } from 'stream';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document';
Expand Down Expand Up @@ -496,33 +496,14 @@ export abstract class AbstractCursor<
}

stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
if (options?.transform) {
const transform = options.transform;
const readable = new ReadableCursorStream(this);

const transformedStream = readable.pipe(
new Transform({
objectMode: true,
highWaterMark: 1,
transform(chunk, _, callback) {
try {
const transformed = transform(chunk);
callback(undefined, transformed);
} catch (err) {
callback(err);
}
}
})
);

// Bubble errors to transformed stream, because otherwise no way
// to handle this error.
readable.on('error', err => transformedStream.emit('error', err));

return transformedStream;
}

return new ReadableCursorStream(this);
const transform = options?.transform ?? (doc => doc);
const stream = Readable.from(this, {
autoDestroy: false,
highWaterMark: 1,
objectMode: true
}).map(transform);
stream.on('close', () => this.close() as any);
return stream;
}

async hasNext(): Promise<boolean> {
Expand Down Expand Up @@ -1062,87 +1043,6 @@ export abstract class AbstractCursor<
}
}

class ReadableCursorStream extends Readable {
private _cursor: AbstractCursor;
private _readInProgress = false;

constructor(cursor: AbstractCursor) {
super({
objectMode: true,
autoDestroy: false,
highWaterMark: 1
});
this._cursor = cursor;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
override _read(size: number): void {
if (!this._readInProgress) {
this._readInProgress = true;
this._readNext();
}
}

override _destroy(error: Error | null, callback: (error?: Error | null) => void): void {
this._cursor.close().then(
() => callback(error),
closeError => callback(closeError)
);
}

private _readNext() {
if (this._cursor.id === Long.ZERO) {
this.push(null);
return;
}

this._cursor.next().then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}

// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}
);
}
}

configureResourceManagement(AbstractCursor.prototype);

/**
Expand Down
116 changes: 43 additions & 73 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1495,40 +1495,34 @@ describe('Cursor', function () {
}
});

it('does not auto destroy streams', function (done) {
it('does not auto destroy streams', async function () {
const docs = [];

for (var i = 0; i < 10; i++) {
docs.push({ a: i + 1 });
}

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
await client.connect();

const db = client.db(configuration.db);
db.createCollection('does_not_autodestroy_streams', (err, collection) => {
expect(err).to.not.exist;
const db = client.db(configuration.db);
const collection = await db.createCollection('does_not_autodestroy_streams');

collection.insertMany(docs, configuration.writeConcernMax(), err => {
expect(err).to.not.exist;
await collection.insertMany(docs, configuration.writeConcernMax());

const cursor = collection.find();
const stream = cursor.stream();
stream.on('close', () => {
expect.fail('extra close event must not be called');
});
stream.on('end', () => {
client.close();
done();
});
stream.on('data', doc => {
expect(doc).to.exist;
});
stream.resume();
});
});
const cursor = collection.find();
const stream = cursor.stream();

const end$ = once(stream, 'end');
const close$ = once(stream, 'close').then(() => {
expect.fail('extra close event must not be called');
});

stream.resume();

await Promise.race([end$, close$]);

await client.close();
});

it('should be able to stream documents', {
Expand Down Expand Up @@ -1614,62 +1608,38 @@ describe('Cursor', function () {
}
});

it('immediately destroying a stream prevents the query from executing', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
requires: { topology: ['single', 'replicaset', 'sharded'] }
},
it('immediately destroying a stream prevents the query from executing', async function () {
var i = 0,
docs = [{ b: 2 }, { b: 3 }];

test: function (done) {
var i = 0,
docs = [{ b: 2 }, { b: 3 }],
doneCalled = 0;

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const db = client.db(configuration.db);
db.createCollection(
'immediately_destroying_a_stream_prevents_the_query_from_executing',
(err, collection) => {
expect(err).to.not.exist;

// insert all docs
collection.insertMany(docs, configuration.writeConcernMax(), err => {
expect(err).to.not.exist;
const configuration = this.configuration;
await client.connect();

const cursor = collection.find();
const stream = cursor.stream();
const db = client.db(configuration.db);
const collection = db.collection(
'immediately_destroying_a_stream_prevents_the_query_from_executing'
);
// insert all docs
await collection.insertMany(docs, configuration.writeConcernMax());

stream.on('data', function () {
i++;
});
const cursor = collection.find();
const stream = cursor.stream();

cursor.once('close', testDone('close'));
stream.once('error', testDone('error'));
stream.on('data', function () {
i++;
});

stream.destroy();
const close$ = once(stream, 'close').then(async () => {
expect(i).to.equal(0);
expect(cursor.closed).to.be.true;
});
const error$ = once(stream, 'error').catch(e => {
throw e;
});

function testDone() {
return err => {
++doneCalled;
stream.destroy();

if (doneCalled === 1) {
expect(err).to.not.exist;
test.strictEqual(0, i);
test.strictEqual(true, cursor.closed);
done();
}
};
}
});
}
);
});
}
await Promise.race([close$, error$]);
});

it('removes session when cloning an find cursor', async function () {
Expand Down Expand Up @@ -2321,7 +2291,7 @@ describe('Cursor', function () {
.find()
.withReadPreference('notsecondary');
test.ok(false);
} catch (err) {} // eslint-disable-line
} catch (err) { } // eslint-disable-line

db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
Expand Down
16 changes: 6 additions & 10 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { Transform } from 'stream';
import { inspect } from 'util';

import {
Expand Down Expand Up @@ -286,8 +285,7 @@ describe('class AbstractCursor', function () {
const docs = [{ count: 0 }];

beforeEach(async function () {
client = this.configuration.newClient();

client = this.configuration.newClient({}, { monitorCommands: true });
collection = client.db('abstract_cursor_integration').collection('test');

await collection.insertMany(docs);
Expand All @@ -299,14 +297,12 @@ describe('class AbstractCursor', function () {
});

it('propagates errors to transform stream', async function () {
const transform = new Transform({
transform(data, encoding, callback) {
callback(null, data);
}
});

// MongoServerError: unknown operator: $bar
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc });

stream.on('data', () => {
// do nothing
});

const error: Error | null = await new Promise(resolve => {
stream.on('error', error => resolve(error));
Expand Down
Loading