@@ -4,18 +4,14 @@ import { CompressionAlgorithm, OTLPExporterBase } from '@opentelemetry/otlp-expo
44import { gzipSync } from 'zlib' ;
55import { ExportResult , ExportResultCode } from '@opentelemetry/core' ;
66import { AwsAuthenticator } from './aws-authenticator' ;
7- import { PassthroughSerializer } from './passthrough-serializer' ;
87import { ISerializer } from '@opentelemetry/otlp-transformer' ;
9- import { diag } from '@opentelemetry/api' ;
108
119/**
1210 * Base class for AWS OTLP exporters
1311 */
14- export abstract class OTLPAwsBaseExporter < Payload , Response > {
12+ export abstract class OTLPAwsBaseExporter < Payload , Response > extends OTLPExporterBase < Payload > {
1513 protected parentExporter : OTLPExporterBase < Payload > ;
16- private readonly originalHeaders ?: Record < string , string > ;
1714 private readonly compression ?: CompressionAlgorithm ;
18- private newHeaders : Record < string , string > = { } ;
1915 private endpoint : string ;
2016 private serializer : PassthroughSerializer < Response > ;
2117 private authenticator : AwsAuthenticator ;
@@ -28,6 +24,7 @@ export abstract class OTLPAwsBaseExporter<Payload, Response> {
2824 parentSerializer : ISerializer < Payload , Response > ,
2925 compression ?: CompressionAlgorithm
3026 ) {
27+ super ( parentExporter [ '_delegate' ] ) ;
3128 this . compression = compression ;
3229 this . endpoint = endpoint ;
3330 this . authenticator = new AwsAuthenticator ( this . endpoint , service ) ;
@@ -41,7 +38,6 @@ export abstract class OTLPAwsBaseExporter<Payload, Response> {
4138 // https://github.com/open-telemetry/opentelemetry-js/blob/ec17ce48d0e5a99a122da5add612a20e2dd84ed5/experimental/packages/otlp-exporter-base/src/otlp-export-delegate.ts#L69
4239 this . serializer = new PassthroughSerializer < Response > ( this . parentSerializer . deserializeResponse ) ;
4340 this . parentExporter [ '_delegate' ] . _serializer = this . serializer ;
44- this . originalHeaders = this . getHeaders ( ) ;
4541 }
4642
4743 /**
@@ -51,83 +47,101 @@ export abstract class OTLPAwsBaseExporter<Payload, Response> {
5147 * @param items - Array of signal data to export
5248 * @param resultCallback - Callback function to handle export result
5349 */
54- public async export ( items : Payload , resultCallback : ( result : ExportResult ) => void ) : Promise < void > {
55- if ( this . originalHeaders ) {
56- let serializedData : Uint8Array | undefined = this . parentSerializer . serializeRequest ( items ) ;
50+ override async export ( items : Payload , resultCallback : ( result : ExportResult ) => void ) : Promise < void > {
51+ const headers = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ?. headers ( ) ;
52+
53+ if ( ! headers ) {
54+ resultCallback ( {
55+ code : ExportResultCode . FAILED ,
56+ error : new Error ( `Request headers are unset - unable to export to ${ this . endpoint } ` ) ,
57+ } ) ;
58+ return ;
59+ }
60+
61+ let serializedData : Uint8Array | undefined = this . parentSerializer . serializeRequest ( items ) ;
62+
63+ if ( ! serializedData ) {
64+ resultCallback ( {
65+ code : ExportResultCode . FAILED ,
66+ error : new Error ( 'Nothing to send' ) ,
67+ } ) ;
68+ return ;
69+ }
5770
58- if ( ! serializedData ) {
71+ delete headers [ 'Content-Encoding' ] ;
72+ const shouldCompress = this . compression && this . compression !== CompressionAlgorithm . NONE ;
73+
74+ if ( shouldCompress ) {
75+ try {
76+ serializedData = gzipSync ( serializedData ) ;
77+ headers [ 'Content-Encoding' ] = 'gzip' ;
78+ } catch ( exception ) {
5979 resultCallback ( {
6080 code : ExportResultCode . FAILED ,
61- error : new Error ( 'Nothing to send' ) ,
81+ error : new Error ( `Failed to compress: ${ exception } ` ) ,
6282 } ) ;
6383 return ;
6484 }
85+ }
6586
66- const shouldCompress = this . compression && this . compression !== CompressionAlgorithm . NONE ;
67-
68- if ( shouldCompress ) {
69- try {
70- serializedData = gzipSync ( serializedData ) ;
71- this . addHeader ( 'Content-Encoding' , 'gzip' ) ;
72- } catch ( exception ) {
73- resultCallback ( {
74- code : ExportResultCode . FAILED ,
75- error : new Error ( `Failed to compress: ${ exception } ` ) ,
76- } ) ;
77- return ;
78- }
79- }
87+ this . serializer . setSerializedData ( serializedData ) ;
88+ const signedHeaders = await this . authenticator . authenticate ( headers , serializedData ) ;
8089
81- this . serializer . setSerializedData ( serializedData ) ;
90+ if ( signedHeaders ) {
91+ this . parentExporter [ '_delegate' ] . _transport . _transport . _parameters . headers = ( ) => signedHeaders ;
92+ }
8293
83- const mergedHeaders = { ... this . newHeaders , ... this . originalHeaders } ;
84- const signedHeaders = await this . authenticator . authenticate ( mergedHeaders , serializedData ) ;
94+ this . parentExporter . export ( items , resultCallback ) ;
95+ }
8596
86- if ( signedHeaders ) {
87- this . setTransportHeaders ( signedHeaders ) ;
88- }
97+ override shutdown ( ) : Promise < void > {
98+ return this . parentExporter . shutdown ( ) ;
99+ }
89100
90- this . parentExporter . export ( items , resultCallback ) ;
101+ override forceFlush ( ) : Promise < void > {
102+ return this . parentExporter . forceFlush ( ) ;
103+ }
104+ }
91105
92- this . setTransportHeaders ( this . originalHeaders ) ;
93- this . newHeaders = { } ;
94- return ;
95- }
106+ /**
107+ * A serializer that bypasses request serialization by returning pre-serialized data.
108+ * @template Response The type of the deserialized response
109+ */
110+ class PassthroughSerializer < Response > implements ISerializer < Uint8Array , Response > {
111+ private serializedData : Uint8Array = new Uint8Array ( ) ;
112+ private deserializer : ( data : Uint8Array ) => Response ;
96113
97- resultCallback ( {
98- code : ExportResultCode . FAILED ,
99- error : new Error ( 'No headers found, cannot sign request. Not exporting.' ) ,
100- } ) ;
114+ /**
115+ * Creates a new PassthroughSerializer instance.
116+ * @param deserializer Function to deserialize response data
117+ */
118+ constructor ( deserializer : ( data : Uint8Array ) => Response ) {
119+ this . deserializer = deserializer ;
101120 }
102121
103- // This is a bit ugly but need it in order safely set any new headers
104-
105122 /**
106- * Adds a header to the exporter's transport parameters
123+ * Sets the pre-serialized data to be returned when serializeRequest is called.
124+ * @param data The serialized data to use
107125 */
108- protected addHeader ( key : string , value : string ) : void {
109- this . newHeaders [ key ] = value ;
126+ setSerializedData ( data : Uint8Array ) : void {
127+ this . serializedData = data ;
110128 }
111129
112130 /**
113- * Gets headers in the transport parameters
131+ * Returns the pre-serialized data, ignoring the request parameter.
132+ * @param request Ignored parameter.
133+ * @returns The pre-serialized data
114134 */
115- private getHeaders ( ) : Record < string , string > | undefined {
116- const headersFunc = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ?. headers ;
117- if ( ! headersFunc ) {
118- diag . debug ( 'No existing headers found, using empty headers.' ) ;
119- return undefined ;
120- }
121- return headersFunc ( ) ;
135+ serializeRequest ( request : Uint8Array ) : Uint8Array {
136+ return this . serializedData ;
122137 }
123138
124139 /**
125- * Sets headers in the transport parameters
140+ * Deserializes response data using the provided deserializer function.
141+ * @param data The response data to deserialize
142+ * @returns The deserialized response
126143 */
127- private setTransportHeaders ( headers : Record < string , string > ) : void {
128- const parameters = this . parentExporter [ '_delegate' ] . _transport ?. _transport ?. _parameters ;
129- if ( parameters ) {
130- parameters . headers = ( ) => headers ;
131- }
144+ deserializeResponse ( data : Uint8Array ) : Response {
145+ return this . deserializer ( data ) ;
132146 }
133147}
0 commit comments