Skip to content

Commit 429dc4d

Browse files
committed
implement remaining ReadableStream functionality
1 parent 47aa966 commit 429dc4d

File tree

4 files changed

+271
-28
lines changed

4 files changed

+271
-28
lines changed

src/browser/streams/ReadableStream.zig

Lines changed: 194 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const ReadableStreamDefaultController = @import("ReadableStreamDefaultController
3030
const State = union(enum) {
3131
readable,
3232
closed: ?[]const u8,
33+
cancelled: ?[]const u8,
3334
errored: Env.JsObject,
3435
};
3536

@@ -38,8 +39,29 @@ cancel_resolver: v8.Persistent(v8.PromiseResolver),
3839
locked: bool = false,
3940
state: State = .readable,
4041

42+
cancel_fn: ?Env.Function = null,
43+
pull_fn: ?Env.Function = null,
44+
45+
strategy: QueueingStrategy,
46+
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
4147
queue: std.ArrayListUnmanaged([]const u8) = .empty,
4248

49+
pub const ReadableStreamReadResult = struct {
50+
const ValueUnion =
51+
union(enum) { data: []const u8, empty: void };
52+
53+
value: ValueUnion,
54+
done: bool,
55+
56+
pub fn get_value(self: *const ReadableStreamReadResult) ValueUnion {
57+
return self.value;
58+
}
59+
60+
pub fn get_done(self: *const ReadableStreamReadResult) bool {
61+
return self.done;
62+
}
63+
};
64+
4365
const UnderlyingSource = struct {
4466
start: ?Env.Function = null,
4567
pull: ?Env.Function = null,
@@ -49,19 +71,19 @@ const UnderlyingSource = struct {
4971

5072
const QueueingStrategy = struct {
5173
size: ?Env.Function = null,
52-
high_water_mark: f64 = 1.0,
74+
high_water_mark: u32 = 1,
5375
};
5476

55-
pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
56-
_ = strategy;
77+
pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
78+
const strategy: QueueingStrategy = _strategy orelse .{};
5779

5880
const cancel_resolver = v8.Persistent(v8.PromiseResolver).init(
5981
page.main_context.isolate,
6082
v8.PromiseResolver.init(page.main_context.v8_context),
6183
);
6284

6385
const stream = try page.arena.create(ReadableStream);
64-
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver };
86+
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy };
6587

6688
const controller = ReadableStreamDefaultController{ .stream = stream };
6789

@@ -70,6 +92,15 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p
7092
if (src.start) |start| {
7193
try start.call(void, .{controller});
7294
}
95+
96+
if (src.cancel) |cancel| {
97+
stream.cancel_fn = cancel;
98+
}
99+
100+
if (src.pull) |pull| {
101+
stream.pull_fn = pull;
102+
try stream.pullIf();
103+
}
73104
}
74105

75106
return stream;
@@ -79,7 +110,7 @@ pub fn get_locked(self: *const ReadableStream) bool {
79110
return self.locked;
80111
}
81112

82-
pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise {
113+
pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Promise {
83114
if (self.locked) {
84115
return error.TypeError;
85116
}
@@ -89,9 +120,31 @@ pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise {
89120
.resolver = self.cancel_resolver.castToPromiseResolver(),
90121
};
91122

123+
self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null };
124+
125+
// Call cancel callback.
126+
if (self.cancel_fn) |cancel| {
127+
if (reason) |r| {
128+
try cancel.call(void, .{r});
129+
} else {
130+
try cancel.call(void, .{});
131+
}
132+
}
133+
134+
try resolver.resolve({});
92135
return resolver.promise();
93136
}
94137

138+
pub fn pullIf(self: *ReadableStream) !void {
139+
if (self.pull_fn) |pull_fn| {
140+
// Must be under the high water mark AND readable.
141+
if ((self.queue.items.len < self.strategy.high_water_mark) and self.state == .readable) {
142+
const controller = ReadableStreamDefaultController{ .stream = self };
143+
try pull_fn.call(void, .{controller});
144+
}
145+
}
146+
}
147+
95148
const GetReaderOptions = struct {
96149
// Mode must equal 'byob' or be undefined. RangeError otherwise.
97150
mode: ?[]const u8 = null,
@@ -102,6 +155,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag
102155
return error.TypeError;
103156
}
104157

158+
// TODO: Determine if we need the ReadableStreamBYOBReader
105159
const options = _options orelse GetReaderOptions{};
106160
_ = options;
107161

