Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ export class AwsSpanProcessorProvider {
case 'http/protobuf':
if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) {
diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter');
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint);
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint, { compression: CompressionAlgorithm.GZIP });
}
return new OTLPProtoTraceExporter();
case 'udp':
Expand All @@ -618,7 +618,7 @@ export class AwsSpanProcessorProvider {
diag.warn(`Unsupported OTLP traces protocol: ${protocol}. Using http/protobuf.`);
if (otlpExporterTracesEndpoint && isAwsOtlpEndpoint(otlpExporterTracesEndpoint, 'xray')) {
diag.debug('Detected XRay OTLP Traces endpoint. Switching exporter to OtlpAwsSpanExporter');
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint);
return new OTLPAwsSpanExporter(otlpExporterTracesEndpoint, { compression: CompressionAlgorithm.GZIP });
}
return new OTLPProtoTraceExporter();
}
Expand Down Expand Up @@ -853,21 +853,24 @@ function validateLogsHeaders() {
return false;
}

let filteredLogHeadersCount = 0;
let hasLogGroup = false;
let hasLogStream = false;

for (const pair of logsHeaders.split(',')) {
if (pair.includes('=')) {
const [key, value] = pair.split('=', 2);
if ((key === AWS_OTLP_LOGS_GROUP_HEADER || key === AWS_OTLP_LOGS_STREAM_HEADER) && value) {
filteredLogHeadersCount += 1;
if (key === AWS_OTLP_LOGS_GROUP_HEADER && value) {
hasLogGroup = true;
} else if (key === AWS_OTLP_LOGS_STREAM_HEADER && value) {
hasLogStream = true;
}
}
}

if (filteredLogHeadersCount !== 2) {
if (!hasLogGroup || !hasLogStream) {
diag.warn(
'Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
'to have values for x-aws-log-group and x-aws-log-stream'
`Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS ' +
'to have values for ${AWS_OTLP_LOGS_GROUP_HEADER} and ${AWS_OTLP_LOGS_STREAM_HEADER}`
);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,37 @@ if (getNodeVersion() >= 16) {
diag.error('SigV4 signing requires at least Node major version 16');
}

// See: https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html
export const AUTHORIZATION_HEADER = 'authorization';
export const X_AMZ_DATE_HEADER = 'x-amz-date';
export const X_AMZ_SECURITY_TOKEN_HEADER = 'x-amz-security-token';
export const X_AMZ_CONTENT_SHA256_HEADER = 'x-amz-content-sha256';

export class AwsAuthenticator {
private endpoint: URL;
private region: string;
private service: string;

constructor(region: string, service: string) {
this.region = region;
constructor(endpoint: string, service: string) {
this.endpoint = new URL(endpoint);
this.region = endpoint.split('.')[1];
this.service = service;
}

public async authenticate(endpoint: string, headers: Record<string, string>, serializedData: Uint8Array | undefined) {
public async authenticate(headers: Record<string, string>, serializedData: Uint8Array | undefined) {
// Only do SigV4 Signing if the required dependencies are installed.
if (dependenciesLoaded) {
const url = new URL(endpoint);

if (serializedData === undefined) {
diag.error('Given serialized data is undefined. Not authenticating.');
return headers;
}

if (dependenciesLoaded && serializedData) {
const cleanedHeaders = this.removeSigV4Headers(headers);

const request = new HttpRequest({
method: 'POST',
protocol: 'https',
hostname: url.hostname,
path: url.pathname,
hostname: this.endpoint.hostname,
path: this.endpoint.pathname,
body: serializedData,
headers: {
...cleanedHeaders,
host: url.hostname,
host: this.endpoint.hostname,
},
});

Expand All @@ -68,22 +69,27 @@ export class AwsAuthenticator {

return signedRequest.headers;
} catch (exception) {
diag.debug(
`Failed to sign/authenticate the given exported Span request to OTLP XRay endpoint with error: ${exception}`
);
diag.debug(`Failed to sign/authenticate the given export request with error: ${exception}`);
return undefined;
}
}

return headers;
diag.debug('No serialized data provided. Not authenticating.');
return undefined;
}

// Cleans up Sigv4 from headers to avoid accidentally copying them to the new headers
private removeSigV4Headers(headers: Record<string, string>) {
const newHeaders: Record<string, string> = {};
const sigV4Headers = ['x-amz-date', 'authorization', 'x-amz-content-sha256', 'x-amz-security-token'];
const sigv4Headers = [
AUTHORIZATION_HEADER,
X_AMZ_CONTENT_SHA256_HEADER,
X_AMZ_DATE_HEADER,
X_AMZ_CONTENT_SHA256_HEADER,
];

for (const key in headers) {
if (!sigV4Headers.includes(key.toLowerCase())) {
if (!sigv4Headers.includes(key.toLowerCase())) {
newHeaders[key] = headers[key];
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { CompressionAlgorithm, OTLPExporterBase } from '@opentelemetry/otlp-exporter-base';
import { gzipSync } from 'zlib';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { AwsAuthenticator } from './aws-authenticator';
import { PassthroughSerializer } from './passthrough-serializer';
import { ISerializer } from '@opentelemetry/otlp-transformer';

/**
* Base class for AWS OTLP exporters
*/
export abstract class OTLPAwsBaseExporter<Payload, Response> {
protected parentExporter: OTLPExporterBase<Payload>;
private compression?: CompressionAlgorithm;
private endpoint: string;
private serializer: PassthroughSerializer<Response>;
private authenticator: AwsAuthenticator;
private parentSerializer: ISerializer<Payload, Response>;

constructor(
endpoint: string,
service: string,
parentExporter: OTLPExporterBase<Payload>,
parentSerializer: ISerializer<Payload, Response>,
compression?: CompressionAlgorithm
) {
this.compression = compression;
this.endpoint = endpoint;
this.authenticator = new AwsAuthenticator(this.endpoint, service);
this.parentExporter = parentExporter;
this.parentSerializer = parentSerializer;

// To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
// locally in this exporter and pass the pre-processed data to the upstream export.
// This is used in order to prevent serializing and compressing the data again when calling parentExporter.export().
// To see why this works:
// https://github.com/open-telemetry/opentelemetry-js/blob/ec17ce48d0e5a99a122da5add612a20e2dd84ed5/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts#L69
this.serializer = new PassthroughSerializer<Response>(this.parentSerializer.deserializeResponse);

this.parentExporter['_delegate']._serializer = this.serializer;
}

/**
* Overrides the upstream implementation of export.
* All behaviors are the same except if the endpoint is an AWS OTLP endpoint, we will sign the request with SigV4
* in headers before sending it to the endpoint.
* @param items - Array of signal data to export
* @param resultCallback - Callback function to handle export result
*/
public async export(items: Payload, resultCallback: (result: ExportResult) => void): Promise<void> {
let serializedData: Uint8Array | undefined = this.parentSerializer.serializeRequest(items);

if (!serializedData) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Nothing to send'),
});
return;
}

const headers = this.parentExporter['_delegate']._transport?._transport?._parameters?.headers();

// This should never be reached as upstream always sets the header.
if (!headers) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error(`Request headers are undefined - unable to export to ${this.endpoint}`),
});

return;
}

delete headers['Content-Encoding'];
const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;

if (shouldCompress) {
try {
serializedData = gzipSync(serializedData);
headers['Content-Encoding'] = 'gzip';
} catch (exception) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error(`Failed to compress: ${exception}`),
});
return;
}
}

this.serializer.setSerializedData(serializedData);

const signedRequestHeaders = await this.authenticator.authenticate(headers, serializedData);

if (signedRequestHeaders) {
this.parentExporter['_delegate']._transport._transport._parameters.headers = () => signedRequestHeaders;
}

this.parentExporter.export(items, resultCallback);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,97 +2,35 @@
// SPDX-License-Identifier: Apache-2.0
import { OTLPLogExporter as OTLPProtoLogExporter } from '@opentelemetry/exporter-logs-otlp-proto';
import { CompressionAlgorithm, OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base';
import { gzipSync } from 'zlib';
import { IExportLogsServiceResponse, ProtobufLogsSerializer } from '@opentelemetry/otlp-transformer';
import { ReadableLogRecord } from '@opentelemetry/sdk-logs';
import { AwsAuthenticator } from '../common/aws-authenticator';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { PassthroughSerializer } from '../common/passthrough-serializer';
import { LogRecordExporter, ReadableLogRecord } from '@opentelemetry/sdk-logs';
import { OTLPAwsBaseExporter } from '../common/otlp-aws-base-exporter';

/**
* This exporter extends the functionality of the OTLPProtoLogExporter to allow spans to be exported
* This exporter extends the functionality of the OTLPProtoLogExporter to allow logs to be exported
* to the CloudWatch Logs OTLP endpoint https://logs.[AWSRegion].amazonaws.com/v1/logs. Utilizes the aws-sdk
* library to sign and directly inject SigV4 Authentication to the exported request's headers. <a
* href="https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html">...</a>
*
* This only works with version >=16 Node.js environments.
* @param endpoint - The AWS CloudWatch Logs OTLP endpoint URL
* @param config - Optional OTLP exporter configuration
*/

export class OTLPAwsLogExporter extends OTLPProtoLogExporter {
private compression: CompressionAlgorithm | undefined;
private endpoint: string;
private region: string;
private serializer: PassthroughSerializer<IExportLogsServiceResponse>;
private authenticator: AwsAuthenticator;

export class OTLPAwsLogExporter
extends OTLPAwsBaseExporter<ReadableLogRecord[], IExportLogsServiceResponse>
implements LogRecordExporter
{
constructor(endpoint: string, config?: OTLPExporterNodeConfigBase) {
const modifiedConfig: OTLPExporterNodeConfigBase = {
...config,
url: endpoint,
compression: CompressionAlgorithm.NONE,
};

super(modifiedConfig);
this.compression = config?.compression;
this.region = endpoint.split('.')[1];
this.endpoint = endpoint;
this.authenticator = new AwsAuthenticator(this.region, 'logs');

// This is used in order to prevent serializing and compressing the data twice. Once for signing Sigv4 and
// once when we pass the data to super.export() which will serialize and compress the data again.
this.serializer = new PassthroughSerializer(ProtobufLogsSerializer.deserializeResponse);
this['_delegate']._serializer = this.serializer;
const parentExporter = new OTLPProtoLogExporter(modifiedConfig);
super(endpoint, 'logs', parentExporter, ProtobufLogsSerializer, config?.compression);
}

/**
* Overrides the upstream implementation of export. If the
* endpoint is the CloudWatch Logs OTLP endpoint, we sign the request with SigV4 in headers.
* To prevent performance degradation from serializing and compressing data twice, we handle serialization and compression
* locally in this exporter and pass the pre-processed data to the upstream export functionality.
*/

// Upstream already implements a retry mechanism:
// https://github.com/open-telemetry/opentelemetry-js/blob/main/experimental/packages/otlp-exporter-base/src/retrying-transport.ts

public override async export(
items: ReadableLogRecord[],
resultCallback: (result: ExportResult) => void
): Promise<void> {
let serializedLogs: Uint8Array | undefined = ProtobufLogsSerializer.serializeRequest(items);

if (serializedLogs === undefined) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Nothing to send'),
});
return;
}

const shouldCompress = this.compression && this.compression !== CompressionAlgorithm.NONE;
if (shouldCompress) {
serializedLogs = gzipSync(serializedLogs);
}

// Pass pre-processed data to passthrough serializer. When super.export() is called, the Passthrough Serializer will
// use the pre-processed data instead of serializing and compressing the data again.
this.serializer.setSerializedData(serializedLogs);

// See type: https://github.com/open-telemetry/opentelemetry-js/blob/experimental/v0.57.1/experimental/packages/otlp-exporter-base/src/transport/http-transport-types.ts#L31
const headers = this['_delegate']._transport?._transport?._parameters?.headers();

if (headers) {
if (shouldCompress) {
headers['Content-Encoding'] = 'gzip';
} else {
delete headers['Content-Encoding'];
}

const signedRequest = await this.authenticator.authenticate(this.endpoint, headers, serializedLogs);

const newHeaders: () => Record<string, string> = () => signedRequest;
this['_delegate']._transport._transport._parameters.headers = newHeaders;
}

super.export(items, resultCallback);
shutdown(): Promise<void> {
return this.parentExporter.shutdown();
}
}
Loading
Loading