Skip to content

Commit 90bb696

Browse files
gterzianTG199
authored andcommitted
Streams: add an underlying sink type (servo#36385)
Introduces the concept of different types of underlying sinks for the writable controller, and a minor fix to the abort algorithm. The dead code is already used in the wip at servo#36181, and will also be used in a another wip in parallel to implement transform stream, so the concept is introduced here with dead code to facilitate the work in parallel and prevent too much merge conflicts down the road. Signed-off-by: gterzian <[email protected]>
1 parent 2b1865c commit 90bb696

File tree

2 files changed

+105
-50
lines changed

2 files changed

+105
-50
lines changed

components/script/dom/writablestream.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_alg
2828
use crate::dom::globalscope::GlobalScope;
2929
use crate::dom::promise::Promise;
3030
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31-
use crate::dom::writablestreamdefaultcontroller::WritableStreamDefaultController;
31+
use crate::dom::writablestreamdefaultcontroller::{
32+
UnderlyingSinkType, WritableStreamDefaultController,
33+
};
3234
use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
3335
use crate::realms::{InRealm, enter_realm};
3436
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
@@ -155,7 +157,7 @@ pub struct WritableStream {
155157

156158
impl WritableStream {
157159
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
158-
/// <https://streams.spec.whatwg.org/#initialize-readable-stream>
160+
/// <https://streams.spec.whatwg.org/#initialize-writable-stream>
159161
fn new_inherited() -> WritableStream {
160162
WritableStream {
161163
reflector_: Reflector::new(),
@@ -879,6 +881,7 @@ impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
879881
// Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink
880882
let controller = WritableStreamDefaultController::new(
881883
global,
884+
UnderlyingSinkType::Js,
882885
&underlying_sink_dict,
883886
high_water_mark,
884887
size_algorithm,

components/script/dom/writablestreamdefaultcontroller.rs

Lines changed: 100 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,26 @@ impl Callback for WriteAlgorithmRejectionHandler {
214214
}
215215
}
216216

217+
/// The type of sink algorithms we are using.
218+
#[allow(dead_code)]
219+
#[derive(JSTraceable, MallocSizeOf, PartialEq)]
220+
pub enum UnderlyingSinkType {
221+
/// Algorithms are provided by Js callbacks.
222+
Js,
223+
/// Algorithms supporting streams transfer are implemented in Rust.
224+
/// TODO: implement transfer.
225+
Transfer,
226+
}
227+
217228
/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
218229
#[dom_struct]
219230
pub struct WritableStreamDefaultController {
220231
reflector_: Reflector,
221232

233+
/// The type of underlying sink used. Besides the default JS one,
234+
/// there will be others for stream transfer, and for transform stream.
235+
underlying_sink_type: UnderlyingSinkType,
236+
222237
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
223238
#[ignore_malloc_size_of = "Rc is hard"]
224239
abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
@@ -256,12 +271,14 @@ impl WritableStreamDefaultController {
256271
/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
257272
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
258273
fn new_inherited(
274+
underlying_sink_type: UnderlyingSinkType,
259275
underlying_sink: &UnderlyingSink,
260276
strategy_hwm: f64,
261277
strategy_size: Rc<QueuingStrategySize>,
262278
) -> WritableStreamDefaultController {
263279
WritableStreamDefaultController {
264280
reflector_: Reflector::new(),
281+
underlying_sink_type,
265282
queue: Default::default(),
266283
stream: Default::default(),
267284
abort: RefCell::new(underlying_sink.abort.clone()),
@@ -276,13 +293,15 @@ impl WritableStreamDefaultController {
276293

277294
pub(crate) fn new(
278295
global: &GlobalScope,
296+
underlying_sink_type: UnderlyingSinkType,
279297
underlying_sink: &UnderlyingSink,
280298
strategy_hwm: f64,
281299
strategy_size: Rc<QueuingStrategySize>,
282300
can_gc: CanGc,
283301
) -> DomRoot<WritableStreamDefaultController> {
284302
reflect_dom_object(
285303
Box::new(WritableStreamDefaultController::new_inherited(
304+
underlying_sink_type,
286305
underlying_sink,
287306
strategy_hwm,
288307
strategy_size,
@@ -390,6 +409,7 @@ impl WritableStreamDefaultController {
390409
Promise::new_resolved(global, cx, result.get(), can_gc)
391410
}
392411
} else {
412+
// Let startAlgorithm be an algorithm that returns undefined.
393413
Promise::new_resolved(global, cx, (), can_gc)
394414
};
395415

@@ -439,71 +459,103 @@ impl WritableStreamDefaultController {
439459
reason: SafeHandleValue,
440460
can_gc: CanGc,
441461
) -> Rc<Promise> {
442-
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
443-
let algo = self.abort.borrow().clone();
444-
let result = if let Some(algo) = algo {
445-
algo.Call_(
446-
&this_object.handle(),
447-
Some(reason),
448-
ExceptionHandling::Rethrow,
449-
can_gc,
450-
)
451-
} else {
452-
Ok(Promise::new_resolved(global, cx, (), can_gc))
462+
let result = match self.underlying_sink_type {
463+
UnderlyingSinkType::Js => {
464+
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
465+
let algo = self.abort.borrow().clone();
466+
// Let result be the result of performing this.[[abortAlgorithm]], passing reason.
467+
let result = if let Some(algo) = algo {
468+
algo.Call_(
469+
&this_object.handle(),
470+
Some(reason),
471+
ExceptionHandling::Rethrow,
472+
can_gc,
473+
)
474+
} else {
475+
Ok(Promise::new_resolved(global, cx, (), can_gc))
476+
};
477+
result.unwrap_or_else(|e| {
478+
let promise = Promise::new(global, can_gc);
479+
promise.reject_error(e, can_gc);
480+
promise
481+
})
482+
},
483+
UnderlyingSinkType::Transfer => {
484+
// TODO: implement transfer.
485+
Promise::new_resolved(global, cx, (), can_gc)
486+
},
453487
};
454-
result.unwrap_or_else(|e| {
455-
let promise = Promise::new(global, can_gc);
456-
promise.reject_error(e, can_gc);
457-
promise
458-
})
488+
489+
// Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
490+
self.clear_algorithms();
491+
492+
result
459493
}
460494

461-
pub(crate) fn call_write_algorithm(
495+
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
496+
fn call_write_algorithm(
462497
&self,
463498
cx: SafeJSContext,
464499
chunk: SafeHandleValue,
465500
global: &GlobalScope,
466501
can_gc: CanGc,
467502
) -> Rc<Promise> {
468-
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
469-
let algo = self.write.borrow().clone();
470-
let result = if let Some(algo) = algo {
471-
algo.Call_(
472-
&this_object.handle(),
473-
chunk,
474-
self,
475-
ExceptionHandling::Rethrow,
476-
can_gc,
477-
)
478-
} else {
479-
Ok(Promise::new_resolved(global, cx, (), can_gc))
480-
};
481-
result.unwrap_or_else(|e| {
482-
let promise = Promise::new(global, can_gc);
483-
promise.reject_error(e, can_gc);
484-
promise
485-
})
503+
match self.underlying_sink_type {
504+
UnderlyingSinkType::Js => {
505+
rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
506+
let algo = self.write.borrow().clone();
507+
let result = if let Some(algo) = algo {
508+
algo.Call_(
509+
&this_object.handle(),
510+
chunk,
511+
self,
512+
ExceptionHandling::Rethrow,
513+
can_gc,
514+
)
515+
} else {
516+
Ok(Promise::new_resolved(global, cx, (), can_gc))
517+
};
518+
result.unwrap_or_else(|e| {
519+
let promise = Promise::new(global, can_gc);
520+
promise.reject_error(e, can_gc);
521+
promise
522+
})
523+
},
524+
UnderlyingSinkType::Transfer => {
525+
// TODO: implement transfer.
526+
Promise::new_resolved(global, cx, (), can_gc)
527+
},
528+
}
486529
}
487530

531+
/// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
488532
fn call_close_algorithm(
489533
&self,
490534
cx: SafeJSContext,
491535
global: &GlobalScope,
492536
can_gc: CanGc,
493537
) -> Rc<Promise> {
494-
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
495-
this_object.set(self.underlying_sink_obj.get());
496-
let algo = self.close.borrow().clone();
497-
let result = if let Some(algo) = algo {
498-
algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
499-
} else {
500-
Ok(Promise::new_resolved(global, cx, (), can_gc))
501-
};
502-
result.unwrap_or_else(|e| {
503-
let promise = Promise::new(global, can_gc);
504-
promise.reject_error(e, can_gc);
505-
promise
506-
})
538+
match self.underlying_sink_type {
539+
UnderlyingSinkType::Js => {
540+
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
541+
this_object.set(self.underlying_sink_obj.get());
542+
let algo = self.close.borrow().clone();
543+
let result = if let Some(algo) = algo {
544+
algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
545+
} else {
546+
Ok(Promise::new_resolved(global, cx, (), can_gc))
547+
};
548+
result.unwrap_or_else(|e| {
549+
let promise = Promise::new(global, can_gc);
550+
promise.reject_error(e, can_gc);
551+
promise
552+
})
553+
},
554+
UnderlyingSinkType::Transfer => {
555+
// TODO: implement transfer.
556+
Promise::new_resolved(global, cx, (), can_gc)
557+
},
558+
}
507559
}
508560

509561
/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>

0 commit comments

Comments
 (0)