diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index a5a267ac3e4..5f05a53beb4 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -43,6 +43,7 @@ export class AggregateOperation extends CommandOperation { target: string | typeof DB_AGGREGATE_COLLECTION; pipeline: Document[]; hasWriteStage: boolean; + hasChangeStreamStage: boolean; constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) { super(undefined, { ...options, dbName: ns.db }); @@ -72,6 +73,14 @@ export class AggregateOperation extends CommandOperation { delete this.options.writeConcern; } + this.hasChangeStreamStage = false; + if (pipeline.length > 0) { + const firstStage = pipeline[0]; + if (firstStage.$changeStream) { + this.hasChangeStreamStage = true; + } + } + if (this.explain && this.writeConcern) { throw new MongoInvalidArgumentError( 'Option "explain" cannot be used on an aggregate call with writeConcern' @@ -88,11 +97,7 @@ export class AggregateOperation extends CommandOperation { } override get canRetryRead(): boolean { - return !this.hasWriteStage; - } - - addToPipeline(stage: Document): void { - this.pipeline.push(stage); + return !this.hasWriteStage && !this.hasChangeStreamStage; } override async execute( diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 9e171f0ee63..1f9649abb00 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -891,6 +891,47 @@ describe('Change Streams', function () { 'This test only worked because of timing, changeStream.close does not remove the change listener'; }); + describe('when the aggregate fails due to a retryable error', function () { + let internalClient: MongoClient; + let client: MongoClient; + let changeStream: ChangeStream; + let aggregates: CommandStartedEvent[]; + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['aggregate'], + errorCode: 7 // Host Not Found + } + }; + + beforeEach(async function () { + aggregates = []; + internalClient = this.configuration.newClient(); + await internalClient.db().admin().command(failpoint); + client = this.configuration.newClient(undefined, { monitorCommands: true }); + client.on('commandStarted', ev => { + if (ev.commandName === 'aggregate') aggregates.push(ev); + }); + }); + + afterEach(async function () { + await internalClient.db().admin().command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + + await changeStream.close(); + await client.close(); + }); + + it('should not retry the aggregate command', async function () { + changeStream = client.db('test').collection('test').watch(); + const maybeError = await changeStream.next().catch(e => e); + expect(maybeError).to.be.instanceof(MongoServerError); + + expect(aggregates).to.have.lengthOf(1); + }); + }); + describe('iterator api', function () { describe('#tryNext()', function () { it('should return null on single iteration of empty cursor', {