@@ -13,6 +13,7 @@ use containerd_shim_wasm::{
1313 container:: { Engine , RuntimeContext , Stdio } ,
1414 sandbox:: WasmLayer ,
1515} ;
16+ use futures:: future;
1617use log:: info;
1718use oci_spec:: image:: MediaType ;
1819use spin_app:: locked:: LockedApp ;
@@ -189,70 +190,91 @@ impl SpinEngine {
189190 env:: set_var ( "XDG_CACHE_HOME" , & cache_dir) ;
190191 let app_source = self . app_source ( ctx, & cache) . await ?;
191192 let resolved_app_source = self . resolve_app_source ( app_source. clone ( ) , & cache) . await ?;
192- let trigger_cmd = trigger_command_for_resolved_app_source ( & resolved_app_source)
193+ let trigger_cmds = trigger_command_for_resolved_app_source ( & resolved_app_source)
193194 . with_context ( || format ! ( "Couldn't find trigger executor for {app_source:?}" ) ) ?;
194195 let locked_app = self . load_resolved_app_source ( resolved_app_source) . await ?;
195- self . run_trigger ( ctx, & trigger_cmd, locked_app, app_source)
196- . await
196+ self . run_trigger (
197+ ctx,
198+ trigger_cmds. iter ( ) . map ( |s| s. as_ref ( ) ) . collect ( ) ,
199+ locked_app,
200+ app_source,
201+ )
202+ . await
197203 }
198204
199205 async fn run_trigger (
200206 & self ,
201207 ctx : & impl RuntimeContext ,
202- trigger_type : & str ,
208+ trigger_types : Vec < & str > ,
203209 app : LockedApp ,
204210 app_source : AppSource ,
205211 ) -> Result < ( ) > {
206212 let working_dir = PathBuf :: from ( "/" ) ;
207- let f = match trigger_type {
208- HttpTrigger :: TRIGGER_TYPE => {
209- let http_trigger: HttpTrigger = self
210- . build_spin_trigger ( working_dir, app, app_source)
211- . await
212- . context ( "failed to build spin trigger" ) ?;
213-
214- info ! ( " >>> running spin trigger" ) ;
215- http_trigger. run ( spin_trigger_http:: CliArgs {
216- address : parse_addr ( SPIN_ADDR ) . unwrap ( ) ,
217- tls_cert : None ,
218- tls_key : None ,
219- } )
220- }
221- RedisTrigger :: TRIGGER_TYPE => {
222- let redis_trigger: RedisTrigger = self
223- . build_spin_trigger ( working_dir, app, app_source)
224- . await
225- . context ( "failed to build spin trigger" ) ?;
226-
227- info ! ( " >>> running spin trigger" ) ;
228- redis_trigger. run ( spin_trigger:: cli:: NoArgs )
229- }
230- SqsTrigger :: TRIGGER_TYPE => {
231- let sqs_trigger: SqsTrigger = self
232- . build_spin_trigger ( working_dir, app, app_source)
233- . await
234- . context ( "failed to build spin trigger" ) ?;
235-
236- info ! ( " >>> running spin trigger" ) ;
237- sqs_trigger. run ( spin_trigger:: cli:: NoArgs )
238- }
239- CommandTrigger :: TRIGGER_TYPE => {
240- let command_trigger: CommandTrigger = self
241- . build_spin_trigger ( working_dir, app, app_source)
242- . await
243- . context ( "failed to build spin trigger" ) ?;
244-
245- info ! ( " >>> running spin trigger" ) ;
246- command_trigger. run ( trigger_command:: CliArgs {
247- guest_args : ctx. args ( ) . to_vec ( ) ,
248- } )
249- }
250- _ => {
251- todo ! ( "Only Http, Redis and SQS triggers are currently supported." )
252- }
253- } ;
213+ let mut futures_list = Vec :: with_capacity ( trigger_types. len ( ) ) ;
214+ for trigger_type in trigger_types. iter ( ) {
215+ let f = match trigger_type. to_owned ( ) {
216+ HttpTrigger :: TRIGGER_TYPE => {
217+ let http_trigger: HttpTrigger = self
218+ . build_spin_trigger ( working_dir. clone ( ) , app. clone ( ) , app_source. clone ( ) )
219+ . await
220+ . context ( "failed to build spin trigger" ) ?;
221+
222+ info ! ( " >>> running spin http trigger" ) ;
223+ http_trigger. run ( spin_trigger_http:: CliArgs {
224+ address : parse_addr ( SPIN_ADDR ) . unwrap ( ) ,
225+ tls_cert : None ,
226+ tls_key : None ,
227+ } )
228+ }
229+ RedisTrigger :: TRIGGER_TYPE => {
230+ let redis_trigger: RedisTrigger = self
231+ . build_spin_trigger ( working_dir. clone ( ) , app. clone ( ) , app_source. clone ( ) )
232+ . await
233+ . context ( "failed to build spin trigger" ) ?;
234+
235+ info ! ( " >>> running spin redis trigger" ) ;
236+ redis_trigger. run ( spin_trigger:: cli:: NoArgs )
237+ }
238+ SqsTrigger :: TRIGGER_TYPE => {
239+ let sqs_trigger: SqsTrigger = self
240+ . build_spin_trigger ( working_dir. clone ( ) , app. clone ( ) , app_source. clone ( ) )
241+ . await
242+ . context ( "failed to build spin trigger" ) ?;
243+
244+ info ! ( " >>> running spin trigger" ) ;
245+ sqs_trigger. run ( spin_trigger:: cli:: NoArgs )
246+ }
247+ CommandTrigger :: TRIGGER_TYPE => {
248+ let command_trigger: CommandTrigger = self
249+ . build_spin_trigger ( working_dir. clone ( ) , app. clone ( ) , app_source. clone ( ) )
250+ . await
251+ . context ( "failed to build spin trigger" ) ?;
252+
253+ info ! ( " >>> running spin trigger" ) ;
254+ command_trigger. run ( trigger_command:: CliArgs {
255+ guest_args : ctx. args ( ) . to_vec ( ) ,
256+ } )
257+ }
258+ _ => {
259+ todo ! ( "Only Http, Redis and SQS triggers are currently supported." )
260+ }
261+ } ;
262+
263+ futures_list. push ( f)
264+ }
265+
254266 info ! ( " >>> notifying main thread we are about to start" ) ;
255- f. await
267+
268+ // exit as soon as any of the trigger completes/exits
269+ let ( result, index, rest) = future:: select_all ( futures_list) . await ;
270+ info ! (
271+ " >>> trigger type '{trigger_type}' exited" ,
272+ trigger_type = trigger_types[ index]
273+ ) ;
274+
275+ drop ( rest) ;
276+
277+ result
256278 }
257279
258280 async fn load_resolved_app_source (
@@ -435,7 +457,7 @@ pub enum ResolvedAppSource {
435457}
436458
437459impl ResolvedAppSource {
438- pub fn trigger_type ( & self ) -> anyhow:: Result < & str > {
460+ pub fn trigger_types ( & self ) -> anyhow:: Result < Vec < & str > > {
439461 let types = match self {
440462 ResolvedAppSource :: File { manifest, .. } => {
441463 manifest. triggers . keys ( ) . collect :: < HashSet < _ > > ( )
@@ -448,23 +470,26 @@ impl ResolvedAppSource {
448470 } ;
449471
450472 ensure ! ( !types. is_empty( ) , "no triggers in app" ) ;
451- ensure ! ( types. len( ) == 1 , "multiple trigger types not yet supported" ) ;
452- Ok ( types. into_iter ( ) . next ( ) . unwrap ( ) )
473+ Ok ( types. into_iter ( ) . map ( |t| t. as_str ( ) ) . collect ( ) )
453474 }
454475}
455476
456- fn trigger_command_for_resolved_app_source ( resolved : & ResolvedAppSource ) -> Result < String > {
457- let trigger_type = resolved. trigger_type ( ) ?;
458-
459- match trigger_type {
460- RedisTrigger :: TRIGGER_TYPE
461- | HttpTrigger :: TRIGGER_TYPE
462- | SqsTrigger :: TRIGGER_TYPE
463- | CommandTrigger :: TRIGGER_TYPE => Ok ( trigger_type. to_owned ( ) ) ,
464- _ => {
465- todo ! ( "Only Http, Redis, SQS, and command triggers are currently supported." )
477+ fn trigger_command_for_resolved_app_source ( resolved : & ResolvedAppSource ) -> Result < Vec < String > > {
478+ let trigger_types = resolved. trigger_types ( ) ?;
479+ let mut types = Vec :: with_capacity ( trigger_types. len ( ) ) ;
480+ for trigger_type in trigger_types. iter ( ) {
481+ match trigger_type. to_owned ( ) {
482+ RedisTrigger :: TRIGGER_TYPE
483+ | HttpTrigger :: TRIGGER_TYPE
484+ | SqsTrigger :: TRIGGER_TYPE
485+ | CommandTrigger :: TRIGGER_TYPE => types. push ( trigger_type) ,
486+ _ => {
487+ todo ! ( "Only Http, Redis and SQS triggers are currently supported." )
488+ }
466489 }
467490 }
491+
492+ Ok ( trigger_types. iter ( ) . map ( |x| x. to_string ( ) ) . collect ( ) )
468493}
469494
470495#[ cfg( test) ]
0 commit comments