Skip to content

Commit 6796342

Browse files
committed
consolidate logs and span exporters
1 parent 8f753e3 commit 6796342

File tree

14 files changed

+504
-418
lines changed

14 files changed

+504
-418
lines changed

aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ export class AwsSpanProcessorProvider {
608608
case 'http/protobuf':
609609
if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) {
610610
diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter');
611-
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint);
611+
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint, { compression: CompressionAlgorithm.GZIP });
612612
}
613613
return new OTLPProtoTraceExporter();
614614
case 'udp':
@@ -618,7 +618,7 @@ export class AwsSpanProcessorProvider {
618618
diag.warn(`Unsupported OTLP traces protocol: ${protocol}. Using http/protobuf.`);
619619
if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) {
620620
diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter');
621-
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint);
621+
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint, { compression: CompressionAlgorithm.GZIP });
622622
}
623623
return new OTLPProtoTraceExporter();
624624
}
@@ -853,21 +853,24 @@ function validateLogsHeaders() {
853853
return false;
854854
}
855855

856-
let filteredLogHeadersCount = 0;
856+
let hasLogGroup = false;
857+
let hasLogStream = false;
857858

858859
for (const pair of logsHeaders.split(',')) {
859860
if (pair.includes('=')) {
860861
const [key, value] = pair.split('=', 2);
861-
if ((key === AWS_OTLP_LOGS_GROUP_HEADER || key === AWS_OTLP_LOGS_STREAM_HEADER) && value) {
862-
filteredLogHeadersCount += 1;
862+
if (key === AWS_OTLP_LOGS_GROUP_HEADER && value) {
863+
hasLogGroup = true;
864+
} else if (key === AWS_OTLP_LOGS_STREAM_HEADER && value) {
865+
hasLogStream = true;
863866
}
864867
}
865868
}
866869

867-
if (filteredLogHeadersCount !== 2) {
870+
if (!hasLogGroup || !hasLogStream) {
868871
diag.warn(
869-
'Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
870-
'to have values for x-aws-log-group and x-aws-log-stream'
872+
`Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
873+
'to have values for ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}`
871874
);
872875
return false;
873876
}

aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/common/aws-authenticator.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,37 @@ if (getNodeVersion() >= 16) {
2323
diag.error('SigV4 signing requires at least Node major version 16');
2424
}
2525

26+
// See: https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html
27+
export const AUTHORIZATION_HEADER = 'authorization';
28+
export const X_AMZ_DATE_HEADER = 'x-amz-date';
29+
export const X_AMZ_SECURITY_TOKEN_HEADER = 'x-amz-security-token';
30+
export const X_AMZ_CONTENT_SHA256_HEADER = 'x-amz-content-sha256';
31+
2632
export class AwsAuthenticator {
33+
private endpoint: URL;
2734
private region: string;
2835
private service: string;
2936

30-
constructor(region: string, service: string) {
31-
this.region = region;
37+
constructor(endpoint: string, service: string) {
38+
this.endpoint = new URL(endpoint);
39+
this.region = endpoint.split('.')[1];
3240
this.service = service;
3341
}
3442

35-
public async authenticate(endpoint: string, headers: Record<string, string>, serializedData: Uint8Array | undefined) {
43+
public async authenticate(headers: Record<string, string>, serializedData: Uint8Array | undefined) {
3644
// Only do SigV4 Signing if the required dependencies are installed.
37-
if (dependenciesLoaded) {
38-
const url = new URL(endpoint);
39-
40-
if (serializedData === undefined) {
41-
diag.error('Given serialized data is undefined. Not authenticating.');
42-
return headers;
43-
}
44-
45+
if (dependenciesLoaded && serializedData) {
4546
const cleanedHeaders = this.removeSigV4Headers(headers);
4647

4748
const request = new HttpRequest({
4849
method: 'POST',
4950
protocol: 'https',
50-
hostname: url.hostname,
51-
path: url.pathname,
51+
hostname: this.endpoint.hostname,
52+
path: this.endpoint.pathname,
5253
body: serializedData,
5354
headers: {
5455
...cleanedHeaders,
55-
host: url.hostname,
56+
host: this.endpoint.hostname,
5657
},
5758
});
5859

@@ -68,22 +69,27 @@ export class AwsAuthenticator {
6869

6970
return signedRequest.headers;
7071
} catch (exception) {
71-
diag.debug(
72-
`Failed to sign/authenticate the given exported Span request to OTLP XRay endpoint with error: ${exception}`
73-
);
72+
diag.debug(`Failed to sign/authenticate the given export request with error: ${exception}`);
73+
return undefined;
7474
}
7575
}
7676

77-
return headers;
77+
diag.debug('No serialized data provided. Not authenticating.');
78+
return undefined;
7879
}
7980

8081
// Cleans up Sigv4 from headers to avoid accidentally copying them to the new headers
8182
private removeSigV4Headers(headers: Record<string, string>) {
8283
const newHeaders: Record<string, string> = {};
83-
const sigV4Headers = ['x-amz-date', 'authorization', 'x-amz-content-sha256', 'x-amz-security-token'];
84+
const sigv4Headers = [
85+
AUTHORIZATION_HEADER,
86+
X_AMZ_CONTENT_SHA256_HEADER,
87+
X_AMZ_DATE_HEADER,
88+
X_AMZ_CONTENT_SHA256_HEADER,
89+
];
8490

8591
for (const key in headers) {
86-
if (!sigV4Headers.includes(key.toLowerCase())) {
92+
if (!sigv4Headers.includes(key.toLowerCase())) {
8793
newHeaders[key] = headers[key];
8894
}
8995
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
import { CompressionAlgorithm, OTLPExporterBase } from '@opentelemetry/otlp-exporter-base';
4+
import { gzipSync } from 'zlib';
5+
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
6+
import { AwsAuthenticator } from './aws-authenticator';
7+
import { PassthroughSerializer } from './passthrough-serializer';
8+
import { ISerializer } from '@opentelemetry/otlp-transformer';
9+
10+
/**
11+
* Base class for AWS OTLP exporters
12+
*/
13+
export abstract class OTLPAwsBaseExporter<Payload, Response> {
14+
protected parentExporter: OTLPExporterBase<Payload>;
15+
private compression?: CompressionAlgorithm;
16+
private endpoint: string;
17+
private serializer: PassthroughSerializer<Response>;
18+
private authenticator: AwsAuthenticator;
19+
private parentSerializer: ISerializer<Payload, Response>;
20+
21+
constructor(
22+
endpoint: string,
23+
service: string,
24+
parentExporter: OTLPExporterBase<Payload>,
25+
parentSerializer: ISerializer<Payload, Response>,
26+
compression?: CompressionAlgorithm
27+
) {
28+
this.compression = compression;
29+
this.endpoint = endpoint;
30+
this.authenticator = new AwsAuthenticator(this.endpoint, service);
31+
this.parentExporter = parentExporter;
32+
this.parentSerializer = parentSerializer;
33+
34+
// To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
35+
// locally in this exporter and pass the pre-processed data to the upstream export.
36+
// This is used in order to prevent serializing and compressing the data again when calling parentExporter.export().
37+
// To see why this works:
38+
// https://github.com/open-telemetry/opentelemetry-js/blob/ec17ce48d0e5a99a122da5add612a20e2dd84ed5/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts#L69
39+
this.serializer = new PassthroughSerializer<Response>(this.parentSerializer.deserializeResponse);
40+
41+
this.parentExporter['_delegate']._serializer = this.serializer;
42+
}
43+
44+
/**
45+
* Overrides the upstream implementation of export.
46+
* All behaviors are the same except if the endpoint is an AWS OTLP endpoint, we will sign the request with SigV4
47+
* in headers before sending it to the endpoint.
48+
* @param items - Array of signal data to export
49+
* @param resultCallback - Callback function to handle export result
50+
*/
51+
public async export(items: Payload, resultCallback: (result: ExportResult) => void): Promise<void> {
52+
let serializedData: Uint8Array | undefined = this.parentSerializer.serializeRequest(items);
53+
54+
if (!serializedData) {
55+
resultCallback({
56+
code: ExportResultCode.FAILED,
57+
error: new Error('Nothing to send'),
58+
});
59+
return;
60+
}
61+
62+
const headers = this.parentExporter['_delegate']._transport?._transport?._parameters?.headers();
63+
64+
// This should never be reached as upstream always sets the header.
65+
if (!headers) {
66+
resultCallback({
67+
code: ExportResultCode.FAILED,
68+
error: new Error(`Request headers are undefined - unable to export to ${this.endpoint}`),
69+
});
70+
71+
return;
72+
}
73+
74+
delete headers['Content-Encoding'];
75+
const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;
76+
77+
if (shouldCompress) {
78+
try {
79+
serializedData = gzipSync(serializedData);
80+
headers['Content-Encoding'] = 'gzip';
81+
} catch (exception) {
82+
resultCallback({
83+
code: ExportResultCode.FAILED,
84+
error: new Error(`Failed to compress: ${exception}`),
85+
});
86+
return;
87+
}
88+
}
89+
90+
this.serializer.setSerializedData(serializedData);
91+
92+
const signedRequestHeaders = await this.authenticator.authenticate(headers, serializedData);
93+
94+
if (signedRequestHeaders) {
95+
this.parentExporter['_delegate']._transport._transport._parameters.headers = () => signedRequestHeaders;
96+
}
97+
98+
this.parentExporter.export(items, resultCallback);
99+
}
100+
}

aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/common/utils.ts

Lines changed: 0 additions & 15 deletions
This file was deleted.

aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/logs/otlp-aws-log-exporter.ts

Lines changed: 13 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,97 +2,35 @@
22
// SPDX-License-Identifier: Apache-2.0
33
import { OTLPLogExporter as OTLPProtoLogExporter } from '@opentelemetry/exporter-logs-otlp-proto';
44
import { CompressionAlgorithm, OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
5-
import { gzipSync } from 'zlib';
65
import { IExportLogsServiceResponse, ProtobufLogsSerializer } from '@opentelemetry/otlp-transformer';
7-
import { ReadableLogRecord } from '@opentelemetry/sdk-logs';
8-
import { AwsAuthenticator } from '../common/aws-authenticator';
9-
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
10-
import { PassthroughSerializer } from '../common/passthrough-serializer';
6+
import { LogRecordExporter, ReadableLogRecord } from '@opentelemetry/sdk-logs';
7+
import { OTLPAwsBaseExporter } from '../common/otlp-aws-base-exporter';
118

129
/**
13-
* This exporter extends the functionality of the OTLPProtoLogExporter to allow spans to be exported
10+
* This exporter extends the functionality of the OTLPProtoLogExporter to allow logs to be exported
1411
* to the CloudWatch Logs OTLP endpoint https://logs.[AWSRegion].amazonaws.com/v1/logs. Utilizes the aws-sdk
1512
* library to sign and directly inject SigV4 Authentication to the exported request's headers. <a
1613
* href="https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html">...</a>
1714
*
1815
* This only works with version >=16 Node.js environments.
16+
* @param endpoint - The AWS CloudWatch Logs OTLP endpoint URL
17+
* @param config - Optional OTLP exporter configuration
1918
*/
20-
21-
export class OTLPAwsLogExporter extends OTLPProtoLogExporter {
22-
private compression: CompressionAlgorithm | undefined;
23-
private endpoint: string;
24-
private region: string;
25-
private serializer: PassthroughSerializer<IExportLogsServiceResponse>;
26-
private authenticator: AwsAuthenticator;
27-
19+
export class OTLPAwsLogExporter
20+
extends OTLPAwsBaseExporter<ReadableLogRecord[], IExportLogsServiceResponse>
21+
implements LogRecordExporter
22+
{
2823
constructor(endpoint: string, config?: OTLPExporterNodeConfigBase) {
2924
const modifiedConfig: OTLPExporterNodeConfigBase = {
3025
...config,
3126
url: endpoint,
3227
compression: CompressionAlgorithm.NONE,
3328
};
3429

35-
super(modifiedConfig);
36-
this.compression = config?.compression;
37-
this.region = endpoint.split('.')[1];
38-
this.endpoint = endpoint;
39-
this.authenticator = new AwsAuthenticator(this.region, 'logs');
40-
41-
// This is used in order to prevent serializing and compressing the data twice. Once for signing Sigv4 and
42-
// once when we pass the data to super.export() which will serialize and compress the data again.
43-
this.serializer = new PassthroughSerializer(ProtobufLogsSerializer.deserializeResponse);
44-
this['_delegate']._serializer = this.serializer;
30+
const parentExporter = new OTLPProtoLogExporter(modifiedConfig);
31+
super(endpoint, 'logs', parentExporter, ProtobufLogsSerializer, config?.compression);
4532
}
46-
47-
/**
48-
* Overrides the upstream implementation of export. If the
49-
* endpoint is the CloudWatch Logs OTLP endpoint, we sign the request with SigV4 in headers.
50-
* To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
51-
* locally in this exporter and pass the pre-processed data to the upstream export functionality.
52-
*/
53-
54-
// Upstream already implements a retry mechanism:
55-
// https://github.com/open-telemetry/opentelemetry-js/blob/main/experimental/packages/otlp-exporter-base/src/retrying-transport.ts
56-
57-
public override async export(
58-
items: ReadableLogRecord[],
59-
resultCallback: (result: ExportResult) => void
60-
): Promise<void> {
61-
let serializedLogs: Uint8Array | undefined = ProtobufLogsSerializer.serializeRequest(items);
62-
63-
if (serializedLogs === undefined) {
64-
resultCallback({
65-
code: ExportResultCode.FAILED,
66-
error: new Error('Nothing to send'),
67-
});
68-
return;
69-
}
70-
71-
const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;
72-
if (shouldCompress) {
73-
serializedLogs = gzipSync(serializedLogs);
74-
}
75-
76-
// Pass pre-processed data to passthrough serializer. When super.export() is called, the Passthrough Serializer will
77-
// use the pre-processed data instead of serializing and compressing the data again.
78-
this.serializer.setSerializedData(serializedLogs);
79-
80-
// See type: https://github.com/open-telemetry/opentelemetry-js/blob/experimental/v0.57.1/experimental/packages/otlp-exporter-base/src/transport/http-transport-types.ts#L31
81-
const headers = this['_delegate']._transport?._transport?._parameters?.headers();
82-
83-
if (headers) {
84-
if (shouldCompress) {
85-
headers['Content-Encoding'] = 'gzip';
86-
} else {
87-
delete headers['Content-Encoding'];
88-
}
89-
90-
const signedRequest = await this.authenticator.authenticate(this.endpoint, headers, serializedLogs);
91-
92-
const newHeaders: () => Record<string, string> = () => signedRequest;
93-
this['_delegate']._transport._transport._parameters.headers = newHeaders;
94-
}
95-
96-
super.export(items, resultCallback);
33+
shutdown(): Promise<void> {
34+
return this.parentExporter.shutdown();
9735
}
9836
}

0 commit comments

Comments
 (0)