Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export const AWS_ATTRIBUTE_KEYS: { [key: string]: string } = {
// Used for JavaScript workaround - attribute for pre-calculated value of isLocalRoot
AWS_IS_LOCAL_ROOT: 'aws.is.local.root',

// Naming divergence from Java/Python
// AWS_#_NAME attributes are not supported in JavaScript as they are not part of the Semantic Conventions.
// TODO:Move to Semantic Conventions when these attributes are added.
AWS_S3_BUCKET: 'aws.s3.bucket',
AWS_SQS_QUEUE_URL: 'aws.sqs.queue.url',
AWS_SQS_QUEUE_NAME: 'aws.sqs.queue.name',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AttributeValue } from '@opentelemetry/api';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '@opentelemetry/instrumentation-aws-sdk';
import { SEMATTRS_MESSAGING_URL } from '@opentelemetry/semantic-conventions';
import { AWS_ATTRIBUTE_KEYS } from '../aws-attribute-keys';
import { SqsUrlParser } from '../sqs-url-parser';
import { RequestMetadata } from '../third-party/otel/aws/services/ServiceExtension';
import { KinesisServiceExtension } from './aws/services/kinesis';
import { S3ServiceExtension } from './aws/services/s3';
Expand All @@ -30,31 +27,47 @@ export function applyInstrumentationPatches(instrumentations: Instrumentation[])
if (services) {
services.set('S3', new S3ServiceExtension());
services.set('Kinesis', new KinesisServiceExtension());
const sqsServiceExtension: any = services.get('SQS');
// It is not expected that `sqsServiceExtension` is undefined
if (sqsServiceExtension) {
const requestPreSpanHook = sqsServiceExtension.requestPreSpanHook;
// Save original `requestPreSpanHook` under a similar name, to be invoked by the patched hook
sqsServiceExtension._requestPreSpanHook = requestPreSpanHook;
// The patched hook will populate the 'aws.sqs.queue.url' and 'aws.sqs.queue.name' attributes according to spec
// from the 'messaging.url' attribute
const patchedRequestPreSpanHook = (
request: NormalizedRequest,
_config: AwsSdkInstrumentationConfig
): RequestMetadata => {
const requestMetadata: RequestMetadata = sqsServiceExtension._requestPreSpanHook(request, _config);
// It is not expected that `requestMetadata.spanAttributes` can possibly be undefined, but still be careful anyways
if (requestMetadata.spanAttributes) {
const queueUrl: AttributeValue | undefined = requestMetadata.spanAttributes[SEMATTRS_MESSAGING_URL];
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL] = queueUrl;
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME] =
SqsUrlParser.getQueueName(queueUrl);
}
return requestMetadata;
};
sqsServiceExtension.requestPreSpanHook = patchedRequestPreSpanHook;
}
patchSqsServiceExtension(services.get('SQS'));
}
}
});
}

/*
* This patch extends the existing upstream extension for SQS. Extensions allow for custom logic for adding
* service-specific information to spans, such as attributes. Specifically, we are adding logic to add
* `aws.sqs.queue.url` and `aws.sqs.queue.name` attributes, to be used to generate RemoteTarget and achieve parity
* with the Java/Python instrumentation.
*
* Callout that today, the upstream logic adds `messaging.url` and `messaging.destination` but we feel that
* `aws.sqs` is more in line with existing AWS Semantic Convention attributes like `AWS_S3_BUCKET`, etc.
*
* @param sqsServiceExtension SQS Service Extension obtained the service extension list from the AWS SDK OTel Instrumentation
*/
function patchSqsServiceExtension(sqsServiceExtension: any): void {
// It is not expected that `sqsServiceExtension` is undefined
if (sqsServiceExtension) {
const requestPreSpanHook = sqsServiceExtension.requestPreSpanHook;
// Save original `requestPreSpanHook` under a similar name, to be invoked by the patched hook
sqsServiceExtension._requestPreSpanHook = requestPreSpanHook;
// The patched hook will populate the 'aws.sqs.queue.url' and 'aws.sqs.queue.name' attributes according to spec
// from the 'messaging.url' attribute
const patchedRequestPreSpanHook = (
request: NormalizedRequest,
_config: AwsSdkInstrumentationConfig
): RequestMetadata => {
const requestMetadata: RequestMetadata = sqsServiceExtension._requestPreSpanHook(request, _config);
// It is not expected that `requestMetadata.spanAttributes` can possibly be undefined, but still be careful anyways
if (requestMetadata.spanAttributes) {
if (request.commandInput?.QueueUrl) {
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL] = request.commandInput.QueueUrl;
}
if (request.commandInput?.QueueName) {
requestMetadata.spanAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME] = request.commandInput.QueueName;
}
}
return requestMetadata;
};
sqsServiceExtension.requestPreSpanHook = patchedRequestPreSpanHook;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ describe('Kinesis', () => {
secretAccessKey: 'abcde',
},
});

nock(`https://kinesis.${region}.amazonaws.com`).post('/').reply(200, {});
});

