Skip to content

Commit a7e1659

Browse files
committed
refacorted insert and insertMany to leverage bulkWrite to allow for more than 1000 doc inserts
1 parent 9a63469 commit a7e1659

24 files changed

+412
-246
lines changed

lib/aggregation_cursor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ for(var name in CoreCursor.prototype) {
152152
* @return {AggregationCursor}
153153
*/
154154
AggregationCursor.prototype.batchSize = function(value) {
155-
if(this.s.state == AggregationCursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
156-
if(typeof value != 'number') throw new MongoError("batchSize requires an integer");
155+
if(this.s.state == AggregationCursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true });
156+
if(typeof value != 'number') throw MongoError.create({message: "batchSize requires an integer", drvier:true });
157157
if(this.s.cmd.cursor) this.s.cmd.cursor.batchSize = value;
158158
this.setCursorBatchSize(value);
159159
return this;

lib/bulk/common.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var REMOVE = 3
1818
var writeConcern = function(target, col, options) {
1919
if(options.w != null || options.j != null || options.fsync != null) {
2020
target.writeConcern = options;
21-
} else if(col.writeConcern.w != null || col.writeConcern.j != null || col.writeConcern.fsync != null) {
21+
} else if(col.writeConcern.w != null || col.writeConcern.j != null || col.writeConcern.fsync != null) {
2222
target.writeConcern = col.writeConcern;
2323
}
2424

@@ -43,7 +43,7 @@ var defineReadOnlyProperty = function(self, name, value) {
4343
* correctly after command execution
4444
* @ignore
4545
*/
46-
var Batch = function(batchType, originalZeroIndex) {
46+
var Batch = function(batchType, originalZeroIndex) {
4747
this.originalZeroIndex = originalZeroIndex;
4848
this.currentIndex = 0;
4949
this.originalIndexes = [];
@@ -108,7 +108,7 @@ var BulkWriteResult = function(bulkResult) {
108108
* @return {object}
109109
*/
110110
this.getUpsertedIdAt = function(index) {
111-
return bulkResult.upserted[index];
111+
return bulkResult.upserted[index];
112112
}
113113

114114
/**
@@ -186,7 +186,7 @@ var BulkWriteResult = function(bulkResult) {
186186
for(var i = 0; i < bulkResult.writeConcernErrors.length; i++) {
187187
var err = bulkResult.writeConcernErrors[i];
188188
errmsg = errmsg + err.errmsg;
189-
189+
190190
// TODO: Something better
191191
if(i == 0) errmsg = errmsg + " and ";
192192
}
@@ -289,7 +289,7 @@ var mergeBatchResults = function(ordered, batch, bulkResult, err, result) {
289289
, op: batch.operations[0]
290290
};
291291

292-
bulkResult.writeErrors.push(new WriteError(writeError));
292+
bulkResult.writeErrors.push(new WriteError(writeError));
293293
return;
294294
} else if(result.ok == 0 && bulkResult.ok == 0) {
295295
return;
@@ -337,7 +337,7 @@ var mergeBatchResults = function(ordered, batch, bulkResult, err, result) {
337337
var nModified = result.nModified;
338338
bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
339339
bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);
340-
340+
341341
if(typeof nModified == 'number') {
342342
bulkResult.nModified = bulkResult.nModified + nModified;
343343
} else {
@@ -390,4 +390,4 @@ exports.MULTIPLE_ERROR = MULTIPLE_ERROR;
390390
exports.UNKNOWN_ERROR = UNKNOWN_ERROR;
391391
exports.INSERT = INSERT;
392392
exports.UPDATE = UPDATE;
393-
exports.REMOVE = REMOVE;
393+
exports.REMOVE = REMOVE;

lib/bulk/ordered.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ function OrderedBulkOperation(topology, collection, options) {
283283
, collection: collection
284284
// Promise Library
285285
, promiseLibrary: promiseLibrary
286+
// Fundamental error
287+
, err: null
286288
}
287289
}
288290

@@ -391,6 +393,11 @@ var executeCommands = function(self, callback) {
391393
var batch = self.s.batches.shift();
392394

393395
var resultHandler = function(err, result) {
396+
// Error is a driver related error not a bulk op error, terminate
397+
if(err && err.driver || err && err.message) {
398+
return callback(err);
399+
}
400+
394401
// If we have and error
395402
if(err) err.ok = 0;
396403
// Merge the results together
@@ -419,6 +426,11 @@ var executeCommands = function(self, callback) {
419426
resultHandler.operationId = self.operationId;
420427
}
421428

429+
// Serialize functions
430+
if(self.s.options.serializeFunctions) {
431+
finalOptions.serializeFunctions = true
432+
}
433+
422434
try {
423435
if(batch.batchType == common.INSERT) {
424436
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
@@ -473,7 +485,9 @@ OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
473485
}
474486

475487
// Execute using callback
476-
if(typeof callback == 'function') return executeCommands(this, callback);
488+
if(typeof callback == 'function') {
489+
return executeCommands(this, callback);
490+
}
477491

478492
// Return a Promise
479493
return new this.s.promiseLibrary(function(resolve, reject) {

lib/bulk/unordered.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,11 @@ var executeBatch = function(self, batch, callback) {
399399
}
400400

401401
var resultHandler = function(err, result) {
402+
// Error is a driver related error not a bulk op error, terminate
403+
if(err && err.driver || err && err.message) {
404+
return callback(err);
405+
}
406+
402407
// If we have and error
403408
if(err) err.ok = 0;
404409
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
@@ -409,6 +414,11 @@ var executeBatch = function(self, batch, callback) {
409414
resultHandler.operationId = self.operationId;
410415
}
411416

417+
// Serialize functions
418+
if(self.s.options.serializeFunctions) {
419+
finalOptions.serializeFunctions = true
420+
}
421+
412422
try {
413423
if(batch.batchType == common.INSERT) {
414424
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
@@ -429,13 +439,20 @@ var executeBatch = function(self, batch, callback) {
429439
// Execute all the commands
430440
var executeBatches = function(self, callback) {
431441
var numberOfCommandsToExecute = self.s.batches.length;
442+
var error = null;
432443
// Execute over all the batches
433444
for(var i = 0; i < self.s.batches.length; i++) {
434445
executeBatch(self, self.s.batches[i], function(err, result) {
446+
// Driver layer error capture it
447+
if(err) error = err;
448+
// Count down the number of commands left to execute
435449
numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
436450

437451
// Execute
438452
if(numberOfCommandsToExecute == 0) {
453+
// Driver level error
454+
if(error) return callback(error);
455+
// Treat write errors
439456
var error = self.s.bulkResult.writeErrors.length > 0 ? self.s.bulkResult.writeErrors[0] : null;
440457
callback(error, new BulkWriteResult(self.s.bulkResult));
441458
}

lib/collection.js

Lines changed: 73 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ Collection.prototype.find = function() {
277277

278278
// Ensure the query is an object
279279
if(selector != null && typeof selector != 'object') {
280-
throw new MongoError("query selector must be an object");
280+
throw MongoError.create({message: "query selector must be an object", driver:true });
281281
}
282282

283283
// Build the find command
@@ -357,7 +357,7 @@ Collection.prototype.insertOne = function(doc, options, callback) {
357357
var self = this;
358358
if(typeof options == 'function') callback = options, options = {};
359359
options = options || {};
360-
if(Array.isArray(doc)) return callback(new MongoError('doc parameter must be an object'));
360+
if(Array.isArray(doc)) return callback(MongoError.create({message: 'doc parameter must be an object', driver:true }));
361361

362362
// Execute using callback
363363
if(typeof callback == 'function') return insertOne(self, doc, options, callback);
@@ -384,6 +384,25 @@ var insertOne = function(self, doc, options, callback) {
384384
});
385385
}
386386

387+
var mapInserManyResults = function(docs, r) {
388+
var ids = r.getInsertedIds();
389+
var keys = Object.keys(ids);
390+
var finalIds = new Array(keys);
391+
392+
for(var i = 0; i < keys.length; i++) {
393+
if(ids[keys[i]]._id) {
394+
finalIds[ids[keys[i]].index] = ids[keys[i]]._id;
395+
}
396+
}
397+
398+
return {
399+
result: {ok: 1, n: r.insertedCount},
400+
ops: docs,
401+
insertedCount: r.insertedCount,
402+
insertedIds: finalIds
403+
}
404+
}
405+
387406
/**
388407
* Inserts an array of documents into MongoDB.
389408
* @method
@@ -400,33 +419,47 @@ var insertOne = function(self, doc, options, callback) {
400419
Collection.prototype.insertMany = function(docs, options, callback) {
401420
var self = this;
402421
if(typeof options == 'function') callback = options, options = {};
403-
options = options || {};
404-
if(!Array.isArray(docs)) return callback(new MongoError('docs parameter must be an array of documents'));
422+
options = options || {ordered:true};
423+
if(!Array.isArray(docs)) return callback(MongoError.create({message: 'docs parameter must be an array of documents', driver:true }));
424+
425+
// Get the write concern options
426+
if(typeof options.checkKeys != 'boolean') {
427+
options.checkKeys = true;
428+
}
429+
430+
// If keep going set unordered
431+
options['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
432+
433+
// Do we have keepGoing legacy o
434+
if(options.forceServerObjectId == null || options.forceServerObjectId == false) {
435+
// Add _id if not specified
436+
for(var i = 0; i < docs.length; i++) {
437+
if(docs[i]._id == null) docs[i]._id = self.s.pkFactory.createPk();
438+
}
439+
}
440+
441+
// Generate the bulk write operations
442+
var operations = [{
443+
insertMany: docs
444+
}];
405445

406446
// Execute using callback
407-
if(typeof callback == 'function') return insertMany(self, docs, options, callback);
447+
if(typeof callback == 'function') return bulkWrite(self, operations, options, function(err, r) {
448+
if(err) return callback(err);
449+
if(r.hasWriteErrors()) return callback(r);
450+
callback(null, mapInserManyResults(docs, r));
451+
});
408452

409453
// Return a Promise
410454
return new this.s.promiseLibrary(function(resolve, reject) {
411-
insertMany(self, docs, options, function(err, r) {
455+
bulkWrite(self, operations, options, function(err, r) {
412456
if(err) return reject(err);
413-
resolve(r);
414-
});
415-
});
416-
}
457+
if(r.hasWriteErrors()) {
458+
return reject(r);
459+
}
417460

418-
var insertMany = function(self, docs, options, callback) {
419-
insertDocuments(self, docs, options, function(err, r) {
420-
if(callback == null) return;
421-
if(err && callback) return callback(err);
422-
if(r == null) return callback(null, {result: {ok:1}});
423-
r.insertedCount = r.result.n;
424-
var ids = [];
425-
for(var i = 0; i < docs.length; i++) {
426-
if(docs[i]._id) ids.push(docs[i]._id);
427-
}
428-
r.insertedIds = ids;
429-
if(callback) callback(null, r);
461+
resolve(mapInserManyResults(docs, r));
462+
});
430463
});
431464
}
432465

@@ -482,7 +515,9 @@ Collection.prototype.bulkWrite = function(operations, options, callback) {
482515
if(typeof options == 'function') callback = options, options = {};
483516
options = options || {ordered:true};
484517

485-
if(!Array.isArray(operations)) throw new MongoError("operations must be an array of documents");
518+
if(!Array.isArray(operations)) {
519+
throw MongoError.create({message: "operations must be an array of documents", driver:true });
520+
}
486521

487522
// Execute using callback
488523
if(typeof callback == 'function') return bulkWrite(self, operations, options, callback);
@@ -497,7 +532,7 @@ Collection.prototype.bulkWrite = function(operations, options, callback) {
497532
}
498533

499534
var bulkWrite = function(self, operations, options, callback) {
500-
var bulk = options.ordered == true || options.ordered == null ? self.initializeOrderedBulkOp() : self.initializeUnorderedBulkOp();
535+
var bulk = options.ordered == true || options.ordered == null ? self.initializeOrderedBulkOp(options) : self.initializeUnorderedBulkOp(options);
501536

502537
// for each op go through and add to the bulk
503538
for(var i = 0; i < operations.length; i++) {
@@ -510,6 +545,7 @@ var bulkWrite = function(self, operations, options, callback) {
510545

511546
// Execute the bulk
512547
bulk.execute(writeCon, function(err, r) {
548+
if(err) return callback(err);
513549
r.insertedCount = r.nInserted;
514550
r.matchedCount = r.nMatched;
515551
r.modifiedCount = r.nModified || 0;
@@ -518,6 +554,9 @@ var bulkWrite = function(self, operations, options, callback) {
518554
r.upsertedIds = {};
519555
r.insertedIds = {};
520556

557+
// Update the n
558+
r.n = r.insertedCount;
559+
521560
// Inserted documents
522561
var inserted = r.getInsertedIds();
523562
// Map inserted ids
@@ -601,18 +640,14 @@ var insertDocuments = function(self, docs, options, callback) {
601640
Collection.prototype.insert = function(docs, options, callback) {
602641
var self = this;
603642
if(typeof options == 'function') callback = options, options = {};
604-
options = options || {};
643+
options = options || {ordered:false};
644+
docs = !Array.isArray(docs) ? [docs] : docs;
605645

606-
// Execute using callback
607-
if(typeof callback == 'function') return insertDocuments(self, docs, options, callback);
646+
if(options.keepGoing == true) {
647+
options.ordered = false;
648+
}
608649

609-
// Return a Promise
610-
return new this.s.promiseLibrary(function(resolve, reject) {
611-
insertDocuments(self, docs, options, function(err, r) {
612-
if(err) return reject(err);
613-
resolve(r);
614-
});
615-
});
650+
return this.insertMany(docs, options, callback);
616651
}
617652

618653
/**
@@ -1172,7 +1207,10 @@ Collection.prototype.options = function(callback) {
11721207
var options = function(self, callback) {
11731208
self.s.db.listCollections({name: self.s.name}).toArray(function(err, collections) {
11741209
if(err) return handleCallback(callback, err);
1175-
if(collections.length == 0) return handleCallback(callback, new MongoError(f("collection %s not found", self.s.namespace)));
1210+
if(collections.length == 0) {
1211+
return handleCallback(callback, MongoError.create({message: f("collection %s not found", self.s.namespace), driver:true }));
1212+
}
1213+
11761214
handleCallback(callback, err, collections[0].options || null);
11771215
});
11781216
}

lib/command_cursor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ for(var i = 0; i < methodsToInherit.length; i++) {
157157
* @return {CommandCursor}
158158
*/
159159
CommandCursor.prototype.batchSize = function(value) {
160-
if(this.s.state == CommandCursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
161-
if(typeof value != 'number') throw new MongoError("batchSize requires an integer");
160+
if(this.s.state == CommandCursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
161+
if(typeof value != 'number') throw MongoError.create({message: "batchSize requires an integer", driver:true});
162162
if(this.s.cmd.cursor) this.s.cmd.cursor.batchSize = value;
163163
this.setCursorBatchSize(value);
164164
return this;

0 commit comments

Comments
 (0)