Skip to content

Commit aab1f4b

Browse files
committed
Expose flush in compression
1 parent 281673b commit aab1f4b

File tree

4 files changed

+182
-29
lines changed

4 files changed

+182
-29
lines changed

src/compression/Compression.ts

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import { HttpHeader } from '../core/mod';
44
import { ContentEncoding } from './ContentEnconding';
55
import { compress } from './compress';
66

7+
export type TFlush = () => void;
8+
79
export interface ICompression {
810
readonly acceptedEncoding: readonly ContentEncoding[];
9-
readonly usedEncoding: null | readonly ContentEncoding[];
11+
readonly usedEncoding: null | ContentEncoding;
12+
readonly flush: TFlush;
1013
}
1114

1215
export const CompressionKey = Key.create<ICompression>('Compress');
@@ -23,27 +26,63 @@ export function Compression(): Middleware {
2326
? (acceptedEncodingHeader.split(/, ?/) as any)
2427
: [ContentEncoding.Identity];
2528

26-
const usedEncoding = selectEncodings(acceptedEncoding);
27-
const compressCtx: ICompression = { acceptedEncoding, usedEncoding };
29+
const usedEncoding = selectEncoding(acceptedEncoding);
30+
const flushDeferred = createDeferredFlush();
31+
const compressCtx: ICompression = {
32+
acceptedEncoding,
33+
usedEncoding,
34+
flush: flushDeferred.flush,
35+
};
2836

2937
const response = await next(ctx.with(CompressionKey.Provider(compressCtx)));
3038
if (response === null) {
3139
// no response = do nothing
3240
return response;
3341
}
34-
return compress(response, usedEncoding);
42+
return compress(response, usedEncoding, flushDeferred.setFlush);
3543
};
3644
}
3745