describe('DescribeStream', () => {
it('adds Stream Name', async () => {
const dummyStreamName: string = 'dummy-stream-name';

nock(`https://kinesis.${region}.amazonaws.com`).post('/').reply(200, {});

await kinesis
.describeStream({
StreamName: dummyStreamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ describe('S3', () => {
secretAccessKey: 'abcde',
},
});

nock(`https://s3.${region}.amazonaws.com`).post('/').reply(200, {});
});

describe('ListObjects', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ describe('InstrumentationPatchTest', () => {
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(unpatchedAwsSdkInstrumentation);
expect(() => doExtractSqsAttributes(services)).not.toThrow();

const sqsAttributes: Attributes = doExtractSqsAttributes(services);
let sqsAttributes: Attributes = doExtractSqsAttributes(services, false);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toBeUndefined();
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toBeUndefined();

sqsAttributes = doExtractSqsAttributes(services, true);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toBeUndefined();
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toBeUndefined();
});
Expand All @@ -97,9 +101,15 @@ describe('InstrumentationPatchTest', () => {
it('SQS with patching', () => {
const patchedAwsSdkInstrumentation: AwsInstrumentation = extractAwsSdkInstrumentation(PATCHED_INSTRUMENTATIONS);
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(patchedAwsSdkInstrumentation);
const sqsAttributes: Attributes = doExtractSqsAttributes(services);
const sqsAttributes: Attributes = doExtractSqsAttributes(services, false);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toEqual(_QUEUE_URL);
});

it('SQS with patching if Queue Name was available (but is not)', () => {
const patchedAwsSdkInstrumentation: AwsInstrumentation = extractAwsSdkInstrumentation(PATCHED_INSTRUMENTATIONS);
const services: Map<string, any> = extractServicesFromAwsSdkInstrumentation(patchedAwsSdkInstrumentation);
const sqsAttributes: Attributes = doExtractSqsAttributes(services, true);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_URL]).toEqual(_QUEUE_URL);
expect(sqsAttributes[AWS_ATTRIBUTE_KEYS.AWS_SQS_QUEUE_NAME]).toEqual(_QUEUE_NAME);
});

function extractAwsSdkInstrumentation(instrumentations: Instrumentation[]): AwsInstrumentation {
Expand Down Expand Up @@ -144,7 +154,10 @@ describe('InstrumentationPatchTest', () => {
return doExtractAttributes(services, serviceName, params);
}

function doExtractSqsAttributes(services: Map<string, ServiceExtension>): Attributes {
function doExtractSqsAttributes(
services: Map<string, ServiceExtension>,
includeQueueName: boolean = false
): Attributes {
const serviceName: string = 'SQS';
const params: NormalizedRequest = {
serviceName: serviceName,
Expand All @@ -153,6 +166,9 @@ describe('InstrumentationPatchTest', () => {
QueueUrl: _QUEUE_URL,
},
};
if (includeQueueName) {
params.commandInput.QueueName = _QUEUE_NAME;
}
return doExtractAttributes(services, serviceName, params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import { spawnSync, SpawnSyncReturns } from 'child_process';

// The OpenTelemetry Authors code
describe('RegisterPatch', function () {
it('Correctly applies AWS SDK Patches and generates expected attributes for S3 Client call', () => {
it('Correctly applies AWS SDK Patches and generates expected attributes for S3, Kinesis, and SQS Client calls', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-s3-call.js'],
['--require', '../build/src/register.js', './test-app/app-aws-sdk-client-calls.js'],
{
cwd: __dirname,
timeout: 10000,
Expand Down Expand Up @@ -57,45 +57,6 @@ describe('RegisterPatch', function () {
proc.stdout.includes("'aws.remote.resource.identifier': 'test-bucket-not-exists'"),
'console span output in stdout - validate aws.remote.resource.identifier'
);
});

it('Correctly applies AWS SDK Patches and generates expected attributes for Kinesis Client call', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-kinesis-call.js'],
{
cwd: __dirname,
timeout: 10000,
killSignal: 'SIGKILL', // SIGTERM is not sufficient to terminate some hangs
env: Object.assign({}, process.env, {
OTEL_NODE_RESOURCE_DETECTORS: 'none',
OTEL_TRACES_EXPORTER: 'console',
// nx (used by lerna run) defaults `FORCE_COLOR=true`, which in
// node v18.17.0, v20.3.0 and later results in ANSI color escapes
// in the ConsoleSpanExporter output that is checked below.
FORCE_COLOR: '0',

OTEL_LOG_LEVEL: 'ALL',
OTEL_TRACES_SAMPLER: 'always_on',
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://localhost:4316/v1/traces',
OTEL_RESOURCE_ATTRIBUTES: 'service.name=test-adot-sdk-ec2-service-name',
OTEL_AWS_APPLICATION_SIGNALS_ENABLED: 'true',
OTEL_NODE_DISABLED_INSTRUMENTATIONS: 'fs',
}),
}
);
assert.ifError(proc.error);
assert.equal(proc.status, 0, `proc.status (${proc.status})`);
assert.equal(proc.signal, null, `proc.signal (${proc.signal})`);

assert.ok(proc.stdout.includes('AWS Distro of OpenTelemetry automatic instrumentation started successfully'));
assert.ok(proc.stdout.includes("Environment variable OTEL_EXPORTER_OTLP_PROTOCOL is set to 'http/protobuf'"));
assert.ok(proc.stdout.includes("Environment variable OTEL_PROPAGATORS is set to 'xray,tracecontext,b3,b3multi'"));

assert.ok(
proc.stdout.includes("'service.name': 'test-adot-sdk-ec2-service-name'"),
'console span output in stdout - validate service.name'
);

assert.ok(
proc.stdout.includes("'aws.kinesis.stream.name': 'my-kinesis-stream'"),
Expand All @@ -109,59 +70,11 @@ describe('RegisterPatch', function () {
proc.stdout.includes("'aws.remote.resource.identifier': 'my-kinesis-stream'"),
'console span output in stdout - validate aws.remote.resource.identifier'
);
});

it('Correctly applies AWS SDK Patches and generates expected attributes for SQS Client call', () => {
const proc: SpawnSyncReturns<Buffer> = spawnSync(
process.execPath,
['--require', '../build/src/register.js', './test-app/app-aws-sdk-sqs-call.js'],
{
cwd: __dirname,
timeout: 10000,
killSignal: 'SIGKILL', // SIGTERM is not sufficient to terminate some hangs
env: Object.assign({}, process.env, {
OTEL_NODE_RESOURCE_DETECTORS: 'none',
OTEL_TRACES_EXPORTER: 'console',
// nx (used by lerna run) defaults `FORCE_COLOR=true`, which in
// node v18.17.0, v20.3.0 and later results in ANSI color escapes
// in the ConsoleSpanExporter output that is checked below.
FORCE_COLOR: '0',

OTEL_LOG_LEVEL: 'ALL',
OTEL_TRACES_SAMPLER: 'always_on',
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://localhost:4316/v1/traces',
OTEL_RESOURCE_ATTRIBUTES: 'service.name=test-adot-sdk-ec2-service-name',
OTEL_AWS_APPLICATION_SIGNALS_ENABLED: 'true',
OTEL_NODE_DISABLED_INSTRUMENTATIONS: 'fs',
}),
}
);
assert.ifError(proc.error);
assert.equal(proc.status, 0, `proc.status (${proc.status})`);
assert.equal(proc.signal, null, `proc.signal (${proc.signal})`);

assert.ok(proc.stdout.includes('AWS Distro of OpenTelemetry automatic instrumentation started successfully'));
assert.ok(proc.stdout.includes("Environment variable OTEL_EXPORTER_OTLP_PROTOCOL is set to 'http/protobuf'"));
assert.ok(proc.stdout.includes("Environment variable OTEL_PROPAGATORS is set to 'xray,tracecontext,b3,b3multi'"));

assert.ok(
proc.stdout.includes("'service.name': 'test-adot-sdk-ec2-service-name'"),
'console span output in stdout - validate service.name'
);

assert.ok(
proc.stdout.includes("'aws.sqs.queue.name': 'sqs-queue-name'"),
'console span output in stdout - validate aws.sqs.queue.name'
);
assert.ok(
proc.stdout.includes("'aws.sqs.queue.url': 'https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name'"),
'console span output in stdout - validate aws.sqs.queue.url'
);

assert.ok(
proc.stdout.includes("'messaging.url': 'https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name'"),
'console span output in stdout - validate messaging.url'
);
assert.ok(
proc.stdout.includes("'aws.remote.resource.type': 'AWS::SQS::Queue'"),
'console span output in stdout - validate aws.remote.resource.type'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// Used in register.patch.test.ts to mimic a JS app using SQS client of AWS SDK for JS (v3).
const { S3Client, ListObjectsCommand } = require("@aws-sdk/client-s3");
const { KinesisClient, ListStreamsCommand } = require('@aws-sdk/client-kinesis');
const { SQSClient, GetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

const s3Client = new S3Client({});
const bucketName = "test-bucket-not-exists";

const kinesisClient = new KinesisClient({});
const streamName = "my-kinesis-stream";

const sqsClient = new SQSClient({});
const queueUrl = "https://sqs.us-east-1.amazonaws.com/012345678910/sqs-queue-name";

const awsSdkClientSendPromises = [
s3Client.send(
new ListObjectsCommand({
Bucket: bucketName
})
),
kinesisClient.send(
new ListStreamsCommand({
StreamName: streamName,
})
),
sqsClient.send(
new GetQueueAttributesCommand({
QueueUrl: queueUrl
})
),
]

Promise.all(awsSdkClientSendPromises).catch(e => {
console.error("Exception thrown", e.message);
});

This file was deleted.

This file was deleted.

This file was deleted.