Skip to content

Commit 0727ed7

Browse files
mattolsondyladan
andauthored
Fix gzip compression for OTLP HTTP exporter (#3046)
Co-authored-by: Daniel Dyla <[email protected]>
1 parent d2de661 commit 0727ed7

File tree

4 files changed

+161
-8
lines changed

4 files changed

+161
-8
lines changed

experimental/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ All notable changes to experimental packages in this project will be documented
2424

2525
* fix(otlp-transformer): remove type dependency on Long #3022 @legendecas
2626
* fix(grpc-exporter): use non-normalized URL to determine channel security #3019 @pichlermarc
27+
* fix(otlp-exporter-base): fix gzip output stream in http otlp export #3046 @mattolson
2728

2829
### :books: (Refine Doc)
2930

experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,5 @@ export abstract class OTLPExporterNodeBase<
8888
promise.then(popPromise, popPromise);
8989
}
9090

91-
// TODO: end gzip stream from util.ts if not undefined
92-
// It should perhaps be a class member here instead of a variable in util.ts
9391
onShutdown(): void {}
9492
}

experimental/packages/otlp-exporter-base/src/platform/node/util.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import { CompressionAlgorithm } from './types';
2525
import { getEnv } from '@opentelemetry/core';
2626
import { OTLPExporterError } from '../../types';
2727

