Skip to content

Commit 8ea21c1

Browse files
committed
operationId now applied correctly to query and bulk operations
1 parent e4b2c40 commit 8ea21c1

File tree

6 files changed

+389
-30
lines changed

6 files changed

+389
-30
lines changed

lib/apm.js

Lines changed: 118 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ var Instrumentation = function(core, options) {
3434
// The actual prototype
3535
proto[x] = function() {
3636
var requestId = core.Query.nextRequestId();
37-
var ourOpId = operationIdGenerator.next();
3837
// Get the aruments
3938
var args = Array.prototype.slice.call(arguments, 0);
4039
var ns = args[0];
@@ -43,6 +42,12 @@ var Instrumentation = function(core, options) {
4342
var commandName = keys[0];
4443
var db = ns.split('.')[0];
4544

45+
// Get the callback
46+
var callback = args.pop();
47+
// Set current callback operation id from the current context or create
48+
// a new one
49+
var ourOpId = callback.operationId || operationIdGenerator.next();
50+
4651
// Get a connection reference for this server instance
4752
var connection = this.s.pool.get()
4853
// Emit the start event for the command
@@ -70,8 +75,7 @@ var Instrumentation = function(core, options) {
7075
// Start time
7176
var startTime = new Date().getTime();
7277

73-
// Get the callback
74-
var callback = args.pop();
78+
// Push our handler callback
7579
args.push(function(err, r) {
7680
var endTime = new Date().getTime();
7781
var command = {
@@ -100,6 +104,45 @@ var Instrumentation = function(core, options) {
100104
}
101105
});
102106

107+
// ---------------------------------------------------------
108+
//
109+
// Bulk Operations
110+
//
111+
// ---------------------------------------------------------
112+
113+
// Inject ourselves into the Bulk methods
114+
var methods = ['execute'];
115+
var prototypes = [
116+
require('./bulk/ordered').Bulk.prototype,
117+
require('./bulk/unordered').Bulk.prototype
118+
]
119+
120+
prototypes.forEach(function(proto) {
121+
// Core server method we are going to wrap
122+
methods.forEach(function(x) {
123+
var func = proto[x];
124+
125+
// The actual prototype
126+
proto[x] = function() {
127+
var bulk = this;
128+
// Get the aruments
129+
var args = Array.prototype.slice.call(arguments, 0);
130+
// Set an operation Id on the bulk object
131+
this.operationId = operationIdGenerator.next();
132+
133+
// Get the callback
134+
var callback = args.pop();
135+
args.push(function(err, r) {
136+
// Return to caller
137+
callback(err, r);
138+
});
139+
140+
// Apply the call
141+
func.apply(this, args);
142+
}
143+
});
144+
});
145+
103146
// ---------------------------------------------------------
104147
//
105148
// Cursor
@@ -109,7 +152,8 @@ var Instrumentation = function(core, options) {
109152
// Inject ourselves into the Cursor methods
110153
var methods = ['_find', '_getmore', '_killcursor'];
111154
var prototypes = [
112-
require('./cursor').prototype, require('./command_cursor').prototype,
155+
require('./cursor').prototype,
156+
require('./command_cursor').prototype,
113157
require('./aggregation_cursor').prototype
114158
]
115159

@@ -128,18 +172,80 @@ var Instrumentation = function(core, options) {
128172
var cursor = this;
129173
var requestId = core.Query.nextRequestId();
130174
var ourOpId = operationIdGenerator.next();
131-
var db = this.ns.split('.')[0];
175+
var parts = this.ns.split('.');
176+
var db = parts[0];
177+
// Get the collection
178+
parts.shift();
179+
var collection = parts.join('.');
132180

133181
// If we have a find method, set the operationId on the cursor
134182
if(x == '_find') {
135183
this.operationId = ourOpId;
136184
this.startTime = new Date();
137185
}
138186

187+
// Set the command
188+
var command = this.query;
189+
var cmd = this.s.cmd;
190+
191+
// Do we have a find command rewrite it
192+
if(x == '_find') {
193+
command = {
194+
find: collection, filter: cmd.query
195+
}
196+
197+
if(cmd.sort) command.sort = cmd.sort;
198+
if(cmd.fields) command.projection = cmd.fields;
199+
if(cmd.limit && cmd.limit < 0) {
200+
command.limit = Math.abs(cmd.limit);
201+
command.singleBatch = true;
202+
} else if(cmd.limit) {
203+
command.limit = Math.abs(cmd.limit);
204+
}
205+
206+
// Options
207+
if(cmd.skip) command.skip = cmd.skip;
208+
if(cmd.hint) command.hint = cmd.hint;
209+
if(cmd.batchSize) command.batchSize = cmd.batchSize;
210+
if(cmd.returnKey) command.returnKey = cmd.returnKey;
211+
if(cmd.comment) command.comment = cmd.comment;
212+
if(cmd.min) command.min = cmd.min;
213+
if(cmd.max) command.max = cmd.max;
214+
if(cmd.maxScan) command.maxScan = cmd.maxScan;
215+
if(cmd.readPreference) command['$readPreference'] = cmd.readPreference;
216+
if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
217+
218+
// Flags
219+
if(cmd.awaitData) command.awaitData = cmd.awaitData;
220+
if(cmd.snapshot) command.snapshot = cmd.snapshot;
221+
if(cmd.tailable) command.tailable = cmd.tailable;
222+
if(cmd.oplogReplay) command.oplogReplay = cmd.oplogReplay;
223+
if(cmd.noCursorTimeout) command.noCursorTimeout = cmd.noCursorTimeout;
224+
if(cmd.partial) command.partial = cmd.partial;
225+
if(cmd.showDiskLoc) command.showRecordId = cmd.showDiskLoc;
226+
227+
// Override method
228+
if(cmd.explain) command.explain = cmd.explain;
229+
if(cmd.exhaust) command.exhaust = cmd.exhaust;
230+
} else if(x == '_getmore') {
231+
command = {
232+
getMore: this.cursorState.cursorId,
233+
collection: collection,
234+
batchSize: cmd.batchSize
235+
}
236+
237+
if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS;
238+
} else if(x == '_killcursors') {
239+
command = {
240+
killCursors: collection,
241+
cursors: [this.cursorState.cursorId]
242+
}
243+
}
244+
139245
// Emit the start event for the command
140246
var command = {
141247
// Returns the command.
142-
command: this.query,
248+
command: command,
143249
// Returns the database name.
144250
databaseName: db,
145251
// Returns the command name.
@@ -172,16 +278,20 @@ var Instrumentation = function(core, options) {
172278
operationId: ourOpId,
173279
connectionId: cursor.server.getConnection(),
174280
failure: err };
281+
175282
// Emit the command
176283
self.emit('failed', command)
177-
} else { //if(connect.Long.ZERO.equals(cursor.cursorState.cursorId)) {
284+
} else {
178285
// cursor id is zero, we can issue success command
179286
var command = {
180287
duration: (new Date().getTime() - cursor.startTime.getTime()),
181288
commandName: commandTranslation[x],
182289
requestId: requestId,
183290
operationId: cursor.operationId,
184-
connectionId: cursor.server.getConnection() };
291+
connectionId: cursor.server.getConnection(),
292+
reply: cursor.cursorState.documents
293+
};
294+
185295
// Emit the command
186296
self.emit('succeeded', command)
187297
}

lib/bulk/ordered.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,11 @@ var executeCommands = function(self, callback) {
414414
finalOptions.writeConcern = self.s.writeConcern;
415415
}
416416

417+
// Set an operationIf if provided
418+
if(self.operationId) {
419+
resultHandler.operationId = self.operationId;
420+
}
421+
417422
try {
418423
if(batch.batchType == common.INSERT) {
419424
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
@@ -425,7 +430,7 @@ var executeCommands = function(self, callback) {
425430
} catch(err) {
426431
// Force top level error
427432
err.ok = 0;
428-
// Merge top level error and return
433+
// Merge top level error and return
429434
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
430435
}
431436
}
@@ -474,7 +479,7 @@ OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
474479
return new this.s.promiseLibrary(function(resolve, reject) {
475480
executeCommands(self, function(err, r) {
476481
if(err) return reject(err);
477-
resolve(r);
482+
resolve(r);
478483
});
479484
});
480485
}
@@ -488,3 +493,4 @@ var initializeOrderedBulkOp = function(topology, collection, options) {
488493
}
489494

490495
module.exports = initializeOrderedBulkOp;
496+
module.exports.Bulk = OrderedBulkOperation;

lib/bulk/unordered.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,11 @@ var executeBatch = function(self, batch, callback) {
404404
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
405405
}
406406

407+
// Set an operationIf if provided
408+
if(self.operationId) {
409+
resultHandler.operationId = self.operationId;
410+
}
411+
407412
try {
408413
if(batch.batchType == common.INSERT) {
409414
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
@@ -415,7 +420,7 @@ var executeBatch = function(self, batch, callback) {
415420
} catch(err) {
416421
// Force top level error
417422
err.ok = 0;
418-
// Merge top level error and return
423+
// Merge top level error and return
419424
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
420425
}
421426
}
@@ -484,7 +489,7 @@ UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
484489
return new this.s.promiseLibrary(function(resolve, reject) {
485490
executeBatches(self, function(err, r) {
486491
if(err) return reject(err);
487-
resolve(r);
492+
resolve(r);
488493
});
489494
});
490495
}
@@ -498,3 +503,4 @@ var initializeUnorderedBulkOp = function(topology, collection, options) {
498503
}
499504

500505
module.exports = initializeUnorderedBulkOp;
506+
module.exports.Bulk = UnorderedBulkOperation;

lib/cursor.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,90 @@ Cursor.prototype.filter = function(filter) {
251251
return this;
252252
}
253253

254+
/**
255+
* Set the cursor maxScan
256+
* @method
257+
* @param {object} maxScan Constrains the query to only scan the specified number of documents when fulfilling the query
258+
* @return {Cursor}
259+
*/
260+
Cursor.prototype.maxScan = function(maxScan) {
261+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
262+
this.s.cmd.maxScan = maxScan;
263+
return this;
264+
}
265+
266+
/**
267+
* Set the cursor hint
268+
* @method
269+
* @param {object} hint If specified, then the query system will only consider plans using the hinted index.
270+
* @return {Cursor}
271+
*/
272+
Cursor.prototype.hint = function(hint) {
273+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
274+
this.s.cmd.hint = hint;
275+
return this;
276+
}
277+
278+
/**
279+
* Set the cursor min
280+
* @method
281+
* @param {object} min Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
282+
* @return {Cursor}
283+
*/
284+
Cursor.prototype.min = function(min) {
285+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
286+
this.s.cmd.min = min;
287+
return this;
288+
}
289+
290+
/**
291+
* Set the cursor max
292+
* @method
293+
* @param {object} max Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
294+
* @return {Cursor}
295+
*/
296+
Cursor.prototype.max = function(max) {
297+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
298+
this.s.cmd.max = max;
299+
return this;
300+
}
301+
302+
/**
303+
* Set the cursor returnKey
304+
* @method
305+
* @param {object} returnKey Only return the index field or fields for the results of the query. If $returnKey is set to true and the query does not use an index to perform the read operation, the returned documents will not contain any fields. Use one of the following forms:
306+
* @return {Cursor}
307+
*/
308+
Cursor.prototype.returnKey = function(value) {
309+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
310+
this.s.cmd.returnKey = value;
311+
return this;
312+
}
313+
314+
/**
315+
* Set the cursor showRecordId
316+
* @method
317+
* @param {object} showRecordId The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
318+
* @return {Cursor}
319+
*/
320+
Cursor.prototype.showRecordId = function(value) {
321+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
322+
this.s.cmd.showDiskLoc = value;
323+
return this;
324+
}
325+
326+
/**
327+
* Set the cursor snapshot
328+
* @method
329+
* @param {object} snapshot The $snapshot operator prevents the cursor from returning a document more than once because an intervening write operation results in a move of the document.
330+
* @return {Cursor}
331+
*/
332+
Cursor.prototype.snapshot = function(value) {
333+
if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
334+
this.s.cmd.snapshot = value;
335+
return this;
336+
}
337+
254338
/**
255339
* Set a node.js specific cursor option
256340
* @method

0 commit comments

Comments
 (0)