Skip to content

Commit c60d1f6

Browse files
committed
support multi trigger apps
Signed-off-by: Rajat Jindal <[email protected]>
1 parent 175e241 commit c60d1f6

File tree

1 file changed

+86
-65
lines changed

1 file changed

+86
-65
lines changed

containerd-shim-spin/src/engine.rs

Lines changed: 86 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use containerd_shim_wasm::{
1313
container::{Engine, RuntimeContext, Stdio},
1414
sandbox::WasmLayer,
1515
};
16+
use futures::future;
1617
use log::info;
1718
use oci_spec::image::MediaType;
1819
use spin_app::locked::LockedApp;
@@ -189,70 +190,87 @@ impl SpinEngine {
189190
env::set_var("XDG_CACHE_HOME", &cache_dir);
190191
let app_source = self.app_source(ctx, &cache).await?;
191192
let resolved_app_source = self.resolve_app_source(app_source.clone(), &cache).await?;
192-
let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source)
193+
let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source)
193194
.with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?;
194195
let locked_app = self.load_resolved_app_source(resolved_app_source).await?;
195-
self.run_trigger(ctx, &trigger_cmd, locked_app, app_source)
196-
.await
196+
self.run_trigger(
197+
ctx,
198+
trigger_cmds.iter().map(|s| s.as_ref()).collect(),
199+
locked_app,
200+
app_source,
201+
)
202+
.await
197203
}
198204

199205
async fn run_trigger(
200206
&self,
201207
ctx: &impl RuntimeContext,
202-
trigger_type: &str,
208+
trigger_types: Vec<&str>,
203209
app: LockedApp,
204210
app_source: AppSource,
205211
) -> Result<()> {
206212
let working_dir = PathBuf::from("/");
207-
let f = match trigger_type {
208-
HttpTrigger::TRIGGER_TYPE => {
209-
let http_trigger: HttpTrigger = self
210-
.build_spin_trigger(working_dir, app, app_source)
211-
.await
212-
.context("failed to build spin trigger")?;
213-
214-
info!(" >>> running spin trigger");
215-
http_trigger.run(spin_trigger_http::CliArgs {
216-
address: parse_addr(SPIN_ADDR).unwrap(),
217-
tls_cert: None,
218-
tls_key: None,
219-
})
220-
}
221-
RedisTrigger::TRIGGER_TYPE => {
222-
let redis_trigger: RedisTrigger = self
223-
.build_spin_trigger(working_dir, app, app_source)
224-
.await
225-
.context("failed to build spin trigger")?;
226-
227-
info!(" >>> running spin trigger");
228-
redis_trigger.run(spin_trigger::cli::NoArgs)
229-
}
230-
SqsTrigger::TRIGGER_TYPE => {
231-
let sqs_trigger: SqsTrigger = self
232-
.build_spin_trigger(working_dir, app, app_source)
233-
.await
234-
.context("failed to build spin trigger")?;
235-
236-
info!(" >>> running spin trigger");
237-
sqs_trigger.run(spin_trigger::cli::NoArgs)
238-
}
239-
CommandTrigger::TRIGGER_TYPE => {
240-
let command_trigger: CommandTrigger = self
241-
.build_spin_trigger(working_dir, app, app_source)
242-
.await
243-
.context("failed to build spin trigger")?;
244-
245-
info!(" >>> running spin trigger");
246-
command_trigger.run(trigger_command::CliArgs {
247-
guest_args: ctx.args().to_vec(),
248-
})
249-
}
250-
_ => {
251-
todo!("Only Http, Redis and SQS triggers are currently supported.")
252-
}
253-
};
213+
let mut futures_list = Vec::with_capacity(trigger_types.len());
214+
for trigger_type in trigger_types.iter() {
215+
let f = match trigger_type.to_owned() {
216+
HttpTrigger::TRIGGER_TYPE => {
217+
let http_trigger: HttpTrigger = self
218+
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
219+
.await
220+
.context("failed to build spin trigger")?;
221+
222+
info!(" >>> running spin http trigger");
223+
http_trigger.run(spin_trigger_http::CliArgs {
224+
address: parse_addr(SPIN_ADDR).unwrap(),
225+
tls_cert: None,
226+
tls_key: None,
227+
})
228+
}
229+
RedisTrigger::TRIGGER_TYPE => {
230+
let redis_trigger: RedisTrigger = self
231+
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
232+
.await
233+
.context("failed to build spin trigger")?;
234+
235+
info!(" >>> running spin redis trigger");
236+
redis_trigger.run(spin_trigger::cli::NoArgs)
237+
}
238+
SqsTrigger::TRIGGER_TYPE => {
239+
let sqs_trigger: SqsTrigger = self
240+
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
241+
.await
242+
.context("failed to build spin trigger")?;
243+
244+
info!(" >>> running spin trigger");
245+
sqs_trigger.run(spin_trigger::cli::NoArgs)
246+
}
247+
CommandTrigger::TRIGGER_TYPE => {
248+
let command_trigger: CommandTrigger = self
249+
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
250+
.await
251+
.context("failed to build spin trigger")?;
252+
253+
info!(" >>> running spin trigger");
254+
command_trigger.run(trigger_command::CliArgs {
255+
guest_args: ctx.args().to_vec(),
256+
})
257+
}
258+
_ => {
259+
todo!("Only Http, Redis and SQS triggers are currently supported.")
260+
}
261+
};
262+
263+
futures_list.push(f)
264+
}
265+
254266
info!(" >>> notifying main thread we are about to start");
255-
f.await
267+
268+
// exit as soon as any of the trigger completes/exits
269+
let (result, _, rest) = future::select_all(futures_list).await;
270+
271+
drop(rest);
272+
273+
result
256274
}
257275

