Skip to content

Commit 277da59

Browse files
committed
NODE-541 Initial Support "read committed" isolation level where "committed" means confimed by the voting majority of a replica set
1 parent e0e0156 commit 277da59

File tree

6 files changed

+99
-21
lines changed

6 files changed

+99
-21
lines changed

lib/apm.js

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -288,17 +288,17 @@ var Instrumentation = function(core, options, callback) {
288288
parts.shift();
289289
var collection = parts.join('.');
290290

291+
// Set the command
292+
var command = this.query;
293+
var cmd = this.s.cmd;
294+
291295
// If we have a find method, set the operationId on the cursor
292296
if(x == '_find') {
293297
cursor.operationId = ourOpId;
294298
}
295299

296-
// Set the command
297-
var command = this.query;
298-
var cmd = this.s.cmd;
299-
300300
// Do we have a find command rewrite it
301-
if(x == '_find') {
301+
if(cmd.find) {
302302
command = {
303303
find: collection, filter: cmd.query
304304
}
@@ -333,6 +333,9 @@ var Instrumentation = function(core, options, callback) {
333333
if(cmd.partial) command.partial = cmd.partial;
334334
if(cmd.showDiskLoc) command.showRecordId = cmd.showDiskLoc;
335335

336+
// Read Concern
337+
if(cmd.readConcern) command.readConcern = cmd.readConcern;
338+
336339
// Override method
337340
if(cmd.explain) command.explain = cmd.explain;
338341
if(cmd.exhaust) command.exhaust = cmd.exhaust;
@@ -344,6 +347,10 @@ var Instrumentation = function(core, options, callback) {
344347
explain: command,
345348
verbosity: 'allPlansExecution'
346349
}
350+
351+
// Set readConcern on the command if available
352+
if(cmd.readConcern) command.readConcern = cmd.readConcern
353+
347354
// Set up the _explain name for the command
348355
x = '_explain';
349356
}
@@ -360,6 +367,8 @@ var Instrumentation = function(core, options, callback) {
360367
killCursors: collection,
361368
cursors: [this.cursorState.cursorId]
362369
}
370+
} else {
371+
command = cmd;
363372
}
364373

365374
// Set up the connection
@@ -369,14 +378,17 @@ var Instrumentation = function(core, options, callback) {
369378
if(this.connection) connectionId = this.connection;
370379
if(!connectionId && this.server && this.server.getConnection) connectionId = this.server.getConnection();
371380

381+
// Get the command Name
382+
var commandName = x == '_find' ? Object.keys(command)[0] : commandTranslation[x];
383+
372384
// Emit the start event for the command
373385
var command = {
374386
// Returns the command.
375387
command: command,
376388
// Returns the database name.
377389
databaseName: db,
378390
// Returns the command name.
379-
commandName: commandTranslation[x],
391+
commandName: commandName,
380392
// Returns the driver generated request id.
381393
requestId: requestId,
382394
// Returns the driver generated operation id.
@@ -407,7 +419,7 @@ var Instrumentation = function(core, options, callback) {
407419
// Emit the succeeded command
408420
var command = {
409421
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
410-
commandName: commandTranslation[x],
422+
commandName: commandName,
411423
requestId: requestId,
412424
operationId: cursor.operationId,
413425
connectionId: cursor.server.getConnection(),
@@ -425,7 +437,7 @@ var Instrumentation = function(core, options, callback) {
425437
// Command
426438
var command = {
427439
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
428-
commandName: commandTranslation[x],
440+
commandName: commandName,
429441
requestId: requestId,
430442
operationId: ourOpId,
431443
connectionId: cursor.server.getConnection(),
@@ -437,7 +449,7 @@ var Instrumentation = function(core, options, callback) {
437449
// cursor id is zero, we can issue success command
438450
var command = {
439451
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
440-
commandName: commandTranslation[x],
452+
commandName: commandName,
441453
requestId: requestId,
442454
operationId: cursor.operationId,
443455
connectionId: cursor.server.getConnection(),
@@ -472,7 +484,7 @@ var Instrumentation = function(core, options, callback) {
472484
// cursor id is zero, we can issue success command
473485
var command = {
474486
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
475-
commandName: commandTranslation[x],
487+
commandName: commandName,
476488
requestId: requestId,
477489
operationId: cursor.operationId,
478490
connectionId: cursor.server.getConnection(),
@@ -485,7 +497,7 @@ var Instrumentation = function(core, options, callback) {
485497
// Command
486498
var command = {
487499
duration: timestampGenerator.duration(startTime, timestampGenerator.current()),
488-
commandName: commandTranslation[x],
500+
commandName: commandName,
489501
requestId: requestId,
490502
operationId: ourOpId,
491503
connectionId: cursor.server.getConnection(),

lib/collection.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ var checkCollectionName = require('./utils').checkCollectionName
5151
* @property {string} collectionName Get the collection name.
5252
* @property {string} namespace Get the full collection namespace.
5353
* @property {object} writeConcern The current write concern values.
54+
* @property {object} readConcern The current read concern values.
5455
* @property {object} hint Get current index hint for collection.
5556
* @return {Collection} a Collection instance.
5657
*/
@@ -118,6 +119,8 @@ var Collection = function(db, topology, dbName, name, pkFactory, options) {
118119
, name: name
119120
// Promise library
120121
, promiseLibrary: promiseLibrary
122+
// Read Concern
123+
, readConcern: options.readConcern
121124
}
122125
}
123126

@@ -131,6 +134,10 @@ Object.defineProperty(Collection.prototype, 'namespace', {
131134
enumerable: true, get: function() { return this.s.namespace; }
132135
});
133136

137+
Object.defineProperty(Collection.prototype, 'readConcern', {
138+
enumerable: true, get: function() { return this.s.readConcern || {level: 'local'}; }
139+
});
140+
134141
Object.defineProperty(Collection.prototype, 'writeConcern', {
135142
enumerable:true,
136143
get: function() {
@@ -338,6 +345,11 @@ Collection.prototype.find = function() {
338345
if(findCommand.sort)
339346
findCommand.sort = formattedOrderClause(findCommand.sort);
340347

348+
// Set the readConcern
349+
if(this.s.readConcern) {
350+
findCommand.readConcern = this.s.readConcern;
351+
}
352+
341353
// Create the cursor
342354
if(typeof callback == 'function') return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, findCommand, newOptions));
343355
return this.s.topology.cursor(this.s.namespace, findCommand, newOptions);
@@ -1805,6 +1817,11 @@ var count = function(self, query, options, callback) {
18051817
// Ensure we have the right read preference inheritance
18061818
options = getReadPreference(self, options, self.s.db, self);
18071819

1820+
// Do we have a readConcern specified
1821+
if(self.s.readConcern) {
1822+
cmd.readConcern = self.s.readConcern;
1823+
}
1824+
18081825
// Execute command
18091826
self.s.db.command(cmd, options, function(err, result) {
18101827
if(err) return handleCallback(callback, err);
@@ -1860,6 +1877,11 @@ var distinct = function(self, key, query, options, callback) {
18601877
// Ensure we have the right read preference inheritance
18611878
options = getReadPreference(self, options, self.s.db, self);
18621879

1880+
// Do we have a readConcern specified
1881+
if(self.s.readConcern) {
1882+
cmd.readConcern = self.s.readConcern;
1883+
}
1884+
18631885
// Execute the command
18641886
self.s.db.command(cmd, options, function(err, result) {
18651887
if(err) return handleCallback(callback, err);
@@ -2315,6 +2337,11 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {
23152337
command.bypassDocumentValidation = options.bypassDocumentValidation;
23162338
}
23172339

2340+
// Do we have a readConcern specified
2341+
if(this.s.readConcern) {
2342+
command.readConcern = this.s.readConcern;
2343+
}
2344+
23182345
// If we have allowDiskUse defined
23192346
if(options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
23202347
if(typeof options.maxTimeMS == 'number') command.maxTimeMS = options.maxTimeMS;
@@ -2424,6 +2451,11 @@ var parallelCollectionScan = function(self, options, callback) {
24242451
, numCursors: options.numCursors
24252452
}
24262453

2454+
// Do we have a readConcern specified
2455+
if(self.s.readConcern) {
2456+
commandObject.readConcern = self.s.readConcern;
2457+
}
2458+
24272459
// Execute the command
24282460
self.s.db.command(commandObject, options, function(err, result) {
24292461
if(err) return handleCallback(callback, err, null);
@@ -2515,6 +2547,11 @@ var geoNear = function(self, x, y, point, options, callback) {
25152547
// Filter out any excluded objects
25162548
commandObject = decorateCommand(commandObject, options, exclude);
25172549

2550+
// Do we have a readConcern specified
2551+
if(self.s.readConcern) {
2552+
commandObject.readConcern = self.s.readConcern;
2553+
}
2554+
25182555
// Execute the command
25192556
self.s.db.command(commandObject, options, function (err, res) {
25202557
if(err) return handleCallback(callback, err);
@@ -2574,6 +2611,11 @@ var geoHaystackSearch = function(self, x, y, options, callback) {
25742611
// Ensure we have the right read preference inheritance
25752612
options = getReadPreference(self, options, self.s.db, self);
25762613

2614+
// Do we have a readConcern specified
2615+
if(self.s.readConcern) {
2616+
commandObject.readConcern = self.s.readConcern;
2617+
}
2618+
25772619
// Execute the command
25782620
self.s.db.command(commandObject, options, function (err, res) {
25792621
if(err) return handleCallback(callback, err);
@@ -2711,6 +2753,12 @@ var group = function(self, keys, condition, initial, reduce, finalize, command,
27112753

27122754
// Ensure we have the right read preference inheritance
27132755
options = getReadPreference(self, options, self.s.db, self);
2756+
2757+
// Do we have a readConcern specified
2758+
if(self.s.readConcern) {
2759+
selector.readConcern = self.s.readConcern;
2760+
}
2761+
27142762
// Execute command
27152763
self.s.db.command(selector, options, function(err, result) {
27162764
if(err) return handleCallback(callback, err, null);
@@ -2851,6 +2899,11 @@ var mapReduce = function(self, map, reduce, options, callback) {
28512899
mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
28522900
}
28532901

2902+
// Do we have a readConcern specified
2903+
if(self.s.readConcern) {
2904+
mapCommandHash.readConcern = self.s.readConcern;
2905+
}
2906+
28542907
// Execute command
28552908
self.s.db.command(mapCommandHash, {readPreference:options.readPreference}, function (err, result) {
28562909
if(err) return handleCallback(callback, err);

lib/db.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ var debugFields = ['authSource', 'w', 'wtimeout', 'j', 'native_parser', 'forceSe
6262
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
6363
* @param {object} [options.pkFactory=null] A primary key factory object for generation of custom _id keys.
6464
* @param {object} [options.promiseLibrary=null] A Promise library class the application wishes to use such as Bluebird, must be ES6 compatible
65+
* @param {object} [options.readConcern=null] Specify a read concern for the collection. (only MongoDB 3.2 or higher supported)
66+
* @param {object} [options.readConcern.level='local'] Specify a read concern level for the collection operations, one of [local|majority]. (only MongoDB 3.2 or higher supported)
6567
* @property {(Server|ReplSet|Mongos)} serverConfig Get the current db topology.
6668
* @property {number} bufferMaxEntries Current bufferMaxEntries value for the database
6769
* @property {string} databaseName The name of the database this instance represents.
@@ -128,6 +130,8 @@ var Db = function(databaseName, topology, options) {
128130
, promiseLibrary: promiseLibrary
129131
// No listener
130132
, noListener: typeof options.noListener == 'boolean' ? options.noListener : false
133+
// ReadConcern
134+
, readConcern: options.readConcern
131135
}
132136

133137
// Ensure we have a valid db name
@@ -392,6 +396,8 @@ define.classMethod('admin', {callback: false, promise:false, returns: [Admin]});
392396
* @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
393397
* @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
394398
* @param {boolean} [options.strict=false] Returns an error if the collection does not exist
399+
* @param {object} [options.readConcern=null] Specify a read concern for the collection. (only MongoDB 3.2 or higher supported)
400+
* @param {object} [options.readConcern.level='local'] Specify a read concern level for the collection operations, one of [local|majority]. (only MongoDB 3.2 or higher supported)
395401
* @param {Db~collectionResultCallback} callback The collection result callback
396402
* @return {Collection} return the new Collection instance if not in strict mode
397403
*/
@@ -403,6 +409,9 @@ Db.prototype.collection = function(name, options, callback) {
403409
// Set the promise library
404410
options.promiseLibrary = this.s.promiseLibrary;
405411

412+
// If we have not set a collection level readConcern set the db level one
413+
options.readConcern = options.readConcern || this.s.readConcern;
414+
406415
// Execute
407416
if(options == null || !options.strict) {
408417
try {

lib/url_parser.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ module.exports = function(url, options) {
180180
case 'native_parser':
181181
dbOptions.native_parser = (value == 'true');
182182
break;
183+
case 'readConcernLevel':
184+
dbOptions.readConcern = {level: value};
185+
break;
183186
case 'connectTimeoutMS':
184187
serverOptions.socketOptions.connectTimeoutMS = parseInt(value, 10);
185188
replSetServersOptions.socketOptions.connectTimeoutMS = parseInt(value, 10);

test/functional/apm_tests.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ var f = require('util').format,
44
fs = require('fs');
55

66
exports['Correctly receive the APM events for an insert'] = {
7-
metadata: { requires: { topology: ['single'] } },
7+
metadata: { requires: { topology: ['single', 'replicaset'] } },
88

99
// The actual test we wish to run
1010
test: function(configuration, test) {
@@ -46,7 +46,7 @@ exports['Correctly receive the APM events for an insert'] = {
4646
}
4747

4848
exports['Correctly receive the APM events for an insert using custom operationId and time generator'] = {
49-
metadata: { requires: { topology: ['single'] } },
49+
metadata: { requires: { topology: ['single', 'replicaset'] } },
5050

5151
// The actual test we wish to run
5252
test: function(configuration, test) {
@@ -123,7 +123,6 @@ var validateExpecations = function(test, expectation, results) {
123123

124124
// Get the result
125125
var result = results.successes.shift();
126-
127126
// Validate the test
128127
test.equal(commandName, result.commandName);
129128
// test.deepEqual(reply[0], result.reply.result);
@@ -299,7 +298,7 @@ var executeSuite = function(assert, client, listener, scenarios, callback) {
299298
}
300299

301300
exports['Correctly run all JSON APM Tests'] = {
302-
metadata: { requires: { topology: ['single'] } },
301+
metadata: { requires: { topology: ['single', 'replicaset'] } },
303302

304303
// The actual test we wish to run
305304
test: function(configuration, test) {
@@ -331,7 +330,7 @@ exports['Correctly run all JSON APM Tests'] = {
331330
}
332331

333332
exports['Correctly receive the APM events for a find with getmore and killcursor'] = {
334-
metadata: { requires: { topology: ['single'] } },
333+
metadata: { requires: { topology: ['single', 'replicaset'] } },
335334

336335
// The actual test we wish to run
337336
test: function(configuration, test) {
@@ -411,7 +410,7 @@ exports['Correctly receive the APM events for a find with getmore and killcursor
411410
}
412411

413412
exports['Correctly receive the APM failure event for find'] = {
414-
metadata: { requires: { topology: ['single'], mongodb: ">=2.6.0" } },
413+
metadata: { requires: { topology: ['single', 'replicaset'], mongodb: ">=2.6.0" } },
415414

416415
// The actual test we wish to run
417416
test: function(configuration, test) {
@@ -481,7 +480,7 @@ var cleanup = function(overrides) {
481480
}
482481

483482
exports['Correctly receive the APM events for a bulk operation'] = {
484-
metadata: { requires: { topology: ['single'] } },
483+
metadata: { requires: { topology: ['single', 'replicaset'] } },
485484

486485
// The actual test we wish to run
487486
test: function(configuration, test) {
@@ -518,14 +517,15 @@ exports['Correctly receive the APM events for a bulk operation'] = {
518517
db.close();
519518
test.done();
520519
}).catch(function(err) {
521-
console.dir(err)
520+
console.log(err.stack)
521+
test.done();
522522
});
523523
});
524524
}
525525
}
526526

527527
exports['Correctly receive the APM explain command'] = {
528-
metadata: { requires: { topology: ['single'] } },
528+
metadata: { requires: { topology: ['single', 'replicaset'] } },
529529

530530
// The actual test we wish to run
531531
test: function(configuration, test) {
@@ -590,7 +590,7 @@ exports['Correctly receive the APM explain command'] = {
590590
}
591591

592592
exports['Correctly filter out sensitive commands'] = {
593-
metadata: { requires: { topology: ['single'] } },
593+
metadata: { requires: { topology: ['single', 'replicaset'] } },
594594

595595
// The actual test we wish to run
596596
test: function(configuration, test) {

test/runner.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ var testFiles =[
272272
, '/test/functional/replset_read_preference_tests.js'
273273
, '/test/functional/replset_failover_tests.js'
274274
, '/test/functional/replset_connection_tests.js'
275+
, '/test/functional/readconcern_tests.js'
275276

276277
// Sharding tests
277278
, '/test/functional/sharding_failover_tests.js'

0 commit comments

Comments
 (0)