Skip to content

Commit 4c975b4

Browse files
committed
Added initial code for APM support
1 parent b6781eb commit 4c975b4

File tree

6 files changed

+278
-101
lines changed

6 files changed

+278
-101
lines changed

index.js

Lines changed: 5 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Core module
22
var core = require('mongodb-core'),
3-
EventEmitter = require('events').EventEmitter,
4-
inherits = require('util').inherits;
3+
Instrumentation = require('./lib/apm');
54

65
// Set up the connect function
76
var connect = require('./lib/mongo_client').connect;
@@ -37,84 +36,13 @@ connect.Timestamp = core.BSON.Timestamp;
3736

3837
// Add connect method
3938
connect.connect = connect;
40-
// Operation id
41-
var operationId = 1;
4239

43-
var Instrumentation = function() {
44-
EventEmitter.call(this);
45-
// Names of methods we need to wrap
46-
var methods = ['command'];
47-
// Prototype
48-
var proto = core.Server.prototype;
49-
// Reference
50-
var self = this;
51-
// Core server method we are going to wrap
52-
methods.forEach(function(x) {
53-
var func = proto[x];
54-
55-
// The actual prototype
56-
proto[x] = function() {
57-
var requestId = core.Query.nextRequestId();
58-
var ourOpId = operationId++;
59-
// Get the aruments
60-
var args = Array.prototype.slice.call(arguments, 0);
61-
var ns = args[0];
62-
var commandObj = args[1];
63-
var keys = Object.keys(commandObj);
64-
var commandName = keys[0];
65-
var db = ns.split('.')[0];
66-
67-
// Get a connection reference for this server instance
68-
var connection = this.s.pool.get()
69-
// Emit the start event for the command
70-
var command = {type: 'started', operationId: ourOpId};
71-
command.command = commandObj;
72-
command.databaseName = db;
73-
command.commandName = commandName
74-
command.connectionId = connection;
75-
command.requestId = requestId;
76-
self.emit('command', command)
77-
78-
// Start time
79-
var startTime = new Date().getTime();
80-
81-
// Get the callback
82-
var callback = args.pop();
83-
args.push(function(err, r) {
84-
var endTime = new Date().getTime();
85-
var command = {type: 'succeeded',
86-
duration: (endTime - startTime),
87-
requestId: requestId,
88-
operationId: ourOpId,
89-
connectionId: connection};
90-
91-
// If we have an error
92-
if(err) {
93-
command.type = 'failed'
94-
command.failure = err;
95-
} else {
96-
command.reply = r;
97-
}
98-
99-
// Emit the command
100-
self.emit('command', command)
101-
102-
// Return to caller
103-
callback(err, r);
104-
});
105-
106-
// Apply the call
107-
func.apply(this, args);
108-
}
109-
});
110-
}
111-
112-
inherits(Instrumentation, EventEmitter);
113-
114-
var instrumentation = new Instrumentation();
40+
// Instrumentation instance
41+
var instrumentation = null;
11542

