Skip to content

Commit 4ed8f53

Browse files
authored
Merge pull request #2213 from carlokok/feat/multiple-triggertypes
feat(up): Spawn multiple trigger commands
2 parents c4fcb8f + b8490df commit 4ed8f53

File tree

9 files changed

+154
-60
lines changed

9 files changed

+154
-60
lines changed

crates/app/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#![deny(missing_docs)]
88

99
mod host_component;
10+
use serde_json::Value;
1011
pub use spin_locked_app::locked;
1112
pub use spin_locked_app::values;
1213
pub use spin_locked_app::{Error, MetadataKey, Result};
@@ -207,6 +208,36 @@ impl<'a, L> App<'a, L> {
207208
.map(|locked| AppTrigger { app: self, locked })
208209
}
209210

211+
/// Returns the trigger metadata for a specific trigger type.
212+
pub fn get_trigger_metadata<'this, T: Deserialize<'this> + Default>(
213+
&'this self,
214+
trigger_type: &'a str,
215+
) -> Result<Option<T>> {
216+
let Some(value) = self.get_trigger_metadata_value(trigger_type) else {
217+
return Ok(None);
218+
};
219+
let metadata = T::deserialize(value).map_err(|err| {
220+
Error::MetadataError(format!(
221+
"invalid metadata value for {trigger_type:?}: {err:?}"
222+
))
223+
})?;
224+
Ok(Some(metadata))
225+
}
226+
227+
fn get_trigger_metadata_value(&self, trigger_type: &str) -> Option<Value> {
228+
if let Some(trigger_configs) = self.locked.metadata.get("triggers") {
229+
// New-style: `{"triggers": {"<type>": {...}}}`
230+
trigger_configs.get(trigger_type).cloned()
231+
} else if self.locked.metadata["trigger"]["type"] == trigger_type {
232+
// Old-style: `{"trigger": {"type": "<type>", ...}}`
233+
let mut meta = self.locked.metadata["trigger"].clone();
234+
meta.as_object_mut().unwrap().remove("type");
235+
Some(meta)
236+
} else {
237+
None
238+
}
239+
}
240+
210241
/// Returns an iterator of [`AppTrigger`]s defined for this app with
211242
/// the given `trigger_type`.
212243
pub fn triggers_with_type(

crates/http/src/trigger.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ pub const METADATA_KEY: MetadataKey<Metadata> = MetadataKey::new("trigger");
77
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
88
#[serde(deny_unknown_fields)]
99
pub struct Metadata {
10-
// The type of trigger which should always been "http" in this case
11-
pub r#type: String,
1210
// The based url
1311
#[serde(default = "default_base")]
1412
pub base: String,

crates/redis/src/lib.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ use anyhow::{anyhow, Context, Result};
88
use futures::{future::join_all, StreamExt};
99
use redis::{Client, ConnectionLike};
1010
use serde::{de::IgnoredAny, Deserialize, Serialize};
11-
use spin_app::MetadataKey;
1211
use spin_core::async_trait;
1312
use spin_trigger::{cli::NoArgs, TriggerAppEngine, TriggerExecutor};
1413

1514
use crate::spin::SpinRedisExecutor;
1615

17-
const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadata> = MetadataKey::new("trigger");
18-
1916
pub(crate) type RuntimeData = ();
2017
pub(crate) type Store = spin_core::Store<RuntimeData>;
2118

@@ -44,7 +41,6 @@ pub struct RedisTriggerConfig {
4441
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
4542
#[serde(deny_unknown_fields)]
4643
struct TriggerMetadata {
47-
r#type: String,
4844
address: String,
4945
}
5046

@@ -56,7 +52,10 @@ impl TriggerExecutor for RedisTrigger {
5652
type RunConfig = NoArgs;
5753

5854
async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
59-
let address = engine.app().require_metadata(TRIGGER_METADATA_KEY)?.address;
55+
let address = engine
56+
.trigger_metadata::<TriggerMetadata>()?
57+
.unwrap_or_default()
58+
.address;
6059

6160
let mut channel_components: HashMap<String, Vec<String>> = HashMap::new();
6261

crates/trigger-http/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ impl TriggerExecutor for HttpTrigger {
9595

9696
async fn new(engine: TriggerAppEngine<Self>) -> Result<Self> {
9797
let mut base = engine
98-
.app()
99-
.require_metadata(spin_http::trigger::METADATA_KEY)?
98+
.trigger_metadata::<spin_http::trigger::Metadata>()?
99+
.unwrap_or_default()
100100
.base;
101+
101102
if !base.starts_with('/') {
102103
base = format!("/{base}");
103104
}

crates/trigger/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ impl<Executor: TriggerExecutor> TriggerAppEngine<Executor> {
287287
self.app.borrowed()
288288
}
289289

290+
pub fn trigger_metadata<T: DeserializeOwned + Default>(&self) -> spin_app::Result<Option<T>> {
291+
self.app().get_trigger_metadata(Executor::TRIGGER_TYPE)
292+
}
293+
290294
/// Returns AppTriggers and typed TriggerConfigs for this executor type.
291295
pub fn trigger_configs(&self) -> impl Iterator<Item = (AppTrigger, &Executor::TriggerConfig)> {
292296
self.app()

examples/spin-timer/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,16 @@ pub struct TimerTrigger {
2929
component_timings: HashMap<String, u64>,
3030
}
3131

32-
// Application settings (raw serialization format)
32+
// Picks out the timer entry from the application-level trigger settings
33+
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
34+
struct TriggerMetadataParent {
35+
timer: Option<TriggerMetadata>,
36+
}
37+
38+
// Application-level settings (raw serialization format)
3339
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
3440
#[serde(deny_unknown_fields)]
3541
struct TriggerMetadata {
36-
r#type: String,
3742
speedup: Option<u64>,
3843
}
3944

@@ -45,7 +50,7 @@ pub struct TimerTriggerConfig {
4550
interval_secs: u64,
4651
}
4752

48-
const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadata> = MetadataKey::new("trigger");
53+
const TRIGGER_METADATA_KEY: MetadataKey<TriggerMetadataParent> = MetadataKey::new("triggers");
4954

5055
#[async_trait]
5156
impl TriggerExecutor for TimerTrigger {
@@ -61,6 +66,8 @@ impl TriggerExecutor for TimerTrigger {
6166
let speedup = engine
6267
.app()
6368
.require_metadata(TRIGGER_METADATA_KEY)?
69+
.timer
70+
.unwrap_or_default()
6471
.speedup
6572
.unwrap_or(1);
6673

examples/spin-timer/trigger-timer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Run Spin components at timed intervals",
44
"homepage": "https://github.com/fermyon/spin/tree/main/examples/spin-timer",
55
"version": "0.1.0",
6-
"spinCompatibility": ">=2.0",
6+
"spinCompatibility": ">=2.2",
77
"license": "Apache-2.0",
88
"packages": [
99
{

src/commands/up.rs

Lines changed: 99 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88

99
use anyhow::{anyhow, bail, Context, Result};
1010
use clap::{CommandFactory, Parser};
11+
use itertools::Itertools;
1112
use reqwest::Url;
1213
use spin_app::locked::LockedApp;
1314
use spin_common::ui::quoted_path;
@@ -16,6 +17,8 @@ use spin_oci::OciLoader;
1617
use spin_trigger::cli::{SPIN_LOCAL_APP_DIR, SPIN_LOCKED_URL, SPIN_WORKING_DIR};
1718
use tempfile::TempDir;
1819

20+
use futures::StreamExt;
21+
1922
use crate::opts::*;
2023

2124
use self::app_source::{AppSource, ResolvedAppSource};
@@ -128,9 +131,11 @@ impl UpCommand {
128131

129132
if app_source == AppSource::None {
130133
if self.help {
131-
return self
132-
.run_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None)
133-
.await;
134+
let mut child = self
135+
.start_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None)
136+
.await?;
137+
let _ = child.wait().await?;
138+
return Ok(());
134139
} else {
135140
bail!("Default file '{DEFAULT_MANIFEST_FILE}' not found. Run `spin up --from <APPLICATION>`, or `spin up --help` for usage.");
136141
}
@@ -150,28 +155,51 @@ impl UpCommand {
150155

151156
let resolved_app_source = self.resolve_app_source(&app_source, &working_dir).await?;
152157

153-
let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source)
158+
let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source)
154159
.with_context(|| format!("Couldn't find trigger executor for {app_source}"))?;
155160

156161
if self.help {
157-
return self.run_trigger(trigger_cmd, None).await;
162+
for cmd in trigger_cmds {
163+
let mut help_process = self.start_trigger(cmd.clone(), None).await?;
164+
_ = help_process.wait().await;
165+
}
166+
return Ok(());
158167
}
159168

160169
let mut locked_app = self
161170
.load_resolved_app_source(resolved_app_source, &working_dir)
162171
.await?;
163172

164173
self.update_locked_app(&mut locked_app);
174+
let locked_url = self.write_locked_app(&locked_app, &working_dir).await?;
165175

166176
let local_app_dir = app_source.local_app_dir().map(Into::into);
167177

168178
let run_opts = RunTriggerOpts {
169-
locked_app,
179+
locked_url,
170180
working_dir,
171181
local_app_dir,
172182
};
173183

174-
self.run_trigger(trigger_cmd, Some(run_opts)).await
184+
let mut trigger_processes = self.start_trigger_processes(trigger_cmds, run_opts).await?;
185+
186+
set_kill_on_ctrl_c(&trigger_processes)?;
187+
188+
let mut trigger_tasks = trigger_processes
189+
.iter_mut()
190+
.map(|ch| ch.wait())
191+
.collect::<futures::stream::FuturesUnordered<_>>();
192+
193+
let first_to_finish = trigger_tasks.next().await;
194+
195+
if let Some(process_result) = first_to_finish {
196+
let status = process_result?;
197+
if !status.success() {
198+
return Err(crate::subprocess::ExitStatusError::new(status).into());
199+
}
200+
}
201+
202+
Ok(())
175203
}
176204

177205
fn get_canonical_working_dir(&self) -> Result<WorkingDirectory, anyhow::Error> {
@@ -190,57 +218,57 @@ impl UpCommand {
190218
Ok(working_dir_holder)
191219
}
192220

193-
async fn run_trigger(
221+
async fn start_trigger_processes(
194222
self,
223+
trigger_cmds: Vec<Vec<String>>,
224+
run_opts: RunTriggerOpts,
225+
) -> anyhow::Result<Vec<tokio::process::Child>> {
226+
let mut trigger_processes = Vec::with_capacity(trigger_cmds.len());
227+
228+
for cmd in trigger_cmds {
229+
let child = self
230+
.start_trigger(cmd.clone(), Some(run_opts.clone()))
231+
.await
232+
.context("Failed to start trigger process")?;
233+
trigger_processes.push(child);
234+
}
235+
236+
Ok(trigger_processes)
237+
}
238+
239+
async fn start_trigger(
240+
&self,
195241
trigger_cmd: Vec<String>,
196242
opts: Option<RunTriggerOpts>,
197-
) -> Result<(), anyhow::Error> {
243+
) -> Result<tokio::process::Child, anyhow::Error> {
198244
// The docs for `current_exe` warn that this may be insecure because it could be executed
199245
// via hard-link. I think it should be fine as long as we aren't `setuid`ing this binary.
200-
let mut cmd = std::process::Command::new(std::env::current_exe().unwrap());
246+
let mut cmd = tokio::process::Command::new(std::env::current_exe().unwrap());
201247
cmd.args(&trigger_cmd);
202248

203249
if let Some(RunTriggerOpts {
204-
locked_app,
250+
locked_url,
205251
working_dir,
206252
local_app_dir,
207253
}) = opts
208254
{
209-
let locked_url = self.write_locked_app(&locked_app, &working_dir).await?;
210-
211255
cmd.env(SPIN_LOCKED_URL, locked_url)
212256
.env(SPIN_WORKING_DIR, &working_dir)
213257
.args(&self.trigger_args);
214258

215259
if let Some(local_app_dir) = local_app_dir {
216260
cmd.env(SPIN_LOCAL_APP_DIR, local_app_dir);
217261
}
262+
263+
cmd.kill_on_drop(true);
218264
} else {
219265
cmd.arg("--help-args-only");
220266
}
221267

222268
tracing::trace!("Running trigger executor: {:?}", cmd);
223269

224-
let mut child = cmd.spawn().context("Failed to execute trigger")?;
225-
226-
// Terminate trigger executor if `spin up` itself receives a termination signal
227-
#[cfg(not(windows))]
228-
{
229-
// https://github.com/nix-rust/nix/issues/656
230-
let pid = nix::unistd::Pid::from_raw(child.id() as i32);
231-
ctrlc::set_handler(move || {
232-
if let Err(err) = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM) {
233-
tracing::warn!("Failed to kill trigger handler process: {:?}", err)
234-
}
235-
})?;
236-
}
237-
238-
let status = child.wait()?;
239-
if status.success() {
240-
Ok(())
241-
} else {
242-
Err(crate::subprocess::ExitStatusError::new(status).into())
243-
}
270+
let child = cmd.spawn().context("Failed to execute trigger")?;
271+
Ok(child)
244272
}
245273

246274
fn app_source(&self) -> AppSource {
@@ -358,8 +386,31 @@ impl UpCommand {
358386
}
359387
}
360388

389+
#[cfg(windows)]
390+
fn set_kill_on_ctrl_c(trigger_processes: &Vec<tokio::process::Child>) -> Result<(), anyhow::Error> {
391+
Ok(())
392+
}
393+
394+
#[cfg(not(windows))]
395+
fn set_kill_on_ctrl_c(trigger_processes: &[tokio::process::Child]) -> Result<(), anyhow::Error> {
396+
// https://github.com/nix-rust/nix/issues/656
397+
let pids = trigger_processes
398+
.iter()
399+
.flat_map(|child| child.id().map(|id| nix::unistd::Pid::from_raw(id as i32)))
400+
.collect_vec();
401+
ctrlc::set_handler(move || {
402+
for pid in &pids {
403+
if let Err(err) = nix::sys::signal::kill(*pid, nix::sys::signal::SIGTERM) {
404+
tracing::warn!("Failed to kill trigger handler process: {:?}", err)
405+
}
406+
}
407+
})?;
408+
Ok(())
409+
}
410+
411+
#[derive(Clone)]
361412
struct RunTriggerOpts {
362-
locked_app: LockedApp,
413+
locked_url: String,
363414
working_dir: PathBuf,
364415
local_app_dir: Option<PathBuf>,
365416
}
@@ -424,16 +475,20 @@ fn trigger_command(trigger_type: &str) -> Vec<String> {
424475
vec!["trigger".to_owned(), trigger_type.to_owned()]
425476
}
426477

427-
fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<Vec<String>> {
428-
let trigger_type = resolved.trigger_type()?;
429-
430-
match trigger_type {
431-
"http" | "redis" => Ok(trigger_command(trigger_type)),
432-
_ => {
433-
let cmd = resolve_trigger_plugin(trigger_type)?;
434-
Ok(vec![cmd])
435-
}
436-
}
478+
fn trigger_command_for_resolved_app_source(
479+
resolved: &ResolvedAppSource,
480+
) -> Result<Vec<Vec<String>>> {
481+
let trigger_type = resolved.trigger_types()?;
482+
trigger_type
483+
.iter()
484+
.map(|&t| match t {
485+
"http" | "redis" => Ok(trigger_command(t)),
486+
_ => {
487+
let cmd = resolve_trigger_plugin(t)?;
488+
Ok(vec![cmd])
489+
}
490+
})
491+
.collect()
437492
}
438493

439494
#[cfg(test)]

0 commit comments

Comments
 (0)