Skip to content

Commit 270dd6c

Browse files
create operations
1 parent 302f8e0 commit 270dd6c

File tree

2 files changed

+78
-63
lines changed

2 files changed

+78
-63
lines changed

src/db.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ import type { MongoClient, PkFactory } from './mongo_client';
1111
import type { Abortable, TODO_NODE_3286 } from './mongo_types';
1212
import type { AggregateOptions } from './operations/aggregate';
1313
import { CollectionsOperation } from './operations/collections';
14-
import {
15-
CreateCollectionOperation,
16-
type CreateCollectionOptions
17-
} from './operations/create_collection';
14+
import { type CreateCollectionOptions, createCollections } from './operations/create_collection';
1815
import {
1916
type DropCollectionOptions,
2017
dropCollections,
@@ -242,10 +239,15 @@ export class Db {
242239
name: string,
243240
options?: CreateCollectionOptions
244241
): Promise<Collection<TSchema>> {
245-
return await executeOperation(
246-
this.client,
247-
new CreateCollectionOperation(this, name, resolveOptions(this, options)) as TODO_NODE_3286
248-
);
242+
options = resolveOptions(this, { ...options });
243+
if (options.session) {
244+
return await createCollections<TSchema>(this, name, options);
245+
}
246+
247+
return await this.client.withSession({ explicit: false }, async session => {
248+
options.session = session;
249+
return await createCollections<TSchema>(this, name, options);
250+
});
249251
}
250252

251253
/**

src/operations/create_collection.ts

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import { MongoCompatibilityError } from '../error';
99
import type { PkFactory } from '../mongo_client';
1010
import type { Server } from '../sdam/server';
1111
import type { ClientSession } from '../sessions';
12-
import { type TimeoutContext } from '../timeout';
12+
import { TimeoutContext } from '../timeout';
1313
import { CommandOperation, type CommandOperationOptions } from './command';
14+
import { executeOperation } from './execute_operation';
1415
import { CreateIndexesOperation } from './indexes';
1516
import { Aspect, defineAspects } from './operation';
1617

@@ -102,6 +103,9 @@ export interface CreateCollectionOptions extends CommandOperationOptions {
102103
* change streams that listen on this collection.
103104
*/
104105
changeStreamPreAndPostImages?: { enabled: boolean };
106+
107+
/** @internal */
108+
isEncryptedCollection?: boolean;
105109
}
106110