@@ -144,3 +198,138 @@ test "streams: ReadableStream" {
144198
.{ "readResult.done", "false" },
145199
}, .{});
146200
}
201+
202+
test "streams: ReadableStream cancel and close" {
203+
var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" });
204+
defer runner.deinit();
205+
try runner.testCases(&.{
206+
.{ "var readResult; var cancelResult; var closeResult;", "undefined" },
207+
208+
// Test 1: Stream with controller.close()
209+
.{
210+
\\ const stream1 = new ReadableStream({
211+
\\ start(controller) {
212+
\\ controller.enqueue("first");
213+
\\ controller.enqueue("second");
214+
\\ controller.close();
215+
\\ }
216+
\\ });
217+
,
218+
undefined,
219+
},
220+
.{ "const reader1 = stream1.getReader();", undefined },
221+
.{
222+
\\ (async function () {
223+
\\ readResult = await reader1.read();
224+
\\ }());
225+
\\ false;
226+
,
227+
"false",
228+
},
229+
.{ "readResult.value", "first" },
230+
.{ "readResult.done", "false" },
231+
232+
// Read second chunk
233+
.{
234+
\\ (async function () {
235+
\\ readResult = await reader1.read();
236+
\\ }());
237+
\\ false;
238+
,
239+
"false",
240+
},
241+
.{ "readResult.value", "second" },
242+
.{ "readResult.done", "false" },
243+
244+
// Read after close - should get done: true
245+
.{
246+
\\ (async function () {
247+
\\ readResult = await reader1.read();
248+
\\ }());
249+
\\ false;
250+
,
251+
"false",
252+
},
253+
.{ "readResult.value", "undefined" },
254+
.{ "readResult.done", "true" },
255+
256+
// Test 2: Stream with reader.cancel()
257+
.{
258+
\\ const stream2 = new ReadableStream({
259+
\\ start(controller) {
260+
\\ controller.enqueue("data1");
261+
\\ controller.enqueue("data2");
262+
\\ controller.enqueue("data3");
263+
\\ },
264+
\\ cancel(reason) {
265+
\\ closeResult = `Stream cancelled: ${reason}`;
266+
\\ }
267+
\\ });
268+
,
269+
undefined,
270+
},
271+
.{ "const reader2 = stream2.getReader();", undefined },
272+
273+
// Read one chunk before canceling
274+
.{
275+
\\ (async function () {
276+
\\ readResult = await reader2.read();
277+
\\ }());
278+
\\ false;
279+
,
280+
"false",
281+
},
282+
.{ "readResult.value", "data1" },
283+
.{ "readResult.done", "false" },
284+
285+
// Cancel the stream
286+
.{
287+
\\ (async function () {
288+
\\ cancelResult = await reader2.cancel("user requested");
289+
\\ }());
290+
\\ false;
291+
,
292+
"false",
293+
},
294+
.{ "cancelResult", "undefined" },
295+
.{ "closeResult", "Stream cancelled: user requested" },
296+
297+
// Try to read after cancel - should throw or return done
298+
.{
299+
\\ try {
300+
\\ (async function () {
301+
\\ readResult = await reader2.read();
302+
\\ }());
303+
\\ } catch(e) {
304+
\\ readResult = { error: e.name };
305+
\\ }
306+
\\ false;
307+
,
308+
"false",
309+
},
310+
311+
// Test 3: Cancel without reason
312+
.{
313+
\\ const stream3 = new ReadableStream({
314+
\\ start(controller) {
315+
\\ controller.enqueue("test");
316+
\\ },
317+
\\ cancel(reason) {
318+
\\ closeResult = reason === undefined ? "no reason" : reason;
319+
\\ }
320+
\\ });
321+
,
322+
undefined,
323+
},
324+
.{ "const reader3 = stream3.getReader();", undefined },
325+
.{
326+
\\ (async function () {
327+
\\ await reader3.cancel();
328+
\\ }());
329+
\\ false;
330+
,
331+
"false",
332+
},
333+
.{ "closeResult", "no reason" },
334+
}, .{});
335+
}

src/browser/streams/ReadableStreamDefaultController.zig

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const Env = @import("../env.zig").Env;
2424
const v8 = @import("v8");
2525

2626
const ReadableStream = @import("./ReadableStream.zig");
27+
const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult;
2728

2829
const ReadableStreamDefaultController = @This();
2930

@@ -38,6 +39,16 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
3839
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
3940
self.stream.state = .{ .closed = reason };
4041

42+
if (self.stream.reader_resolver) |rr| {
43+
const resolver = Env.PromiseResolver{
44+
.js_context = page.main_context,
45+
.resolver = rr.castToPromiseResolver(),
46+
};
47+
48+
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
49+
self.stream.reader_resolver = null;
50+
}
51+
4152
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
4253
// to discard, must use cancel.
4354
}
@@ -49,9 +60,37 @@ pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page:
4960
return error.TypeError;
5061
}
5162

63+
if (self.stream.reader_resolver) |rr| {
64+
const resolver = Env.PromiseResolver{
65+
.js_context = page.main_context,
66+
.resolver = rr.castToPromiseResolver(),
67+
};
68+
69+
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false });
70+
self.stream.reader_resolver = null;
71+
72+
// rr.setWeakFinalizer(@ptrCast(self.stream), struct {
73+
// fn callback(info: ?*v8.c.WeakCallbackInfo) void {
74+
// const inner_stream: *ReadableStream = @ptrCast(@alignCast(v8.c.v8__WeakCallbackInfo__GetParameter(info).?));
75+
// inner_stream.reader_resolver = null;
76+
// }
77+
// }.callback, .kParameter);
78+
}
79+
5280
try self.stream.queue.append(page.arena, chunk);
81+
try self.stream.pullIf();
5382
}
5483

55-
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void {
84+
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject, page: *Page) !void {
5685
self.stream.state = .{ .errored = err };
86+
87+
if (self.stream.reader_resolver) |rr| {
88+
const resolver = Env.PromiseResolver{
89+
.js_context = page.main_context,
90+
.resolver = rr.castToPromiseResolver(),
91+
};
92+
93+
try resolver.reject(err);
94+
self.stream.reader_resolver = null;
95+
}
5796
}

0 commit comments

Comments
 (0)