Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions packages/instrumentation-redis/src/v4-v5/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,15 @@ export class RedisInstrumentationV4_V5 extends InstrumentationBase<RedisInstrume
this._wrap(
redisClientMultiCommandPrototype,
'exec',
this._getPatchMultiCommandsExec()
this._getPatchMultiCommandsExec(false)
);
if (isWrapped(redisClientMultiCommandPrototype?.execAsPipeline)) {
this._unwrap(redisClientMultiCommandPrototype, 'execAsPipeline');
}
this._wrap(
redisClientMultiCommandPrototype,
'execAsPipeline',
this._getPatchMultiCommandsExec(true)
);

if (isWrapped(redisClientMultiCommandPrototype?.addCommand)) {
Expand Down Expand Up @@ -277,36 +285,36 @@ export class RedisInstrumentationV4_V5 extends InstrumentationBase<RedisInstrume
};
}

private _getPatchMultiCommandsExec() {
private _getPatchMultiCommandsExec(isPipeline: boolean) {
const plugin = this;
return function execPatchWrapper(original: Function) {
return function execPatch(this: any) {
const execRes = original.apply(this, arguments);
if (typeof execRes?.then !== 'function') {
plugin._diag.error(
'got non promise result when patching RedisClientMultiCommand.exec'
'non-promise result when patching exec/execAsPipeline'
);
return execRes;
}

return execRes
.then((redisRes: unknown[]) => {
const openSpans = this[OTEL_OPEN_SPANS];
plugin._endSpansWithRedisReplies(openSpans, redisRes);
plugin._endSpansWithRedisReplies(openSpans, redisRes, isPipeline);
return redisRes;
})
.catch((err: Error) => {
const openSpans = this[OTEL_OPEN_SPANS];
if (!openSpans) {
plugin._diag.error(
'cannot find open spans to end for redis multi command'
'cannot find open spans to end for multi/pipeline'
);
} else {
const replies =
err.constructor.name === 'MultiErrorReply'
? (err as MultiErrorReply).replies
: new Array(openSpans.length).fill(err);
plugin._endSpansWithRedisReplies(openSpans, replies);
plugin._endSpansWithRedisReplies(openSpans, replies, isPipeline);
}
return Promise.reject(err);
});
Expand Down Expand Up @@ -472,27 +480,40 @@ export class RedisInstrumentationV4_V5 extends InstrumentationBase<RedisInstrume

private _endSpansWithRedisReplies(
openSpans: Array<MutliCommandInfo>,
replies: unknown[]
replies: unknown[],
isPipeline = false
) {
if (!openSpans) {
return this._diag.error(
'cannot find open spans to end for redis multi command'
'cannot find open spans to end for redis multi/pipeline'
);
}
if (replies.length !== openSpans.length) {
return this._diag.error(
'number of multi command spans does not match response from redis'
);
}
// Determine a single operation name for the batch of commands.
// If all commands are identical, include the command name (e.g., "MULTI SET").
// Otherwise, use a generic "MULTI" or "PIPELINE" label for the span.
const allCommands = openSpans.map(s => s.commandName);
const allSameCommand = allCommands.every(cmd => cmd === allCommands[0]);
const operationName = allSameCommand
? (isPipeline ? 'PIPELINE ' : 'MULTI ') + allCommands[0]
: isPipeline
? 'PIPELINE'
: 'MULTI';

for (let i = 0; i < openSpans.length; i++) {
const { span, commandName, commandArgs } = openSpans[i];
const { span, commandArgs } = openSpans[i];
const currCommandRes = replies[i];
const [res, err] =
currCommandRes instanceof Error
? [null, currCommandRes]
: [currCommandRes, undefined];
this._endSpanWithResponse(span, commandName, commandArgs, res, err);
span.setAttribute(ATTR_DB_OPERATION_NAME, operationName);

this._endSpanWithResponse(span, allCommands[i], commandArgs, res, err);
}
}

Expand Down
78 changes: 75 additions & 3 deletions packages/instrumentation-redis/test/v4-v5/redis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ describe('redis v4-v5', () => {
);
assert.strictEqual(
multiSetSpan?.attributes[ATTR_DB_OPERATION_NAME],
'SET'
'MULTI'
);

assert.ok(multiGetSpan);
Expand Down Expand Up @@ -487,7 +487,7 @@ describe('redis v4-v5', () => {
);
assert.strictEqual(
multiGetSpan?.attributes[ATTR_DB_OPERATION_NAME],
'GET'
'MULTI'
);
});

Expand Down Expand Up @@ -530,7 +530,7 @@ describe('redis v4-v5', () => {
);
assert.strictEqual(
multiSetSpan?.attributes[ATTR_DB_OPERATION_NAME],
'SET'
'MULTI SET'
);
});

Expand Down Expand Up @@ -824,4 +824,76 @@ describe('redis v4-v5', () => {
);
});
});
describe('pipeline commands', () => {
it('should trace all commands in a pipeline with a mixed set of commands', async () => {
await client.set('another-key', 'another-value');

const [setKeyReply, otherKeyValue] = await client
.multi()
.set('key', 'value')
.get('another-key')
.execAsPipeline();

assert.strictEqual(setKeyReply, 'OK');
assert.strictEqual(otherKeyValue, 'another-value');

const [setSpan, pipelineSetSpan, pipelineGetSpan] = getTestSpans();

assert.ok(setSpan);

assert.ok(pipelineSetSpan);
assert.strictEqual(pipelineSetSpan.name, 'redis-SET');
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_STATEMENT],
'SET key [1 other arguments]'
);
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_QUERY_TEXT],
'SET key [1 other arguments]'
);
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_OPERATION_NAME],
'PIPELINE'
);

assert.ok(pipelineGetSpan);
assert.strictEqual(pipelineGetSpan.name, 'redis-GET');
assert.strictEqual(
pipelineGetSpan.attributes[ATTR_DB_STATEMENT],
'GET another-key'
);
assert.strictEqual(
pipelineGetSpan.attributes[ATTR_DB_QUERY_TEXT],
'GET another-key'
);
assert.strictEqual(
pipelineGetSpan.attributes[ATTR_DB_OPERATION_NAME],
'PIPELINE'
);
});

it('should trace all commands in a pipeline with a same set of commands', async () => {
const [setReply] = await client
.multi()
.addCommand(['SET', 'key', 'value'])
.execAsPipeline();

assert.strictEqual(setReply, 'OK');

const [pipelineSetSpan] = getTestSpans();
assert.ok(pipelineSetSpan);
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_STATEMENT],
'SET key [1 other arguments]'
);
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_QUERY_TEXT],
'SET key [1 other arguments]'
);
assert.strictEqual(
pipelineSetSpan.attributes[ATTR_DB_OPERATION_NAME],
'PIPELINE SET'
);
});
});
});