Skip to content

Commit b7440ae

Browse files
committed
basic readable stream working
1 parent 0ffcb86 commit b7440ae

File tree

7 files changed

+336
-4
lines changed

7 files changed

+336
-4
lines changed

src/browser/env.zig

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ const WebApis = struct {
3636
@import("xhr/form_data.zig").Interfaces,
3737
@import("xhr/File.zig"),
3838
@import("xmlserializer/xmlserializer.zig").Interfaces,
39-
@import("fetch/Headers.zig"),
40-
@import("fetch/Request.zig"),
41-
@import("fetch/Response.zig"),
39+
@import("fetch/fetch.zig").Interfaces,
40+
@import("streams/streams.zig").Interfaces,
4241
});
4342
};
4443

src/browser/fetch/Request.zig

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,6 @@ test "fetch: request" {
254254
.{ "let request2 = new Request('https://google.com', { method: 'POST', body: 'Hello, World' })", "undefined" },
255255
.{ "request2.url", "https://google.com" },
256256
.{ "request2.method", "POST" },
257-
.{ "request2.body", "Hello, World" },
258257
}, .{});
259258
}
260259

src/browser/fetch/fetch.zig

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <[email protected]>
4+
// Pierre Tachoire <[email protected]>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
pub const Interfaces = .{
20+
@import("Headers.zig"),
21+
@import("Request.zig"),
22+
@import("Response.zig"),
23+
};
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <[email protected]>
4+
// Pierre Tachoire <[email protected]>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
const std = @import("std");
20+
const log = @import("../../log.zig");
21+
22+
const v8 = @import("v8");
23+
const Page = @import("../page.zig").Page;
24+
const Env = @import("../env.zig").Env;
25+
26+
const ReadableStream = @This();
27+
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
28+
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
29+
30+
const State = union(enum) {
31+
readable,
32+
closed: ?[]const u8,
33+
errored: Env.JsObject,
34+
};
35+
36+
// This promise resolves when a stream is canceled.
37+
cancel_resolver: Env.PromiseResolver,
38+
locked: bool = false,
39+
state: State = .readable,
40+
41+
// A queue would be ideal here but I don't want to pay the cost of the priority operation.
42+
queue: std.ArrayListUnmanaged([]const u8) = .empty,
43+
44+
const UnderlyingSource = struct {
45+
start: ?Env.Function = null,
46+
pull: ?Env.Function = null,
47+
cancel: ?Env.Function = null,
48+
type: ?[]const u8 = null,
49+
};
50+
51+
const QueueingStrategy = struct {
52+
size: ?Env.Function = null,
53+
high_water_mark: f64 = 1.0,
54+
};
55+
56+
pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
57+
_ = strategy;
58+
59+
const cancel_resolver = Env.PromiseResolver{
60+
.js_context = page.main_context,
61+
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
62+
};
63+
64+
const stream = try page.arena.create(ReadableStream);
65+
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver };
66+
67+
const controller = ReadableStreamDefaultController{ .stream = stream };
68+
69+
// call start
70+
if (underlying) |src| {
71+
if (src.start) |start| {
72+
try start.call(void, .{controller});
73+
}
74+
}
75+
76+
log.info(.browser, "rs aux", .{ .queue_len = stream.queue.items.len });
77+
78+
return stream;
79+
}
80+
81+
pub fn _cancel(self: *const ReadableStream) Env.Promise {
82+
return self.cancel_resolver.promise();
83+
}
84+
85+
pub fn get_locked(self: *const ReadableStream) bool {
86+
return self.locked;
87+
}
88+
89+
const GetReaderOptions = struct {
90+
mode: ?[]const u8 = null,
91+
};
92+
93+
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) ReadableStreamDefaultReader {
94+
const options = _options orelse GetReaderOptions{};
95+
_ = options;
96+
97+
return ReadableStreamDefaultReader.constructor(self, page);
98+
}
99+
100+
const testing = @import("../../testing.zig");
101+
test "streams: ReadableStream" {
102+
var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" });
103+
defer runner.deinit();
104+
105+
try runner.testCases(&.{
106+
.{ "var readResult;", "undefined" },
107+
.{
108+
\\ const stream = new ReadableStream({
109+
\\ start(controller) {
110+
\\ controller.enqueue("hello");
111+
\\ controller.enqueue("world");
112+
\\ controller.close();
113+
\\ }
114+
\\ });
115+
,
116+
undefined,
117+
},
118+
.{
119+
\\ const reader = stream.getReader();
120+
\\ (async function () { readResult = await reader.read() }());
121+
\\ false;
122+
,
123+
"false",
124+
},
125+
.{ "reader", "[object ReadableStreamDefaultReader]" },
126+
.{ "readResult.value", "hello" },
127+
.{ "readResult.done", "false" },
128+
}, .{});
129+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <[email protected]>
4+
// Pierre Tachoire <[email protected]>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
const std = @import("std");
20+
const log = @import("../../log.zig");
21+
22+
const Page = @import("../page.zig").Page;
23+
const Env = @import("../env.zig").Env;
24+
const v8 = @import("v8");
25+
26+
const ReadableStream = @import("./ReadableStream.zig");
27+
28+
const ReadableStreamDefaultController = @This();
29+
30+
stream: *ReadableStream,
31+
32+
pub fn get_desiredSize(self: *const ReadableStreamDefaultController) i32 {
33+
// TODO: This may need tuning at some point if it becomes a performance issue.
34+
return @intCast(self.stream.queue.capacity - self.stream.queue.items.len);
35+
}
36+
37+
pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page: *Page) !void {
38+
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
39+
self.stream.state = .{ .closed = reason };
40+
41+
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
42+
// to discard, must use cancel.
43+
}
44+
45+
pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page: *Page) !void {
46+
const stream = self.stream;
47+
48+
if (stream.state != .readable) {
49+
return error.TypeError;
50+
}
51+
52+
try self.stream.queue.append(page.arena, chunk);
53+
}
54+
55+
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void {
56+
self.stream.state = .{ .errored = err };
57+
// set to error.
58+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <[email protected]>
4+
// Pierre Tachoire <[email protected]>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
const std = @import("std");
20+
21+
const v8 = @import("v8");
22+
23+
const log = @import("../../log.zig");
24+
const Env = @import("../env.zig").Env;
25+
const Page = @import("../page.zig").Page;
26+
const ReadableStream = @import("./ReadableStream.zig");
27+
28+
const ReadableStreamDefaultReader = @This();
29+
30+
stream: *ReadableStream,
31+
// This promise resolves when the stream is closed.
32+
closed_resolver: Env.PromiseResolver,
33+
34+
pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader {
35+
const closed_resolver = Env.PromiseResolver{
36+
.js_context = page.main_context,
37+
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
38+
};
39+
40+
return .{
41+
.stream = stream,
42+
.closed_resolver = closed_resolver,
43+
};
44+
}
45+
46+
pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
47+
return self.closed_resolver.promise();
48+
}
49+
50+
pub fn _cancel(self: *ReadableStreamDefaultReader) Env.Promise {
51+
return self.stream._cancel();
52+
}
53+
54+
pub const ReadableStreamReadResult = struct {
55+
value: ?[]const u8,
56+
done: bool,
57+
58+
pub fn get_value(self: *const ReadableStreamReadResult, page: *Page) !?[]const u8 {
59+
return if (self.value) |value| try page.arena.dupe(u8, value) else null;
60+
}
61+
62+
pub fn get_done(self: *const ReadableStreamReadResult) bool {
63+
return self.done;
64+
}
65+
};
66+
67+
pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise {
68+
const stream = self.stream;
69+
70+
const resolver = Env.PromiseResolver{
71+
.js_context = page.main_context,
72+
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
73+
};
74+
75+
switch (stream.state) {
76+
.readable => {
77+
if (stream.queue.items.len > 0) {
78+
const data = self.stream.queue.orderedRemove(0);
79+
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
80+
} else {
81+
// TODO: need to wait until we have more data
82+
try resolver.reject("TODO!");
83+
return error.Todo;
84+
}
85+
},
86+
.closed => |_| {
87+
if (stream.queue.items.len > 0) {
88+
const data = try page.arena.dupe(u8, self.stream.queue.orderedRemove(0));
89+
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
90+
} else {
91+
try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true });
92+
}
93+
},
94+
.errored => |err| {
95+
try resolver.reject(err);
96+
},
97+
}
98+
99+
return resolver.promise();
100+
}

src/browser/streams/streams.zig

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
2+
//
3+
// Francis Bouvier <[email protected]>
4+
// Pierre Tachoire <[email protected]>
5+
//
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU Affero General Public License as
8+
// published by the Free Software Foundation, either version 3 of the
9+
// License, or (at your option) any later version.
10+
//
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU Affero General Public License for more details.
15+
//
16+
// You should have received a copy of the GNU Affero General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
pub const Interfaces = .{
20+
@import("ReadableStream.zig"),
21+
@import("ReadableStreamDefaultReader.zig"),
22+
@import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult,
23+
@import("ReadableStreamDefaultController.zig"),
24+
};

0 commit comments

Comments
 (0)