Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit c427488

Browse files
author
Amir Blum
authored
Merge pull request #10 from aspecto-io/aws-cb-context
feat(plugin-aws-sdk): sqs proper context and semantic conventions
2 parents 090168f + ba6f797 commit c427488

File tree

8 files changed

+300
-131
lines changed

8 files changed

+300
-131
lines changed

packages/plugin-aws-sdk/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "opentelemetry-plugin-aws-sdk",
3-
"version": "0.0.3",
3+
"version": "0.0.4",
44
"description": "open telemetry instrumentation for the `aws-sdk` package",
55
"keywords": [
66
"aws",

packages/plugin-aws-sdk/src/aws-sdk.ts

Lines changed: 124 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,24 @@ import { Span, CanonicalCode } from "@opentelemetry/api";
1313
import * as shimmer from "shimmer";
1414
import AWS from "aws-sdk";
1515
import { AttributeNames } from "./enums";
16-
import {
17-
getRequestServiceAttributes,
18-
getResponseServiceAttributes,
19-
} from "./services";
16+
import { ServicesExtensions } from "./services";
2017
import { AwsSdkPluginConfig } from "./types";
2118

22-
const VERSION = "0.0.3";
19+
const VERSION = "0.0.4";
2320

2421
class AwsPlugin extends BasePlugin<typeof AWS> {
2522
readonly component: string;
2623
protected _config: AwsSdkPluginConfig;
27-
private activeRequests: Set<AWS.Request<any, any>> = new Set();
24+
private REQUEST_SPAN_KEY = Symbol("opentelemetry.plugin.aws-sdk.span");
25+
private servicesExtensions: ServicesExtensions;
2826

2927
constructor(readonly moduleName: string) {
3028
super(`opentelemetry-plugin-aws-sdk`, VERSION);
3129
}
3230

3331
protected patch() {
32+
this.servicesExtensions = new ServicesExtensions(this._tracer);
33+
3434
this._logger.debug(
3535
"applying patch to %s@%s",
3636
this.moduleName,
@@ -40,12 +40,12 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
4040
shimmer.wrap(
4141
this._moduleExports?.Request.prototype,
4242
"send",
43-
this._patchRequestMethod()
43+
this._getRequestSendPatch.bind(this)
4444
);
4545
shimmer.wrap(
4646
this._moduleExports?.Request.prototype,
4747
"promise",
48-
this._patchRequestMethod()
48+
this._getRequestPromisePatch.bind(this)
4949
);
5050

5151
return this._moduleExports;
@@ -56,73 +56,135 @@ class AwsPlugin extends BasePlugin<typeof AWS> {
5656
shimmer.unwrap(this._moduleExports?.Request.prototype, "promise");
5757
}
5858

59-
private _patchRequestMethod = () => {
60-
return this._getPatchedRequestMethod;
61-
};
59+
private _bindPromise(target: Promise<any>, span: Span) {
60+
const thisPlugin = this;
61+
62+
const origThen = target.then;
63+
target.then = function (onFulfilled, onRejected) {
64+
const newOnFulfilled = thisPlugin._tracer.bind(onFulfilled, span);
65+
const newOnRejected = thisPlugin._tracer.bind(onRejected, span);
66+
return origThen.call(this, newOnFulfilled, newOnRejected);
67+
};
68+
69+
return target;
70+
}
71+
72+
private _startAwsSpan(request: AWS.Request<any, any>): Span {
73+
const operation = (request as any).operation;
74+
const service = (request as any).service;
75+
76+
const newSpan = this._tracer.startSpan(this._getSpanName(request), {
77+
attributes: {
78+
[AttributeNames.COMPONENT]: this.moduleName,
79+
[AttributeNames.AWS_OPERATION]: operation,
80+
[AttributeNames.AWS_SIGNATURE_VERSION]:
81+
service?.config?.signatureVersion,
82+
[AttributeNames.AWS_REGION]: service?.config?.region,
83+
[AttributeNames.AWS_SERVICE_API]: service?.api?.className,
84+
[AttributeNames.AWS_SERVICE_IDENTIFIER]: service?.serviceIdentifier,
85+
[AttributeNames.AWS_SERVICE_NAME]: service?.api?.abbreviation,
86+
},
87+
});
88+
89+
request[this.REQUEST_SPAN_KEY] = newSpan;
90+
return newSpan;
91+
}
92+
93+
private _callPreRequestHooks(span: Span, request: AWS.Request<any, any>) {
94+
if (this._config?.preRequestHook) {
95+
this._safeExecute(
96+
span,
97+
() => this._config.preRequestHook(span, request),
98+
false
99+
);
100+
}
101+
}
62102

63-
private _getPatchedRequestMethod = (original: Function) => {
103+
private _registerCompletedEvent(span: Span, request: AWS.Request<any, any>) {
64104
const thisPlugin = this;
65-
return function () {
66-
let span: Span | null = null;
105+
request.on("complete", (response) => {
106+
if (!request[thisPlugin.REQUEST_SPAN_KEY]) {
107+
return;
108+
}
109+
request[thisPlugin.REQUEST_SPAN_KEY] = undefined;
110+
111+
if (response.error) {
112+
span.setAttribute(AttributeNames.AWS_ERROR, response.error);
113+
}
114+
115+
this.servicesExtensions.responseHook(response, span);
116+
117+
span.setAttributes({
118+
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
119+
});
120+
span.end();
121+
});
122+
}
123+
124+
private _getRequestSendPatch(
125+
original: (callback?: (err: any, data: any) => void) => void
126+
) {
127+
const thisPlugin = this;
128+
return function (callback?: (err: any, data: any) => void) {
129+
const awsRequest: AWS.Request<any, any> = this;
67130
/*
68131
if the span was already started, we don't want to start a new one
69132
when Request.promise() is called
70133
*/
71-
72134
if (
73-
this._asm.currentState !== "complete" &&
74-
!thisPlugin.activeRequests.has(this)
135+
this._asm.currentState === "complete" ||
136+
awsRequest[thisPlugin.REQUEST_SPAN_KEY]
75137
) {
76-
thisPlugin.activeRequests.add(this);
77-
78-
span = thisPlugin._tracer.startSpan(thisPlugin._getSpanName(this), {
79-
attributes: {
80-
[AttributeNames.COMPONENT]: thisPlugin.moduleName,
81-
[AttributeNames.AWS_OPERATION]: this.operation,
82-
[AttributeNames.AWS_SIGNATURE_VERSION]: this.service?.config
83-
?.signatureVersion,
84-
[AttributeNames.AWS_REGION]: this.service?.config?.region,
85-
[AttributeNames.AWS_SERVICE_API]: this.service?.api?.className,
86-
[AttributeNames.AWS_SERVICE_IDENTIFIER]: this.service
87-
?.serviceIdentifier,
88-
[AttributeNames.AWS_SERVICE_NAME]: this.service?.api?.abbreviation,
89-
...getRequestServiceAttributes(this),
90-
},
91-
});
92-
93-
if (thisPlugin._config?.preRequestHook) {
94-
thisPlugin._safeExecute(
95-
span,
96-
() => thisPlugin._config.preRequestHook(span, this),
97-
false
98-
);
99-
}
100-
101-
(this as AWS.Request<any, any>).on("complete", (response) => {
102-
if (thisPlugin.activeRequests.has(this)) {
103-
thisPlugin.activeRequests.delete(this);
104-
}
105-
if (!span) return;
106-
107-
if (response.error) {
108-
span.setAttribute(AttributeNames.AWS_ERROR, response.error);
109-
}
110-
111-
span.setAttributes({
112-
[AttributeNames.AWS_REQUEST_ID]: response.requestId,
113-
...getResponseServiceAttributes(response),
114-
});
115-
span.end();
116-
span = null;
117-
});
138+
return original.apply(this, arguments);
118139
}
119140

120-
const awsRequest = this;
141+
const span = thisPlugin._startAwsSpan(awsRequest);
142+
thisPlugin.servicesExtensions.requestHook(awsRequest, span);
143+
thisPlugin._callPreRequestHooks(span, awsRequest);
144+
thisPlugin._registerCompletedEvent(span, awsRequest);
145+
146+
const callbackWithContext = thisPlugin._tracer.bind(callback, span);
121147
return thisPlugin._tracer.withSpan(span, () => {
122-
return original.apply(awsRequest, arguments);
148+
return original.call(awsRequest, callbackWithContext);
123149
});
124150
};
125-
};
151+
}
152+
153+
private _getRequestPromisePatch(original: () => Promise<any>) {
154+
const thisPlugin = this;
155+
return function (): Promise<any> {
156+
const awsRequest: AWS.Request<any, any> = this;
157+
/*
158+
if the span was already started, we don't want to start a new one
159+
when Request.promise() is called
160+
*/
161+
if (
162+
this._asm.currentState === "complete" ||
163+
awsRequest[thisPlugin.REQUEST_SPAN_KEY]
164+
) {
165+
return original.apply(this, arguments);
166+
}
167+
168+
const span = thisPlugin._startAwsSpan(awsRequest);
169+
const requestMetadata = thisPlugin.servicesExtensions.requestHook(
170+
awsRequest,
171+
span
172+
);
173+
thisPlugin._callPreRequestHooks(span, awsRequest);
174+
thisPlugin._registerCompletedEvent(span, awsRequest);
175+
176+
const origPromise: Promise<any> = thisPlugin._tracer.withSpan(
177+
span,
178+
() => {
179+
return original.apply(awsRequest, arguments);
180+
}
181+
);
182+
183+
return requestMetadata.isIncoming
184+
? thisPlugin._bindPromise(origPromise, span)
185+
: origPromise;
186+
};
187+
}
126188

127189
private _getSpanName = (request: any) => {
128190
return `aws.${request.service?.serviceIdentifier ?? "request"}.${
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { Span } from "@opentelemetry/api";
2+
3+
/*
4+
isIncoming - if true, then the operation callback / promise should be bind with the operation's span
5+
*/
6+
export interface RequestMetadata {
7+
isIncoming: boolean;
8+
}
9+
10+
export interface ServiceExtension {
11+
requestHook: (request: AWS.Request<any, any>, span: Span) => RequestMetadata;
12+
responseHook: (response: AWS.Response<any, any>, span: Span) => void;
13+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Tracer, Span } from "@opentelemetry/api";
2+
import { ServiceExtension, RequestMetadata } from "./ServiceExtension";
3+
import { SqsServiceExtension } from "./sqs";
4+
import * as AWS from "aws-sdk";
5+
6+
export class ServicesExtensions implements ServiceExtension {
7+
services: Map<string, ServiceExtension> = new Map();
8+
9+
constructor(tracer: Tracer) {
10+
this.services.set("sqs", new SqsServiceExtension(tracer));
11+
}
12+
13+
requestHook(request: AWS.Request<any, any>, span: Span): RequestMetadata {
14+
const serviceId = (request as any)?.service?.serviceIdentifier;
15+
const serviceExtension = this.services.get(serviceId);
16+
if (!serviceExtension) return { isIncoming: false };
17+
return serviceExtension.requestHook(request, span);
18+
}
19+
20+
responseHook(response: AWS.Response<any, any>, span: Span) {
21+
const serviceId = (response as any)?.request?.service?.serviceIdentifier;
22+
const serviceExtension = this.services.get(serviceId);
23+
if (!serviceExtension) return;
24+
serviceExtension.responseHook(response, span);
25+
}
26+
}
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1 @@
1-
export {
2-
getRequestServiceAttributes,
3-
getResponseServiceAttributes,
4-
} from "./service-attributes";
1+
export { ServicesExtensions } from "./ServicesExtensions";

packages/plugin-aws-sdk/src/services/s3.ts

Lines changed: 0 additions & 13 deletions
This file was deleted.

packages/plugin-aws-sdk/src/services/service-attributes.ts

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)