11643
// Set up the instrumentation method
117-
connect.instrument = function() {
44+
connect.instrument = function(options) {
45+
if(!instrumentation) instrumentation = new Instrumentation(core, options)
11846
return instrumentation;
11947
}
12048

lib/apm.js

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
var EventEmitter = require('events').EventEmitter,
2+
inherits = require('util').inherits;
3+
4+
var basicOperationIdGenerator = {
5+
operationId: 1,
6+
7+
next: function() {
8+
return this.operationId++;
9+
}
10+
}
11+
12+
var Instrumentation = function(core, options) {
13+
options = options || {};
14+
var operationIdGenerator = options.operationIdGenerator || basicOperationIdGenerator;
15+
// Extend with event emitter functionality
16+
EventEmitter.call(this);
17+
18+
// ---------------------------------------------------------
19+
//
20+
// Server
21+
//
22+
// ---------------------------------------------------------
23+
24+
// Reference
25+
var self = this;
26+
// Names of methods we need to wrap
27+
var methods = ['command'];
28+
// Prototype
29+
var proto = core.Server.prototype;
30+
// Core server method we are going to wrap
31+
methods.forEach(function(x) {
32+
var func = proto[x];
33+
34+
// The actual prototype
35+
proto[x] = function() {
36+
var requestId = core.Query.nextRequestId();
37+
var ourOpId = operationIdGenerator.next();
38+
// Get the aruments
39+
var args = Array.prototype.slice.call(arguments, 0);
40+
var ns = args[0];
41+
var commandObj = args[1];
42+
var keys = Object.keys(commandObj);
43+
var commandName = keys[0];
44+
var db = ns.split('.')[0];
45+
46+
// Get a connection reference for this server instance
47+
var connection = this.s.pool.get()
48+
// Emit the start event for the command
49+
var command = {
50+
// Returns the command.
51+
command: commandObj,
52+
// Returns the database name.
53+
databaseName: db,
54+
// Returns the command name.
55+
commandName: commandName,
56+
// Returns the driver generated request id.
57+
requestId: requestId,
58+
// Returns the driver generated operation id.
59+
// This is used to link events together such as bulk write operations. OPTIONAL.
60+
operationId: ourOpId,
61+
// Returns the connection id for the command. For languages that do not have this,
62+
// this MUST return the driver equivalent which MUST include the server address and port.
63+
// The name of this field is flexible to match the object that is returned from the driver.
64+
connectionId: connection
65+
};
66+
67+
// Emit the started event
68+
self.emit('started', command)
69+
70+
// Start time
71+
var startTime = new Date().getTime();
72+
73+
// Get the callback
74+
var callback = args.pop();
75+
args.push(function(err, r) {
76+
var endTime = new Date().getTime();
77+
var command = {
78+
duration: (endTime - startTime),
79+
commandName: commandName,
80+
requestId: requestId,
81+
operationId: ourOpId,
82+
connectionId: connection
83+
};
84+
85+
// If we have an error
86+
if(err) {
87+
command.failure = err;
88+
self.emit('failed', command);
89+
} else {
90+
command.reply = r;
91+
self.emit('succeeded', command);
92+
}
93+
94+
// Return to caller
95+
callback(err, r);
96+
});
97+
98+
// Apply the call
99+
func.apply(this, args);
100+
}
101+
});
102+
103+
// ---------------------------------------------------------
104+
//
105+
// Cursor
106+
//
107+
// ---------------------------------------------------------
108+
109+
// Inject ourselves into the Cursor methods
110+
var methods = ['_find', '_getmore', '_killcursor'];
111+
var prototypes = [
112+
require('./cursor').prototype, require('./command_cursor').prototype,
113+
require('./aggregation_cursor').prototype
114+
]
115+
116+
// Command name translation
117+
var commandTranslation = {
118+
'_find': 'find', '_getmore': 'getMore', '_killcursor': 'killCursors'
119+
}
120+
121+
prototypes.forEach(function(proto) {
122+
// Core server method we are going to wrap
123+
methods.forEach(function(x) {
124+
var func = proto[x];
125+
126+
// The actual prototype
127+
proto[x] = function() {
128+
var cursor = this;
129+
var requestId = core.Query.nextRequestId();
130+
var ourOpId = operationIdGenerator.next();
131+
var db = this.ns.split('.')[0];
132+
133+
// If we have a find method, set the operationId on the cursor
134+
if(x == '_find') {
135+
this.operationId = ourOpId;
136+
this.startTime = new Date();
137+
}
138+
139+
// Emit the start event for the command
140+
var command = {
141+
// Returns the command.
142+
command: this.query,
143+
// Returns the database name.
144+
databaseName: db,
145+
// Returns the command name.
146+
commandName: commandTranslation[x],
147+
// Returns the driver generated request id.
148+
requestId: requestId,
149+
// Returns the driver generated operation id.
150+
// This is used to link events together such as bulk write operations. OPTIONAL.
151+
operationId: this.operationId,
152+
// Returns the connection id for the command. For languages that do not have this,
153+
// this MUST return the driver equivalent which MUST include the server address and port.
154+
// The name of this field is flexible to match the object that is returned from the driver.
155+
connectionId: this.server.getConnection()
156+
};
157+
158+
// Emit the started event
159+
self.emit('started', command)
160+
161+
// Get the aruments
162+
var args = Array.prototype.slice.call(arguments, 0);
163+
// Get the callback
164+
var callback = args.pop();
165+
args.push(function(err, r) {
166+
if(err) {
167+
// Command
168+
var command = {
169+
duration: (new Date().getTime() - cursor.startTime.getTime()),
170+
commandName: commandTranslation[x],
171+
requestId: requestId,
172+
operationId: ourOpId,
173+
connectionId: cursor.server.getConnection(),
174+
failure: err };
175+
// Emit the command
176+
self.emit('failed', command)
177+
} else { //if(connect.Long.ZERO.equals(cursor.cursorState.cursorId)) {
178+
// cursor id is zero, we can issue success command
179+
var command = {
180+
duration: (new Date().getTime() - cursor.startTime.getTime()),
181+
commandName: commandTranslation[x],
182+
requestId: requestId,
183+
operationId: cursor.operationId,
184+
connectionId: cursor.server.getConnection() };
185+
// Emit the command
186+
self.emit('succeeded', command)
187+
}
188+
189+
// Return to caller
190+
callback(err, r);
191+
});
192+
193+
// Apply the call
194+
func.apply(this, args);
195+
}
196+
});
197+
});
198+
}
199+
200+
inherits(Instrumentation, EventEmitter);
201+
202+
module.exports = Instrumentation;

lib/command_cursor.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ inherits(CommandCursor, Readable);
141141

142142
// Set the methods to inherit from prototype
143143
var methodsToInherit = ['_next', 'next', 'each', 'forEach', 'toArray'
144-
, 'rewind', 'bufferedCount', 'readBufferedDocuments', 'close', 'isClosed', 'kill'];
144+
, 'rewind', 'bufferedCount', 'readBufferedDocuments', 'close', 'isClosed', 'kill'
145+
, '_find', '_getmore', '_killcursor'];
145146

