Skip to content

Commit ec88344

Browse files
alisabzevariobecnydyladan
authored
Support gzip compression for node exporter collector (#2337)
Co-authored-by: Bartlomiej Obecny <[email protected]> Co-authored-by: Daniel Dyla <[email protected]>
1 parent d8fbedd commit ec88344

File tree

7 files changed

+227
-48
lines changed

7 files changed

+227
-48
lines changed

packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,28 @@ import {
1919
CollectorExporterNodeBase as CollectorExporterBaseMain,
2020
collectorTypes,
2121
CollectorExporterNodeConfigBase,
22+
CompressionAlgorithm,
2223
} from '@opentelemetry/exporter-collector';
2324
import { ServiceClientType } from './types';
2425

26+
type SendFn = <ExportItem, ServiceRequest>(collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
27+
objects: ExportItem[],
28+
compression: CompressionAlgorithm,
29+
onSuccess: () => void,
30+
onError: (error: collectorTypes.CollectorExporterError) => void) => void;
31+
2532
/**
2633
* Collector Metric Exporter abstract base class
2734
*/
2835
export abstract class CollectorExporterNodeBase<
2936
ExportItem,
3037
ServiceRequest
3138
> extends CollectorExporterBaseMain<ExportItem, ServiceRequest> {
32-
private _send!: Function;
39+
private _send!: SendFn;
40+
41+
constructor(config: CollectorExporterNodeConfigBase = {}) {
42+
super(config)
43+
}
3344

3445
private _sendPromise(
3546
objects: ExportItem[],
@@ -51,7 +62,7 @@ export abstract class CollectorExporterNodeBase<
5162
this._sendingPromises.splice(index, 1);
5263
};
5364

54-
this._send(this, objects, _onSuccess, _onError);
65+
this._send(this, objects, this.compression, _onSuccess, _onError);
5566
});
5667

5768
this._sendingPromises.push(promise);

packages/opentelemetry-exporter-collector-proto/src/util.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
collectorTypes,
1919
sendWithHttp,
2020
CollectorExporterNodeConfigBase,
21+
CompressionAlgorithm,
2122
} from '@opentelemetry/exporter-collector';
2223
import * as path from 'path';
2324

