Skip to content

Commit f0708bc

Browse files
committed
fix(otel): revert metrics to be tracked in the sendCommand method
1 parent 72d2eac commit f0708bc

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, Pu
66
import { AbortError, ErrorReply, CommandTimeoutDuringMaintenanceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88
import { dbgMaintenance } from './enterprise-maintenance-manager';
9-
import { OTelClientAttributes, OTelMetrics } from '../opentelemetry';
109

1110
export interface CommandOptions<T = TypeMapping> {
1211
chainId?: symbol;
@@ -200,35 +199,23 @@ export default class RedisCommandsQueue {
200199

201200
addCommand<T>(
202201
args: ReadonlyArray<RedisArgument>,
203-
options?: CommandOptions,
204-
otelAttributes?: OTelClientAttributes
202+
options?: CommandOptions
205203
): Promise<T> {
206204
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
207205
return Promise.reject(new Error('The queue is full'));
208206
} else if (options?.abortSignal?.aborted) {
209207
return Promise.reject(new AbortError());
210208
}
211209

212-
const recordOperation = OTelMetrics.instance.commandMetrics.createRecordOperationDuration(args, otelAttributes);
213-
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(1, otelAttributes);
214-
215210
return new Promise((resolve, reject) => {
216211
let node: DoublyLinkedNode<CommandToWrite>;
217212
const value: CommandToWrite = {
218213
args,
219214
chainId: options?.chainId,
220215
abort: undefined,
221216
timeout: undefined,
222-
resolve: reply => {
223-
recordOperation()
224-
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(-1, otelAttributes);
225-
resolve(reply as T);
226-
},
227-
reject: (err) => {
228-
recordOperation(err as Error);
229-
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(-1, otelAttributes);
230-
reject(err);
231-
},
217+
resolve,
218+
reject,
232219
channelsCounter: undefined,
233220
typeMapping: options?.typeMapping
234221
};

packages/client/lib/client/index.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
2323
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager';
24-
import { OTelClientAttributes } from '../opentelemetry';
24+
import { OTelClientAttributes, OTelMetrics } from '../opentelemetry';
2525

2626
export interface RedisClientOptions<
2727
M extends RedisModules = RedisModules,
@@ -652,7 +652,7 @@ export default class RedisClient<
652652
.addCommand(cmd, {
653653
chainId,
654654
asap
655-
}, this._getClientOTelAttributes())
655+
})
656656
.catch(errorHandler)
657657
);
658658
}
@@ -1076,12 +1076,17 @@ export default class RedisClient<
10761076
args: ReadonlyArray<RedisArgument>,
10771077
options?: CommandOptions
10781078
): Promise<T> {
1079+
const clientAttributes = this._self._getClientOTelAttributes();
1080+
const recordOperation = OTelMetrics.instance.commandMetrics.createRecordOperationDuration(args, clientAttributes);
1081+
10791082
if (!this._self.#socket.isOpen) {
1083+
recordOperation(new ClientClosedError());
10801084
return Promise.reject(new ClientClosedError());
10811085
} else if (
10821086
!this._self.#socket.isReady &&
10831087
this._self.#options.disableOfflineQueue
10841088
) {
1089+
recordOperation(new ClientOfflineError());
10851090
return Promise.reject(new ClientOfflineError());
10861091
}
10871092

@@ -1091,11 +1096,29 @@ export default class RedisClient<
10911096
...options,
10921097
};
10931098

1094-
const promise = this._self.#queue.addCommand<T>(args, opts, this._self._getClientOTelAttributes());
1099+
const promise = this._self.#queue.addCommand<T>(args, opts);
10951100

1101+
if (OTelMetrics.isInitialized()) {
1102+
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(1, clientAttributes);
10961103

1097-
this._self.#scheduleWrite();
1098-
return promise;
1104+
const trackedPromise = promise
1105+
.then((reply) => {
1106+
recordOperation();
1107+
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(-1, clientAttributes);
1108+
return reply;
1109+
})
1110+
.catch((err) => {
1111+
recordOperation(err);
1112+
OTelMetrics.instance.connectionAdvancedMetrics.recordPendingRequests(-1, clientAttributes);
1113+
throw err;
1114+
});
1115+
1116+
this._self.#scheduleWrite();
1117+
return trackedPromise;
1118+
} else {
1119+
this._self.#scheduleWrite();
1120+
return promise;
1121+
}
10991122
}
11001123

11011124
async SELECT(db: number): Promise<void> {
@@ -1331,20 +1354,20 @@ export default class RedisClient<
13311354
const typeMapping = this._commandOptions?.typeMapping;
13321355
const chainId = Symbol('MULTI Chain');
13331356
const promises = [
1334-
this._self.#queue.addCommand(['MULTI'], { chainId }, this._self._getClientOTelAttributes()),
1357+
this._self.#queue.addCommand(['MULTI'], { chainId }),
13351358
];
13361359

13371360
for (const { args } of commands) {
13381361
promises.push(
13391362
this._self.#queue.addCommand(args, {
13401363
chainId,
13411364
typeMapping
1342-
}, this._self._getClientOTelAttributes())
1365+
})
13431366
);
13441367
}
13451368

13461369
promises.push(
1347-
this._self.#queue.addCommand(['EXEC'], { chainId }, this._self._getClientOTelAttributes())
1370+
this._self.#queue.addCommand(['EXEC'], { chainId })
13481371
);
13491372

13501373
this._self.#scheduleWrite();
@@ -1520,7 +1543,7 @@ export default class RedisClient<
15201543
this._self.#credentialsSubscription = null;
15211544
return this._self.#socket.quit(async () => {
15221545
clearTimeout(this._self.#pingTimer);
1523-
const quitPromise = this._self.#queue.addCommand<string>(['QUIT'], undefined, this._self._getClientOTelAttributes());
1546+
const quitPromise = this._self.#queue.addCommand<string>(['QUIT']);
15241547
this._self.#scheduleWrite();
15251548
return quitPromise;
15261549
});

0 commit comments

Comments
 (0)