|
1 | | -import { MongoQuery } from '@sofie-automation/corelib/dist/mongo' |
2 | | -import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString' |
| 1 | +import { MongoModifier, MongoQuery } from '@sofie-automation/corelib/dist/mongo' |
| 2 | +import { ProtectedString, protectString, unprotectString } from '@sofie-automation/corelib/dist/protectedString' |
3 | 3 | import { Meteor } from 'meteor/meteor' |
| 4 | +import { Mongo } from 'meteor/mongo' |
4 | 5 | import { |
| 6 | + UpdateOptions, |
| 7 | + UpsertOptions, |
| 8 | + IndexSpecifier, |
| 9 | + MongoCursor, |
5 | 10 | FindOptions, |
6 | 11 | ObserveChangesCallbacks, |
7 | 12 | ObserveCallbacks, |
8 | 13 | } from '@sofie-automation/meteor-lib/dist/collections/lib' |
| 14 | +import type { AnyBulkWriteOperation, Collection as RawCollection, Db as RawDb } from 'mongodb' |
| 15 | +import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError' |
| 16 | +import { NpmModuleMongodb } from 'meteor/npm-mongo' |
| 17 | +import { profiler } from '../../api/profiler' |
9 | 18 | import { PromisifyCallbacks } from '@sofie-automation/shared-lib/dist/lib/types' |
10 | | -import type { AnyBulkWriteOperation } from 'mongodb' |
11 | | -import { MinimalMongoCursor, WrappedMongoCollectionBase } from './base' |
12 | 19 | import { AsyncOnlyMongoCollection } from '../collection' |
13 | | -import { profiler } from '../../api/profiler' |
| 20 | + |
| 21 | +export type MinimalMongoCursor<T extends { _id: ProtectedString<any> }> = Pick< |
| 22 | + MongoCursor<T>, |
| 23 | + 'fetchAsync' | 'observeChangesAsync' | 'observeAsync' | 'countAsync' |
| 24 | + // | 'forEach' | 'map' | |
| 25 | +> |
| 26 | + |
| 27 | +export type MinimalMeteorMongoCollection<T extends { _id: ProtectedString<any> }> = Pick< |
| 28 | + Mongo.Collection<T>, |
| 29 | + // | 'find' |
| 30 | + 'insertAsync' | 'removeAsync' | 'updateAsync' | 'upsertAsync' | 'rawCollection' | 'rawDatabase' | 'createIndex' |
| 31 | +> & { |
| 32 | + find: (...args: Parameters<Mongo.Collection<T>['find']>) => MinimalMongoCursor<T> |
| 33 | +} |
14 | 34 |
|
15 | 35 | export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedString<any> }> |
16 | | - extends WrappedMongoCollectionBase<DBInterface> |
17 | 36 | implements AsyncOnlyMongoCollection<DBInterface> |
18 | 37 | { |
| 38 | + protected readonly _collection: MinimalMeteorMongoCollection<DBInterface> |
| 39 | + |
| 40 | + public readonly name: string | null |
| 41 | + |
| 42 | + constructor(collection: Mongo.Collection<DBInterface>, name: string | null) { |
| 43 | + this._collection = collection as any |
| 44 | + this.name = name |
| 45 | + } |
| 46 | + |
| 47 | + protected get _isMock(): boolean { |
| 48 | + // @ts-expect-error re-export private property |
| 49 | + return this._collection._isMock |
| 50 | + } |
| 51 | + |
| 52 | + public get mockCollection(): MinimalMeteorMongoCollection<DBInterface> { |
| 53 | + return this._collection |
| 54 | + } |
| 55 | + |
19 | 56 | get mutableCollection(): AsyncOnlyMongoCollection<DBInterface> { |
20 | 57 | return this |
21 | 58 | } |
22 | 59 |
|
| 60 | + protected wrapMongoError(e: unknown): never { |
| 61 | + const str = stringifyError(e) || 'Unknown MongoDB Error' |
| 62 | + throw new Meteor.Error(e instanceof Meteor.Error ? e.error : 500, `Collection "${this.name}": ${str}`) |
| 63 | + } |
| 64 | + |
| 65 | + rawCollection(): RawCollection<DBInterface> { |
| 66 | + return this._collection.rawCollection() as any |
| 67 | + } |
| 68 | + protected rawDatabase(): RawDb { |
| 69 | + return this._collection.rawDatabase() as any |
| 70 | + } |
| 71 | + |
23 | 72 | async findFetchAsync( |
24 | 73 | selector: MongoQuery<DBInterface> | DBInterface['_id'], |
25 | 74 | options?: FindOptions<DBInterface> |
@@ -131,10 +180,115 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr |
131 | 180 | } |
132 | 181 | } |
133 | 182 |
|
| 183 | + public async countDocuments( |
| 184 | + selector?: MongoQuery<DBInterface>, |
| 185 | + options?: FindOptions<DBInterface> |
| 186 | + ): Promise<number> { |
| 187 | + const span = profiler.startSpan(`MongoCollection.${this.name}.countDocuments`) |
| 188 | + if (span) { |
| 189 | + span.addLabels({ |
| 190 | + collection: this.name, |
| 191 | + query: JSON.stringify(selector), |
| 192 | + }) |
| 193 | + } |
| 194 | + try { |
| 195 | + const res = await this._collection.find((selector ?? {}) as any, options as any).countAsync() |
| 196 | + if (span) span.end() |
| 197 | + return res |
| 198 | + } catch (e) { |
| 199 | + if (span) span.end() |
| 200 | + this.wrapMongoError(e) |
| 201 | + } |
| 202 | + } |
| 203 | + |
| 204 | + public async insertAsync(doc: DBInterface): Promise<DBInterface['_id']> { |
| 205 | + const span = profiler.startSpan(`MongoCollection.${this.name}.insert`) |
| 206 | + if (span) { |
| 207 | + span.addLabels({ |
| 208 | + collection: this.name, |
| 209 | + id: unprotectString(doc._id), |
| 210 | + }) |
| 211 | + } |
| 212 | + try { |
| 213 | + const resultId = await this._collection.insertAsync(doc as unknown as Mongo.OptionalId<DBInterface>) |
| 214 | + if (span) span.end() |
| 215 | + return protectString(resultId) |
| 216 | + } catch (e) { |
| 217 | + if (span) span.end() |
| 218 | + this.wrapMongoError(e) |
| 219 | + } |
| 220 | + } |
| 221 | + |
134 | 222 | async insertManyAsync(docs: DBInterface[]): Promise<Array<DBInterface['_id']>> { |
135 | 223 | return Promise.all(docs.map(async (doc) => this.insertAsync(doc))) |
136 | 224 | } |
137 | 225 |
|
| 226 | + public async removeAsync(selector: MongoQuery<DBInterface> | DBInterface['_id']): Promise<number> { |
| 227 | + const span = profiler.startSpan(`MongoCollection.${this.name}.remove`) |
| 228 | + if (span) { |
| 229 | + span.addLabels({ |
| 230 | + collection: this.name, |
| 231 | + query: JSON.stringify(selector), |
| 232 | + }) |
| 233 | + } |
| 234 | + try { |
| 235 | + const res = await this._collection.removeAsync(selector as any) |
| 236 | + if (span) span.end() |
| 237 | + return res |
| 238 | + } catch (e) { |
| 239 | + if (span) span.end() |
| 240 | + this.wrapMongoError(e) |
| 241 | + } |
| 242 | + } |
| 243 | + public async updateAsync( |
| 244 | + selector: MongoQuery<DBInterface> | DBInterface['_id'] | { _id: DBInterface['_id'] }, |
| 245 | + modifier: MongoModifier<DBInterface>, |
| 246 | + options?: UpdateOptions |
| 247 | + ): Promise<number> { |
| 248 | + const span = profiler.startSpan(`MongoCollection.${this.name}.update`) |
| 249 | + if (span) { |
| 250 | + span.addLabels({ |
| 251 | + collection: this.name, |
| 252 | + query: JSON.stringify(selector), |
| 253 | + }) |
| 254 | + } |
| 255 | + try { |
| 256 | + const res = await this._collection.updateAsync(selector as any, modifier as any, options) |
| 257 | + if (span) span.end() |
| 258 | + return res |
| 259 | + } catch (e) { |
| 260 | + if (span) span.end() |
| 261 | + this.wrapMongoError(e) |
| 262 | + } |
| 263 | + } |
| 264 | + public async upsertAsync( |
| 265 | + selector: MongoQuery<DBInterface> | DBInterface['_id'] | { _id: DBInterface['_id'] }, |
| 266 | + modifier: MongoModifier<DBInterface>, |
| 267 | + options?: UpsertOptions |
| 268 | + ): Promise<{ |
| 269 | + numberAffected?: number |
| 270 | + insertedId?: DBInterface['_id'] |
| 271 | + }> { |
| 272 | + const span = profiler.startSpan(`MongoCollection.${this.name}.upsert`) |
| 273 | + if (span) { |
| 274 | + span.addLabels({ |
| 275 | + collection: this.name, |
| 276 | + query: JSON.stringify(selector), |
| 277 | + }) |
| 278 | + } |
| 279 | + try { |
| 280 | + const result = await this._collection.upsertAsync(selector as any, modifier as any, options) |
| 281 | + if (span) span.end() |
| 282 | + return { |
| 283 | + numberAffected: result.numberAffected, |
| 284 | + insertedId: protectString(result.insertedId), |
| 285 | + } |
| 286 | + } catch (e) { |
| 287 | + if (span) span.end() |
| 288 | + this.wrapMongoError(e) |
| 289 | + } |
| 290 | + } |
| 291 | + |
138 | 292 | async upsertManyAsync(docs: DBInterface[]): Promise<{ numberAffected: number; insertedIds: DBInterface['_id'][] }> { |
139 | 293 | const result: { |
140 | 294 | numberAffected: number |
@@ -178,16 +332,16 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr |
178 | 332 | if (span) span.end() |
179 | 333 | } |
180 | 334 |
|
181 | | - async countDocuments(selector?: MongoQuery<DBInterface>, options?: FindOptions<DBInterface>): Promise<number> { |
182 | | - const span = profiler.startSpan(`MongoCollection.${this.name}.countDocuments`) |
| 335 | + createIndex(keys: IndexSpecifier<DBInterface> | string, options?: NpmModuleMongodb.CreateIndexesOptions): void { |
| 336 | + const span = profiler.startSpan(`MongoCollection.${this.name}.createIndex`) |
183 | 337 | if (span) { |
184 | 338 | span.addLabels({ |
185 | 339 | collection: this.name, |
186 | | - query: JSON.stringify(selector), |
| 340 | + keys: JSON.stringify(keys), |
187 | 341 | }) |
188 | 342 | } |
189 | 343 | try { |
190 | | - const res = await this._collection.find((selector ?? {}) as any, options as any).countAsync() |
| 344 | + const res = this._collection.createIndex(keys as any, options) |
191 | 345 | if (span) span.end() |
192 | 346 | return res |
193 | 347 | } catch (e) { |
|
0 commit comments