diff --git a/CHANGELOG.md b/CHANGELOG.md index 4903862b..dba2b80f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,3 +17,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t - Support X-Ray Trace Id extraction from Lambda Context object, and respect user-configured OTEL_PROPAGATORS in AWS Lamdba instrumentation ([#259](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259)) + +### Bugfixes + +- Fix issue where UDP Exporter throws error in async callback, which isn't caught + ([#289](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259)) diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/otlp-udp-exporter.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/otlp-udp-exporter.ts index 1a866e02..18dbb061 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/otlp-udp-exporter.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/otlp-udp-exporter.ts @@ -24,20 +24,25 @@ export class UdpExporter { this._socket.unref(); } - sendData(data: Uint8Array, signalFormatPrefix: string): void { + sendData(data: Uint8Array, signalFormatPrefix: string): Promise { const base64EncodedString = Buffer.from(data).toString('base64'); const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`; - try { - this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => { - if (err) { - throw err; - } - }); - } catch (err) { - diag.error('Error sending UDP data: %s', err); - throw err; - } + return new Promise((resolve, reject) => { + try { + this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => { + if (err) { + diag.error('Error sending UDP data: %s', err); + reject(err); + } else { + resolve(); + } + }); + } catch (err) { + diag.error('Error sending UDP data: %s', err); + reject(err); + } + }); } shutdown(): void { @@ -70,13 +75,13 @@ export class OTLPUdpSpanExporter implements SpanExporter { if (serializedData == null) { return; } - try { - this._udpExporter.sendData(serializedData, this._signalPrefix); - return resultCallback({ code: ExportResultCode.SUCCESS }); - } catch (err) { - diag.error('Error exporting spans: %s', err); - return resultCallback({ code: ExportResultCode.FAILED }); - } + this._udpExporter + .sendData(serializedData, this._signalPrefix) + .then(() => resultCallback({ code: ExportResultCode.SUCCESS })) + .catch(err => { + diag.error('Error exporting spans: %s', err); + resultCallback({ code: ExportResultCode.FAILED }); + }); } forceFlush(): Promise { diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts index 23ec223d..ac5ed877 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts @@ -609,7 +609,7 @@ describe('TestAWSCloudWatchEMFExporter', () => { }); }); - it('TestExportCallsSendLogBatchWithExpectedInput', async () => { + it('TestExportCallsSendLogBatchWithExpectedInput', done => { const timeInSeconds = Math.round(Date.now() / 1000); const resourceMetricsData: ResourceMetrics = { @@ -659,24 +659,22 @@ describe('TestAWSCloudWatchEMFExporter', () => { const logClientSendLogBatchStub = sinon.stub(exporter['logClient'], 'sendLogBatch' as any); sinon.stub(exporter['logClient'], 'eventBatchExceedsLimit' as any).returns(true); - await new Promise(resolve => { - exporter.export(resourceMetricsData, result => { - expect(result.code).toEqual(ExportResultCode.FAILED); - - sinon.assert.calledThrice(logClientSendLogBatchStub); - const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch; - const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch; - const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch; - - expect(call1Args.logEvents.length).toEqual(0); - expect(call2Args.logEvents[0].message).toMatch( - /^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/ - ); - expect(call3Args.logEvents[0].message).toMatch( - /^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/ - ); - resolve(); - }); + exporter.export(resourceMetricsData, result => { + expect(result.code).toEqual(ExportResultCode.SUCCESS); + + sinon.assert.calledThrice(logClientSendLogBatchStub); + const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch; + const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch; + const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch; + + expect(call1Args.logEvents.length).toEqual(0); + expect(call2Args.logEvents[0].message).toMatch( + /^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/ + ); + expect(call3Args.logEvents[0].message).toMatch( + /^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/ + ); + done(); }); }); diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/otlp-udp-exporter.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/otlp-udp-exporter.test.ts index 20eb286d..04eee3b7 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/otlp-udp-exporter.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/otlp-udp-exporter.test.ts @@ -49,18 +49,36 @@ describe('UdpExporterTest', () => { expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary); }); - it('should handle errors when sending UDP data', () => { + it('should handle errors when sending UDP data', async () => { const errorMessage = 'UDP send error'; socketSend.yields(new Error(errorMessage)); // Simulate an error const data = new Uint8Array([1, 2, 3]); - // Expect the sendData method to throw the error - expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage); + // Expect the sendData method to reject with the error + await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage); // Assert that diag.error was called with the correct error message expect(diagErrorSpy.calledOnce).toBe(true); expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true); }); + it('should resolve when sending UDP data successfully', async () => { + socketSend.yields(null); // Simulate successful send + + const data = new Uint8Array([1, 2, 3]); + await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined(); + expect(diagErrorSpy.notCalled).toBe(true); + }); + + it('should handle synchronous errors when sending UDP data', async () => { + const errorMessage = 'Synchronous UDP send error'; + socketSend.throws(new Error(errorMessage)); // Simulate synchronous error + + const data = new Uint8Array([1, 2, 3]); + await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage); + expect(diagErrorSpy.calledOnce).toBe(true); + expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true); + }); + it('should close the socket on shutdown', () => { udpExporter.shutdown(); expect(socketClose.calledOnce).toBe(true); @@ -125,16 +143,21 @@ describe('OTLPUdpSpanExporterTest', () => { sinon.restore(); }); - it('should export spans successfully', () => { + it('should export spans successfully', done => { const callback = sinon.stub(); // Stub ProtobufTraceSerializer.serializeRequest sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData); + udpExporterMock.sendData.resolves(); // Make sendData resolve successfully otlpUdpSpanExporter.export(spans, callback); - expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true); - expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true); - expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged + // Use setTimeout to allow Promise to resolve + setTimeout(() => { + expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true); + expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true); + expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged + done(); + }, 0); }); it('should handle serialization failure', () => { @@ -149,20 +172,33 @@ describe('OTLPUdpSpanExporterTest', () => { expect(diagErrorSpy.notCalled).toBe(true); }); - it('should handle errors during export', () => { + it('should handle errors during export', done => { const error = new Error('Export error'); - udpExporterMock.sendData.throws(error); + udpExporterMock.sendData.rejects(error); // Make sendData reject with error + sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData); const callback = sinon.stub(); otlpUdpSpanExporter.export(spans, callback); - expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true); - expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true); + // Use setTimeout to allow Promise to reject + setTimeout(() => { + expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true); + expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true); + done(); + }, 0); }); it('should shutdown the UDP exporter successfully', async () => { await otlpUdpSpanExporter.shutdown(); expect(udpExporterMock.shutdown.calledOnce).toBe(true); }); + + it('should handle shutdown errors', async () => { + const shutdownError = new Error('Shutdown error'); + udpExporterMock.shutdown.reset(); + udpExporterMock.shutdown.throws(shutdownError); + + await expect(otlpUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error'); + }); }); diff --git a/exporters/aws-distro-opentelemetry-exporter-xray-udp/src/aws-xray-udp-span-exporter.ts b/exporters/aws-distro-opentelemetry-exporter-xray-udp/src/aws-xray-udp-span-exporter.ts index b5c69232..65d46cc1 100644 --- a/exporters/aws-distro-opentelemetry-exporter-xray-udp/src/aws-xray-udp-span-exporter.ts +++ b/exporters/aws-distro-opentelemetry-exporter-xray-udp/src/aws-xray-udp-span-exporter.ts @@ -24,20 +24,25 @@ export class UdpExporter { this._socket.unref(); } - sendData(data: Uint8Array, signalFormatPrefix: string): void { + sendData(data: Uint8Array, signalFormatPrefix: string): Promise { const base64EncodedString = Buffer.from(data).toString('base64'); const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`; - try { - this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => { - if (err) { - throw err; - } - }); - } catch (err) { - diag.error('Error sending UDP data: %s', err); - throw err; - } + return new Promise((resolve, reject) => { + try { + this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => { + if (err) { + diag.error('Error sending UDP data: %s', err); + reject(err); + } else { + resolve(); + } + }); + } catch (err) { + diag.error('Error sending UDP data: %s', err); + reject(err); + } + }); } shutdown(): void { @@ -79,13 +84,13 @@ export class AwsXrayUdpSpanExporter implements SpanExporter { if (serializedData == null) { return; } - try { - this._udpExporter.sendData(serializedData, this._signalPrefix); - return resultCallback({ code: ExportResultCode.SUCCESS }); - } catch (err) { - diag.error('Error exporting spans: %s', err); - return resultCallback({ code: ExportResultCode.FAILED }); - } + this._udpExporter + .sendData(serializedData, this._signalPrefix) + .then(() => resultCallback({ code: ExportResultCode.SUCCESS })) + .catch(err => { + diag.error('Error exporting spans: %s', err); + resultCallback({ code: ExportResultCode.FAILED }); + }); } forceFlush(): Promise { diff --git a/exporters/aws-distro-opentelemetry-exporter-xray-udp/test/aws-xray-udp-span-exporter.test.ts b/exporters/aws-distro-opentelemetry-exporter-xray-udp/test/aws-xray-udp-span-exporter.test.ts index 7abd2bc2..3541d79c 100644 --- a/exporters/aws-distro-opentelemetry-exporter-xray-udp/test/aws-xray-udp-span-exporter.test.ts +++ b/exporters/aws-distro-opentelemetry-exporter-xray-udp/test/aws-xray-udp-span-exporter.test.ts @@ -50,18 +50,36 @@ describe('UdpExporterTest', () => { expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary); }); - it('should handle errors when sending UDP data', () => { + it('should handle errors when sending UDP data', async () => { const errorMessage = 'UDP send error'; socketSend.yields(new Error(errorMessage)); // Simulate an error const data = new Uint8Array([1, 2, 3]); - // Expect the sendData method to throw the error - expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage); + // Expect the sendData method to reject with the error + await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage); // Assert that diag.error was called with the correct error message expect(diagErrorSpy.calledOnce).toBe(true); expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true); }); + it('should resolve when sending UDP data successfully', async () => { + socketSend.yields(null); // Simulate successful send + + const data = new Uint8Array([1, 2, 3]); + await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined(); + expect(diagErrorSpy.notCalled).toBe(true); + }); + + it('should handle synchronous errors when sending UDP data', async () => { + const errorMessage = 'Synchronous UDP send error'; + socketSend.throws(new Error(errorMessage)); // Simulate synchronous error + + const data = new Uint8Array([1, 2, 3]); + await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage); + expect(diagErrorSpy.calledOnce).toBe(true); + expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true); + }); + it('should close the socket on shutdown', () => { udpExporter.shutdown(); expect(socketClose.calledOnce).toBe(true); @@ -132,16 +150,21 @@ describe('AwsXrayUdpSpanExporterTest', () => { sinon.restore(); }); - it('should export spans successfully', () => { + it('should export spans successfully', done => { const callback = sinon.stub(); // Stub ProtobufTraceSerializer.serializeRequest sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData); + udpExporterMock.sendData.resolves(); // Make sendData resolve successfully awsXrayUdpSpanExporter.export(spans, callback); - expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true); - expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true); - expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged + // Use setTimeout to allow Promise to resolve + setTimeout(() => { + expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true); + expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true); + expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged + done(); + }, 0); }); it('should handle serialization failure', () => { @@ -156,16 +179,21 @@ describe('AwsXrayUdpSpanExporterTest', () => { expect(diagErrorSpy.notCalled).toBe(true); }); - it('should handle errors during export', () => { + it('should handle errors during export', done => { const error = new Error('Export error'); - udpExporterMock.sendData.throws(error); + udpExporterMock.sendData.rejects(error); + sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData); const callback = sinon.stub(); awsXrayUdpSpanExporter.export(spans, callback); - expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true); - expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true); + // Use setTimeout to allow Promise to reject + setTimeout(() => { + expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true); + expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true); + done(); + }, 0); }); it('should forceFlush without throwing', async () => { @@ -177,6 +205,14 @@ describe('AwsXrayUdpSpanExporterTest', () => { expect(udpExporterMock.shutdown.calledOnce).toBe(true); }); + it('should handle shutdown errors', async () => { + const shutdownError = new Error('Shutdown error'); + udpExporterMock.shutdown.reset(); + udpExporterMock.shutdown.throws(shutdownError); + + await expect(awsXrayUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error'); + }); + it('should use expected Environment Variables to configure endpoint', () => { process.env.AWS_LAMBDA_FUNCTION_NAME = 'testFunctionName'; process.env.AWS_XRAY_DAEMON_ADDRESS = 'someaddress:1234';