28-
let gzip: zlib.Gzip | undefined;
29-
3028
/**
3129
* Sends data using http
3230
* @param collector
@@ -116,13 +114,10 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
116114

117115
switch (collector.compression) {
118116
case CompressionAlgorithm.GZIP: {
119-
if (!gzip) {
120-
gzip = zlib.createGzip();
121-
}
122117
req.setHeader('Content-Encoding', 'gzip');
123118
const dataStream = readableFromBuffer(data);
124119
dataStream.on('error', onError)
125-
.pipe(gzip).on('error', onError)
120+
.pipe(zlib.createGzip()).on('error', onError)
126121
.pipe(req);
127122

128123
break;

experimental/packages/otlp-exporter-base/test/node/util.test.ts

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,50 @@
1616

1717
import * as assert from 'assert';
1818
import { configureExporterTimeout, invalidTimeout } from '../../src/util';
19+
import { sendWithHttp } from '../../src/platform/node/util';
1920
import { CompressionAlgorithm} from '../../src/platform/node/types';
2021
import { configureCompression} from '../../src/platform/node/util';
2122
import { diag } from '@opentelemetry/api';
2223
import * as sinon from 'sinon';
2324

25+
import { OTLPExporterNodeBase } from '../../src/platform/node/OTLPExporterNodeBase';
26+
import { OTLPExporterNodeConfigBase } from '../../src/platform/node/types';
27+
import { OTLPExporterError } from '../../src/types';
28+
import { PassThrough } from 'stream';
29+
import * as http from 'http';
30+
import * as zlib from 'zlib';
31+
32+
// Meant to simulate http.IncomingMessage, at least the parts that sendWithHttp cares about
33+
// but make it a PassThrough so we can inspect it for the test
34+
class HttpResponse extends PassThrough {
35+
statusCode: number;
36+
statusMessage: string;
37+
38+
constructor(statusCode = 200, statusMessage = 'OK') {
39+
super();
40+
this.statusCode = statusCode;
41+
this.statusMessage = statusMessage;
42+
}
43+
}
44+
45+
// Meant to simulate http.ClientRequest, at least the parts that sendWithHttp cares about
46+
// but make it a PassThrough so we can inspect it for the test
47+
class HttpRequest extends PassThrough {
48+
setHeader(name: string, value: string) {}
49+
}
50+
51+
// Barebones exporter for use by sendWithHttp
52+
type ExporterConfig = OTLPExporterNodeConfigBase;
53+
class Exporter extends OTLPExporterNodeBase<object,object> {
54+
getDefaultUrl(config: ExporterConfig): string {
55+
return config.url || '';
56+
}
57+
58+
convert(spans: object[]): object {
59+
return {};
60+
}
61+
}
62+
2463
describe('configureExporterTimeout', () => {
2564
const envSource = process.env;
2665
it('should use timeoutMillis parameter as export timeout value', () => {
@@ -124,3 +163,123 @@ describe('configureCompression', () => {
124163
assert.strictEqual(configureCompression(undefined),CompressionAlgorithm.NONE);
125164
});
126165
});
166+
167+
describe('sendWithHttp', () => {
168+
let exporter: Exporter;
169+
let httpRequestStub: sinon.SinonStub;
170+
let mockRequest: HttpRequest;
171+
let setHeaderSpy: sinon.SinonSpy;
172+
173+
const spanData: object = {
174+
'foo': 'bar',
175+
'bar': 'baz',
176+
};
177+
178+
beforeEach(() => {
179+
180+
// Create stub of http.request (used by sendWithHttp)
181+
httpRequestStub = sinon.stub(http, 'request');
182+
183+
// Mock out a request
184+
mockRequest = new HttpRequest();
185+
setHeaderSpy = sinon.spy(mockRequest, 'setHeader');
186+
187+
// Mock out response
188+
const response = new HttpResponse();
189+
response.end('OK');
190+
191+
// Stub out http.request so it calls our callback with the mocked response
192+
// and also so it returns our mocked request stream so we can observe. We don't
193+
// really care about the response for the purpose of this test, but we do want
194+
// to observe the request compression behavior.
195+
httpRequestStub.returns(mockRequest).callsArgWith(1, response);
196+
});
197+
198+
afterEach(function() {
199+
httpRequestStub.restore();
200+
setHeaderSpy.restore();
201+
});
202+
203+
it('should send with no compression if configured to do so', () => {
204+
exporter = new Exporter({
205+
url: 'http://foobar.com',
206+
compression: CompressionAlgorithm.NONE,
207+
});
208+
const data = JSON.stringify(spanData);
209+
210+
// Show that data is written to the request stream
211+
let requestData = '';
212+
mockRequest.on('data', chunk => requestData += chunk);
213+
mockRequest.on('end', () => {
214+
assert.strictEqual(requestData, data);
215+
});
216+
217+
sendWithHttp(exporter, data, 'application/json', () => {
218+
// Show that we aren't setting the gzip encoding header
219+
assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').notCalled);
220+
}, (err: OTLPExporterError) => {
221+
assert.fail(err);
222+
});
223+
});
224+
225+
it('should send with gzip compression if configured to do so', () => {
226+
exporter = new Exporter({
227+
url: 'http://foobar.com',
228+
compression: CompressionAlgorithm.GZIP,
229+
});
230+
231+
const data = JSON.stringify(spanData);
232+
const compressedData = zlib.gzipSync(Buffer.from(data));
233+
234+
// Show that compressed data is written to the request stream
235+
const buffers: Buffer[] = [];
236+
mockRequest.on('data', chunk => buffers.push(Buffer.from(chunk)));
237+
mockRequest.on('end', () => {
238+
assert(Buffer.concat(buffers).equals(compressedData));
239+
});
240+
241+
sendWithHttp(exporter, data, 'application/json', () => {
242+
// Show that we are setting the gzip encoding header
243+
assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').calledOnce);
244+
}, (err: OTLPExporterError) => {
245+
assert.fail(err);
246+
});
247+
});
248+
249+
it('should work with gzip compression enabled even after multiple requests', () => {
250+
exporter = new Exporter({
251+
url: 'http://foobar.com',
252+
compression: CompressionAlgorithm.GZIP,
253+
});
254+
255+
const data = JSON.stringify(spanData);
256+
const compressedData = zlib.gzipSync(Buffer.from(data));
257+
258+
for (let i = 0; i < 5; i++) {
259+
mockRequest = new HttpRequest();
260+
setHeaderSpy.restore();
261+
setHeaderSpy = sinon.spy(mockRequest, 'setHeader');
262+
263+
const response = new HttpResponse();
264+
response.end('OK');
265+
266+
httpRequestStub.restore();
267+
httpRequestStub = sinon.stub(http, 'request');
268+
httpRequestStub.returns(mockRequest).callsArgWith(1, response);
269+
270+
// Show that compressed data is written to the request stream
271+
const buffers: Buffer[] = [];
272+
mockRequest.on('data', chunk => buffers.push(Buffer.from(chunk)));
273+
mockRequest.on('end', () => {
274+
assert(Buffer.concat(buffers).equals(compressedData));
275+
});
276+
277+
sendWithHttp(exporter, data, 'application/json', () => {
278+
// Show that we are setting the gzip encoding header
279+
assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').calledOnce);
280+
}, (err: OTLPExporterError) => {
281+
assert.fail(err);
282+
});
283+
}
284+
});
285+
});

0 commit comments

Comments
 (0)