1
+ import { type Connection } from '..' ;
1
2
import type { Document } from '../bson' ;
2
3
import { CursorResponse , ExplainedCursorResponse } from '../cmap/wire_protocol/responses' ;
3
4
import { type CursorTimeoutMode } from '../cursor/abstract_cursor' ;
4
5
import { MongoInvalidArgumentError } from '../error' ;
5
6
import { type ExplainOptions } from '../explain' ;
6
- import type { Server } from '../sdam/server' ;
7
- import type { ClientSession } from '../sessions' ;
8
- import { type TimeoutContext } from '../timeout' ;
9
7
import { maxWireVersion , type MongoDBNamespace } from '../utils' ;
10
8
import { WriteConcern } from '../write_concern' ;
11
- import { type CollationOptions , CommandOperation , type CommandOperationOptions } from './command' ;
9
+ import {
10
+ type CollationOptions ,
11
+ type CommandOperationOptions ,
12
+ ModernizedCommandOperation
13
+ } from './command' ;
12
14
import { Aspect , defineAspects , type Hint } from './operation' ;
13
15
14
16
/** @internal */
@@ -51,7 +53,8 @@ export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain
51
53
}
52
54
53
55
/** @internal */
54
- export class AggregateOperation extends CommandOperation < CursorResponse > {
56
+ export class AggregateOperation extends ModernizedCommandOperation < CursorResponse > {
57
+ override SERVER_COMMAND_RESPONSE_TYPE = CursorResponse ;
55
58
override options : AggregateOptions ;
56
59
target : string | typeof DB_AGGREGATE_COLLECTION ;
57
60
pipeline : Document [ ] ;
@@ -79,9 +82,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
79
82
}
80
83
}
81
84
82
- if ( this . hasWriteStage ) {
83
- this . trySecondaryWrite = true ;
84
- } else {
85
+ if ( ! this . hasWriteStage ) {
85
86
delete this . options . writeConcern ;
86
87
}
87
88
@@ -94,6 +95,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
94
95
if ( options ?. cursor != null && typeof options . cursor !== 'object' ) {
95
96
throw new MongoInvalidArgumentError ( 'Cursor options must be an object' ) ;
96
97
}
98
+
99
+ this . SERVER_COMMAND_RESPONSE_TYPE = this . explain ? ExplainedCursorResponse : CursorResponse ;
97
100
}
98
101
99
102
override get commandName ( ) {
@@ -108,13 +111,9 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
108
111
this . pipeline . push ( stage ) ;
109
112
}
110
113
111
- override async execute (
112
- server : Server ,
113
- session : ClientSession | undefined ,
114
- timeoutContext : TimeoutContext
115
- ) : Promise < CursorResponse > {
116
- const options : AggregateOptions = this . options ;
117
- const serverWireVersion = maxWireVersion ( server ) ;
114
+ override buildCommandDocument ( connection : Connection ) : Document {
115
+ const options = this . options ;
116
+ const serverWireVersion = maxWireVersion ( connection ) ;
118
117
const command : Document = { aggregate : this . target , pipeline : this . pipeline } ;
119
118
120
119
if ( this . hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT ) {
@@ -152,13 +151,13 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
152
151
command . cursor . batchSize = options . batchSize ;
153
152
}
154
153
155
- return await super . executeCommand (
156
- server ,
157
- session ,
158
- command ,
159
- timeoutContext ,
160
- this . explain ? ExplainedCursorResponse : CursorResponse
161
- ) ;
154
+ return command ;
155
+ }
156
+
157
+ override handleOk (
158
+ response : InstanceType < typeof this . SERVER_COMMAND_RESPONSE_TYPE >
159
+ ) : CursorResponse {
160
+ return response ;
162
161
}
163
162
}
164
163
0 commit comments