Skip to content

Commit 8f753e3

Browse files
committed
add preprocessing for compression
1 parent c577ac8 commit 8f753e3

File tree

5 files changed

+151
-21
lines changed

5 files changed

+151
-21
lines changed

aws-distro-opentelemetry-node-autoinstrumentation/src/aws-opentelemetry-configurator.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { getPropagator } from '@opentelemetry/auto-configuration-propagators';
77
import { getResourceDetectors as getResourceDetectorsFromEnv } from '@opentelemetry/auto-instrumentations-node';
88
import { ENVIRONMENT, TracesSamplerValues, getEnv, getEnvWithoutDefaults } from '@opentelemetry/core';
99
import { OTLPMetricExporter as OTLPGrpcOTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc';
10+
import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base';
1011
import {
1112
AggregationTemporalityPreference,
1213
OTLPMetricExporter as OTLPHttpOTLPMetricExporter,
@@ -474,7 +475,9 @@ export class AwsLoggerProcessorProvider {
474475
validateLogsHeaders()
475476
) {
476477
diag.debug('Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter');
477-
exporters.push(new OTLPAwsLogExporter(otlpExporterLogsEndpoint));
478+
exporters.push(
479+
new OTLPAwsLogExporter(otlpExporterLogsEndpoint, { compression: CompressionAlgorithm.GZIP })
480+
);
478481
} else {
479482
exporters.push(new OTLPProtoLogExporter());
480483
}
@@ -491,7 +494,9 @@ export class AwsLoggerProcessorProvider {
491494
validateLogsHeaders()
492495
) {
493496
diag.debug('Detected CloudWatch Logs OTLP endpoint. Switching exporter to OTLPAwsLogExporter');
494-
exporters.push(new OTLPAwsLogExporter(otlpExporterLogsEndpoint));
497+
exporters.push(
498+
new OTLPAwsLogExporter(otlpExporterLogsEndpoint, { compression: CompressionAlgorithm.GZIP })
499+
);
495500
} else {
496501
exporters.push(new OTLPProtoLogExporter());
497502
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
import { ISerializer } from '@opentelemetry/otlp-transformer';
4+
5+
/**
6+
* A serializer that bypasses request serialization by returning pre-serialized data.
7+
* @template Response The type of the deserialized response
8+
*/
9+
export class PassthroughSerializer<Response> implements ISerializer<Uint8Array, Response> {
10+
private serializedData: Uint8Array = new Uint8Array();
11+
private deserializer: (data: Uint8Array) => Response;
12+
13+
/**
14+
* Creates a new PassthroughSerializer instance.
15+
* @param deserializer Function to deserialize response data
16+
*/
17+
constructor(deserializer: (data: Uint8Array) => Response) {
18+
this.deserializer = deserializer;
19+
}
20+
21+
/**
22+
* Sets the pre-serialized data to be returned when serializeRequest is called.
23+
* @param data The serialized data to use
24+
*/
25+
setSerializedData(data: Uint8Array): void {
26+
this.serializedData = data;
27+
}
28+
29+
/**
30+
* Returns the pre-serialized data, ignoring the request parameter.
31+
* @param request Ignored parameter.
32+
* @returns The pre-serialized data
33+
*/
34+
serializeRequest(request: Uint8Array): Uint8Array {
35+
return this.serializedData;
36+
}
37+
38+
/**
39+
* Deserializes response data using the provided deserializer function.
40+
* @param data The response data to deserialize
41+
* @returns The deserialized response
42+
*/
43+
deserializeResponse(data: Uint8Array): Response {
44+
return this.deserializer(data);
45+
}
46+
}

aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/logs/otlp-aws-log-exporter.ts

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33
import { OTLPLogExporter as OTLPProtoLogExporter } from '@opentelemetry/exporter-logs-otlp-proto';
4-
import { OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
5-
import { ProtobufLogsSerializer } from '@opentelemetry/otlp-transformer';
4+
import { CompressionAlgorithm, OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
5+
import { gzipSync } from 'zlib';
6+
import { IExportLogsServiceResponse, ProtobufLogsSerializer } from '@opentelemetry/otlp-transformer';
67
import { ReadableLogRecord } from '@opentelemetry/sdk-logs';
78
import { AwsAuthenticator } from '../common/aws-authenticator';
8-
import { changeUrlConfig } from '../common/utils';
9-
import { ExportResult } from '@opentelemetry/core';
9+
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
10+
import { PassthroughSerializer } from '../common/passthrough-serializer';
1011

1112
/**
1213
* This exporter extends the functionality of the OTLPProtoLogExporter to allow spans to be exported
@@ -16,34 +17,78 @@ import { ExportResult } from '@opentelemetry/core';
1617
*
1718
* This only works with version >=16 Node.js environments.
1819
*/
20+
1921
export class OTLPAwsLogExporter extends OTLPProtoLogExporter {
22+
private compression: CompressionAlgorithm | undefined;
2023
private endpoint: string;
2124
private region: string;
25+
private serializer: PassthroughSerializer<IExportLogsServiceResponse>;
2226
private authenticator: AwsAuthenticator;
2327

2428
constructor(endpoint: string, config?: OTLPExporterNodeConfigBase) {
25-
super(changeUrlConfig(endpoint, config));
29+
const modifiedConfig: OTLPExporterNodeConfigBase = {
30+
...config,
31+
url: endpoint,
32+
compression: CompressionAlgorithm.NONE,
33+
};
34+
35+
super(modifiedConfig);
36+
this.compression = config?.compression;
2637
this.region = endpoint.split('.')[1];
2738
this.endpoint = endpoint;
2839
this.authenticator = new AwsAuthenticator(this.region, 'logs');
40+
41+
// This is used in order to prevent serializing and compressing the data twice. Once for signing Sigv4 and
42+
// once when we pass the data to super.export() which will serialize and compress the data again.
43+
this.serializer = new PassthroughSerializer(ProtobufLogsSerializer.deserializeResponse);
44+
this['_delegate']._serializer = this.serializer;
2945
}
3046

3147
/**
32-
* Overrides the upstream implementation of export. All behaviors are the same except if the
33-
* endpoint is the CloudWatch Logs OTLP endpoint, we will sign the request with SigV4 in headers before
34-
* sending it to the endpoint. Otherwise, we will skip signing.
48+
* Overrides the upstream implementation of export. If the
49+
* endpoint is the CloudWatch Logs OTLP endpoint, we sign the request with SigV4 in headers.
50+
* To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
51+
* locally in this exporter and pass the pre-processed data to the upstream export functionality.
3552
*/
53+
54+
// Upstream already implements a retry mechanism:
55+
// https://github.com/open-telemetry/opentelemetry-js/blob/main/experimental/packages/otlp-exporter-base/src/retrying-transport.ts
56+
3657
public override async export(
3758
items: ReadableLogRecord[],
3859
resultCallback: (result: ExportResult) => void
3960
): Promise<void> {
40-
const serializedLogs: Uint8Array | undefined = ProtobufLogsSerializer.serializeRequest(items);
61+
let serializedLogs: Uint8Array | undefined = ProtobufLogsSerializer.serializeRequest(items);
62+
63+
if (serializedLogs === undefined) {
64+
resultCallback({
65+
code: ExportResultCode.FAILED,
66+
error: new Error('Nothing to send'),
67+
});
68+
return;
69+
}
70+
71+
const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;
72+
if (shouldCompress) {
73+
serializedLogs = gzipSync(serializedLogs);
74+
}
75+
76+
// Pass pre-processed data to passthrough serializer. When super.export() is called, the Passthrough Serializer will
77+
// use the pre-processed data instead of serializing and compressing the data again.
78+
this.serializer.setSerializedData(serializedLogs);
79+
80+
// See type: https://github.com/open-telemetry/opentelemetry-js/blob/experimental/v0.57.1/experimental/packages/otlp-exporter-base/src/transport/http-transport-types.ts#L31
4181
const headers = this['_delegate']._transport?._transport?._parameters?.headers();
4282

4383
if (headers) {
84+
if (shouldCompress) {
85+
headers['Content-Encoding'] = 'gzip';
86+
} else {
87+
delete headers['Content-Encoding'];
88+
}
89+
4490
const signedRequest = await this.authenticator.authenticate(this.endpoint, headers, serializedLogs);
4591

46-
// See type: https://github.com/open-telemetry/opentelemetry-js/blob/experimental/v0.57.1/experimental/packages/otlp-exporter-base/src/transport/http-transport-types.ts#L31
4792
const newHeaders: () => Record<string, string> = () => signedRequest;
4893
this['_delegate']._transport._transport._parameters.headers = newHeaders;
4994
}

aws-distro-opentelemetry-node-autoinstrumentation/src/exporter/otlp/aws/traces/otlp-aws-span-exporter.ts

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33
import { OTLPTraceExporter as OTLPProtoTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
4-
import { OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
5-
import { ProtobufTraceSerializer } from '@opentelemetry/otlp-transformer';
4+
import { CompressionAlgorithm, OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
5+
import { IExportTraceServiceResponse, ProtobufTraceSerializer } from '@opentelemetry/otlp-transformer';
66
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
7-
import { ExportResult } from '@opentelemetry/core';
7+
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
88
import { AwsAuthenticator } from '../common/aws-authenticator';
9-
import { changeUrlConfig } from '../common/utils';
9+
import { PassthroughSerializer } from '../common/passthrough-serializer';
10+
import { gzipSync } from 'zlib';
1011

1112
/**
1213
* This exporter extends the functionality of the OTLPProtoTraceExporter to allow spans to be exported
@@ -17,27 +18,63 @@ import { changeUrlConfig } from '../common/utils';
1718
* This only works with version >=16 Node.js environments.
1819
*/
1920
export class OTLPAwsSpanExporter extends OTLPProtoTraceExporter {
21+
private compression: CompressionAlgorithm | undefined;
2022
private endpoint: string;
2123
private region: string;
24+
private serializer: PassthroughSerializer<IExportTraceServiceResponse>;
2225
private authenticator: AwsAuthenticator;
2326

2427
constructor(endpoint: string, config?: OTLPExporterNodeConfigBase) {
25-
super(changeUrlConfig(endpoint, config));
28+
const modifiedConfig: OTLPExporterNodeConfigBase = {
29+
...config,
30+
url: endpoint,
31+
compression: CompressionAlgorithm.NONE,
32+
};
33+
34+
super(modifiedConfig);
2635
this.region = endpoint.split('.')[1];
2736
this.endpoint = endpoint;
2837
this.authenticator = new AwsAuthenticator(this.region, 'xray');
38+
this.serializer = new PassthroughSerializer(ProtobufTraceSerializer.deserializeResponse);
39+
this['_delegate']._serializer = this.serializer;
2940
}
3041

3142
/**
3243
* Overrides the upstream implementation of export. All behaviors are the same except if the
3344
* endpoint is an XRay OTLP endpoint, we will sign the request with SigV4 in headers before
3445
* sending it to the endpoint. Otherwise, we will skip signing.
46+
* To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
47+
* locally in this exporter and pass the pre-processed data to the upstream export functionality.
3548
*/
3649
public override async export(items: ReadableSpan[], resultCallback: (result: ExportResult) => void): Promise<void> {
37-
const serializedSpans: Uint8Array | undefined = ProtobufTraceSerializer.serializeRequest(items);
50+
let serializedSpans: Uint8Array | undefined = ProtobufTraceSerializer.serializeRequest(items);
51+
52+
if (serializedSpans === undefined) {
53+
resultCallback({
54+
code: ExportResultCode.FAILED,
55+
error: new Error('Nothing to send'),
56+
});
57+
return;
58+
}
59+
60+
// Pass pre-processed data to passthrough serializer. When super.export() is called, the Passthrough Serializer will
61+
// use the pre-processed data instead of serializing and compressing the data again.
62+
const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;
63+
if (shouldCompress) {
64+
serializedSpans = gzipSync(serializedSpans);
65+
}
66+
67+
this.serializer.setSerializedData(serializedSpans);
68+
3869
const headers = this['_delegate']._transport?._transport?._parameters?.headers();
3970

4071
if (headers) {
72+
if (shouldCompress) {
73+
headers['Content-Encoding'] = 'gzip';
74+
} else {
75+
delete headers['Content-Encoding'];
76+
}
77+
4178
const signedRequest = await this.authenticator.authenticate(this.endpoint, headers, serializedSpans);
4279

4380
// See type: https://github.com/open-telemetry/opentelemetry-js/blob/experimental/v0.57.1/experimental/packages/otlp-exporter-base/src/transport/http-transport-types.ts#L31

aws-distro-opentelemetry-node-autoinstrumentation/src/register.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@ try {
8484
diag.info('Setting TraceProvider for instrumentations at the end of initialization');
8585
for (const instrumentation of instrumentations) {
8686
instrumentation.setTracerProvider(trace.getTracerProvider());
87-
if (instrumentation.setLoggerProvider) {
88-
instrumentation.setLoggerProvider(logs.getLoggerProvider());
89-
}
9087
}
9188

9289
diag.debug(`Environment variable OTEL_PROPAGATORS is set to '${process.env.OTEL_PROPAGATORS}'`);

0 commit comments

Comments
 (0)