Skip to content

Commit 5c50d35

Browse files
crysmagsrochdev
authored andcommitted
Migrating Postgres from AsyncResource to Diagnostics Channel (#6189)
* migrating pg away from async resource: * PR to PR: Fixing pg appsec tests (#6225)
1 parent 1df1206 commit 5c50d35

File tree

7 files changed

+77
-74
lines changed

7 files changed

+77
-74
lines changed

packages/datadog-instrumentations/src/pg.js

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
const {
44
channel,
5-
addHook,
6-
AsyncResource
5+
addHook
76
} = require('./helpers/instrument')
87
const shimmer = require('../../datadog-shimmer')
98

@@ -33,8 +32,6 @@ function wrapQuery (query) {
3332
return query.apply(this, arguments)
3433
}
3534

36-
const callbackResource = new AsyncResource('bound-anonymous-fn')
37-
const asyncResource = new AsyncResource('bound-anonymous-fn')
3835
const processId = this.processID
3936

4037
const pgQuery = arguments[0] !== null && typeof arguments[0] === 'object'
@@ -46,34 +43,34 @@ function wrapQuery (query) {
4643
const stream = typeof textPropObj.read === 'function'
4744

4845
// Only alter `text` property if safe to do so. Initially, it's a property, not a getter.
46+
let originalText
4947
if (!textProp || textProp.configurable) {
50-
const originalText = textPropObj.text
48+
originalText = textPropObj.text
5149

5250
Object.defineProperty(textPropObj, 'text', {
5351
get () {
5452
return this?.__ddInjectableQuery || originalText
5553
}
5654
})
5755
}
58-
59-
return asyncResource.runInAsyncScope(() => {
60-
const abortController = new AbortController()
61-
62-
startCh.publish({
63-
params: this.connectionParameters,
64-
query: textPropObj,
65-
processId,
66-
abortController,
67-
stream
68-
})
69-
70-
const finish = asyncResource.bind(function (error, res) {
71-
if (error) {
72-
errorCh.publish(error)
73-
}
74-
finishCh.publish({ result: res?.rows })
75-
})
76-
56+
const abortController = new AbortController()
57+
const ctx = {
58+
params: this.connectionParameters,
59+
query: textPropObj,
60+
originalText,
61+
processId,
62+
abortController,
63+
stream
64+
}
65+
const finish = (error, res) => {
66+
if (error) {
67+
ctx.error = error
68+
errorCh.publish(ctx)
69+
}
70+
ctx.result = res?.rows
71+
return finishCh.publish(ctx)
72+
}
73+
return startCh.runStores(ctx, () => {
7774
if (abortController.signal.aborted) {
7875
const error = abortController.signal.reason || new Error('Aborted')
7976

@@ -121,10 +118,10 @@ function wrapQuery (query) {
121118
}
122119

123120
if (newQuery.callback) {
124-
const originalCallback = callbackResource.bind(newQuery.callback)
125-
newQuery.callback = function (err, res) {
126-
finish(err, res)
127-
return originalCallback.apply(this, arguments)
121+
const originalCallback = newQuery.callback
122+
newQuery.callback = function (err, ...args) {
123+
finish(err, ...args)
124+
return finishCh.runStores(ctx, originalCallback, this, err, ...args)
128125
}
129126
} else if (newQuery.once) {
130127
newQuery
@@ -139,40 +136,33 @@ function wrapQuery (query) {
139136

140137
try {
141138
return retval
142-
} catch (err) {
143-
errorCh.publish(err)
139+
} catch (error) {
140+
ctx.error = error
141+
errorCh.publish(ctx)
144142
}
145143
})
146144
}
147145
}
148-
146+
const finish = (ctx) => {
147+
finishPoolQueryCh.publish(ctx)
148+
}
149149
function wrapPoolQuery (query) {
150150
return function () {
151151
if (!startPoolQueryCh.hasSubscribers) {
152152
return query.apply(this, arguments)
153153
}
154154

155-
const asyncResource = new AsyncResource('bound-anonymous-fn')
156-
157155
const pgQuery = arguments[0] !== null && typeof arguments[0] === 'object' ? arguments[0] : { text: arguments[0] }
156+
const abortController = new AbortController()
158157

159-
return asyncResource.runInAsyncScope(() => {
160-
const abortController = new AbortController()
161-
162-
startPoolQueryCh.publish({
163-
query: pgQuery,
164-
abortController
165-
})
166-
167-
const finish = asyncResource.bind(function () {
168-
finishPoolQueryCh.publish()
169-
})
158+
const ctx = { query: pgQuery, abortController }
170159

160+
return startPoolQueryCh.runStores(ctx, () => {
171161
const cb = arguments[arguments.length - 1]
172162

173163
if (abortController.signal.aborted) {
174164
const error = abortController.signal.reason || new Error('Aborted')
175-
finish()
165+
finish(ctx)
176166

177167
if (typeof cb === 'function') {
178168
cb(error)
@@ -184,7 +174,7 @@ function wrapPoolQuery (query) {
184174

185175
if (typeof cb === 'function') {
186176
arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => function () {
187-
finish()
177+
finish(ctx)
188178
return cb.apply(this, arguments)
189179
})
190180
}
@@ -193,9 +183,9 @@ function wrapPoolQuery (query) {
193183

194184
if (retval?.then) {
195185
retval.then(() => {
196-
finish()
186+
finish(ctx)
197187
}).catch(() => {
198-
finish()
188+
finish(ctx)
199189
})
200190
}
201191

packages/datadog-plugin-pg/src/index.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ class PGPlugin extends DatabasePlugin {
88
static operation = 'query'
99
static system = 'postgres'
1010

11-
start ({ params = {}, query, processId, stream }) {
11+
bindStart (ctx) {
12+
const { params = {}, query, processId, stream } = ctx
1213
const service = this.serviceName({ pluginConfig: this.config, params })
1314
const originalStatement = this.maybeTruncate(query.text)
1415

@@ -25,13 +26,15 @@ class PGPlugin extends DatabasePlugin {
2526
'out.host': params.host,
2627
[CLIENT_PORT_KEY]: params.port
2728
}
28-
})
29+
}, ctx)
2930

3031
if (stream) {
3132
span.setTag('db.stream', 1)
3233
}
3334

3435
query.__ddInjectableQuery = this.injectDbmQuery(span, query.text, service, !!query.name)
36+
37+
return ctx.currentStore
3538
}
3639
}
3740

packages/dd-trace/src/appsec/iast/analyzers/sql-injection-analyzer.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@ class SqlInjectionAnalyzer extends StoredInjectionAnalyzer {
1616
onConfigure () {
1717
this.addSub('apm:mysql:query:start', ({ sql }) => this.analyze(sql, undefined, 'MYSQL'))
1818
this.addSub('datadog:mysql2:outerquery:start', ({ sql }) => this.analyze(sql, undefined, 'MYSQL'))
19-
this.addSub('apm:pg:query:start', ({ query }) => this.analyze(query.text, undefined, 'POSTGRES'))
19+
this.addSub(
20+
'apm:pg:query:start',
21+
({ originalText, query }) => this.analyze(originalText || query.text, undefined, 'POSTGRES')
22+
)
2023

2124
this.addBind(
2225
'datadog:sequelize:query:start',
2326
({ sql, dialect }) => this.getStoreAndAnalyze(sql, dialect.toUpperCase())
2427
)
2528
this.addSub('datadog:sequelize:query:finish', () => this.returnToParentStore())
2629

27-
this.addSub('datadog:pg:pool:query:start', ({ query }) => this.setStoreAndAnalyze(query.text, 'POSTGRES'))
30+
this.addBind('datadog:pg:pool:query:start', ({ query }) => this.getStoreAndAnalyze(query.text, 'POSTGRES'))
2831
this.addSub('datadog:pg:pool:query:finish', () => this.returnToParentStore())
2932

3033
this.addSub('datadog:mysql:pool:query:start', ({ sql }) => this.setStoreAndAnalyze(sql, 'MYSQL'))

packages/dd-trace/src/appsec/iast/taint-tracking/plugin.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,12 @@ class TaintTrackingPlugin extends SourceIastPlugin {
139139
addDatabaseSubscriptions () {
140140
this.addSub(
141141
{ channelName: 'datadog:sequelize:query:finish', tag: SQL_ROW_VALUE },
142-
({ result }) => this._taintDatabaseResult(result, 'sequelize')
142+
({ result }) => this._taintDatabaseResult(result, 'sequelize', getIastContext(storage('legacy').getStore()))
143143
)
144144

145145
this.addSub(
146146
{ channelName: 'apm:pg:query:finish', tag: SQL_ROW_VALUE },
147-
({ result }) => this._taintDatabaseResult(result, 'pg')
147+
({ result, currentStore }) => this._taintDatabaseResult(result, 'pg', getIastContext(currentStore))
148148
)
149149
}
150150

@@ -263,7 +263,7 @@ class TaintTrackingPlugin extends SourceIastPlugin {
263263
this.taintUrl(req, iastContext)
264264
}
265265

266-
_taintDatabaseResult (result, dbOrigin, iastContext = getIastContext(storage('legacy').getStore()), name) {
266+
_taintDatabaseResult (result, dbOrigin, iastContext, name) {
267267
if (!iastContext) return result
268268

269269
if (this._rowsToTaint === 0) return result

packages/dd-trace/test/appsec/iast/analyzers/sql-injection-analyzer.spec.js

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,20 @@ describe('sql-injection-analyzer', () => {
6464
sqlInjectionAnalyzer.configure(true)
6565

6666
it('should subscribe to mysql, mysql2 and pg start query channel', () => {
67-
expect(sqlInjectionAnalyzer._subscriptions).to.have.lengthOf(10)
67+
expect(sqlInjectionAnalyzer._subscriptions).to.have.lengthOf(9)
6868
expect(sqlInjectionAnalyzer._subscriptions[0]._channel.name).to.equals('apm:mysql:query:start')
6969
expect(sqlInjectionAnalyzer._subscriptions[1]._channel.name).to.equals('datadog:mysql2:outerquery:start')
7070
expect(sqlInjectionAnalyzer._subscriptions[2]._channel.name).to.equals('apm:pg:query:start')
7171
expect(sqlInjectionAnalyzer._subscriptions[3]._channel.name).to.equals('datadog:sequelize:query:finish')
72-
expect(sqlInjectionAnalyzer._subscriptions[4]._channel.name).to.equals('datadog:pg:pool:query:start')
73-
expect(sqlInjectionAnalyzer._subscriptions[5]._channel.name).to.equals('datadog:pg:pool:query:finish')
74-
expect(sqlInjectionAnalyzer._subscriptions[6]._channel.name).to.equals('datadog:mysql:pool:query:start')
75-
expect(sqlInjectionAnalyzer._subscriptions[7]._channel.name).to.equals('datadog:mysql:pool:query:finish')
76-
expect(sqlInjectionAnalyzer._subscriptions[8]._channel.name).to.equals('datadog:knex:raw:start')
77-
expect(sqlInjectionAnalyzer._subscriptions[9]._channel.name).to.equals('datadog:knex:raw:finish')
78-
79-
expect(sqlInjectionAnalyzer._bindings).to.have.lengthOf(1)
72+
expect(sqlInjectionAnalyzer._subscriptions[4]._channel.name).to.equals('datadog:pg:pool:query:finish')
73+
expect(sqlInjectionAnalyzer._subscriptions[5]._channel.name).to.equals('datadog:mysql:pool:query:start')
74+
expect(sqlInjectionAnalyzer._subscriptions[6]._channel.name).to.equals('datadog:mysql:pool:query:finish')
75+
expect(sqlInjectionAnalyzer._subscriptions[7]._channel.name).to.equals('datadog:knex:raw:start')
76+
expect(sqlInjectionAnalyzer._subscriptions[8]._channel.name).to.equals('datadog:knex:raw:finish')
77+
78+
expect(sqlInjectionAnalyzer._bindings).to.have.lengthOf(2)
8079
expect(sqlInjectionAnalyzer._bindings[0]._channel.name).to.equals('datadog:sequelize:query:start')
80+
expect(sqlInjectionAnalyzer._bindings[1]._channel.name).to.equals('datadog:pg:pool:query:start')
8181
})
8282

8383
it('should not detect vulnerability when no query', () => {
@@ -206,23 +206,23 @@ describe('sql-injection-analyzer', () => {
206206
it('should call analyze on apm:mysql:query:start', () => {
207207
const onMysqlQueryStart = sqlInjectionAnalyzer._subscriptions[0]._handler
208208

209-
onMysqlQueryStart({ sql: 'SELECT 1', name: 'apm:mysql:query:start' })
209+
onMysqlQueryStart({ sql: 'SELECT 1' })
210210

211211
expect(analyze).to.be.calledOnceWith('SELECT 1')
212212
})
213213

214214
it('should call analyze on apm:mysql2:query:start', () => {
215-
const onMysql2QueryStart = sqlInjectionAnalyzer._subscriptions[0]._handler
215+
const onMysql2QueryStart = sqlInjectionAnalyzer._subscriptions[1]._handler
216216

217-
onMysql2QueryStart({ sql: 'SELECT 1', name: 'apm:mysql2:query:start' })
217+
onMysql2QueryStart({ sql: 'SELECT 1' })
218218

219219
expect(analyze).to.be.calledOnceWith('SELECT 1')
220220
})
221221

222222
it('should call analyze on apm:pg:query:start', () => {
223-
const onPgQueryStart = sqlInjectionAnalyzer._subscriptions[0]._handler
223+
const onPgQueryStart = sqlInjectionAnalyzer._subscriptions[2]._handler
224224

225-
onPgQueryStart({ sql: 'SELECT 1', name: 'apm:pg:query:start' })
225+
onPgQueryStart({ originalText: 'SELECT 1', query: { text: 'modified-query SELECT 1' } })
226226

227227
expect(analyze).to.be.calledOnceWith('SELECT 1')
228228
})

packages/dd-trace/test/appsec/iast/taint-tracking/sources/sql_row.pg.plugin.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,6 @@ describe('db sources with pg', () => {
108108
res.end('OK')
109109
}, 'COMMAND_INJECTION', null, 'Should not detect COMMAND_INJECTION with database source')
110110
})
111-
})
111+
}, undefined, ['pg'])
112112
})
113113
})

packages/dd-trace/test/appsec/iast/utils.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ function checkVulnerabilityInRequest (
239239
}
240240
}
241241

242-
function prepareTestServerForIast (description, tests, iastConfig) {
242+
function prepareTestServerForIast (description, tests, iastConfig, pluginsToConfigure = []) {
243243
describe(description, () => {
244244
const config = {}
245245
let http
@@ -254,7 +254,7 @@ function prepareTestServerForIast (description, tests, iastConfig) {
254254
})
255255

256256
before(() => {
257-
return agent.load('http', undefined, { flushInterval: 1 })
257+
return agent.load(['http', ...pluginsToConfigure], { client: false }, { flushInterval: 1 })
258258
.then(() => {
259259
http = require('http')
260260
})
@@ -308,7 +308,14 @@ function prepareTestServerForIast (description, tests, iastConfig) {
308308
})
309309
}
310310

311-
function prepareTestServerForIastInExpress (description, expressVersion, loadMiddlewares, tests, iastConfig) {
311+
function prepareTestServerForIastInExpress (
312+
description,
313+
expressVersion,
314+
loadMiddlewares,
315+
tests,
316+
iastConfig,
317+
pluginsToConfigure = []
318+
) {
312319
if (arguments.length === 3) {
313320
tests = loadMiddlewares
314321
loadMiddlewares = undefined
@@ -319,7 +326,7 @@ function prepareTestServerForIastInExpress (description, expressVersion, loadMid
319326
let listener, app, server
320327

321328
before(() => {
322-
return agent.load(['express', 'http'], { client: false }, { flushInterval: 1 })
329+
return agent.load(['express', 'http', ...pluginsToConfigure], { client: false }, { flushInterval: 1 })
323330
})
324331

325332
before(() => {

0 commit comments

Comments
 (0)