Skip to content

Commit 3012218

Browse files
wconti27rochdev
authored andcommitted
remove async storage from couchbase (#5948)
1 parent 13637a8 commit 3012218

File tree

2 files changed

+139
-82
lines changed

2 files changed

+139
-82
lines changed

packages/datadog-instrumentations/src/couchbase.js

Lines changed: 102 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
const { errorMonitor } = require('events')
44
const {
55
channel,
6-
addHook,
7-
AsyncResource
6+
addHook
87
} = require('./helpers/instrument')
98
const shimmer = require('../../datadog-shimmer')
109

@@ -24,31 +23,50 @@ function wrapAllNames (names, action) {
2423
names.forEach(name => action(name))
2524
}
2625

27-
// semver >=2 <3
28-
function wrapMaybeInvoke (_maybeInvoke) {
29-
const wrapped = function (fn, args) {
30-
if (!Array.isArray(args)) return _maybeInvoke.apply(this, arguments)
31-
32-
const callbackIndex = args.length - 1
33-
const callback = args[callbackIndex]
26+
function wrapCallback (callback, ctx, channelPrefix) {
27+
const callbackStartCh = channel(`${channelPrefix}:callback:start`)
28+
const callbackFinishCh = channel(`${channelPrefix}:callback:finish`)
3429

35-
if (typeof callback === 'function') {
36-
args[callbackIndex] = AsyncResource.bind(callback)
30+
const wrapped = callbackStartCh.runStores(ctx, () => {
31+
return function (...args) {
32+
return callbackFinishCh.runStores(ctx, () => {
33+
return callback.apply(this, args)
34+
})
3735
}
38-
39-
return _maybeInvoke.apply(this, arguments)
40-
}
36+
})
37+
Object.defineProperty(wrapped, '_dd_wrapped', { value: true })
4138
return wrapped
4239
}
4340

4441
function wrapQuery (query) {
45-
const wrapped = function (q, params, callback) {
46-
if (typeof arguments[arguments.length - 1] === 'function') {
47-
arguments[arguments.length - 1] = AsyncResource.bind(arguments[arguments.length - 1])
42+
return function (q, params, callback) {
43+
const cb = arguments[arguments.length - 1]
44+
if (typeof cb === 'function') {
45+
const ctx = {}
46+
arguments[arguments.length - 1] = wrapCallback(cb, ctx, 'apm:couchbase:query')
4847
}
4948

5049
return query.apply(this, arguments)
5150
}
51+
}
52+
53+
function wrapCallbackFinish (callback, thisArg, _args, errorCh, finishCh, ctx, channelPrefix) {
54+
const callbackStartCh = channel(`${channelPrefix}:callback:start`)
55+
const callbackFinishCh = channel(`${channelPrefix}:callback:finish`)
56+
57+
const wrapped = callbackStartCh.runStores(ctx, () => {
58+
return function finish (error, result) {
59+
return callbackFinishCh.runStores(ctx, () => {
60+
if (error) {
61+
ctx.error = error
62+
errorCh.publish(ctx)
63+
}
64+
finishCh.publish(ctx)
65+
return callback.apply(thisArg, [error, result])
66+
})
67+
}
68+
})
69+
Object.defineProperty(wrapped, '_dd_wrapped', { value: true })
5270
return wrapped
5371
}
5472

@@ -62,31 +80,24 @@ function wrap (prefix, fn) {
6280
return fn.apply(this, arguments)
6381
}
6482

65-
const callbackIndex = findCallbackIndex(arguments)
83+
const callbackIndex = findCallbackIndex(arguments, 1)
6684

6785
if (callbackIndex < 0) return fn.apply(this, arguments)
6886

69-
const callbackResource = new AsyncResource('bound-anonymous-fn')
70-
const asyncResource = new AsyncResource('bound-anonymous-fn')
71-
72-
return asyncResource.runInAsyncScope(() => {
73-
const cb = callbackResource.bind(arguments[callbackIndex])
87+
const ctx = { bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts }
88+
return startCh.runStores(ctx, () => {
89+
const cb = arguments[callbackIndex]
7490

75-
startCh.publish({ bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts })
76-
77-
arguments[callbackIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
78-
if (error) {
79-
errorCh.publish(error)
80-
}
81-
finishCh.publish(result)
82-
return cb.apply(this, arguments)
83-
}))
91+
arguments[callbackIndex] = shimmer.wrapFunction(cb, (cb) => {
92+
return wrapCallbackFinish(cb, this, arguments, errorCh, finishCh, ctx, prefix)
93+
})
8494

8595
try {
8696
return fn.apply(this, arguments)
8797
} catch (error) {
98+
ctx.error = error
8899
error.stack // trigger getting the stack at the original throwing point
89-
errorCh.publish(error)
100+
errorCh.publish(ctx)
90101

91102
throw error
92103
}
@@ -95,6 +106,26 @@ function wrap (prefix, fn) {
95106
return wrapped
96107
}
97108

109+
// semver >=2 <3
110+
function wrapMaybeInvoke (_maybeInvoke, channelPrefix) {
111+
return function (fn, args) {
112+
if (!Array.isArray(args)) return _maybeInvoke.apply(this, arguments)
113+
114+
const callbackIndex = findCallbackIndex(args, 0)
115+
116+
if (callbackIndex === -1) return _maybeInvoke.apply(this, arguments)
117+
118+
const callback = args[callbackIndex]
119+
120+
if (typeof callback === 'function' && !callback._dd_wrapped) {
121+
const ctx = {}
122+
args[callbackIndex] = wrapCallback(callback, ctx, channelPrefix)
123+
}
124+
125+
return _maybeInvoke.apply(this, arguments)
126+
}
127+
}
128+
98129
// semver >=3
99130

100131
function wrapCBandPromise (fn, name, startData, thisArg, args) {
@@ -104,36 +135,36 @@ function wrapCBandPromise (fn, name, startData, thisArg, args) {
104135

105136
if (!startCh.hasSubscribers) return fn.apply(thisArg, args)
106137

107-
const asyncResource = new AsyncResource('bound-anonymous-fn')
108-
const callbackResource = new AsyncResource('bound-anonymous-fn')
109-
110-
return asyncResource.runInAsyncScope(() => {
111-
startCh.publish(startData)
112-
138+
const ctx = startData
139+
return startCh.runStores(ctx, () => {
113140
try {
114141
const cbIndex = findCallbackIndex(args, 1)
115142
if (cbIndex >= 0) {
116143
// v3 offers callback or promises event handling
117144
// NOTE: this does not work with v3.2.0-3.2.1 cluster.query, as there is a bug in the couchbase source code
118-
const cb = callbackResource.bind(args[cbIndex])
119-
args[cbIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
120-
if (error) {
121-
errorCh.publish(error)
122-
}
123-
finishCh.publish({ result })
124-
return cb.apply(thisArg, arguments)
125-
}))
145+
args[cbIndex] = shimmer.wrapFunction(args[cbIndex], (cb) => {
146+
return wrapCallbackFinish(cb, thisArg, args, errorCh, finishCh, ctx, `apm:couchbase:${name}`)
147+
})
126148
}
127149
const res = fn.apply(thisArg, args)
128150

129151
// semver >=3 will always return promise by default
130152
res.then(
131-
asyncResource.bind((result) => finishCh.publish({ result })),
132-
asyncResource.bind((err) => errorCh.publish(err)))
153+
(result) => {
154+
ctx.result = result
155+
finishCh.publish(ctx)
156+
},
157+
(err) => {
158+
ctx.error = err
159+
errorCh.publish(ctx)
160+
finishCh.publish(ctx)
161+
}
162+
)
133163
return res
134164
} catch (e) {
135165
e.stack
136-
errorCh.publish(e)
166+
ctx.error = e
167+
errorCh.publish(ctx)
137168
throw e
138169
}
139170
})
@@ -160,11 +191,14 @@ function wrapV3Query (query) {
160191

161192
// semver >=2 <3
162193
addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Bucket => {
194+
shimmer.wrap(Bucket.prototype, '_maybeInvoke', maybeInvoke => {
195+
return wrapMaybeInvoke(maybeInvoke, 'apm:couchbase:bucket:maybeInvoke')
196+
})
197+
163198
const startCh = channel('apm:couchbase:query:start')
164199
const finishCh = channel('apm:couchbase:query:finish')
165200
const errorCh = channel('apm:couchbase:query:error')
166201

167-
shimmer.wrap(Bucket.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke))
168202
shimmer.wrap(Bucket.prototype, 'query', query => wrapQuery(query))
169203

170204
shimmer.wrap(Bucket.prototype, '_n1qlReq', _n1qlReq => function (host, q, adhoc, emitter) {
@@ -176,24 +210,25 @@ addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Buc
176210

177211
const n1qlQuery = getQueryResource(q)
178212

179-
const asyncResource = new AsyncResource('bound-anonymous-fn')
180-
return asyncResource.runInAsyncScope(() => {
181-
startCh.publish({ resource: n1qlQuery, bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts })
182-
183-
emitter.once('rows', asyncResource.bind(() => {
184-
finishCh.publish()
185-
}))
213+
const ctx = { resource: n1qlQuery, bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts }
214+
return startCh.runStores(ctx, () => {
215+
emitter.once('rows', () => {
216+
finishCh.publish(ctx)
217+
})
186218

187-
emitter.once(errorMonitor, asyncResource.bind((error) => {
188-
errorCh.publish(error)
189-
finishCh.publish()
190-
}))
219+
emitter.once(errorMonitor, (error) => {
220+
if (!error) return
221+
ctx.error = error
222+
errorCh.publish(ctx)
223+
finishCh.publish(ctx)
224+
})
191225

192226
try {
193227
return _n1qlReq.apply(this, arguments)
194228
} catch (err) {
195229
err.stack // trigger getting the stack at the original throwing point
196-
errorCh.publish(err)
230+
ctx.error = err
231+
errorCh.publish(ctx)
197232

198233
throw err
199234
}
@@ -208,9 +243,11 @@ addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Buc
208243
})
209244

210245
addHook({ name: 'couchbase', file: 'lib/cluster.js', versions: ['^2.6.12'] }, Cluster => {
211-
shimmer.wrap(Cluster.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke))
212-
shimmer.wrap(Cluster.prototype, 'query', query => wrapQuery(query))
246+
shimmer.wrap(Cluster.prototype, '_maybeInvoke', maybeInvoke => {
247+
return wrapMaybeInvoke(maybeInvoke, 'apm:couchbase:cluster:maybeInvoke')
248+
})
213249

250+
shimmer.wrap(Cluster.prototype, 'query', query => wrapQuery(query))
214251
shimmer.wrap(Cluster.prototype, 'openBucket', openBucket => {
215252
return function () {
216253
const bucket = openBucket.apply(this, arguments)

packages/datadog-plugin-couchbase/src/index.js

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ class CouchBasePlugin extends StoragePlugin {
77
static id = 'couchbase'
88
static peerServicePrecursors = ['db.couchbase.seed.nodes']
99

10-
addSubs (func, start) {
11-
this.addSub(`apm:couchbase:${func}:start`, start)
12-
this.addSub(`apm:couchbase:${func}:error`, error => this.addError(error))
13-
this.addSub(`apm:couchbase:${func}:finish`, message => this.finish(message))
10+
addBinds (func, start) {
11+
this.addBind(`apm:couchbase:${func}:start`, start)
12+
this.addSub(`apm:couchbase:${func}:error`, ({ error }) => this.addError(error))
13+
this.addSub(`apm:couchbase:${func}:finish`, ctx => this.finish(ctx))
14+
this.addBind(`apm:couchbase:${func}:callback:start`, callbackStart)
15+
this.addBind(`apm:couchbase:${func}:callback:finish`, callbackFinish)
1416
}
1517

16-
startSpan (operation, customTags, store, { bucket, collection, seedNodes }) {
18+
startSpan (operation, customTags, { bucket, collection, seedNodes }, ctx) {
1719
const tags = {
1820
'db.type': 'couchbase',
1921
component: 'couchbase',
@@ -34,26 +36,34 @@ class CouchBasePlugin extends StoragePlugin {
3436
{
3537
service: this.serviceName({ pluginConfig: this.config }),
3638
meta: tags
37-
}
39+
},
40+
ctx
3841
)
3942
}
4043

4144
constructor (...args) {
4245
super(...args)
4346

44-
this.addSubs('query', ({ resource, bucket, seedNodes }) => {
45-
const store = storage('legacy').getStore()
46-
const span = this.startSpan(
47-
'query', {
47+
this.addBinds('query', (ctx) => {
48+
const { resource, bucket, seedNodes } = ctx
49+
50+
this.startSpan(
51+
'query',
52+
{
4853
'span.type': 'sql',
4954
'resource.name': resource,
5055
'span.kind': this.constructor.kind
5156
},
52-
store,
53-
{ bucket, seedNodes }
57+
{ bucket, seedNodes },
58+
ctx
5459
)
55-
this.enter(span, store)
60+
61+
return ctx.currentStore
5662
})
63+
this.addBind('apm:couchbase:bucket:maybeInvoke:callback:start', callbackStart)
64+
this.addBind('apm:couchbase:bucket:maybeInvoke:callback:finish', callbackFinish)
65+
this.addBind('apm:couchbase:cluster:maybeInvoke:callback:start', callbackStart)
66+
this.addBind('apm:couchbase:cluster:maybeInvoke:callback:finish', callbackFinish)
5767

5868
this._addCommandSubs('upsert')
5969
this._addCommandSubs('insert')
@@ -63,12 +73,22 @@ class CouchBasePlugin extends StoragePlugin {
6373
}
6474

6575
_addCommandSubs (name) {
66-
this.addSubs(name, ({ bucket, collection, seedNodes }) => {
67-
const store = storage('legacy').getStore()
68-
const span = this.startSpan(name, {}, store, { bucket, collection, seedNodes })
69-
this.enter(span, store)
76+
this.addBinds(name, (ctx) => {
77+
const { bucket, collection, seedNodes } = ctx
78+
79+
this.startSpan(name, {}, { bucket, collection, seedNodes }, ctx)
80+
return ctx.currentStore
7081
})
7182
}
7283
}
7384

85+
function callbackStart (ctx) {
86+
ctx.parentStore = storage('legacy').getStore()
87+
return ctx.parentStore
88+
}
89+
90+
function callbackFinish (ctx) {
91+
return ctx.parentStore
92+
}
93+
7494
module.exports = CouchBasePlugin

0 commit comments

Comments
 (0)