Skip to content

Commit 45463fd

Browse files
committed
Use TransformStream
1 parent e698e96 commit 45463fd

File tree

2 files changed

+75
-74
lines changed

2 files changed

+75
-74
lines changed

tests/compress.test.ts

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import { TransformStream } from 'stream/web';
12
import { assert, expect, test } from 'vitest';
2-
import { Compression, CompressionKey, ZenResponse, compose, createNodeServer, json } from '../src/mod';
3-
import { createPushabledAsyncIterable } from './utils/asyncIterable';
3+
import { Compression, CompressionKey, HttpHeader, ZenResponse, compose, createNodeServer, json } from '../src/mod';
44
import { mountServer } from './utils/mountServer';
55

66
test('gzip', async () => {
@@ -62,17 +62,32 @@ test('compress with asyncIterable body', async () => {
6262
const server = createNodeServer(
6363
compose(Compression(), (ctx) => {
6464
const compression = ctx.get(CompressionKey.Consumer);
65-
const body = createPushabledAsyncIterable<Uint8Array>();
66-
body.push(Uint8Array.from([1, 2, 3]));
67-
body.push(Uint8Array.from([4, 5, 6]));
68-
compression?.flush();
69-
setTimeout(() => {
70-
body.push(Uint8Array.from([7, 8, 9]));
71-
compression?.flush();
72-
body.end();
73-
}, 100);
74-
return ZenResponse.create(body, {
75-
headers: { 'content-type': 'application/octet-stream' },
65+
const body = new TransformStream<Uint8Array | 'flush', Uint8Array>({
66+
transform(chunk, controller) {
67+
if (typeof chunk === 'string') {
68+
compression?.flush();
69+
return;
70+
}
71+
controller.enqueue(chunk);
72+
},
73+
});
74+
async function sendData() {
75+
const writer = body.writable.getWriter();
76+
await writer.write(Uint8Array.from([1, 2, 3]));
77+
await writer.write(Uint8Array.from([4, 5, 6]));
78+
await writer.write('flush');
79+
await new Promise((resolve) => setTimeout(resolve, 100));
80+
await writer.write(Uint8Array.from([7, 8, 9]));
81+
await writer.close();
82+
}
83+
sendData().catch((err) => {
84+
console.log(err);
85+
});
86+
return ZenResponse.create(body.readable, {
87+
headers: {
88+
[HttpHeader.Connection]: 'keep-alive',
89+
'content-type': 'application/octet-stream',
90+
},
7691
});
7792
}),
7893
);
@@ -96,3 +111,50 @@ test('compress with asyncIterable body', async () => {
96111

97112
await close();
98113
});
114+
115+
test('compress with asyncIterable body aborted', async () => {
116+
const server = createNodeServer(
117+
compose(Compression(), (ctx) => {
118+
const compression = ctx.get(CompressionKey.Consumer);
119+
const body = new TransformStream<Uint8Array | 'flush', Uint8Array>({
120+
transform(chunk, controller) {
121+
if (typeof chunk === 'string') {
122+
compression?.flush();
123+
return;
124+
}
125+
controller.enqueue(chunk);
126+
},
127+
});
128+
async function sendData() {
129+
const writer = body.writable.getWriter();
130+
await writer.write(Uint8Array.from([1, 2, 3]));
131+
await writer.write(Uint8Array.from([4, 5, 6]));
132+
await writer.write('flush');
133+
await new Promise((resolve) => setTimeout(resolve, 100));
134+
await writer.write(Uint8Array.from([7, 8, 9]));
135+
}
136+
sendData().catch((err) => {
137+
console.log(err);
138+
});
139+
return ZenResponse.create(body.readable, {
140+
headers: {
141+
[HttpHeader.Connection]: 'keep-alive',
142+
'content-type': 'application/octet-stream',
143+
},
144+
});
145+
}),
146+
);
147+
const { close, url, fetch } = await mountServer(server);
148+
149+
const controller = new AbortController();
150+
const res = await fetch(url, { headers: { 'accept-encoding': 'deflate' }, signal: controller.signal });
151+
assert(res.body);
152+
const reader = res.body.getReader();
153+
const message = await reader.read();
154+
expect(message).toEqual({
155+
done: false,
156+
value: new Uint8Array([1, 2, 3, 4, 5, 6]),
157+
});
158+
controller.abort();
159+
await close();
160+
});

tests/utils/asyncIterable.ts

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)