Skip to content

Commit 0f37611

Browse files
author
Guy Bedford
committed
more streams debugging
1 parent 7e6c57c commit 0f37611

File tree

8 files changed

+113
-41
lines changed

8 files changed

+113
-41
lines changed

integration-tests/js-compute/fixtures/app/src/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ if (env('FASTLY_DEBUG_LOGGING') === '1') {
5555
if (fastly.debugMessages) {
5656
const { debug: consoleDebug } = console;
5757
console.debug = function debug(...args) {
58-
fastly.debugMessages.push(...args);
58+
fastly.debugLog(...args);
5959
consoleDebug(...args);
6060
};
6161
}

integration-tests/js-compute/fixtures/module-mode/src/http-cache.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,13 +564,54 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
564564
);
565565
});
566566

567+
routes.set('/http-cache/body-stream', async () => {
568+
const url = getTestUrl();
569+
570+
const cacheOverride = new CacheOverride({
571+
afterSend(res) {
572+
// Create a transform that uppercases the response
573+
const transformer = new TransformStream({
574+
start(controller) {
575+
console.debug('transform start');
576+
},
577+
flush(controller) {
578+
console.debug('transform flush');
579+
},
580+
transform(chunk, controller) {
581+
console.debug('transform', chunk.byteLength);
582+
const text = new TextDecoder().decode(chunk);
583+
const upperText = text.toUpperCase();
584+
const upperChunk = new TextEncoder().encode(upperText);
585+
console.debug('enqueue', upperChunk.byteLength);
586+
controller.enqueue(upperChunk);
587+
},
588+
});
589+
590+
return {
591+
bodyTransform: transformer,
592+
cache: true,
593+
};
594+
},
595+
});
596+
597+
const res = await fetch(url, { cacheOverride });
598+
const text = await res.text();
599+
strictEqual(text, text.toUpperCase());
600+
});
601+
567602
routes.set('/http-cache/body-transform', async () => {
568603
const url = getTestUrl();
569604

570605
const cacheOverride = new CacheOverride({
571606
afterSend(res) {
572607
// Create a transform that uppercases the response
573608
const transformer = new TransformStream({
609+
start(controller) {
610+
console.debug('transform start');
611+
},
612+
flush(controller) {
613+
console.debug('transform flush');
614+
},
574615
transform(chunk, controller) {
575616
console.debug('transform', chunk.byteLength);
576617
const text = new TextDecoder().decode(chunk);
@@ -591,6 +632,7 @@ const getTestUrl = (path = `/${Math.random().toString().slice(2)}`) =>
591632
const res = await fetch(url, { cacheOverride });
592633
const text = await res.text();
593634
strictEqual(text, text.toUpperCase());
635+
throw new Error('wow');
594636
});
595637

596638
// Test transform that throws an error

integration-tests/js-compute/fixtures/module-mode/src/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ if (env('FASTLY_DEBUG_LOGGING') === '1') {
2222
if (fastly.debugMessages) {
2323
const { debug: consoleDebug } = console;
2424
console.debug = function debug(...args) {
25-
fastly.debugMessages.push(...args);
25+
fastly.debugLog(...args);
2626
consoleDebug(...args);
2727
};
2828
}

integration-tests/js-compute/fixtures/module-mode/tests.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@
214214
"features": ["http-cache"]
215215
},
216216
"GET /http-cache/body-transform": {
217-
"skip": true,
218217
"environments": ["compute"],
219218
"features": ["http-cache"]
220219
},

runtime/fastly/builtins/fastly.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,23 @@ bool Fastly::enableDebugLogging(JSContext *cx, unsigned argc, JS::Value *vp) {
9595
return true;
9696
}
9797

98+
bool debugLog(JSContext *cx, unsigned argc, JS::Value *vp) {
99+
JS::CallArgs args = CallArgsFromVp(argc, vp);
100+
if (!args.requireAtLeast(cx, __func__, 1))
101+
return false;
102+
JS::RootedString msg_str(cx, JS::ToString(cx, args[0]));
103+
if (!msg_str) {
104+
return false;
105+
}
106+
auto msg_host_str = core::encode(cx, msg_str);
107+
if (!msg_host_str) {
108+
return false;
109+
}
110+
debug_messages.push_back(std::string(msg_host_str));
111+
args.rval().setUndefined();
112+
return true;
113+
}
114+
98115
bool Fastly::getGeolocationForIpAddress(JSContext *cx, unsigned argc, JS::Value *vp) {
99116
JS::CallArgs args = CallArgsFromVp(argc, vp);
100117
REQUEST_HANDLER_ONLY("fastly.getGeolocationForIpAddress");
@@ -523,6 +540,7 @@ bool install(api::Engine *engine) {
523540
const JSFunctionSpec methods[] = {
524541
JS_FN("dump", Fastly::dump, 1, 0),
525542
JS_FN("enableDebugLogging", Fastly::enableDebugLogging, 1, JSPROP_ENUMERATE),
543+
JS_FN("debugLog", debugLog, 1, JSPROP_ENUMERATE),
526544
JS_FN("getGeolocationForIpAddress", Fastly::getGeolocationForIpAddress, 1, JSPROP_ENUMERATE),
527545
JS_FN("getLogger", Fastly::getLogger, 1, JSPROP_ENUMERATE),
528546
JS_FN("includeBytes", Fastly::includeBytes, 1, JSPROP_ENUMERATE),

runtime/fastly/builtins/fetch/fetch.cpp

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,25 +461,26 @@ template <InternalMethod then_or_catch_handler>
461461
bool apply_transform_stream(JSContext *cx, JS::HandleObject response,
462462
host_api::HttpBody into_body) {
463463
DEBUG_LOG("Apply transform stream")
464-
// Create the stream source from the response body (in turn queues the stream task)
464+
// Create the stream source from the response body
465465
JS::RootedObject body_stream_in(cx, RequestOrResponse::create_body_stream(cx, response));
466466
if (!body_stream_in) {
467467
return false;
468468
}
469469

470-
// Pipe the stream source through the transform stream
471-
JSObject *transform_stream =
472-
&JS::GetReservedSlot(response, static_cast<uint32_t>(Response::Slots::CacheBodyTransform))
473-
.toObject();
474-
JS::RootedValueArray<1> pipeThroughArgs(cx);
475-
pipeThroughArgs[0].setObject(*transform_stream);
476-
JS::RootedValue transform_readable_out(cx);
477-
DEBUG_LOG("calling pipe through")
478-
if (!JS::Call(cx, body_stream_in, "pipeThrough", pipeThroughArgs, &transform_readable_out)) {
479-
DEBUG_LOG("pipe through ERR")
470+
// Create the transform stream, and extract its readable and writable end objects
471+
JS::RootedValue transform_stream(
472+
cx,
473+
JS::GetReservedSlot(response, static_cast<uint32_t>(Response::Slots::CacheBodyTransform)));
474+
JS::RootedObject transform_stream_obj(cx, &transform_stream.toObject());
475+
JS::RootedValue transform_readable(cx);
476+
if (!JS_GetProperty(cx, transform_stream_obj, "readable", &transform_readable)) {
477+
return false;
478+
}
479+
480+
JS::RootedValue transform_writable(cx);
481+
if (!JS_GetProperty(cx, transform_stream_obj, "writable", &transform_writable)) {
480482
return false;
481483
}
482-
DEBUG_LOG("pipe through OK")
483484

484485
// Pipe the readable end of the transform stream into the into_body, with the completion handler
485486
// on error or success
@@ -496,7 +497,7 @@ bool apply_transform_stream(JSContext *cx, JS::HandleObject response,
496497
if (!sink) {
497498
return false;
498499
}
499-
JS::RootedObject transform_readable_obj(cx, &transform_readable_out.toObject());
500+
JS::RootedObject transform_readable_obj(cx, &transform_readable.toObject());
500501
JS::RootedValueArray<1> pipeToArgs(cx);
501502
pipeToArgs[0].setObject(*sink);
502503
JS::RootedValue pipe_promise(cx);
@@ -521,6 +522,18 @@ bool apply_transform_stream(JSContext *cx, JS::HandleObject response,
521522
}
522523
}
523524

525+
// Pipe the incoming body stream into the writable end of the transform stream
526+
JS::RootedValueArray<1> args(cx);
527+
args[0].set(transform_writable);
528+
JS::RootedValue pipe_ret(cx);
529+
if (!JS::Call(cx, body_stream_in, "pipeTo", args, &pipe_ret)) {
530+
return false;
531+
}
532+
533+
// We now add a task for the incoming handle
534+
// ENGINE->queue_async_task(
535+
// new FastlyAsyncTask(RequestOrResponse::body_handle(response).async_handle(), nullptr,
536+
// JS::UndefinedHandleValue, nullptr));
524537
return true;
525538
}
526539

runtime/fastly/builtins/fetch/request-response.cpp

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -141,36 +141,36 @@ bool process_body_read(JSContext *cx, host_api::HttpBody::Handle handle, JS::Han
141141
return true;
142142
}
143143

144+
enum StreamState { Complete, Wait, Error };
145+
144146
struct ReadResult {
145147
JS::UniqueChars buffer;
146148
size_t length;
147-
bool end_of_stream;
149+
StreamState state;
148150
};
149151

150152
// Returns a UniqueChars and the length of that string. The UniqueChars value is not
151153
// null-terminated.
152-
template <bool read_async> ReadResult read_from_handle_all(JSContext *cx, host_api::HttpBody body) {
154+
ReadResult read_from_handle_all(JSContext *cx, host_api::HttpBody body) {
153155
std::vector<host_api::HostString> chunks;
154156
size_t bytes_read = 0;
155157
bool end_of_stream = true;
156158
while (true) {
157159
DEBUG_LOG("read from handle all loop")
158-
if (read_async) {
159-
auto ready_res = body.is_ready();
160-
if (auto *err = ready_res.to_err()) {
161-
HANDLE_ERROR(cx, *err);
162-
return {nullptr, 0, true};
163-
}
164-
if (!ready_res.unwrap()) {
165-
DEBUG_LOG("async break")
166-
end_of_stream = false;
167-
break;
168-
}
160+
auto ready_res = body.is_ready();
161+
if (auto *err = ready_res.to_err()) {
162+
HANDLE_ERROR(cx, *err);
163+
return {nullptr, 0, StreamState::Error};
164+
}
165+
if (!ready_res.unwrap()) {
166+
DEBUG_LOG("async break")
167+
end_of_stream = false;
168+
break;
169169
}
170170
auto res = body.read(HANDLE_READ_CHUNK_SIZE);
171171
if (auto *err = res.to_err()) {
172172
HANDLE_ERROR(cx, *err);
173-
return {nullptr, 0, true};
173+
return {nullptr, 0, StreamState::Error};
174174
}
175175

176176
auto &chunk = res.unwrap();
@@ -184,7 +184,7 @@ template <bool read_async> ReadResult read_from_handle_all(JSContext *cx, host_a
184184

185185
JS::UniqueChars buf;
186186
if (chunks.size() == 0) {
187-
return {nullptr, 0, end_of_stream};
187+
return {nullptr, 0, end_of_stream ? StreamState::Complete : StreamState::Wait};
188188
} else if (chunks.size() == 1) {
189189
// If there was only one chunk read, reuse that allocation.
190190
auto &chunk = chunks.back();
@@ -194,7 +194,7 @@ template <bool read_async> ReadResult read_from_handle_all(JSContext *cx, host_a
194194
buf.reset(static_cast<char *>(JS_string_malloc(cx, bytes_read)));
195195
if (!buf) {
196196
JS_ReportOutOfMemory(cx);
197-
return {nullptr, 0};
197+
return {nullptr, 0, StreamState::Error};
198198
}
199199

200200
char *end = buf.get();
@@ -203,7 +203,7 @@ template <bool read_async> ReadResult read_from_handle_all(JSContext *cx, host_a
203203
}
204204
}
205205

206-
return {std::move(buf), bytes_read, end_of_stream};
206+
return {std::move(buf), bytes_read, end_of_stream ? StreamState::Complete : StreamState::Wait};
207207
}
208208

209209
} // namespace
@@ -1383,9 +1383,9 @@ bool async_process_body_handle_for_bodyAll(JSContext *cx, uint32_t handle, JS::H
13831383
auto body = RequestOrResponse::body_handle(self);
13841384
auto *parse_body = reinterpret_cast<RequestOrResponse::ParseBodyCB *>(body_parser.toPrivate());
13851385
DEBUG_LOG("Consume read from handle all task")
1386-
auto [buf, bytes_read, end_of_stream] = read_from_handle_all<true>(cx, body);
1386+
auto [buf, bytes_read, state] = read_from_handle_all(cx, body);
13871387
DEBUG_LOG("Got handle all task")
1388-
if (!buf && end_of_stream) {
1388+
if (state == StreamState::Error) {
13891389
DEBUG_LOG("stream error")
13901390
JS::RootedObject result_promise(cx);
13911391
result_promise =
@@ -1396,7 +1396,7 @@ bool async_process_body_handle_for_bodyAll(JSContext *cx, uint32_t handle, JS::H
13961396
return RejectPromiseWithPendingError(cx, result_promise);
13971397
}
13981398

1399-
if (end_of_stream) {
1399+
if (state == StreamState::Complete) {
14001400
DEBUG_LOG("stream complete")
14011401
return parse_body(cx, self, std::move(buf), bytes_read);
14021402
}
@@ -1414,9 +1414,9 @@ bool RequestOrResponse::consume_body_handle_for_bodyAll(JSContext *cx, JS::Handl
14141414
auto body = body_handle(self);
14151415
auto *parse_body = reinterpret_cast<ParseBodyCB *>(body_parser.toPrivate());
14161416
DEBUG_LOG("Consume read from handle all")
1417-
auto [buf, bytes_read, end_of_stream] = read_from_handle_all<true>(cx, body);
1417+
auto [buf, bytes_read, state] = read_from_handle_all(cx, body);
14181418
DEBUG_LOG("Got handle all")
1419-
if (!buf && end_of_stream) {
1419+
if (state == StreamState::Error) {
14201420
DEBUG_LOG("stream error")
14211421
JS::RootedObject result_promise(cx);
14221422
result_promise =
@@ -1425,7 +1425,7 @@ bool RequestOrResponse::consume_body_handle_for_bodyAll(JSContext *cx, JS::Handl
14251425
return RejectPromiseWithPendingError(cx, result_promise);
14261426
}
14271427

1428-
if (end_of_stream) {
1428+
if (state == StreamState::Complete) {
14291429
DEBUG_LOG("stream complete")
14301430
return parse_body(cx, self, std::move(buf), bytes_read);
14311431
}

runtime/fastly/host-api/host_api.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -899,8 +899,7 @@ Result<HttpBody> HttpBody::make() {
899899
}
900900

901901
Result<HostString> HttpBody::read(uint32_t chunk_size) const {
902-
auto handle = std::to_string(reinterpret_cast<uint32_t>(this->handle));
903-
TRACE_CALL_ARGS(TSV(handle))
902+
TRACE_CALL_ARGS(TSV(std::to_string(handle)))
904903
Result<HostString> res;
905904

906905
fastly::fastly_world_list_u8 ret;
@@ -912,6 +911,7 @@ Result<HostString> HttpBody::read(uint32_t chunk_size) const {
912911
cabi_free(ret.ptr);
913912
res.emplace_err(err);
914913
} else {
914+
TRACE_CALL_RET(TSV(std::to_string(ret.len)))
915915
res.emplace(JS::UniqueChars(reinterpret_cast<char *>(ret.ptr)), ret.len);
916916
}
917917

0 commit comments

Comments
 (0)