146147
// Only inherit the types we need
147148
for(var i = 0; i < methodsToInherit.length; i++) {

test/functional/apm_tests.js

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"use strict";
22

3-
exports['Correctly receive the APM events'] = {
3+
exports['Correctly receive the APM events for an insert'] = {
44
metadata: { requires: { topology: ['single'] } },
55

66
// The actual test we wish to run
@@ -21,3 +21,49 @@ exports['Correctly receive the APM events'] = {
2121
});
2222
}
2323
}
24+
25+
exports['Correctly receive the APM events for a find with getmore and killcursor'] = {
26+
metadata: { requires: { topology: ['single'] } },
27+
28+
// The actual test we wish to run
29+
test: function(configuration, test) {
30+
var listener = require('../..').instrument();
31+
listener.on('started', function(event) {
32+
console.log("===================================== started :: " + event.commandName)
33+
// console.log(JSON.stringify(event, null, 2))
34+
});
35+
36+
listener.on('succeeded', function(event) {
37+
console.log("===================================== succeeded :: " + event.commandName)
38+
// console.log(JSON.stringify(event, null, 2))
39+
});
40+
41+
listener.on('failed', function(event) {
42+
console.log("===================================== failed :: " + event.commandName)
43+
// console.log(JSON.stringify(event, null, 2))
44+
});
45+
46+
var db = configuration.newDbInstance({w:1}, {poolSize:1, auto_reconnect:false});
47+
db.open(function(err, db) {
48+
test.equal(null, err);
49+
50+
// Drop the collection
51+
db.collection('apm_test_1').drop(function(err, r) {
52+
53+
// Insert test documents
54+
db.collection('apm_test_1').insertMany([{a:1}, {a:1}, {a:1}, {a:1}, {a:1}, {a:1}]).then(function(r) {
55+
test.equal(6, r.insertedCount);
56+
57+
db.collection('apm_test_1').find().batchSize(2).toArray().then(function(docs) {
58+
test.equal(6, docs.length);
59+
60+
db.close();
61+
test.done();
62+
});
63+
}).catch(function(e) {
64+
console.dir(e)
65+
});
66+
});
67+
});
68+
}
69+
}

0 commit comments

Comments
 (0)