Skip to content

Commit bcd262c

Browse files
authored
Support multi-field oneshot inputs (#29)
* feat(engine): support multi-field http_input in oneshot pipelines * fix: bind http_input uploads to their referenced pins * fix(ui): multi-upload UI
1 parent b133222 commit bcd262c

File tree

26 files changed

+1404
-402
lines changed

26 files changed

+1404
-402
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Agent-assisted contributions are welcome, but should be **supervised** and **rev
1818
- **Primary server binary**: `skit` (crate: `streamkit-server`).
1919
- **Dev task runner**: `just` (see `justfile`).
2020
- **Docs**: Astro + Starlight in `docs/` (sidebar in `docs/astro.config.mjs`).
21+
- **UI tooling**: Bun-first. Use `bun install`, `bunx` (or `bun run` scripts) for UI work—avoid npm/pnpm.
2122

2223
## Workflow expectations
2324

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/skit-cli/src/client.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
1616
use tracing::{debug, error, info};
1717
use url::Url;
1818

19+
/// Represents one multipart input file for oneshot execution.
20+
#[derive(Debug, Clone)]
21+
pub struct InputFile {
22+
pub field: String,
23+
pub path: String,
24+
pub content_type: Option<String>,
25+
}
26+
1927
fn http_base_url(server_url: &str) -> Result<Url, Box<dyn std::error::Error + Send + Sync>> {
2028
let mut url = Url::parse(server_url)?;
2129
match url.scheme() {
@@ -167,12 +175,12 @@ fn parse_batch_operations(
167175
#[allow(clippy::cognitive_complexity)]
168176
pub async fn process_oneshot(
169177
pipeline_path: &str,
170-
input_path: &str,
178+
inputs: &[InputFile],
171179
output_path: &str,
172180
server_url: &str,
173181
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
174182
let client = reqwest::Client::new();
175-
process_oneshot_with_client(&client, pipeline_path, input_path, output_path, server_url).await
183+
process_oneshot_with_client(&client, pipeline_path, inputs, output_path, server_url).await
176184
}
177185

178186
/// Process a pipeline using a remote server in oneshot mode with a caller-provided HTTP client.
@@ -192,13 +200,17 @@ pub async fn process_oneshot(
192200
pub async fn process_oneshot_with_client(
193201
client: &reqwest::Client,
194202
pipeline_path: &str,
195-
input_path: &str,
203+
inputs: &[InputFile],
196204
output_path: &str,
197205
server_url: &str,
198206
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
207+
if inputs.is_empty() {
208+
return Err("At least one input file is required".into());
209+
}
210+
199211
info!(
200212
pipeline = %pipeline_path,
201-
input = %input_path,
213+
inputs = inputs.len(),
202214
output = %output_path,
203215
server = %server_url,
204216
"Starting oneshot pipeline processing"
@@ -208,31 +220,41 @@ pub async fn process_oneshot_with_client(
208220
if !Path::new(pipeline_path).exists() {
209221
return Err(format!("Pipeline file not found: {pipeline_path}").into());
210222
}
211-
if !Path::new(input_path).exists() {
212-
return Err(format!("Input file not found: {input_path}").into());
223+
for input in inputs {
224+
if !Path::new(&input.path).exists() {
225+
return Err(format!("Input file not found: {}", input.path).into());
226+
}
213227
}
214228

215229
// Read pipeline configuration
216230
debug!("Reading pipeline configuration from {pipeline_path}");
217231
let pipeline_content = fs::read_to_string(pipeline_path).await?;
218232

219-
// Read input media file
220-
debug!("Reading input media file from {input_path}");
221-
let media_data = fs::read(input_path).await?;
222-
223-
// Extract filename for the multipart form
224-
let input_filename = Path::new(input_path)
225-
.file_name()
226-
.and_then(|name| name.to_str())
227-
.unwrap_or("input")
228-
.to_string();
229-
230233
// Create multipart form
231-
let media_len = media_data.len();
232-
debug!("Creating multipart form with {media_len} bytes of media data");
233-
let form = multipart::Form::new()
234-
.text("config", pipeline_content)
235-
.part("media", multipart::Part::bytes(media_data).file_name(input_filename));
234+
let mut form = multipart::Form::new().text("config", pipeline_content);
235+
for input in inputs {
236+
debug!("Reading input media file from {}", input.path);
237+
let media_data = fs::read(&input.path).await?;
238+
let media_len = media_data.len();
239+
240+
let input_filename = Path::new(&input.path)
241+
.file_name()
242+
.and_then(|name| name.to_str())
243+
.unwrap_or("input")
244+
.to_string();
245+
246+
debug!(
247+
"Adding multipart field '{}' with {} bytes (file: {})",
248+
input.field, media_len, input_filename
249+
);
250+
251+
let mut part = multipart::Part::bytes(media_data).file_name(input_filename);
252+
if let Some(ct) = &input.content_type {
253+
part = part.mime_str(ct)?;
254+
}
255+
256+
form = form.part(input.field.clone(), part);
257+
}
236258

237259
// Send request to server
238260
let url = http_base_url(server_url)?.join("/api/v1/process")?;

apps/skit-cli/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use client::{
1717
destroy_session, get_config, get_permissions, get_pipeline, get_sample, list_audio_assets,
1818
list_node_schemas, list_packet_schemas, list_plugins, list_samples_dynamic,
1919
list_samples_oneshot, list_sessions, process_oneshot, save_sample, tune_node,
20-
upload_audio_asset, upload_plugin, watch_events,
20+
upload_audio_asset, upload_plugin, watch_events, InputFile,
2121
};
2222
pub use load_test::run_load_test;
2323

apps/skit-cli/src/load_test/workers.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ pub async fn oneshot_worker(
4444
let result = process_oneshot_with_client(
4545
&client,
4646
pipeline_path,
47-
input_path,
47+
&[crate::client::InputFile {
48+
field: "media".to_string(),
49+
path: input_path.clone(),
50+
content_type: None,
51+
}],
4852
output_path,
4953
&config.server.url,
5054
)

apps/skit-cli/src/main.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
//
33
// SPDX-License-Identifier: MPL-2.0
44

5-
use clap::{Parser, Subcommand};
5+
use clap::{ArgAction, Parser, Subcommand};
6+
use streamkit_client::InputFile;
67
use tracing::{error, info};
78

89
#[derive(Parser, Debug)]
@@ -12,15 +13,34 @@ struct Cli {
1213
command: Commands,
1314
}
1415

16+
#[derive(Debug, Clone)]
17+
struct FieldPath {
18+
field: String,
19+
path: String,
20+
}
21+
22+
fn parse_field_path(s: &str) -> Result<FieldPath, String> {
23+
let mut parts = s.splitn(2, '=');
24+
let field = parts.next().unwrap_or("").trim();
25+
let path = parts.next().unwrap_or("").trim();
26+
if field.is_empty() || path.is_empty() {
27+
return Err("expected form name=path".to_string());
28+
}
29+
Ok(FieldPath { field: field.to_string(), path: path.to_string() })
30+
}
31+
1532
#[derive(Subcommand, Debug)]
1633
enum Commands {
1734
/// Process a pipeline using a remote server (oneshot mode)
1835
#[command(name = "oneshot")]
1936
OneShot {
2037
/// Path to the pipeline YAML file
2138
pipeline: String,
22-
/// Input media file path
39+
/// Primary input media file path (multipart field defaults to 'media')
2340
input: String,
41+
/// Additional input fields in the form name=path (repeatable)
42+
#[arg(long = "input", value_parser = parse_field_path, action = ArgAction::Append)]
43+
extra_input: Vec<FieldPath>,
2444
/// Output file path
2545
output: String,
2646
/// Server URL (default: http://127.0.0.1:4545)
@@ -329,11 +349,17 @@ async fn main() {
329349
let cli = Cli::parse();
330350

331351
match cli.command {
332-
Commands::OneShot { pipeline, input, output, server } => {
352+
Commands::OneShot { pipeline, input, extra_input, output, server } => {
333353
info!("Starting StreamKit client - oneshot processing");
334354

355+
let mut inputs = Vec::new();
356+
inputs.push(InputFile { field: "media".to_string(), path: input, content_type: None });
357+
for extra in extra_input {
358+
inputs.push(InputFile { field: extra.field, path: extra.path, content_type: None });
359+
}
360+
335361
if let Err(e) =
336-
streamkit_client::process_oneshot(&pipeline, &input, &output, &server).await
362+
streamkit_client::process_oneshot(&pipeline, &inputs, &output, &server).await
337363
{
338364
// Error already logged via tracing above
339365
error!(error = %e, "Failed to process oneshot pipeline");

apps/skit-cli/src/shell.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,12 @@ impl Shell {
623623

624624
// Use the existing process_oneshot function from client.rs
625625
// This makes a multipart HTTP POST to /api/v1/process
626-
crate::client::process_oneshot(pipeline_path, input_path, output_path, &http_url).await?;
626+
let inputs = vec![crate::client::InputFile {
627+
field: "media".to_string(),
628+
path: input_path.to_string(),
629+
content_type: None,
630+
}];
631+
crate::client::process_oneshot(pipeline_path, &inputs, output_path, &http_url).await?;
627632

628633
println!("✅ Oneshot processing completed successfully");
629634

apps/skit/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ anyhow = "1.0"
4848
# For HTTP server
4949
axum = { version = "0.8", features = ["multipart", "ws"] }
5050
tokio = { workspace = true, features = ["full"] }
51+
tokio-util = { workspace = true }
5152
tower = "0.5.3"
5253
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "set-header"] }
5354
tokio-stream = "0.1.18"

apps/skit/src/bin/gen-docs-reference.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,39 @@ fn add_synthetic_oneshot_nodes(defs: &mut Vec<NodeDefinition>) {
116116
Receives binary data from the HTTP request body."
117117
.to_string(),
118118
),
119-
param_schema: serde_json::json!({}),
119+
param_schema: serde_json::json!({
120+
"type": "object",
121+
"additionalProperties": false,
122+
"properties": {
123+
"field": {
124+
"type": "string",
125+
"description": "Multipart field name to bind to this input. Defaults to 'media' when only one http_input node exists; otherwise defaults to the node id."
126+
},
127+
"fields": {
128+
"type": "array",
129+
"description": "Optional list of multipart fields for this node. When set, the node exposes one output pin per entry (pin name matches the field name). Entries may be strings or objects with { name, required }.",
130+
"items": {
131+
"oneOf": [
132+
{ "type": "string" },
133+
{
134+
"type": "object",
135+
"additionalProperties": false,
136+
"properties": {
137+
"name": { "type": "string" },
138+
"required": { "type": "boolean", "default": true }
139+
},
140+
"required": ["name"]
141+
}
142+
]
143+
}
144+
},
145+
"required": {
146+
"type": "boolean",
147+
"description": "If true (default), the request must include this field.",
148+
"default": true
149+
}
150+
}
151+
}),
120152
inputs: vec![],
121153
outputs: vec![OutputPin {
122154
name: "out".to_string(),

0 commit comments

Comments
 (0)