Skip to content

Commit 3ad736d

Browse files
author
Guy Bedford
committed
body transform function approach
1 parent 0f37611 commit 3ad736d

File tree

8 files changed

+410
-266
lines changed

8 files changed

+410
-266
lines changed

integration-tests/js-compute/fixtures/module-mode/src/http-cache.js

Lines changed: 74 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
555555
cacheOverride: new CacheOverride({
556556
afterSend() {
557557
return {
558-
bodyTransform: 'not a transform stream',
558+
bodyTransformFn: 'not a transform function',
559559
};
560560
},
561561
}),
@@ -604,35 +604,45 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
604604

605605
const cacheOverride = new CacheOverride({
606606
afterSend(res) {
607-
// Create a transform that uppercases the response
608-
const transformer = new TransformStream({
609-
start(controller) {
610-
console.debug('transform start');
611-
},
612-
flush(controller) {
613-
console.debug('transform flush');
614-
},
615-
transform(chunk, controller) {
616-
console.debug('transform', chunk.byteLength);
617-
const text = new TextDecoder().decode(chunk);
607+
return {
608+
bodyTransformFn: (buffer) => {
609+
const text = new TextDecoder().decode(buffer);
618610
const upperText = text.toUpperCase();
619-
const upperChunk = new TextEncoder().encode(upperText);
620-
console.debug('enqueue', upperChunk.byteLength);
621-
controller.enqueue(upperChunk);
611+
return new TextEncoder().encode(upperText);
622612
},
623-
});
613+
cache: true,
614+
};
615+
},
616+
});
617+
618+
const res = await fetch(url, { cacheOverride });
619+
const text = await res.text();
620+
strictEqual(text.length > 200, true);
621+
strictEqual(text, text.toUpperCase());
622+
});
624623

624+
routes.set('/http-cache/body-transform-delay', async () => {
625+
const url = getTestUrl();
626+
627+
const cacheOverride = new CacheOverride({
628+
afterSend(res) {
625629
return {
626-
bodyTransform: transformer,
630+
async bodyTransformFn(buffer) {
631+
// wait one second before returning the result
632+
await new Promise((resolve) => setTimeout(resolve, 1000));
633+
const text = new TextDecoder().decode(buffer);
634+
const upperText = text.toUpperCase();
635+
return new TextEncoder().encode(upperText);
636+
},
627637
cache: true,
628638
};
629639
},
630640
});
631641

632642
const res = await fetch(url, { cacheOverride });
633643
const text = await res.text();
644+
strictEqual(text.length > 200, true);
634645
strictEqual(text, text.toUpperCase());
635-
throw new Error('wow');
636646
});
637647

638648
// Test transform that throws an error
@@ -641,13 +651,11 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
641651

642652
const cacheOverride = new CacheOverride({
643653
afterSend() {
644-
const transformer = new TransformStream({
645-
transform() {
654+
return {
655+
bodyTransformFn() {
646656
throw new Error('Transform failed');
647657
},
648-
});
649-
650-
return { bodyTransform: transformer };
658+
};
651659
},
652660
});
653661

@@ -659,170 +667,91 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
659667
);
660668
});
661669

