Skip to content

Commit c5520be

Browse files
committed
properly handle closed for ReadableStream
1 parent 516b332 commit c5520be

File tree

3 files changed

+37
-17
lines changed

3 files changed

+37
-17
lines changed

src/browser/streams/ReadableStream.zig

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ const State = union(enum) {
3636

3737
// This promise resolves when a stream is canceled.
3838
cancel_resolver: v8.Persistent(v8.PromiseResolver),
39+
closed_resolver: v8.Persistent(v8.PromiseResolver),
40+
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
41+
3942
locked: bool = false,
4043
state: State = .readable,
4144

4245
cancel_fn: ?Env.Function = null,
4346
pull_fn: ?Env.Function = null,
4447

4548
strategy: QueueingStrategy,
46-
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
4749
queue: std.ArrayListUnmanaged([]const u8) = .empty,
4850

4951
pub const ReadableStreamReadResult = struct {
@@ -82,8 +84,13 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy,
8284
v8.PromiseResolver.init(page.main_context.v8_context),
8385
);
8486

87+
const closed_resolver = v8.Persistent(v8.PromiseResolver).init(
88+
page.main_context.isolate,
89+
v8.PromiseResolver.init(page.main_context.v8_context),
90+
);
91+
8592
const stream = try page.arena.create(ReadableStream);
86-
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy };
93+
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .closed_resolver = closed_resolver, .strategy = strategy };
8794

8895
const controller = ReadableStreamDefaultController{ .stream = stream };
8996

@@ -106,6 +113,15 @@ pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy,
106113
return stream;
107114
}
108115

116+
pub fn destructor(self: *ReadableStream) void {
117+
self.cancel_resolver.deinit();
118+
self.closed_resolver.deinit();
119+
120+
if (self.reader_resolver) |*rr| {
121+
rr.deinit();
122+
}
123+
}
124+
109125
pub fn get_locked(self: *const ReadableStream) bool {
110126
return self.locked;
111127
}
@@ -150,7 +166,7 @@ const GetReaderOptions = struct {
150166
mode: ?[]const u8 = null,
151167
};
152168

153-
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Page) !ReadableStreamDefaultReader {
169+
pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions) !ReadableStreamDefaultReader {
154170
if (self.locked) {
155171
return error.TypeError;
156172
}
@@ -159,7 +175,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag
159175
const options = _options orelse GetReaderOptions{};
160176
_ = options;
161177

162-
return ReadableStreamDefaultReader.constructor(self, page);
178+
return ReadableStreamDefaultReader.constructor(self);
163179
}
164180

165181
// TODO: pipeThrough (requires TransformStream)

src/browser/streams/ReadableStreamDefaultController.zig

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
3939
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
4040
self.stream.state = .{ .closed = reason };
4141

42+
// Resolve the Reader Promise
4243
if (self.stream.reader_resolver) |rr| {
4344
const resolver = Env.PromiseResolver{
4445
.js_context = page.main_context,
@@ -49,6 +50,14 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
4950
self.stream.reader_resolver = null;
5051
}
5152

53+
// Resolve the Closed promise.
54+
const closed_resolver = Env.PromiseResolver{
55+
.js_context = page.main_context,
56+
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
57+
};
58+
59+
try closed_resolver.resolve({});
60+
5261
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
5362
// to discard, must use cancel.
5463
}

src/browser/streams/ReadableStreamDefaultReader.zig

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,18 @@ const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamR
2929
const ReadableStreamDefaultReader = @This();
3030

3131
stream: *ReadableStream,
32-
// This promise resolves when the stream is closed.
33-
closed_resolver: Env.PromiseResolver,
3432

35-
pub fn constructor(stream: *ReadableStream, page: *Page) ReadableStreamDefaultReader {
36-
const closed_resolver = Env.PromiseResolver{
37-
.js_context = page.main_context,
38-
.resolver = v8.PromiseResolver.init(page.main_context.v8_context),
39-
};
33+
pub fn constructor(stream: *ReadableStream) ReadableStreamDefaultReader {
34+
return .{ .stream = stream };
35+
}
4036

41-
return .{
42-
.stream = stream,
43-
.closed_resolver = closed_resolver,
37+
pub fn get_closed(self: *const ReadableStreamDefaultReader, page: *Page) Env.Promise {
38+
const resolver = Env.PromiseResolver{
39+
.js_context = page.main_context,
40+
.resolver = self.stream.closed_resolver.castToPromiseResolver(),
4441
};
45-
}
4642

47-
pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
48-
return self.closed_resolver.promise();
43+
return resolver.promise();
4944
}
5045

5146
pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise {

0 commit comments

Comments
 (0)