Skip to content

Commit a8338ad

Browse files
refactor(NODE-7089): make AggregateOperation subclass ModernizedOperation (#4608)
1 parent ff9a785 commit a8338ad

File tree

12 files changed

+100
-119
lines changed

12 files changed

+100
-119
lines changed

src/cmap/connection.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ export interface CommandOptions extends BSONSerializeOptions {
9292
session?: ClientSession;
9393
documentsReturnedIn?: string;
9494
noResponse?: boolean;
95-
omitReadPreference?: boolean;
9695
omitMaxTimeMS?: boolean;
9796

9897
// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint

src/cursor/aggregation_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import type { Document } from '../bson';
22
import { MongoAPIError } from '../error';
33
import {
44
Explain,
5-
ExplainableCursor,
65
type ExplainCommandOptions,
76
type ExplainVerbosityLike,
87
validateExplainTimeoutOptions
@@ -19,6 +18,7 @@ import {
1918
CursorTimeoutMode,
2019
type InitialCursorResponse
2120
} from './abstract_cursor';
21+
import { ExplainableCursor } from './explainable_cursor';
2222

2323
/** @public */
2424
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}

src/cursor/explainable_cursor.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { type Document } from '../bson';
2+
import { type ExplainCommandOptions, type ExplainVerbosityLike } from '../explain';
3+
import { AbstractCursor } from './abstract_cursor';
4+
5+
/**
6+
* @public
7+
*
8+
* A base class for any cursors that have `explain()` methods.
9+
*/
10+
export abstract class ExplainableCursor<TSchema> extends AbstractCursor<TSchema> {
11+
/** Execute the explain for the cursor */
12+
abstract explain(): Promise<Document>;
13+
abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise<Document>;
14+
abstract explain(options: { timeoutMS?: number }): Promise<Document>;
15+
abstract explain(
16+
verbosity: ExplainVerbosityLike | ExplainCommandOptions,
17+
options: { timeoutMS?: number }
18+
): Promise<Document>;
19+
abstract explain(
20+
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
21+
options?: { timeoutMS?: number }
22+
): Promise<Document>;
23+
24+
protected resolveExplainTimeoutOptions(
25+
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
26+
options?: { timeoutMS?: number }
27+
): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } {
28+
let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined;
29+
let timeout: { timeoutMS?: number } | undefined;
30+
31+
if (verbosity == null && options == null) {
32+
explain = undefined;
33+
timeout = undefined;
34+
} else if (verbosity != null && options == null) {
35+
explain =
36+
typeof verbosity !== 'object'
37+
? verbosity
38+
: 'verbosity' in verbosity
39+
? verbosity
40+
: undefined;
41+
42+
timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined;
43+
} else {
44+
// @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options
45+
explain = verbosity;
46+
timeout = options;
47+
}
48+
49+
return { timeout, explain };
50+
}
51+
}

src/cursor/find_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { CursorResponse } from '../cmap/wire_protocol/responses';
33
import { MongoAPIError, MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
44
import {
55
Explain,
6-
ExplainableCursor,
76
type ExplainCommandOptions,
87
type ExplainVerbosityLike,
98
validateExplainTimeoutOptions
@@ -19,6 +18,7 @@ import type { ClientSession } from '../sessions';
1918
import { formatSort, type Sort, type SortDirection } from '../sort';
2019
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
2120
import { type InitialCursorResponse } from './abstract_cursor';
21+
import { ExplainableCursor } from './explainable_cursor';
2222

2323
/** @public Flags allowed for cursor */
2424
export const FLAGS = [

src/explain.ts

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { type Document } from './bson';
2-
import { AbstractCursor } from './cursor/abstract_cursor';
32
import { MongoAPIError } from './error';
43

54
/** @public */
@@ -123,51 +122,3 @@ export function decorateWithExplain(
123122

124123
return baseCommand;
125124
}
126-
127-
/**
128-
* @public
129-
*
130-
* A base class for any cursors that have `explain()` methods.
131-
*/
132-
export abstract class ExplainableCursor<TSchema> extends AbstractCursor<TSchema> {
133-
/** Execute the explain for the cursor */
134-
abstract explain(): Promise<Document>;
135-
abstract explain(verbosity: ExplainVerbosityLike | ExplainCommandOptions): Promise<Document>;
136-
abstract explain(options: { timeoutMS?: number }): Promise<Document>;
137-
abstract explain(
138-
verbosity: ExplainVerbosityLike | ExplainCommandOptions,
139-
options: { timeoutMS?: number }
140-
): Promise<Document>;
141-
abstract explain(
142-
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
143-
options?: { timeoutMS?: number }
144-
): Promise<Document>;
145-
146-
protected resolveExplainTimeoutOptions(
147-
verbosity?: ExplainVerbosityLike | ExplainCommandOptions | { timeoutMS?: number },
148-
options?: { timeoutMS?: number }
149-
): { timeout?: { timeoutMS?: number }; explain?: ExplainVerbosityLike | ExplainCommandOptions } {
150-
let explain: ExplainVerbosityLike | ExplainCommandOptions | undefined;
151-
let timeout: { timeoutMS?: number } | undefined;
152-
153-
if (verbosity == null && options == null) {
154-
explain = undefined;
155-
timeout = undefined;
156-
} else if (verbosity != null && options == null) {
157-
explain =
158-
typeof verbosity !== 'object'
159-
? verbosity
160-
: 'verbosity' in verbosity
161-
? verbosity
162-
: undefined;
163-
164-
timeout = typeof verbosity === 'object' && 'timeoutMS' in verbosity ? verbosity : undefined;
165-
} else {
166-
// @ts-expect-error TS isn't smart enough to determine that if both options are provided, the first is explain options
167-
explain = verbosity;
168-
timeout = options;
169-
}
170-
171-
return { timeout, explain };
172-
}
173-
}

src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { ListCollectionsCursor } from './cursor/list_collections_cursor';
1010
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
1111
import type { RunCommandCursor } from './cursor/run_command_cursor';
1212
import { Db } from './db';
13-
import { ExplainableCursor } from './explain';
1413
import { GridFSBucket } from './gridfs';
1514
import { GridFSBucketReadStream } from './gridfs/download';
1615
import { GridFSBucketWriteStream } from './gridfs/upload';
@@ -44,6 +43,7 @@ export {
4443
} from './bulk/common';
4544
export { ClientEncryption } from './client-side-encryption/client_encryption';
4645
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
46+
export { ExplainableCursor } from './cursor/explainable_cursor';
4747
export {
4848
MongoAPIError,
4949
MongoAWSError,
@@ -98,7 +98,6 @@ export {
9898
ClientSession,
9999
Collection,
100100
Db,
101-
ExplainableCursor,
102101
FindCursor,
103102
GridFSBucket,
104103
GridFSBucketReadStream,
@@ -511,6 +510,7 @@ export type {
511510
CollationOptions,
512511
CommandOperation,
513512
CommandOperationOptions,
513+
ModernizedCommandOperation,
514514
OperationParent
515515
} from './operations/command';
516516
export type { CountOptions } from './operations/count';

src/operations/aggregate.ts

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1+
import { type Connection } from '..';
12
import type { Document } from '../bson';
23
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
34
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
45
import { MongoInvalidArgumentError } from '../error';
56
import { type ExplainOptions } from '../explain';
6-
import type { Server } from '../sdam/server';
7-
import type { ClientSession } from '../sessions';
8-
import { type TimeoutContext } from '../timeout';
97
import { maxWireVersion, type MongoDBNamespace } from '../utils';
108
import { WriteConcern } from '../write_concern';
11-
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
9+
import {
10+
type CollationOptions,
11+
type CommandOperationOptions,
12+
ModernizedCommandOperation
13+
} from './command';
1214
import { Aspect, defineAspects, type Hint } from './operation';
1315

1416
/** @internal */
@@ -51,7 +53,8 @@ export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain
5153
}
5254

5355
/** @internal */
54-
export class AggregateOperation extends CommandOperation<CursorResponse> {
56+
export class AggregateOperation extends ModernizedCommandOperation<CursorResponse> {
57+
override SERVER_COMMAND_RESPONSE_TYPE = CursorResponse;
5558
override options: AggregateOptions;
5659
target: string | typeof DB_AGGREGATE_COLLECTION;
5760
pipeline: Document[];
@@ -79,9 +82,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
7982
}
8083
}
8184

82-
if (this.hasWriteStage) {
83-
this.trySecondaryWrite = true;
84-
} else {
85+
if (!this.hasWriteStage) {
8586
delete this.options.writeConcern;
8687
}
8788

@@ -94,6 +95,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
9495
if (options?.cursor != null && typeof options.cursor !== 'object') {
9596
throw new MongoInvalidArgumentError('Cursor options must be an object');
9697
}
98+
99+
this.SERVER_COMMAND_RESPONSE_TYPE = this.explain ? ExplainedCursorResponse : CursorResponse;
97100
}
98101

99102
override get commandName() {
@@ -108,13 +111,9 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
108111
this.pipeline.push(stage);
109112
}
110113

111-
override async execute(
112-
server: Server,
113-
session: ClientSession | undefined,
114-
timeoutContext: TimeoutContext
115-
): Promise<CursorResponse> {
116-
const options: AggregateOptions = this.options;
117-
const serverWireVersion = maxWireVersion(server);
114+
override buildCommandDocument(connection: Connection): Document {
115+
const options = this.options;
116+
const serverWireVersion = maxWireVersion(connection);
118117
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
119118

120119
if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
@@ -152,13 +151,13 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
152151
command.cursor.batchSize = options.batchSize;
153152
}
154153

155-
return await super.executeCommand(
156-
server,
157-
session,
158-
command,
159-
timeoutContext,
160-
this.explain ? ExplainedCursorResponse : CursorResponse
161-
);
154+
return command;
155+
}
156+
157+
override handleOk(
158+
response: InstanceType<typeof this.SERVER_COMMAND_RESPONSE_TYPE>
159+
): CursorResponse {
160+
return response;
162161
}
163162
}
164163

src/operations/command.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ import {
1111
import { ReadConcern } from '../read_concern';
1212
import type { ReadPreference } from '../read_preference';
1313
import type { Server, ServerCommandOptions } from '../sdam/server';
14-
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
1514
import type { ClientSession } from '../sessions';
1615
import { type TimeoutContext } from '../timeout';
17-
import { commandSupportsReadConcern, maxWireVersion, MongoDBNamespace } from '../utils';
16+
import { commandSupportsReadConcern, MongoDBNamespace } from '../utils';
1817
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
1918
import type { ReadConcernLike } from './../read_concern';
2019
import { AbstractOperation, Aspect, ModernizedOperation, type OperationOptions } from './operation';
@@ -150,17 +149,12 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
150149
session
151150
};
152151

153-
const serverWireVersion = maxWireVersion(server);
154152
const inTransaction = this.session && this.session.inTransaction();
155153

156154
if (this.readConcern && commandSupportsReadConcern(cmd) && !inTransaction) {
157155
Object.assign(cmd, { readConcern: this.readConcern });
158156
}
159157

160-
if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
161-
options.omitReadPreference = true;
162-
}
163-
164158
if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
165159
WriteConcern.apply(cmd, this.writeConcern);
166160
}
@@ -241,17 +235,12 @@ export abstract class ModernizedCommandOperation<T> extends ModernizedOperation<
241235
override buildCommand(connection: Connection, session?: ClientSession): Document {
242236
const command = this.buildCommandDocument(connection, session);
243237

244-
const serverWireVersion = maxWireVersion(connection);
245238
const inTransaction = this.session && this.session.inTransaction();
246239

247240
if (this.readConcern && commandSupportsReadConcern(command) && !inTransaction) {
248241
Object.assign(command, { readConcern: this.readConcern });
249242
}
250243

251-
if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
252-
command.omitReadPreference = true;
253-
}
254-
255244
if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
256245
WriteConcern.apply(command, this.writeConcern);
257246
}

src/operations/execute_operation.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
2727
import { TimeoutContext } from '../timeout';
2828
import { abortable, supportsRetryableWrites } from '../utils';
29+
import { AggregateOperation } from './aggregate';
2930
import { AbstractOperation, Aspect, ModernizedOperation } from './operation';
3031

3132
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
@@ -192,7 +193,7 @@ async function tryOperation<
192193
// server selection to potentially force monitor checks if the server is
193194
// in an unknown state.
194195
selector = sameServerSelector(operation.server?.description);
195-
} else if (operation.trySecondaryWrite) {
196+
} else if (operation instanceof AggregateOperation && operation.hasWriteStage) {
196197
// If operation should try to write to secondary use the custom server selector
197198
// otherwise provide the read preference.
198199
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);

src/operations/operation.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ export interface OperationOptions extends BSONSerializeOptions {
3333

3434
/** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
3535
bypassPinningCheck?: boolean;
36-
omitReadPreference?: boolean;
3736

3837
/** @internal Hint to `executeOperation` to omit maxTimeMS */
3938
omitMaxTimeMS?: boolean;
@@ -57,7 +56,6 @@ export abstract class AbstractOperation<TResult = any> {
5756
readPreference: ReadPreference;
5857
server!: Server;
5958
bypassPinningCheck: boolean;
60-
trySecondaryWrite: boolean;
6159

6260
// BSON serialization options
6361
bsonOptions?: BSONSerializeOptions;
@@ -83,7 +81,6 @@ export abstract class AbstractOperation<TResult = any> {
8381

8482
this.options = options;
8583
this.bypassPinningCheck = !!options.bypassPinningCheck;
86-
this.trySecondaryWrite = false;
8784
}
8885

8986
/** Must match the first key of the command object sent to the server.

0 commit comments

Comments
 (0)