107111
/* @internal */
@@ -131,15 +135,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
131135
session: ClientSession | undefined,
132136
timeoutContext: TimeoutContext
133137
): Promise<Collection> {
134-
const db = this.db;
135-
const name = this.name;
136-
const options = this.options;
137-
138-
const encryptedFields: Document | undefined =
139-
options.encryptedFields ??
140-
db.client.s.options.autoEncryption?.encryptedFieldsMap?.[`${db.databaseName}.${name}`];
141-
142-
if (encryptedFields) {
138+
if (this.options.isEncryptedCollection) {
143139
// Creating a QE collection required min server of 7.0.0
144140
// TODO(NODE-5353): Get wire version information from connection.
145141
if (
@@ -150,64 +146,81 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
150146
`${INVALID_QE_VERSION} The minimum server version required is ${MIN_SUPPORTED_QE_SERVER_VERSION}`
151147
);
152148
}
153-
// Create auxilliary collections for queryable encryption support.
154-
const escCollection = encryptedFields.escCollection ?? `enxcol_.${name}.esc`;
155-
const ecocCollection = encryptedFields.ecocCollection ?? `enxcol_.${name}.ecoc`;
156-
157-
for (const collectionName of [escCollection, ecocCollection]) {
158-
const createOp = new CreateCollectionOperation(db, collectionName, {
159-
clusteredIndex: {
160-
key: { _id: 1 },
161-
unique: true
162-
}
163-
});
164-
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
165-
}
166-
167-
if (!options.encryptedFields) {
168-
this.options = { ...this.options, encryptedFields };
169-
}
170-
}
171-
172-
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
173-
174-
if (encryptedFields) {
175-
// Create the required index for queryable encryption support.
176-
const createIndexOp = CreateIndexesOperation.fromIndexSpecification(
177-
db,
178-
name,
179-
{ __safeContent__: 1 },
180-
{}
181-
);
182-
await createIndexOp.execute(server, session, timeoutContext);
183149
}
184150

185-
return coll;
186-
}
187-
188-
private async executeWithoutEncryptedFieldsCheck(
189-
server: Server,
190-
session: ClientSession | undefined,
191-
timeoutContext: TimeoutContext
192-
): Promise<Collection> {
193151
const db = this.db;
194152
const name = this.name;
195153
const options = this.options;
196154

197155
const cmd: Document = { create: name };
198-
for (const n in options) {
199-
if (
200-
(options as any)[n] != null &&
201-
typeof (options as any)[n] !== 'function' &&
202-
!ILLEGAL_COMMAND_FIELDS.has(n)
203-
) {
204-
cmd[n] = (options as any)[n];
156+
for (const [option, value] of Object.entries(options)) {
157+
if (value != null && typeof value !== 'function' && !ILLEGAL_COMMAND_FIELDS.has(option)) {
158+
cmd[option] = value;
205159
}
206160
}
161+
207162
// otherwise just execute the command
208163
await super.executeCommand(server, session, cmd, timeoutContext);
209164
return new Collection(db, name, options);
210165
}
211166
}
212167

168+
export async function createCollections<TSchema extends Document>(
169+
db: Db,
170+
name: string,
171+
options: CreateCollectionOptions
172+
): Promise<Collection<TSchema>> {
173+
const timeoutContext = TimeoutContext.create({
174+
session: options.session,
175+
serverSelectionTimeoutMS: db.client.s.options.serverSelectionTimeoutMS,
176+
waitQueueTimeoutMS: db.client.s.options.waitQueueTimeoutMS,
177+
timeoutMS: options.timeoutMS
178+
});
179+
180+
const encryptedFields: Document | undefined =
181+
options.encryptedFields ??
182+
db.client.s.options.autoEncryption?.encryptedFieldsMap?.[`${db.databaseName}.${name}`];
183+
184+
if (encryptedFields) {
185+
// Create auxilliary collections for queryable encryption support.
186+
const escCollection = encryptedFields.escCollection ?? `enxcol_.${name}.esc`;
187+
const ecocCollection = encryptedFields.ecocCollection ?? `enxcol_.${name}.ecoc`;
188+
189+
for (const collectionName of [escCollection, ecocCollection]) {
190+
const createOp = new CreateCollectionOperation(db, collectionName, {
191+
clusteredIndex: {
192+
key: { _id: 1 },
193+
unique: true
194+
},
195+
isEncryptedCollection: true,
196+
session: options.session
197+
});
198+
await executeOperation(db.client, createOp, timeoutContext);
199+
}
200+
201+
if (!options.encryptedFields) {
202+
options = { ...options, encryptedFields };
203+
}
204+
}
205+
206+
const coll = await executeOperation(
207+
db.client,
208+
new CreateCollectionOperation(db, name, options),
209+
timeoutContext
210+
);
211+
212+
if (encryptedFields) {
213+
// Create the required index for queryable encryption support.
214+
const createIndexOp = CreateIndexesOperation.fromIndexSpecification(
215+
db,
216+
name,
217+
{ __safeContent__: 1 },
218+
{ session: options.session }
219+
);
220+
await executeOperation(db.client, createIndexOp, timeoutContext);
221+
}
222+
223+
return coll as unknown as Collection<TSchema>;
224+
}
225+
213226
defineAspects(CreateCollectionOperation, [Aspect.WRITE_OPERATION]);

0 commit comments

Comments
 (0)