@@ -63,6 +64,7 @@ export function onInit<ExportItem, ServiceRequest>(
6364
export function send<ExportItem, ServiceRequest>(
6465
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
6566
objects: ExportItem[],
67+
compression: CompressionAlgorithm,
6668
onSuccess: () => void,
6769
onError: (error: collectorTypes.CollectorExporterError) => void
6870
): void {

packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ import { ExportResultCode } from '@opentelemetry/core';
1919
import {
2020
CollectorExporterNodeConfigBase,
2121
collectorTypes,
22+
CompressionAlgorithm,
2223
} from '@opentelemetry/exporter-collector';
2324
import { ReadableSpan } from '@opentelemetry/tracing';
2425
import * as assert from 'assert';
2526
import * as http from 'http';
2627
import * as sinon from 'sinon';
28+
import { Stream } from 'stream';
29+
import * as zlib from 'zlib';
2730
import { CollectorTraceExporter } from '../src';
2831
import { getExportRequestProto } from '../src/util';
2932
import {
@@ -34,9 +37,9 @@ import {
3437
} from './helper';
3538

3639
const fakeRequest = {
37-
end: function () {},
38-
on: function () {},
39-
write: function () {},
40+
end: function () { },
41+
on: function () { },
42+
write: function () { },
4043
};
4144

4245
describe('CollectorTraceExporter - node with proto over http', () => {
@@ -104,7 +107,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
104107
});
105108

106109
it('should open the connection', done => {
107-
collectorExporter.export(spans, () => {});
110+
collectorExporter.export(spans, () => { });
108111

109112
sinon.stub(http, 'request').callsFake((options: any) => {
110113
assert.strictEqual(options.hostname, 'foo.bar.com');
@@ -116,7 +119,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
116119
});
117120

118121
it('should set custom headers', done => {
119-
collectorExporter.export(spans, () => {});
122+
collectorExporter.export(spans, () => { });
120123

121124
sinon.stub(http, 'request').callsFake((options: any) => {
122125
assert.strictEqual(options.headers['foo'], 'bar');
@@ -126,7 +129,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
126129
});
127130

128131
it('should have keep alive and keepAliveMsecs option set', done => {
129-
collectorExporter.export(spans, () => {});
132+
collectorExporter.export(spans, () => { });
130133

131134
sinon.stub(http, 'request').callsFake((options: any) => {
132135
assert.strictEqual(options.agent.keepAlive, true);
@@ -137,27 +140,31 @@ describe('CollectorTraceExporter - node with proto over http', () => {
137140
});
138141

139142
it('should successfully send the spans', done => {
140-
collectorExporter.export(spans, () => {});
141-
142-
sinon.stub(http, 'request').returns({
143-
end: () => {},
144-
on: () => {},
145-
write: (...args: any[]) => {
146-
const ExportTraceServiceRequestProto = getExportRequestProto();
147-
const data = ExportTraceServiceRequestProto?.decode(args[0]);
148-
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
149-
const span1 =
150-
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
151-
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
152-
if (span1) {
153-
ensureProtoSpanIsCorrect(span1);
154-
}
155-
156-
ensureExportTraceServiceRequestIsSet(json);
157-
158-
done();
159-
},
160-
} as any);
143+
const fakeRequest = new Stream.PassThrough();
144+
sinon.stub(http, 'request').returns(fakeRequest as any);
145+
146+
let buff = Buffer.from('');
147+
fakeRequest.on('end', () => {
148+
const ExportTraceServiceRequestProto = getExportRequestProto();
149+
const data = ExportTraceServiceRequestProto?.decode(buff);
150+
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
151+
const span1 =
152+
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
153+
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
154+
if (span1) {
155+
ensureProtoSpanIsCorrect(span1);
156+
}
157+
158+
ensureExportTraceServiceRequestIsSet(json);
159+
160+
done();
161+
});
162+
163+
fakeRequest.on('data', chunk => {
164+
buff = Buffer.concat([buff, chunk]);
165+
});
166+
167+
collectorExporter.export(spans, () => { });
161168
});
162169

163170
it('should log the successful message', done => {
@@ -195,4 +202,57 @@ describe('CollectorTraceExporter - node with proto over http', () => {
195202
});
196203
});
197204
});
205+
describe('export - with compression', () => {
206+
beforeEach(() => {
207+
collectorExporterConfig = {
208+
headers: {
209+
foo: 'bar',
210+
},
211+
hostname: 'foo',
212+
attributes: {},
213+
url: 'http://foo.bar.com',
214+
keepAlive: true,
215+
compression: CompressionAlgorithm.GZIP,
216+
httpAgentOptions: { keepAliveMsecs: 2000 },
217+
};
218+
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
219+
spans = [];
220+
spans.push(Object.assign({}, mockedReadableSpan));
221+
});
222+
afterEach(() => {
223+
sinon.restore();
224+
});
225+
226+
it('should successfully send the spans', done => {
227+
const fakeRequest = new Stream.PassThrough();
228+
sinon.stub(http, 'request').returns(fakeRequest as any);
229+
const spySetHeader = sinon.spy();
230+
(fakeRequest as any).setHeader = spySetHeader;
231+
232+
let buff = Buffer.from('');
233+
fakeRequest.on('end', () => {
234+
const unzippedBuff = zlib.gunzipSync(buff);
235+
const ExportTraceServiceRequestProto = getExportRequestProto();
236+
const data = ExportTraceServiceRequestProto?.decode(unzippedBuff);
237+
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
238+
const span1 =
239+
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
240+
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
241+
if (span1) {
242+
ensureProtoSpanIsCorrect(span1);
243+
}
244+
245+
ensureExportTraceServiceRequestIsSet(json);
246+
assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip'));
247+
248+
done();
249+
});
250+
251+
fakeRequest.on('data', chunk => {
252+
buff = Buffer.concat([buff, chunk]);
253+
});
254+
255+
collectorExporter.export(spans, () => { });
256+
});
257+
});
198258
});

packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type * as http from 'http';
1818
import type * as https from 'https';
1919

2020
import { CollectorExporterBase } from '../../CollectorExporterBase';
21-
import { CollectorExporterNodeConfigBase } from './types';
21+
import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types';
2222
import * as collectorTypes from '../../types';
2323
import { parseHeaders } from '../../util';
2424
import { createHttpAgent, sendWithHttp } from './util';
@@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase<
3939
DEFAULT_HEADERS: Record<string, string> = {};
4040
headers: Record<string, string>;
4141
agent: http.Agent | https.Agent | undefined;
42+
compression: CompressionAlgorithm;
4243

4344
constructor(config: CollectorExporterNodeConfigBase = {}) {
4445
super(config);
@@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase<
5152
baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS)
5253
);
5354
this.agent = createHttpAgent(config);
55+
this.compression = config.compression || CompressionAlgorithm.NONE;
5456
}
5557

5658
onInit(_config: CollectorExporterNodeConfigBase): void {

packages/opentelemetry-exporter-collector/src/platform/node/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,11 @@ import { CollectorExporterConfigBase } from '../../types';
2424
export interface CollectorExporterNodeConfigBase
2525
extends CollectorExporterConfigBase {
2626
keepAlive?: boolean;
27+
compression?: CompressionAlgorithm;
2728
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
2829
}
30+
31+
export enum CompressionAlgorithm {
32+
NONE = 'none',
33+
GZIP = 'gzip'
34+
}

packages/opentelemetry-exporter-collector/src/platform/node/util.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@
1616
import * as url from 'url';
1717
import * as http from 'http';
1818
import * as https from 'https';
19+
import * as zlib from 'zlib';
20+
import { Readable } from 'stream';
1921
import * as collectorTypes from '../../types';
2022
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
2123
import { CollectorExporterNodeConfigBase } from '.';
2224
import { diag } from '@opentelemetry/api';
25+
import { CompressionAlgorithm } from './types';
26+
27+
const gzip = zlib.createGzip();
2328

2429
/**
2530
* Sends data using http
@@ -71,11 +76,35 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
7176
});
7277
});
7378

79+
7480
req.on('error', (error: Error) => {
7581
onError(error);
7682
});
77-
req.write(data);
78-
req.end();
83+
84+
switch (collector.compression) {
85+
case CompressionAlgorithm.GZIP: {
86+
req.setHeader('Content-Encoding', 'gzip');
87+
const dataStream = readableFromBuffer(data);
88+
dataStream.on('error', onError)
89+
.pipe(gzip).on('error', onError)
90+
.pipe(req);
91+
92+
break;
93+
}
94+
default:
95+
req.write(data);
96+
req.end();
97+
98+
break;
99+
}
100+
}
101+
102+
function readableFromBuffer(buff: string | Buffer): Readable {
103+
const readable = new Readable();
104+
readable.push(buff);
105+
readable.push(null);
106+
107+
return readable;
79108
}
80109

81110
export function createHttpAgent(

0 commit comments

Comments
 (0)