Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t

- Support X-Ray Trace Id extraction from Lambda Context object, and respect user-configured OTEL_PROPAGATORS in AWS Lamdba instrumentation
([#259](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259))

### Bugfixes

- Fix issue where UDP Exporter throws error in async callback, which isn't caught
([#289](https://github.com/aws-observability/aws-otel-js-instrumentation/pull/259))
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@ export class UdpExporter {
this._socket.unref();
}

sendData(data: Uint8Array, signalFormatPrefix: string): void {
sendData(data: Uint8Array, signalFormatPrefix: string): Promise<void> {
const base64EncodedString = Buffer.from(data).toString('base64');
const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`;

try {
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
if (err) {
throw err;
}
});
} catch (err) {
diag.error('Error sending UDP data: %s', err);
throw err;
}
return new Promise((resolve, reject) => {
try {
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
if (err) {
diag.error('Error sending UDP data: %s', err);
reject(err);
} else {
resolve();
}
});
} catch (err) {
diag.error('Error sending UDP data: %s', err);
reject(err);
}
});
}

shutdown(): void {
Expand Down Expand Up @@ -70,13 +75,13 @@ export class OTLPUdpSpanExporter implements SpanExporter {
if (serializedData == null) {
return;
}
try {
this._udpExporter.sendData(serializedData, this._signalPrefix);
return resultCallback({ code: ExportResultCode.SUCCESS });
} catch (err) {
diag.error('Error exporting spans: %s', err);
return resultCallback({ code: ExportResultCode.FAILED });
}
this._udpExporter
.sendData(serializedData, this._signalPrefix)
.then(() => resultCallback({ code: ExportResultCode.SUCCESS }))
.catch(err => {
diag.error('Error exporting spans: %s', err);
resultCallback({ code: ExportResultCode.FAILED });
});
}

forceFlush(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ describe('TestAWSCloudWatchEMFExporter', () => {
});
});

it('TestExportCallsSendLogBatchWithExpectedInput', async () => {
it('TestExportCallsSendLogBatchWithExpectedInput', done => {
const timeInSeconds = Math.round(Date.now() / 1000);

const resourceMetricsData: ResourceMetrics = {
Expand Down Expand Up @@ -659,24 +659,22 @@ describe('TestAWSCloudWatchEMFExporter', () => {
const logClientSendLogBatchStub = sinon.stub(exporter['logClient'], 'sendLogBatch' as any);
sinon.stub(exporter['logClient'], 'eventBatchExceedsLimit' as any).returns(true);

await new Promise<void>(resolve => {
exporter.export(resourceMetricsData, result => {
expect(result.code).toEqual(ExportResultCode.FAILED);

sinon.assert.calledThrice(logClientSendLogBatchStub);
const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch;
const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch;
const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch;

expect(call1Args.logEvents.length).toEqual(0);
expect(call2Args.logEvents[0].message).toMatch(
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/
);
expect(call3Args.logEvents[0].message).toMatch(
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/
);
resolve();
});
exporter.export(resourceMetricsData, result => {
expect(result.code).toEqual(ExportResultCode.SUCCESS);

sinon.assert.calledThrice(logClientSendLogBatchStub);
const call1Args = logClientSendLogBatchStub.getCall(0).args[0] as LogEventBatch;
const call2Args = logClientSendLogBatchStub.getCall(1).args[0] as LogEventBatch;
const call3Args = logClientSendLogBatchStub.getCall(2).args[0] as LogEventBatch;

expect(call1Args.logEvents.length).toEqual(0);
expect(call2Args.logEvents[0].message).toMatch(
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey1"\]\]\}\]},"Version":"1","descriptorName":3,"uniqueKey1":"uniqueValue1"\}$/
);
expect(call3Args.logEvents[0].message).toMatch(
/^\{"_aws":\{"Timestamp":\d+,"CloudWatchMetrics":\[\{"Namespace":"TestNamespace","Metrics":\[\{"Name":"descriptorName","Unit":"Milliseconds"\}\],"Dimensions":\[\["uniqueKey2"\]\]\}\]},"Version":"1","descriptorName":9,"uniqueKey2":"uniqueValue2"\}$/
);
done();
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,36 @@ describe('UdpExporterTest', () => {
expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary);
});

it('should handle errors when sending UDP data', () => {
it('should handle errors when sending UDP data', async () => {
const errorMessage = 'UDP send error';
socketSend.yields(new Error(errorMessage)); // Simulate an error

const data = new Uint8Array([1, 2, 3]);
// Expect the sendData method to throw the error
expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage);
// Expect the sendData method to reject with the error
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
// Assert that diag.error was called with the correct error message
expect(diagErrorSpy.calledOnce).toBe(true);
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
});

it('should resolve when sending UDP data successfully', async () => {
socketSend.yields(null); // Simulate successful send

const data = new Uint8Array([1, 2, 3]);
await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined();
expect(diagErrorSpy.notCalled).toBe(true);
});

it('should handle synchronous errors when sending UDP data', async () => {
const errorMessage = 'Synchronous UDP send error';
socketSend.throws(new Error(errorMessage)); // Simulate synchronous error

const data = new Uint8Array([1, 2, 3]);
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
expect(diagErrorSpy.calledOnce).toBe(true);
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
});

it('should close the socket on shutdown', () => {
udpExporter.shutdown();
expect(socketClose.calledOnce).toBe(true);
Expand Down Expand Up @@ -125,16 +143,21 @@ describe('OTLPUdpSpanExporterTest', () => {
sinon.restore();
});

it('should export spans successfully', () => {
it('should export spans successfully', done => {
const callback = sinon.stub();
// Stub ProtobufTraceSerializer.serializeRequest
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
udpExporterMock.sendData.resolves(); // Make sendData resolve successfully

otlpUdpSpanExporter.export(spans, callback);

expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
// Use setTimeout to allow Promise to resolve
setTimeout(() => {
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
done();
}, 0);
});

it('should handle serialization failure', () => {
Expand All @@ -149,20 +172,33 @@ describe('OTLPUdpSpanExporterTest', () => {
expect(diagErrorSpy.notCalled).toBe(true);
});

it('should handle errors during export', () => {
it('should handle errors during export', done => {
const error = new Error('Export error');
udpExporterMock.sendData.throws(error);
udpExporterMock.sendData.rejects(error); // Make sendData reject with error
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);

const callback = sinon.stub();

otlpUdpSpanExporter.export(spans, callback);

expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
// Use setTimeout to allow Promise to reject
setTimeout(() => {
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
done();
}, 0);
});

it('should shutdown the UDP exporter successfully', async () => {
await otlpUdpSpanExporter.shutdown();
expect(udpExporterMock.shutdown.calledOnce).toBe(true);
});

it('should handle shutdown errors', async () => {
const shutdownError = new Error('Shutdown error');
udpExporterMock.shutdown.reset();
udpExporterMock.shutdown.throws(shutdownError);

await expect(otlpUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@ export class UdpExporter {
this._socket.unref();
}

sendData(data: Uint8Array, signalFormatPrefix: string): void {
sendData(data: Uint8Array, signalFormatPrefix: string): Promise<void> {
const base64EncodedString = Buffer.from(data).toString('base64');
const message = `${PROTOCOL_HEADER}${signalFormatPrefix}${base64EncodedString}`;

try {
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
if (err) {
throw err;
}
});
} catch (err) {
diag.error('Error sending UDP data: %s', err);
throw err;
}
return new Promise((resolve, reject) => {
try {
this._socket.send(Buffer.from(message, 'utf-8'), this._port, this._host, err => {
if (err) {
diag.error('Error sending UDP data: %s', err);
reject(err);
} else {
resolve();
}
});
} catch (err) {
diag.error('Error sending UDP data: %s', err);
reject(err);
}
});
}

shutdown(): void {
Expand Down Expand Up @@ -79,13 +84,13 @@ export class AwsXrayUdpSpanExporter implements SpanExporter {
if (serializedData == null) {
return;
}
try {
this._udpExporter.sendData(serializedData, this._signalPrefix);
return resultCallback({ code: ExportResultCode.SUCCESS });
} catch (err) {
diag.error('Error exporting spans: %s', err);
return resultCallback({ code: ExportResultCode.FAILED });
}
this._udpExporter
.sendData(serializedData, this._signalPrefix)
.then(() => resultCallback({ code: ExportResultCode.SUCCESS }))
.catch(err => {
diag.error('Error exporting spans: %s', err);
resultCallback({ code: ExportResultCode.FAILED });
});
}

forceFlush(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,36 @@ describe('UdpExporterTest', () => {
expect(socketSend.getCall(0).args[0]).toEqual(protbufBinary);
});

it('should handle errors when sending UDP data', () => {
it('should handle errors when sending UDP data', async () => {
const errorMessage = 'UDP send error';
socketSend.yields(new Error(errorMessage)); // Simulate an error

const data = new Uint8Array([1, 2, 3]);
// Expect the sendData method to throw the error
expect(() => udpExporter.sendData(data, 'T1')).toThrow(errorMessage);
// Expect the sendData method to reject with the error
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
// Assert that diag.error was called with the correct error message
expect(diagErrorSpy.calledOnce).toBe(true);
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
});

it('should resolve when sending UDP data successfully', async () => {
socketSend.yields(null); // Simulate successful send

const data = new Uint8Array([1, 2, 3]);
await expect(udpExporter.sendData(data, 'T1')).resolves.toBeUndefined();
expect(diagErrorSpy.notCalled).toBe(true);
});

it('should handle synchronous errors when sending UDP data', async () => {
const errorMessage = 'Synchronous UDP send error';
socketSend.throws(new Error(errorMessage)); // Simulate synchronous error

const data = new Uint8Array([1, 2, 3]);
await expect(udpExporter.sendData(data, 'T1')).rejects.toThrow(errorMessage);
expect(diagErrorSpy.calledOnce).toBe(true);
expect(diagErrorSpy.calledWith('Error sending UDP data: %s', sinon.match.instanceOf(Error))).toBe(true);
});

it('should close the socket on shutdown', () => {
udpExporter.shutdown();
expect(socketClose.calledOnce).toBe(true);
Expand Down Expand Up @@ -132,16 +150,21 @@ describe('AwsXrayUdpSpanExporterTest', () => {
sinon.restore();
});

it('should export spans successfully', () => {
it('should export spans successfully', done => {
const callback = sinon.stub();
// Stub ProtobufTraceSerializer.serializeRequest
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);
udpExporterMock.sendData.resolves(); // Make sendData resolve successfully

awsXrayUdpSpanExporter.export(spans, callback);

expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
// Use setTimeout to allow Promise to resolve
setTimeout(() => {
expect(udpExporterMock.sendData.calledOnceWith(serializedData, 'T1')).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.SUCCESS })).toBe(true);
expect(diagErrorSpy.notCalled).toBe(true); // Ensure no error was logged
done();
}, 0);
});

it('should handle serialization failure', () => {
Expand All @@ -156,16 +179,21 @@ describe('AwsXrayUdpSpanExporterTest', () => {
expect(diagErrorSpy.notCalled).toBe(true);
});

it('should handle errors during export', () => {
it('should handle errors during export', done => {
const error = new Error('Export error');
udpExporterMock.sendData.throws(error);
udpExporterMock.sendData.rejects(error);
sinon.stub(ProtobufTraceSerializer, 'serializeRequest').returns(serializedData);

const callback = sinon.stub();

awsXrayUdpSpanExporter.export(spans, callback);

expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', sinon.match.instanceOf(Error))).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
// Use setTimeout to allow Promise to reject
setTimeout(() => {
expect(diagErrorSpy.calledOnceWith('Error exporting spans: %s', error)).toBe(true);
expect(callback.calledOnceWith({ code: ExportResultCode.FAILED })).toBe(true);
done();
}, 0);
});

it('should forceFlush without throwing', async () => {
Expand All @@ -177,6 +205,14 @@ describe('AwsXrayUdpSpanExporterTest', () => {
expect(udpExporterMock.shutdown.calledOnce).toBe(true);
});

it('should handle shutdown errors', async () => {
const shutdownError = new Error('Shutdown error');
udpExporterMock.shutdown.reset();
udpExporterMock.shutdown.throws(shutdownError);

await expect(awsXrayUdpSpanExporter.shutdown()).rejects.toThrow('Shutdown error');
});

it('should use expected Environment Variables to configure endpoint', () => {
process.env.AWS_LAMBDA_FUNCTION_NAME = 'testFunctionName';
process.env.AWS_XRAY_DAEMON_ADDRESS = 'someaddress:1234';
Expand Down