258276
async fn load_resolved_app_source(
@@ -435,7 +453,7 @@ pub enum ResolvedAppSource {
435453
}
436454

437455
impl ResolvedAppSource {
438-
pub fn trigger_type(&self) -> anyhow::Result<&str> {
456+
pub fn trigger_types(&self) -> anyhow::Result<Vec<&str>> {
439457
let types = match self {
440458
ResolvedAppSource::File { manifest, .. } => {
441459
manifest.triggers.keys().collect::<HashSet<_>>()
@@ -448,23 +466,26 @@ impl ResolvedAppSource {
448466
};
449467

450468
ensure!(!types.is_empty(), "no triggers in app");
451-
ensure!(types.len() == 1, "multiple trigger types not yet supported");
452-
Ok(types.into_iter().next().unwrap())
469+
Ok(types.into_iter().map(|t| t.as_str()).collect())
453470
}
454471
}
455472

456-
fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<String> {
457-
let trigger_type = resolved.trigger_type()?;
458-
459-
match trigger_type {
460-
RedisTrigger::TRIGGER_TYPE
461-
| HttpTrigger::TRIGGER_TYPE
462-
| SqsTrigger::TRIGGER_TYPE
463-
| CommandTrigger::TRIGGER_TYPE => Ok(trigger_type.to_owned()),
464-
_ => {
465-
todo!("Only Http, Redis, SQS, and command triggers are currently supported.")
473+
fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<Vec<String>> {
474+
let trigger_types = resolved.trigger_types()?;
475+
let mut types = Vec::with_capacity(trigger_types.len());
476+
for trigger_type in trigger_types.iter() {
477+
match trigger_type.to_owned() {
478+
RedisTrigger::TRIGGER_TYPE
479+
| HttpTrigger::TRIGGER_TYPE
480+
| SqsTrigger::TRIGGER_TYPE
481+
| CommandTrigger::TRIGGER_TYPE => types.push(trigger_type),
482+
_ => {
483+
todo!("Only Http, Redis and SQS triggers are currently supported.")
484+
}
466485
}
467486
}
487+
488+
Ok(trigger_types.iter().map(|x| x.to_string()).collect())
468489
}
469490

470491
#[cfg(test)]

0 commit comments

Comments
 (0)