@@ -8,16 +8,21 @@ const ChangeStream = require('./cursor/changeStream');
88const EventEmitter = require ( 'events' ) . EventEmitter ;
99const Schema = require ( './schema' ) ;
1010const STATES = require ( './connectionState' ) ;
11+ const MongooseBulkWriteError = require ( './error/bulkWriteError' ) ;
1112const MongooseError = require ( './error/index' ) ;
1213const ServerSelectionError = require ( './error/serverSelection' ) ;
1314const SyncIndexesError = require ( './error/syncIndexes' ) ;
1415const applyPlugins = require ( './helpers/schema/applyPlugins' ) ;
1516const clone = require ( './helpers/clone' ) ;
1617const driver = require ( './driver' ) ;
1718const get = require ( './helpers/get' ) ;
19+ const getDefaultBulkwriteResult = require ( './helpers/getDefaultBulkwriteResult' ) ;
1820const immediate = require ( './helpers/immediate' ) ;
1921const utils = require ( './utils' ) ;
2022const CreateCollectionsError = require ( './error/createCollectionsError' ) ;
23+ const castBulkWrite = require ( './helpers/model/castBulkWrite' ) ;
24+ const { modelSymbol } = require ( './helpers/symbols' ) ;
25+ const isPromise = require ( './helpers/isPromise' ) ;
2126
2227const arrayAtomicsSymbol = require ( './helpers/symbols' ) . arrayAtomicsSymbol ;
2328const sessionNewDocuments = require ( './helpers/symbols' ) . sessionNewDocuments ;
@@ -416,6 +421,178 @@ Connection.prototype.createCollection = async function createCollection(collecti
416421 return this . db . createCollection ( collection , options ) ;
417422} ;
418423
424+ /**
425+ * _Requires MongoDB Server 8.0 or greater_. Executes bulk write operations across multiple models in a single operation.
426+ * You must specify the `model` for each operation: Mongoose will use `model` for casting and validation, as well as
427+ * determining which collection to apply the operation to.
428+ *
429+ * #### Example:
430+ * const Test = mongoose.model('Test', new Schema({ name: String }));
431+ *
432+ * await db.bulkWrite([
433+ * { model: Test, name: 'insertOne', document: { name: 'test1' } }, // Can specify model as a Model class...
434+ * { model: 'Test', name: 'insertOne', document: { name: 'test2' } } // or as a model name
435+ * ], { ordered: false });
436+ *
437+ * @method bulkWrite
438+ * @param {Array } ops
439+ * @param {Object } [options]
440+ * @param {Boolean } [options.ordered] If false, perform unordered operations. If true, perform ordered operations.
441+ * @param {Session } [options.session] The session to use for the operation.
442+ * @return {Promise }
443+ * @see MongoDB https://www.mongodb.com/docs/manual/reference/command/bulkWrite/#mongodb-dbcommand-dbcmd.bulkWrite
444+ * @api public
445+ */
446+
447+
448+ Connection . prototype . bulkWrite = async function bulkWrite ( ops , options ) {
449+ await this . _waitForConnect ( ) ;
450+ options = options || { } ;
451+
452+ const ordered = options . ordered == null ? true : options . ordered ;
453+ const asyncLocalStorage = this . base . transactionAsyncLocalStorage ?. getStore ( ) ;
454+ if ( ( ! options || ! options . hasOwnProperty ( 'session' ) ) && asyncLocalStorage ?. session != null ) {
455+ options = { ...options , session : asyncLocalStorage . session } ;
456+ }
457+
458+ const now = this . base . now ( ) ;
459+
460+ let res = null ;
461+ if ( ordered ) {
462+ const opsToSend = [ ] ;
463+ for ( const op of ops ) {
464+ if ( typeof op . model !== 'string' && ! op . model ?. [ modelSymbol ] ) {
465+ throw new MongooseError ( 'Must specify model in Connection.prototype.bulkWrite() operations' ) ;
466+ }
467+ const Model = op . model [ modelSymbol ] ? op . model : this . model ( op . model ) ;
468+
469+ if ( op . name == null ) {
470+ throw new MongooseError ( 'Must specify operation name in Connection.prototype.bulkWrite()' ) ;
471+ }
472+ if ( ! castBulkWrite . cast . hasOwnProperty ( op . name ) ) {
473+ throw new MongooseError ( `Unrecognized bulkWrite() operation name ${ op . name } ` ) ;
474+ }
475+
476+ await castBulkWrite . cast [ op . name ] ( Model , op , options , now ) ;
477+ opsToSend . push ( { ...op , namespace : Model . namespace ( ) } ) ;
478+ }
479+
480+ res = await this . client . bulkWrite ( opsToSend , options ) ;
481+ } else {
482+ const validOps = [ ] ;
483+ const validOpIndexes = [ ] ;
484+ let validationErrors = [ ] ;
485+ const asyncValidations = [ ] ;
486+ const results = [ ] ;
487+ for ( let i = 0 ; i < ops . length ; ++ i ) {
488+ const op = ops [ i ] ;
489+ if ( typeof op . model !== 'string' && ! op . model ?. [ modelSymbol ] ) {
490+ const error = new MongooseError ( 'Must specify model in Connection.prototype.bulkWrite() operations' ) ;
491+ validationErrors . push ( { index : i , error : error } ) ;
492+ results [ i ] = error ;
493+ continue ;
494+ }
495+ let Model ;
496+ try {
497+ Model = op . model [ modelSymbol ] ? op . model : this . model ( op . model ) ;
498+ } catch ( error ) {
499+ validationErrors . push ( { index : i , error : error } ) ;
500+ continue ;
501+ }
502+ if ( op . name == null ) {
503+ const error = new MongooseError ( 'Must specify operation name in Connection.prototype.bulkWrite()' ) ;
504+ validationErrors . push ( { index : i , error : error } ) ;
505+ results [ i ] = error ;
506+ continue ;
507+ }
508+ if ( ! castBulkWrite . cast . hasOwnProperty ( op . name ) ) {
509+ const error = new MongooseError ( `Unrecognized bulkWrite() operation name ${ op . name } ` ) ;
510+ validationErrors . push ( { index : i , error : error } ) ;
511+ results [ i ] = error ;
512+ continue ;
513+ }
514+
515+ let maybePromise = null ;
516+ try {
517+ maybePromise = castBulkWrite . cast [ op . name ] ( Model , op , options , now ) ;
518+ } catch ( error ) {
519+ validationErrors . push ( { index : i , error : error } ) ;
520+ results [ i ] = error ;
521+ continue ;
522+ }
523+ if ( isPromise ( maybePromise ) ) {
524+ asyncValidations . push (
525+ maybePromise . then (
526+ ( ) => {
527+ validOps . push ( { ...op , namespace : Model . namespace ( ) } ) ;
528+ validOpIndexes . push ( i ) ;
529+ } ,
530+ error => {
531+ validationErrors . push ( { index : i , error : error } ) ;
532+ results [ i ] = error ;
533+ }
534+ )
535+ ) ;
536+ } else {
537+ validOps . push ( { ...op , namespace : Model . namespace ( ) } ) ;
538+ validOpIndexes . push ( i ) ;
539+ }
540+ }
541+
542+ if ( asyncValidations . length > 0 ) {
543+ await Promise . all ( asyncValidations ) ;
544+ }
545+
546+ validationErrors = validationErrors .
547+ sort ( ( v1 , v2 ) => v1 . index - v2 . index ) .
548+ map ( v => v . error ) ;
549+
550+ if ( validOps . length === 0 ) {
551+ if ( options . throwOnValidationError && validationErrors . length ) {
552+ throw new MongooseBulkWriteError (
553+ validationErrors ,
554+ results ,
555+ res ,
556+ 'bulkWrite'
557+ ) ;
558+ }
559+ return getDefaultBulkwriteResult ( ) ;
560+ }
561+
562+ let error ;
563+ [ res , error ] = await this . client . bulkWrite ( validOps , options ) .
564+ then ( res => ( [ res , null ] ) ) .
565+ catch ( err => ( [ null , err ] ) ) ;
566+
567+ if ( error ) {
568+ if ( validationErrors . length > 0 ) {
569+ error . mongoose = error . mongoose || { } ;
570+ error . mongoose . validationErrors = validationErrors ;
571+ }
572+ }
573+
574+ for ( let i = 0 ; i < validOpIndexes . length ; ++ i ) {
575+ results [ validOpIndexes [ i ] ] = null ;
576+ }
577+ if ( validationErrors . length > 0 ) {
578+ if ( options . throwOnValidationError ) {
579+ throw new MongooseBulkWriteError (
580+ validationErrors ,
581+ results ,
582+ res ,
583+ 'bulkWrite'
584+ ) ;
585+ } else {
586+ res . mongoose = res . mongoose || { } ;
587+ res . mongoose . validationErrors = validationErrors ;
588+ res . mongoose . results = results ;
589+ }
590+ }
591+ }
592+
593+ return res ;
594+ } ;
595+
419596/**
420597 * Calls `createCollection()` on a models in a series.
421598 *
0 commit comments