1+ use std:: collections:: BTreeMap ;
2+
13use anyhow:: anyhow;
2- use deno_core:: v8;
4+ use deno_core:: {
5+ serde_v8,
6+ v8,
7+ ToJsBuffer ,
8+ } ;
39use errors:: {
410 ErrorMetadata ,
511 ErrorMetadataAnyhowExt ,
612} ;
13+ use serde:: Serialize ;
714use serde_json:: Value as JsonValue ;
15+ use uuid:: Uuid ;
816
917use super :: {
10- client:: PendingAsyncSyscall ,
18+ client:: {
19+ PendingAsyncSyscall ,
20+ PendingDynamicImport ,
21+ } ,
1122 context_state:: ContextState ,
1223} ;
1324use crate :: {
14- environment:: UncatchableDeveloperError ,
25+ environment:: {
26+ helpers:: resolve_promise,
27+ UncatchableDeveloperError ,
28+ } ,
1529 helpers:: {
1630 self ,
1731 to_rust_string,
1832 } ,
19- ops:: run_op,
33+ ops:: {
34+ run_op,
35+ start_async_op,
36+ } ,
37+ request_scope:: StreamListener ,
2038} ;
2139
2240pub struct CallbackContext < ' callback , ' scope : ' callback > {
2341 pub scope : & ' callback mut v8:: HandleScope < ' scope > ,
2442 context : v8:: Local < ' scope , v8:: Context > ,
2543}
2644
27- impl < ' callback , ' scope > CallbackContext < ' callback , ' scope > {
45+ impl < ' callback , ' scope : ' callback > CallbackContext < ' callback , ' scope > {
2846 fn new ( scope : & ' callback mut v8:: HandleScope < ' scope > ) -> Self {
2947 let context = scope. get_current_context ( ) ;
3048 Self { scope, context }
@@ -120,10 +138,7 @@ impl<'callback, 'scope> CallbackContext<'callback, 'scope> {
120138 let resolver = v8:: Global :: new ( self . scope , promise_resolver) ;
121139 {
122140 let context_state = self . context_state ( ) ?;
123-
124- let promise_id = context_state. next_promise_id ;
125- context_state. next_promise_id += 1 ;
126-
141+ let promise_id = context_state. register_promise ( resolver) ;
127142 let pending_async_syscall = PendingAsyncSyscall {
128143 promise_id,
129144 name,
@@ -132,8 +147,6 @@ impl<'callback, 'scope> CallbackContext<'callback, 'scope> {
132147 context_state
133148 . pending_async_syscalls
134149 . push ( pending_async_syscall) ;
135-
136- context_state. promise_resolvers . insert ( promise_id, resolver) ;
137150 } ;
138151 Ok ( promise)
139152 }
@@ -149,6 +162,17 @@ impl<'callback, 'scope> CallbackContext<'callback, 'scope> {
149162 }
150163 }
151164
165+ pub fn start_async_op (
166+ scope : & mut v8:: HandleScope ,
167+ args : v8:: FunctionCallbackArguments ,
168+ rv : v8:: ReturnValue ,
169+ ) {
170+ let mut ctx = CallbackContext :: new ( scope) ;
171+ if let Err ( e) = start_async_op ( & mut ctx, args, rv) {
172+ ctx. handle_syscall_or_op_error ( e) ;
173+ }
174+ }
175+
152176 pub extern "C" fn promise_reject_callback ( message : v8:: PromiseRejectMessage ) {
153177 let mut scope = unsafe { v8:: CallbackScope :: new ( & message) } ;
154178
@@ -285,9 +309,13 @@ impl<'callback, 'scope> CallbackContext<'callback, 'scope> {
285309 let resolved_specifier = deno_core:: resolve_import ( & specifier_str, & referrer_name)
286310 . map_err ( |e| ErrorMetadata :: bad_request ( "InvalidImport" , e. to_string ( ) ) ) ?;
287311
288- self . context_state ( ) ?
289- . pending_dynamic_imports
290- . push ( ( resolved_specifier, resolver) ) ;
312+ let state = self . context_state ( ) ?;
313+ let promise_id = state. register_promise ( resolver) ;
314+ let pending = PendingDynamicImport {
315+ promise_id,
316+ specifier : resolved_specifier,
317+ } ;
318+ state. pending_dynamic_imports . push ( pending) ;
291319
292320 Ok ( promise)
293321 }
@@ -313,6 +341,91 @@ impl<'callback, 'scope> CallbackContext<'callback, 'scope> {
313341 // TODO: Handle system errors.
314342 todo ! ( ) ;
315343 }
344+
345+ fn update_stream_listeners ( & mut self ) -> anyhow:: Result < ( ) > {
346+ #[ derive( Serialize , Debug ) ]
347+ #[ serde( rename_all = "camelCase" ) ]
348+ struct JsStreamChunk {
349+ done : bool ,
350+ value : Option < ToJsBuffer > ,
351+ }
352+ loop {
353+ let mut ready = BTreeMap :: new ( ) ;
354+
355+ let state = self . context_state ( ) ?;
356+ for stream_id in state. stream_listeners . keys ( ) {
357+ let chunk = state. streams . mutate (
358+ stream_id,
359+ |stream| -> anyhow:: Result < Result < ( Option < Uuid > , bool ) , ( ) > > {
360+ let stream = stream
361+ . ok_or_else ( || anyhow:: anyhow!( "listening on nonexistent stream" ) ) ?;
362+ let result = match stream {
363+ Ok ( stream) => Ok ( ( stream. parts . pop_front ( ) , stream. done ) ) ,
364+ Err ( _) => Err ( ( ) ) ,
365+ } ;
366+ Ok ( result)
367+ } ,
368+ ) ?;
369+ match chunk {
370+ Err ( _) => {
371+ ready. insert (
372+ * stream_id,
373+ Err ( state. streams . remove ( stream_id) . unwrap ( ) . unwrap_err ( ) ) ,
374+ ) ;
375+ } ,
376+ Ok ( ( chunk, stream_done) ) => {
377+ if let Some ( chunk) = chunk {
378+ let ready_chunk = state
379+ . blob_parts
380+ . remove ( & chunk)
381+ . ok_or_else ( || anyhow:: anyhow!( "stream chunk missing" ) ) ?;
382+ ready. insert ( * stream_id, Ok ( Some ( ready_chunk) ) ) ;
383+ } else if stream_done {
384+ ready. insert ( * stream_id, Ok ( None ) ) ;
385+ }
386+ } ,
387+ }
388+ }
389+ if ready. is_empty ( ) {
390+ // Nothing to notify -- all caught up.
391+ return Ok ( ( ) ) ;
392+ }
393+ for ( stream_id, update) in ready {
394+ if let Some ( listener) = self . context_state ( ) ?. stream_listeners . remove ( & stream_id) {
395+ match listener {
396+ StreamListener :: JsPromise ( resolver) => {
397+ let result = match update {
398+ Ok ( update) => Ok ( serde_v8:: to_v8 (
399+ self . scope ,
400+ JsStreamChunk {
401+ done : update. is_none ( ) ,
402+ value : update. map ( |chunk| chunk. to_vec ( ) . into ( ) ) ,
403+ } ,
404+ ) ?) ,
405+ Err ( e) => Err ( e) ,
406+ } ;
407+ // TODO: Is this okay? We're throwing a JsError here from within
408+ // the callback context, which then needs to propagate it.
409+ resolve_promise ( self . scope , resolver, result) ?;
410+ } ,
411+ StreamListener :: RustStream ( stream) => match update {
412+ Ok ( None ) => stream. close_channel ( ) ,
413+ Ok ( Some ( bytes) ) => {
414+ let _ = stream. unbounded_send ( Ok ( bytes) ) ;
415+ self . context_state ( ) ?
416+ . stream_listeners
417+ . insert ( stream_id, StreamListener :: RustStream ( stream) ) ;
418+ } ,
419+ Err ( e) => {
420+ let _ = stream. unbounded_send ( Err ( e) ) ;
421+ stream. close_channel ( ) ;
422+ } ,
423+ } ,
424+ }
425+ }
426+ }
427+ }
428+ }
316429}
317430
318431mod op_provider {
@@ -344,6 +457,7 @@ mod op_provider {
344457 use super :: CallbackContext ;
345458 use crate :: {
346459 environment:: AsyncOpRequest ,
460+ isolate2:: client:: PendingAsyncOp ,
347461 ops:: OpProvider ,
348462 request_scope:: StreamListener ,
349463 } ;
@@ -376,52 +490,68 @@ mod op_provider {
376490 fn console_timers (
377491 & mut self ,
378492 ) -> anyhow:: Result < & mut WithHeapSize < BTreeMap < String , UnixTimestamp > > > {
379- todo ! ( )
493+ Ok ( & mut self . context_state ( ) ? . console_timers )
380494 }
381495
382496 fn unix_timestamp ( & mut self ) -> anyhow:: Result < UnixTimestamp > {
383497 self . context_state ( ) ?. environment . unix_timestamp ( )
384498 }
385499
386500 fn unix_timestamp_non_deterministic ( & mut self ) -> anyhow:: Result < UnixTimestamp > {
387- todo ! ( )
501+ self . context_state ( ) ?
502+ . environment
503+ . unix_timestamp_non_deterministic ( )
388504 }
389505
390506 fn start_async_op (
391507 & mut self ,
392- _request : AsyncOpRequest ,
393- _resolver : v8:: Global < v8:: PromiseResolver > ,
508+ request : AsyncOpRequest ,
509+ resolver : v8:: Global < v8:: PromiseResolver > ,
394510 ) -> anyhow:: Result < ( ) > {
395- todo ! ( ) ;
511+ let state = self . context_state ( ) ?;
512+ let promise_id = state. register_promise ( resolver) ;
513+ let pending_async_op = PendingAsyncOp {
514+ promise_id,
515+ request,
516+ } ;
517+ state. pending_async_ops . push ( pending_async_op) ;
518+ Ok ( ( ) )
396519 }
397520
398- fn create_blob_part ( & mut self , _bytes : Bytes ) -> anyhow:: Result < Uuid > {
399- todo ! ( )
521+ fn create_blob_part ( & mut self , bytes : Bytes ) -> anyhow:: Result < Uuid > {
522+ let state = self . context_state ( ) ?;
523+ state. create_blob_part ( bytes)
400524 }
401525
402- fn get_blob_part ( & mut self , _uuid : & Uuid ) -> anyhow:: Result < Option < Bytes > > {
403- todo ! ( )
526+ fn get_blob_part ( & mut self , uuid : & Uuid ) -> anyhow:: Result < Option < Bytes > > {
527+ let state = self . context_state ( ) ?;
528+ Ok ( state. get_blob_part ( uuid) )
404529 }
405530
406531 fn create_stream ( & mut self ) -> anyhow:: Result < Uuid > {
407- todo ! ( )
532+ let state = self . context_state ( ) ?;
533+ state. create_stream ( )
408534 }
409535
410536 fn extend_stream (
411537 & mut self ,
412- _id : Uuid ,
413- _bytes : Option < Bytes > ,
414- _new_done : bool ,
538+ id : Uuid ,
539+ bytes : Option < Bytes > ,
540+ new_done : bool ,
415541 ) -> anyhow:: Result < ( ) > {
416- todo ! ( )
542+ let state = self . context_state ( ) ?;
543+ state. extend_stream ( id, bytes, new_done) ?;
544+ self . update_stream_listeners ( )
417545 }
418546
419547 fn new_stream_listener (
420548 & mut self ,
421- _stream_id : Uuid ,
422- _listener : StreamListener ,
549+ stream_id : Uuid ,
550+ listener : StreamListener ,
423551 ) -> anyhow:: Result < ( ) > {
424- todo ! ( ) ;
552+ let state = self . context_state ( ) ?;
553+ state. new_stream_listener ( stream_id, listener) ?;
554+ self . update_stream_listeners ( )
425555 }
426556
427557 fn get_environment_variable (
@@ -434,11 +564,13 @@ mod op_provider {
434564 fn get_all_table_mappings (
435565 & mut self ,
436566 ) -> anyhow:: Result < ( TableMapping , VirtualTableMapping ) > {
437- todo ! ( )
567+ self . context_state ( ) ? . environment . get_all_table_mappings ( )
438568 }
439569
440570 fn get_table_mapping_without_system_tables ( & mut self ) -> anyhow:: Result < TableMappingValue > {
441- todo ! ( )
571+ self . context_state ( ) ?
572+ . environment
573+ . get_table_mapping_without_system_tables ( )
442574 }
443575 }
444576}
0 commit comments