@@ -6,13 +6,16 @@ import { ExportResult, ExportResultCode } from '@opentelemetry/core';
66import { AwsAuthenticator } from './aws-authenticator' ;
77import { PassthroughSerializer } from './passthrough-serializer' ;
88import { ISerializer } from '@opentelemetry/otlp-transformer' ;
9+ import { diag } from '@opentelemetry/api' ;
910
1011/**
1112 * Base class for AWS OTLP exporters
1213 */
1314export abstract class OTLPAwsBaseExporter < Payload , Response > {
1415 protected parentExporter : OTLPExporterBase < Payload > ;
15- private compression ?: CompressionAlgorithm ;
16+ private readonly originalHeaders ?: Record < string , string > ;
17+ private readonly compression ?: CompressionAlgorithm ;
18+ private newHeaders : Record < string , string > = { } ;
1619 private endpoint : string ;
1720 private serializer : PassthroughSerializer < Response > ;
1821 private authenticator : AwsAuthenticator ;
@@ -37,8 +40,8 @@ export abstract class OTLPAwsBaseExporter<Payload, Response> {
3740 // To see why this works:
3841 // https://github.com/open-telemetry/opentelemetry-js/blob/ec17ce48d0e5a99a122da5add612a20e2dd84ed5/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts#L69
3942 this . serializer = new PassthroughSerializer < Response > ( this . parentSerializer . deserializeResponse ) ;
40-
4143 this . parentExporter [ '_delegate' ] . _serializer = this . serializer ;
44+ this . originalHeaders = this . getHeaders ( ) ;
4245 }
4346
4447 /**
@@ -49,52 +52,82 @@ export abstract class OTLPAwsBaseExporter<Payload, Response> {
4952 * @param resultCallback - Callback function to handle export result
5053 */
5154 public async export ( items : Payload , resultCallback : ( result : ExportResult ) => void ) : Promise < void > {
52- let serializedData : Uint8Array | undefined = this . parentSerializer . serializeRequest ( items ) ;
55+ if ( this . originalHeaders ) {
56+ let serializedData : Uint8Array | undefined = this . parentSerializer . serializeRequest ( items ) ;
5357
54- if ( ! serializedData ) {
55- resultCallback ( {
56- code : ExportResultCode . FAILED ,
57- error : new Error ( 'Nothing to send' ) ,
58- } ) ;
59- return ;
60- }
58+ if ( ! serializedData ) {
59+ resultCallback ( {
60+ code : ExportResultCode . FAILED ,
61+ error : new Error ( 'Nothing to send' ) ,
62+ } ) ;
63+ return ;
64+ }
6165
62- const headers = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ?. headers ( ) ;
66+ const shouldCompress = this . compression && this . compression !== CompressionAlgorithm . NONE ;
6367
64- // This should never be reached as upstream always sets the header.
65- if ( ! headers ) {
66- resultCallback ( {
67- code : ExportResultCode . FAILED ,
68- error : new Error ( `Request headers are undefined - unable to export to ${ this . endpoint } ` ) ,
69- } ) ;
68+ if ( shouldCompress ) {
69+ try {
70+ serializedData = gzipSync ( serializedData ) ;
71+ this . addHeader ( 'Content-Encoding' , 'gzip' ) ;
72+ } catch ( exception ) {
73+ resultCallback ( {
74+ code : ExportResultCode . FAILED ,
75+ error : new Error ( `Failed to compress: ${ exception } ` ) ,
76+ } ) ;
77+ return ;
78+ }
79+ }
7080
71- return ;
72- }
81+ this . serializer . setSerializedData ( serializedData ) ;
7382
74- delete headers [ 'Content-Encoding' ] ;
75- const shouldCompress = this . compression && this . compression !== CompressionAlgorithm . NONE ;
83+ const mergedHeaders = { ... this . newHeaders , ... this . originalHeaders } ;
84+ const signedHeaders = await this . authenticator . authenticate ( mergedHeaders , serializedData ) ;
7685
77- if ( shouldCompress ) {
78- try {
79- serializedData = gzipSync ( serializedData ) ;
80- headers [ 'Content-Encoding' ] = 'gzip' ;
81- } catch ( exception ) {
82- resultCallback ( {
83- code : ExportResultCode . FAILED ,
84- error : new Error ( `Failed to compress: ${ exception } ` ) ,
85- } ) ;
86- return ;
86+ if ( signedHeaders ) {
87+ this . setTransportHeaders ( signedHeaders ) ;
8788 }
89+
90+ this . parentExporter . export ( items , resultCallback ) ;
91+
92+ this . setTransportHeaders ( this . originalHeaders ) ;
93+ this . newHeaders = { } ;
94+ return ;
8895 }
8996
90- this . serializer . setSerializedData ( serializedData ) ;
97+ resultCallback ( {
98+ code : ExportResultCode . FAILED ,
99+ error : new Error ( 'No headers found, cannot sign request. Not exporting.' ) ,
100+ } ) ;
101+ }
91102
92- const signedRequestHeaders = await this . authenticator . authenticate ( headers , serializedData ) ;
103+ // This is a bit ugly but need it in order safely set any new headers
104+
105+ /**
106+ * Adds a header to the exporter's transport parameters
107+ */
108+ protected addHeader ( key : string , value : string ) : void {
109+ this . newHeaders [ key ] = value ;
110+ }
93111
94- if ( signedRequestHeaders ) {
95- this . parentExporter [ '_delegate' ] . _transport . _transport . _parameters . headers = ( ) => signedRequestHeaders ;
112+ /**
113+ * Gets headers in the transport parameters
114+ */
115+ private getHeaders ( ) : Record < string , string > | undefined {
116+ const headersFunc = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ?. headers ;
117+ if ( ! headersFunc ) {
118+ diag . debug ( 'No existing headers found, using empty headers.' ) ;
119+ return undefined ;
96120 }
121+ return headersFunc ( ) ;
122+ }
97123
98- this . parentExporter . export ( items , resultCallback ) ;
124+ /**
125+ * Sets headers in the transport parameters
126+ */
127+ private setTransportHeaders ( headers : Record < string , string > ) : void {
128+ const parameters = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ;
129+ if ( parameters ) {
130+ parameters . headers = ( ) => headers ;
131+ }
99132 }
100133}
0 commit comments