38-
function selectEncodings(acceptedEncoding: Array<ContentEncoding>): Array<ContentEncoding> {
46+
function selectEncoding(acceptedEncoding: Array<ContentEncoding>): ContentEncoding {
3947
if (acceptedEncoding.indexOf(ContentEncoding.Brotli) >= 0) {
40-
return [ContentEncoding.Brotli];
48+
return ContentEncoding.Brotli;
4149
}
4250
if (acceptedEncoding.indexOf(ContentEncoding.Gzip) >= 0) {
43-
return [ContentEncoding.Gzip];
51+
return ContentEncoding.Gzip;
4452
}
4553
if (acceptedEncoding.indexOf(ContentEncoding.Deflate) >= 0) {
46-
return [ContentEncoding.Deflate];
54+
return ContentEncoding.Deflate;
55+
}
56+
return ContentEncoding.Identity;
57+
}
58+
59+
interface IDeferredFlush {
60+
flush: TFlush;
61+
setFlush: (f: TFlush) => void;
62+
}
63+
64+
function createDeferredFlush(): IDeferredFlush {
65+
let flushFn: TFlush | null = null;
66+
// flush requested before flush was set
67+
let requested = false;
68+
69+
return {
70+
flush,
71+
setFlush,
72+
};
73+
74+
function setFlush(f: TFlush) {
75+
flushFn = f;
76+
if (requested) {
77+
flushFn();
78+
}
79+
}
80+
81+
function flush() {
82+
if (flushFn === null) {
83+
requested = true;
84+
return;
85+
}
86+
flushFn();
4787
}
48-
return [ContentEncoding.Identity];
4988
}

src/compression/compress.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,27 @@ import zlib from 'node:zlib';
33
import type { ReadableStream } from 'stream/web';
44
import { Headers, Response } from 'undici';
55
import { HttpHeader, HttpStatus, ZenResponse } from '../core/mod';
6+
import type { TFlush } from './Compression';
67
import { ContentEncoding } from './ContentEnconding';
78

89
/**
910
* Compresses the response body with the given encodings.
1011
*/
11-
export function compress(originalResponse: ZenResponse, encodings: Array<ContentEncoding>): ZenResponse {
12+
export function compress(
13+
originalResponse: ZenResponse,
14+
encoding: ContentEncoding,
15+
setFlush: (f: TFlush) => void,
16+
): ZenResponse {
1217
if (originalResponse.body === null || HttpStatus.isEmpty(originalResponse.status ?? 200)) {
1318
return originalResponse;
1419
}
1520
const bodyStream = new Response(originalResponse.body).body;
16-
const body = encodeBodyWithEncodings(bodyStream, encodings);
21+
const body = encodeBodyWithEncoding(bodyStream, encoding, setFlush);
1722

1823
return originalResponse
1924
.withHeaders((prev) => {
2025
const nextHeaders = new Headers(prev);
21-
nextHeaders.set(HttpHeader.ContentEncoding, encodings.join(', '));
26+
nextHeaders.set(HttpHeader.ContentEncoding, encoding);
2227
// remove content length because we no longer know the size of the body
2328
// (unless we compress first, then send it but that would be quite bad)
2429
nextHeaders.delete(HttpHeader.ContentLength);
@@ -27,31 +32,28 @@ export function compress(originalResponse: ZenResponse, encodings: Array<Content
2732
.with(ZenResponse.BodyKey.Provider(body));
2833
}
2934

30-
function encodeBodyWithEncodings(
35+
function encodeBodyWithEncoding(
3136
body: ReadableStream | null,
32-
encodings: Array<ContentEncoding>,
37+
encoding: ContentEncoding,
38+
setFlush: (f: TFlush) => void,
3339
): ReadableStream | null {
3440
if (body === null) {
3541
return null;
3642
}
37-
let bodyStream: ReadableStream = body;
38-
39-
encodings.forEach((encoding) => {
40-
bodyStream = encodeBodyWithEncoding(bodyStream, encoding);
41-
});
42-
43-
return bodyStream;
44-
}
45-
46-
function encodeBodyWithEncoding(body: ReadableStream, encoding: ContentEncoding): ReadableStream {
4743
if (encoding === ContentEncoding.Brotli) {
48-
return Readable.toWeb(Readable.fromWeb(body).pipe(zlib.createBrotliCompress()));
44+
const enc = zlib.createBrotliCompress();
45+
setFlush(enc.flush.bind(enc));
46+
return Readable.toWeb(Readable.fromWeb(body).pipe(enc));
4947
}
5048
if (encoding === ContentEncoding.Gzip) {
51-
return Readable.toWeb(Readable.fromWeb(body).pipe(zlib.createGzip()));
49+
const enc = zlib.createGzip();
50+
setFlush(enc.flush.bind(enc));
51+
return Readable.toWeb(Readable.fromWeb(body).pipe(enc));
5252
}
5353
if (encoding === ContentEncoding.Deflate) {
54-
return Readable.toWeb(Readable.fromWeb(body).pipe(zlib.createDeflate()));
54+
const enc = zlib.createDeflate();
55+
setFlush(enc.flush.bind(enc));
56+
return Readable.toWeb(Readable.fromWeb(body).pipe(enc));
5557
}
5658
return body;
5759
}

tests/compress.test.ts

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { expect, test } from 'vitest';
2-
import { Compression, compose, createNodeServer, json } from '../src/mod';
1+
import { assert, expect, test } from 'vitest';
2+
import { Compression, CompressionKey, ZenResponse, compose, createNodeServer, json } from '../src/mod';
3+
import { createPushabledAsyncIterable } from './utils/asyncIterable';
34
import { mountServer } from './utils/mountServer';
45

56
test('gzip', async () => {
@@ -56,3 +57,53 @@ test('deflate', async () => {
5657
expect(await res.text()).toBe('{"hello":"world"}');
5758
await close();
5859
});
60+
61+
test('compress with asyncIterable body', async () => {
62+
const server = createNodeServer(
63+
compose(Compression(), (ctx) => {
64+
const compression = ctx.get(CompressionKey.Consumer);
65+
const body = createPushabledAsyncIterable<Uint8Array>();
66+
body.push(Uint8Array.from([1, 2, 3]));
67+
compression?.flush();
68+
setTimeout(() => {
69+
body.push(Uint8Array.from([4, 5, 6]));
70+
compression?.flush();
71+
body.end();
72+
}, 100);
73+
return ZenResponse.create(body, {
74+
headers: { 'content-type': 'application/octet-stream' },
75+
});
76+
}),
77+
);
78+
const { close, url, fetch } = await mountServer(server);
79+
80+
const res = await fetch(url, {
81+
headers: { 'accept-encoding': 'deflate' },
82+
});
83+
assert(res.body);
84+
const reader = res.body.getReader();
85+
const message = await reader.read();
86+
expect(message).toMatchInlineSnapshot(`
87+
{
88+
"done": false,
89+
"value": Uint8Array [
90+
1,
91+
2,
92+
3,
93+
],
94+
}
95+
`);
96+
const message2 = await reader.read();
97+
expect(message2).toMatchInlineSnapshot(`
98+
{
99+
"done": false,
100+
"value": Uint8Array [
101+
4,
102+
5,
103+
6,
104+
],
105+
}
106+
`);
107+
108+
await close();
109+
});

tests/utils/asyncIterable.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
export interface IPushabledAsyncIterable<T> extends AsyncIterable<T> {
2+
push: (value: T) => void;
3+
end: () => void;
4+
}
5+
6+
export function createPushabledAsyncIterable<T>(): IPushabledAsyncIterable<T> {
7+
// A queue of resolve functions waiting for an incoming event which has not yet arrived.
8+
let pullQueue: ((value: IteratorResult<T>) => void)[] = [];
9+
// A queue of values waiting for next() calls to be made
10+
let pushQueue: T[] = [];
11+
let running = true;
12+
13+
return {
14+
[Symbol.asyncIterator]: iterator,
15+
push,
16+
end,
17+
};
18+
19+
function iterator(): AsyncIterator<T> {
20+
return {
21+
next,
22+
};
23+
}
24+
25+
async function next(): Promise<IteratorResult<T>> {
26+
if (running === false) {
27+
return { done: true, value: undefined };
28+
}
29+
return new Promise<IteratorResult<T, undefined>>((resolve) => {
30+
if (pushQueue.length !== 0) {
31+
// Get value from the pushQueue
32+
resolve({ done: false, value: pushQueue.shift() as T });
33+
return;
34+
}
35+
pullQueue.push(resolve);
36+
});
37+
}
38+
39+
function end() {
40+
if (running) {
41+
running = false;
42+
pullQueue.forEach((resolve) => resolve({ value: undefined, done: true }));
43+
pullQueue = [];
44+
pushQueue = [];
45+
}
46+
}
47+
48+
function push(value: T) {
49+
if (running === false) {
50+
// do nothing, pullQueue has been or will be emptied by stop()
51+
return;
52+
}
53+
if (pullQueue.length !== 0) {
54+
// call next resolve from the pullQueue
55+
const resolvedNext = pullQueue.shift()!;
56+
resolvedNext({ value, done: false });
57+
return;
58+
}
59+
pushQueue.push(value);
60+
}
61+
}

0 commit comments

Comments
 (0)