662-
// Test transform with invalid chunk type
663-
routes.set('/http-cache/body-transform-invalid-chunk', async () => {
670+
// Test transform that throws an error
671+
routes.set('/http-cache/body-transform-error-delay', async () => {
664672
const url = getTestUrl();
665673

666674
const cacheOverride = new CacheOverride({
667675
afterSend() {
668-
const transformer = new TransformStream({
669-
transform(chunk, controller) {
670-
// Try to enqueue invalid chunk type
671-
controller.enqueue('string instead of Uint8Array');
676+
return {
677+
async bodyTransformFn() {
678+
await new Promise((resolve) => setTimeout(resolve, 1000));
679+
throw new Error('Transform failed');
672680
},
673-
});
674-
675-
return { bodyTransform: transformer };
681+
};
676682
},
677683
});
678684

679-
// Should reject due to invalid chunk type
685+
// Should reject due to transform error
680686
await assertRejects(
681687
() => fetch(url, { cacheOverride }).then((res) => res.text()),
682-
TypeError,
688+
Error,
689+
'Transform failed',
683690
);
684691
});
685692

686-
// Test transform that tries to write after stream is closed
687-
routes.set('/http-cache/body-transform-write-after-close', async () => {
688-
const url = getTestUrl();
689-
let streamController;
690-
691-
const cacheOverride = new CacheOverride({
692-
afterSend() {
693-
const transformer = new TransformStream({
694-
transform(chunk, controller) {
695-
streamController = controller;
696-
controller.enqueue(chunk);
697-
},
698-
flush() {
699-
// Try to write after stream is closed
700-
setTimeout(() => {
701-
try {
702-
streamController.enqueue(new Uint8Array([1, 2, 3]));
703-
} catch (e) {
704-
// Should throw as stream is closed
705-
strictEqual(e instanceof TypeError, true);
706-
}
707-
}, 0);
708-
},
709-
});
710-
711-
return { bodyTransform: transformer };
712-
},
713-
});
714-
715-
const res = await fetch(url, { cacheOverride });
716-
await res.text(); // Should complete successfully
717-
});
718-
719-
// Test cancellation during transform
720-
routes.set('/http-cache/body-transform-cancel', async () => {
721-
const url = getTestUrl();
722-
let transformCalled = false;
723-
let cancelCalled = false;
724-
725-
const cacheOverride = new CacheOverride({
726-
afterSend() {
727-
const transformer = new TransformStream({
728-
transform(chunk, controller) {
729-
transformCalled = true;
730-
// Simulate slow transform
731-
return new Promise((resolve) =>
732-
setTimeout(() => {
733-
controller.enqueue(chunk);
734-
resolve();
735-
}, 1000),
736-
);
737-
},
738-
cancel() {
739-
cancelCalled = true;
740-
},
741-
});
742-
743-
return { bodyTransform: transformer };
744-
},
745-
});
746-
747-
const res = await fetch(url, { cacheOverride });
748-
749-
// Start reading the body then abort
750-
const reader = res.body.getReader();
751-
await reader.read(); // This will trigger transform
752-
await reader.cancel(); // Cancel mid-transform
753-
754-
strictEqual(transformCalled, true);
755-
strictEqual(cancelCalled, true);
756-
});
757-
758-
// Test transform with backpressure
759-
routes.set('/http-cache/body-transform-backpressure', async () => {
693+
// Test transform with invalid chunk type
694+
routes.set('/http-cache/body-transform-invalid-chunk', async () => {
760695
const url = getTestUrl();
761-
let chunks = 0;
762696

763697
const cacheOverride = new CacheOverride({
764698
afterSend() {
765-
const transformer = new TransformStream({
766-
async transform(chunk, controller) {
767-
chunks++;
768-
// Simulate slow processing of each chunk
769-
await new Promise((resolve) => setTimeout(resolve, 100));
770-
controller.enqueue(chunk);
699+
return {
700+
bodyTransformFn() {
701+
return 'string instead of uint8array';
771702
},
772-
});
773-
774-
return { bodyTransform: transformer };
703+
};
775704
},
776705
});
777706

778-
const res = await fetch(url, { cacheOverride });
779-
await res.arrayBuffer(); // Read entire response
780-
781-
// Verify transform was called for each chunk
782-
strictEqual(chunks > 0, true);
707+
// Should reject due to invalid chunk type
708+
await assertRejects(
709+
() => fetch(url, { cacheOverride }).then((res) => res.text()),
710+
Error,
711+
);
783712
});
784713

785-
// Test race conditions with body transforms
714+
// Concurrent body transforms
786715
routes.set('/http-cache/concurrent-transforms', async () => {
787-
const url = getTestUrl();
788-
789-
// Create two different transforms
790-
const transform1 = new TransformStream({
791-
transform(chunk, controller) {
792-
const text = new TextDecoder().decode(chunk);
793-
controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
794-
},
795-
});
796-
797-
const transform2 = new TransformStream({
798-
transform(chunk, controller) {
799-
const text = new TextDecoder().decode(chunk);
800-
controller.enqueue(new TextEncoder().encode(text.toLowerCase()));
801-
},
802-
});
716+
const url1 = getTestUrl();
717+
const url2 = getTestUrl();
803718

804719
const cacheOverride1 = new CacheOverride({
805720
afterSend() {
806-
return { bodyTransform: transform1, cache: true };
721+
return {
722+
bodyTransformFn(buffer) {
723+
const text = new TextDecoder().decode(buffer);
724+
return new TextEncoder().encode(text.toUpperCase());
725+
},
726+
cache: true,
727+
};
807728
},
808729
});
809730

810731
const cacheOverride2 = new CacheOverride({
811732
afterSend() {
812-
return { bodyTransform: transform2, cache: true };
733+
return {
734+
bodyTransformFn(buffer) {
735+
const text = new TextDecoder().decode(buffer);
736+
return new TextEncoder().encode(text.toLowerCase());
737+
},
738+
cache: true,
739+
};
813740
},
814741
});
815742

816743
// Make concurrent requests with different transforms
817744
const [res1, res2] = await Promise.all([
818-
fetch(url, { cacheOverride: cacheOverride1 }),
819-
fetch(url, { cacheOverride: cacheOverride2 }),
745+
fetch(url1, { cacheOverride: cacheOverride1 }),
746+
fetch(url2, { cacheOverride: cacheOverride2 }),
820747
]);
821748

822749
// Check that transforms were applied correctly
823750
const text1 = await res1.text();
824751
const text2 = await res2.text();
752+
strictEqual(text1.length > 200, true);
825753
strictEqual(text1, text1.toUpperCase());
754+
strictEqual(text2.length > 200, true);
826755
strictEqual(text2, text2.toLowerCase());
827756
});
828757
}
@@ -1018,7 +947,7 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
1018947

1019948
// TODO (skipped, due to unknown blocking from host)
1020949
routes.set(
1021-
'/http-cache/parrallel-request-collapsing-uncacheable',
950+
'/http-cache/parallel-request-collapsing-uncacheable',
1022951
async () => {
1023952
const url = getTestUrl();
1024953
let backendCalls = 0;

integration-tests/js-compute/fixtures/module-mode/tests.json

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,33 +217,23 @@
217217
"environments": ["compute"],
218218
"features": ["http-cache"]
219219
},
220-
"GET /http-cache/body-transform-error": {
221-
"skip": true,
222-
"environments": ["compute"],
223-
"features": ["http-cache"]
224-
},
225-
"GET /http-cache/body-transform-invalid-chunk": {
226-
"skip": true,
220+
"GET /http-cache/body-transform-delay": {
227221
"environments": ["compute"],
228222
"features": ["http-cache"]
229223
},
230-
"GET /http-cache/body-transform-write-after-close": {
231-
"skip": true,
224+
"GET /http-cache/body-transform-error": {
232225
"environments": ["compute"],
233226
"features": ["http-cache"]
234227
},
235-
"GET /http-cache/body-transform-cancel": {
236-
"skip": true,
228+
"GET /http-cache/body-transform-error-delay": {
237229
"environments": ["compute"],
238230
"features": ["http-cache"]
239231
},
240-
"GET /http-cache/body-transform-backpressure": {
241-
"skip": true,
232+
"GET /http-cache/body-transform-invalid-chunk": {
242233
"environments": ["compute"],
243234
"features": ["http-cache"]
244235
},
245236
"GET /http-cache/concurrent-transforms": {
246-
"skip": true,
247237
"environments": ["compute"],
248238
"features": ["http-cache"]
249239
},

runtime/fastly/builtins/body.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,19 @@ bool FastlyBody::close(JSContext *cx, unsigned argc, JS::Value *vp) {
247247
return true;
248248
}
249249

250+
bool FastlyBody::abandon(JSContext *cx, unsigned argc, JS::Value *vp) {
251+
METHOD_HEADER(0);
252+
auto body = host_body(self);
253+
auto result = body.abandon();
254+
255+
if (auto *err = result.to_err()) {
256+
HANDLE_ERROR(cx, *err);
257+
return false;
258+
}
259+
args.rval().setUndefined();
260+
return true;
261+
}
262+
250263
const JSFunctionSpec FastlyBody::static_methods[] = {
251264
JS_FS_END,
252265
};
@@ -256,9 +269,13 @@ const JSPropertySpec FastlyBody::static_properties[] = {
256269
};
257270

258271
const JSFunctionSpec FastlyBody::methods[] = {
259-
JS_FN("concat", concat, 1, JSPROP_ENUMERATE), JS_FN("read", read, 1, JSPROP_ENUMERATE),
260-
JS_FN("append", append, 1, JSPROP_ENUMERATE), JS_FN("prepend", prepend, 1, JSPROP_ENUMERATE),
261-
JS_FN("close", close, 0, JSPROP_ENUMERATE), JS_FS_END,
272+
JS_FN("concat", concat, 1, JSPROP_ENUMERATE),
273+
JS_FN("read", read, 1, JSPROP_ENUMERATE),
274+
JS_FN("append", append, 1, JSPROP_ENUMERATE),
275+
JS_FN("prepend", prepend, 1, JSPROP_ENUMERATE),
276+
JS_FN("close", close, 0, JSPROP_ENUMERATE),
277+
JS_FN("abandon", abandon, 0, JSPROP_ENUMERATE),
278+
JS_FS_END,
262279
};
263280

264281
const JSPropertySpec FastlyBody::properties[] = {

0 commit comments

Comments
 (0)