Skip to content

Commit cd0b7b2

Browse files
authored
Fix issue where UDP Exporter throws error in async callback, which isn't caught (#289)
*Issue #, if available:* In UDP Exporter, when sending via the socket, our callback method will rethrow the error if there an error from the `send`. However, this is done asynchronously, so it isn't actually caught in the surrounding `try/catch`. This `err` can happen when the UDP Packet size limit is reached. When this error is thrown, it can cause the following error in Lambda: ``` [ERROR] [1761253646458] LAMBDA_RUNTIME Failed to post handler success response. Http response code: 403. { "errorMessage": "State transition from RuntimeResponseSentState to InvocationErrorResponse failed for runtime. Error: State transition is not allowed", "errorType": "InvalidStateTransition" } ``` *Description of changes:* - reject error, as opposed to throwing error because it cannot be caught in the existing surrounding `try/catch`. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 42751df commit cd0b7b2

File tree

6 files changed

+162
-77
lines changed

6 files changed

+162
-77
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
1717

1818
- Support X-Ray Trace Id extraction from Lambda Context object, and respect user-configured OTEL_PROPAGATORS in AWS Lamdba instrumentation
1919
([#259](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259))
20+
21+
### Bugfixes
22+
23+
- Fix issue where UDP Exporter throws error in async callback, which isn't caught
24+
([#289](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259))

aws-distro-opentelemetry-node-autoinstrumentation/src/otlp-udp-exporter.ts

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,25 @@ export class UdpExporter {
2424
this._socket.unref();
2525
}
2626

27-
sendData(data: Uint8Array, signalFormatPrefix: string): void {
27+
sendData(data: Uint8Array, signalFormatPrefix: string): Promise<void> {
2828
const base64EncodedString = Buffer.from(data).toString('base64');
2929
const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`;
3030

31-
try {
32-
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
33-
if (err) {
34-
throw err;
35-
}
36-
});
37-
} catch (err) {
38-
diag.error('Error sending UDP data: %s', err);
39-
throw err;
40-
}
31+
return new Promise((resolve, reject) => {
32+
try {
33+
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
34+
if (err) {
35+
diag.error('Error sending UDP data: %s', err);
36+
reject(err);
37+
} else {
38+
resolve();
39+
}
40+
});
41+
} catch (err) {
42+
diag.error('Error sending UDP data: %s', err);
43+
reject(err);
44+
}
45+
});
4146
}
4247

4348
shutdown(): void {
@@ -70,13 +75,13 @@ export class OTLPUdpSpanExporter implements SpanExporter {
7075
if (serializedData == null) {
7176
return;
7277
}
73-
try {
74-
this._udpExporter.sendData(serializedData, this._signalPrefix);
75-
return resultCallback({ code: ExportResultCode.SUCCESS });
76-
} catch (err) {
77-
diag.error('Error exporting spans: %s', err);
78-
return resultCallback({ code: ExportResultCode.FAILED });
79-
}
78+
this._udpExporter
79+
.sendData(serializedData, this._signalPrefix)
80+
.then(() => resultCallback({ code: ExportResultCode.SUCCESS }))
81+
.catch(err => {
82+
diag.error('Error exporting spans: %s', err);
83+
resultCallback({ code: ExportResultCode.FAILED });
84+
});
8085
}
8186

8287
forceFlush(): Promise<void> {

aws-distro-opentelemetry-node-autoinstrumentation/test/exporter/aws/metrics/aws-cloudwatch-emf-exporter.test.ts

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ describe('TestAWSCloudWatchEMFExporter', () => {
609609
});
610610
});
611611

612-
it('TestExportCallsSendLogBatchWithExpectedInput', async () => {
612+
it('TestExportCallsSendLogBatchWithExpectedInput', done => {
613613
const timeInSeconds = Math.round(Date.now() / 1000);
614614

615615
const resourceMetricsData: ResourceMetrics = {
@@ -659,24 +659,22 @@ describe('TestAWSCloudWatchEMFExporter', () => {
659659
const logClientSendLogBatchStub = sinon.stub(exporter['logClient'], 'sendLogBatch' as any);
660660
sinon.stub(exporter['logClient'], 'eventBatchExceedsLimit' as any).returns(true);
661661

662-
await new Promise<void>(resolve => {
663-
exporter.export(resourceMetricsData, result => {
664-
expect(result.code).toEqual(ExportResultCode.FAILED);
665-
666-
sinon.assert.calledThrice(logClientSendLogBatchStub);
667-
const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch;
668-
const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch;
669-
const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch;
670-
671-
expect(call1Args.logEvents.length).toEqual(0);
672-
expect(call2Args.logEvents[0].message).toMatch(
673-
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/
674-
);
675-
expect(call3Args.logEvents[0].message).toMatch(
676-
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/
677-
);
678-
resolve();
679-
});
662+
exporter.export(resourceMetricsData, result => {
663+
expect(result.code).toEqual(ExportResultCode.SUCCESS);
664+
665+
sinon.assert.calledThrice(logClientSendLogBatchStub);
666+
const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch;
667+
const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch;
668+
const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch;
669+
670+
expect(call1Args.logEvents.length).toEqual(0);
671+
expect(call2Args.logEvents[0].message).toMatch(
672+
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/
673+
);
674+
expect(call3Args.logEvents[0].message).toMatch(
675+
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/
676+
);
677+
done();
680678
});
681679
});
682680

aws-distro-opentelemetry-node-autoinstrumentation/test/otlp-udp-exporter.test.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,36 @@ describe('UdpExporterTest', () => {
4949
expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary);
5050
});
5151

52-
it('should handle errors when sending UDP data', () => {
52+
it('should handle errors when sending UDP data', async () => {
5353
const errorMessage = 'UDP send error';
5454
socketSend.yields(new Error(errorMessage)); // Simulate an error
5555

5656
const data = new Uint8Array([1, 2, 3]);
57-
// Expect the sendData method to throw the error
58-
expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage);
57+
// Expect the sendData method to reject with the error
58+
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
5959
// Assert that diag.error was called with the correct error message
6060
expect(diagErrorSpy.calledOnce).toBe(true);
6161
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
6262
});
6363

64+
it('should resolve when sending UDP data successfully', async () => {
65+
socketSend.yields(null); // Simulate successful send
66+
67+
const data = new Uint8Array([1, 2, 3]);
68+
await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined();
69+
expect(diagErrorSpy.notCalled).toBe(true);
70+
});
71+
72+
it('should handle synchronous errors when sending UDP data', async () => {
73+
const errorMessage = 'Synchronous UDP send error';
74+
socketSend.throws(new Error(errorMessage)); // Simulate synchronous error
75+
76+
const data = new Uint8Array([1, 2, 3]);
77+
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
78+
expect(diagErrorSpy.calledOnce).toBe(true);
79+
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
80+
});
81+
6482
it('should close the socket on shutdown', () => {
6583
udpExporter.shutdown();
6684
expect(socketClose.calledOnce).toBe(true);
@@ -125,16 +143,21 @@ describe('OTLPUdpSpanExporterTest', () => {
125143
sinon.restore();
126144
});
127145

128-
it('should export spans successfully', () => {
146+
it('should export spans successfully', done => {
129147
const callback = sinon.stub();
130148
// Stub ProtobufTraceSerializer.serializeRequest
131149
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
150+
udpExporterMock.sendData.resolves(); // Make sendData resolve successfully
132151

133152
otlpUdpSpanExporter.export(spans, callback);
134153

135-
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
136-
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
137-
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
154+
// Use setTimeout to allow Promise to resolve
155+
setTimeout(() => {
156+
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
157+
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
158+
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
159+
done();
160+
}, 0);
138161
});
139162

140163
it('should handle serialization failure', () => {
@@ -149,20 +172,33 @@ describe('OTLPUdpSpanExporterTest', () => {
149172
expect(diagErrorSpy.notCalled).toBe(true);
150173
});
151174

152-
it('should handle errors during export', () => {
175+
it('should handle errors during export', done => {
153176
const error = new Error('Export error');
154-
udpExporterMock.sendData.throws(error);
177+
udpExporterMock.sendData.rejects(error); // Make sendData reject with error
178+
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
155179

156180
const callback = sinon.stub();
157181

158182
otlpUdpSpanExporter.export(spans, callback);
159183

160-
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true);
161-
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
184+
// Use setTimeout to allow Promise to reject
185+
setTimeout(() => {
186+
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true);
187+
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
188+
done();
189+
}, 0);
162190
});
163191

164192
it('should shutdown the UDP exporter successfully', async () => {
165193
await otlpUdpSpanExporter.shutdown();
166194
expect(udpExporterMock.shutdown.calledOnce).toBe(true);
167195
});
196+
197+
it('should handle shutdown errors', async () => {
198+
const shutdownError = new Error('Shutdown error');
199+
udpExporterMock.shutdown.reset();
200+
udpExporterMock.shutdown.throws(shutdownError);
201+
202+
await expect(otlpUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error');
203+
});
168204
});

exporters/aws-distro-opentelemetry-exporter-xray-udp/src/aws-xray-udp-span-exporter.ts

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,25 @@ export class UdpExporter {
2424
this._socket.unref();
2525
}
2626

27-
sendData(data: Uint8Array, signalFormatPrefix: string): void {
27+
sendData(data: Uint8Array, signalFormatPrefix: string): Promise<void> {
2828
const base64EncodedString = Buffer.from(data).toString('base64');
2929
const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`;
3030

31-
try {
32-
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
33-
if (err) {
34-
throw err;
35-
}
36-
});
37-
} catch (err) {
38-
diag.error('Error sending UDP data: %s', err);
39-
throw err;
40-
}
31+
return new Promise((resolve, reject) => {
32+
try {
33+
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
34+
if (err) {
35+
diag.error('Error sending UDP data: %s', err);
36+
reject(err);
37+
} else {
38+
resolve();
39+
}
40+
});
41+
} catch (err) {
42+
diag.error('Error sending UDP data: %s', err);
43+
reject(err);
44+
}
45+
});
4146
}
4247

4348
shutdown(): void {
@@ -79,13 +84,13 @@ export class AwsXrayUdpSpanExporter implements SpanExporter {
7984
if (serializedData == null) {
8085
return;
8186
}
82-
try {
83-
this._udpExporter.sendData(serializedData, this._signalPrefix);
84-
return resultCallback({ code: ExportResultCode.SUCCESS });
85-
} catch (err) {
86-
diag.error('Error exporting spans: %s', err);
87-
return resultCallback({ code: ExportResultCode.FAILED });
88-
}
87+
this._udpExporter
88+
.sendData(serializedData, this._signalPrefix)
89+
.then(() => resultCallback({ code: ExportResultCode.SUCCESS }))
90+
.catch(err => {
91+
diag.error('Error exporting spans: %s', err);
92+
resultCallback({ code: ExportResultCode.FAILED });
93+
});
8994
}
9095

9196
forceFlush(): Promise<void> {

exporters/aws-distro-opentelemetry-exporter-xray-udp/test/aws-xray-udp-span-exporter.test.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,36 @@ describe('UdpExporterTest', () => {
5050
expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary);
5151
});
5252

53-
it('should handle errors when sending UDP data', () => {
53+
it('should handle errors when sending UDP data', async () => {
5454
const errorMessage = 'UDP send error';
5555
socketSend.yields(new Error(errorMessage)); // Simulate an error
5656

5757
const data = new Uint8Array([1, 2, 3]);
58-
// Expect the sendData method to throw the error
59-
expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage);
58+
// Expect the sendData method to reject with the error
59+
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
6060
// Assert that diag.error was called with the correct error message
6161
expect(diagErrorSpy.calledOnce).toBe(true);
6262
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
6363
});
6464

65+
it('should resolve when sending UDP data successfully', async () => {
66+
socketSend.yields(null); // Simulate successful send
67+
68+
const data = new Uint8Array([1, 2, 3]);
69+
await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined();
70+
expect(diagErrorSpy.notCalled).toBe(true);
71+
});
72+
73+
it('should handle synchronous errors when sending UDP data', async () => {
74+
const errorMessage = 'Synchronous UDP send error';
75+
socketSend.throws(new Error(errorMessage)); // Simulate synchronous error
76+
77+
const data = new Uint8Array([1, 2, 3]);
78+
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
79+
expect(diagErrorSpy.calledOnce).toBe(true);
80+
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
81+
});
82+
6583
it('should close the socket on shutdown', () => {
6684
udpExporter.shutdown();
6785
expect(socketClose.calledOnce).toBe(true);
@@ -132,16 +150,21 @@ describe('AwsXrayUdpSpanExporterTest', () => {
132150
sinon.restore();
133151
});
134152

135-
it('should export spans successfully', () => {
153+
it('should export spans successfully', done => {
136154
const callback = sinon.stub();
137155
// Stub ProtobufTraceSerializer.serializeRequest
138156
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
157+
udpExporterMock.sendData.resolves(); // Make sendData resolve successfully
139158

140159
awsXrayUdpSpanExporter.export(spans, callback);
141160

142-
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
143-
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
144-
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
161+
// Use setTimeout to allow Promise to resolve
162+
setTimeout(() => {
163+
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
164+
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
165+
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
166+
done();
167+
}, 0);
145168
});
146169

147170
it('should handle serialization failure', () => {
@@ -156,16 +179,21 @@ describe('AwsXrayUdpSpanExporterTest', () => {
156179
expect(diagErrorSpy.notCalled).toBe(true);
157180
});
158181

159-
it('should handle errors during export', () => {
182+
it('should handle errors during export', done => {
160183
const error = new Error('Export error');
161-
udpExporterMock.sendData.throws(error);
184+
udpExporterMock.sendData.rejects(error);
185+
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
162186

163187
const callback = sinon.stub();
164188

165189
awsXrayUdpSpanExporter.export(spans, callback);
166190

167-
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true);
168-
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
191+
// Use setTimeout to allow Promise to reject
192+
setTimeout(() => {
193+
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true);
194+
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
195+
done();
196+
}, 0);
169197
});
170198

171199
it('should forceFlush without throwing', async () => {
@@ -177,6 +205,14 @@ describe('AwsXrayUdpSpanExporterTest', () => {
177205
expect(udpExporterMock.shutdown.calledOnce).toBe(true);
178206
});
179207

208+
it('should handle shutdown errors', async () => {
209+
const shutdownError = new Error('Shutdown error');
210+
udpExporterMock.shutdown.reset();
211+
udpExporterMock.shutdown.throws(shutdownError);
212+
213+
await expect(awsXrayUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error');
214+
});
215+
180216
it('should use expected Environment Variables to configure endpoint', () => {
181217
process.env.AWS_LAMBDA_FUNCTION_NAME = 'testFunctionName';
182218
process.env.AWS_XRAY_DAEMON_ADDRESS = 'someaddress:1234';

0 commit comments

Comments
 (0)