Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit a682eb5

Browse files
committed
Add storage backup support for identity function
1 parent 9aaf939 commit a682eb5

File tree

8 files changed

+289
-135
lines changed

8 files changed

+289
-135
lines changed

dash/pipe/functions/identity/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ repository = "https://github.com/ulagbulag/OpenARK"
1717
dash-pipe-provider = { path = "../../provider" }
1818

1919
anyhow = { workspace = true }
20+
clap = { workspace = true }
21+
serde = { workspace = true }
2022
serde_json = { workspace = true }
Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,41 @@
1+
use std::sync::Arc;
2+
13
use anyhow::Result;
2-
use dash_pipe_provider::{PipeEngine, PipeMessages};
4+
use clap::{ArgAction, Parser};
5+
use dash_pipe_provider::{PipeEngine, PipeMessage, PipeMessages, PipePayload};
6+
use serde::{Deserialize, Serialize};
37
use serde_json::Value;
48

59
fn main() {
610
PipeEngine::from_env().loop_forever(tick)
711
}
812

9-
async fn tick(input: PipeMessages<Value>) -> Result<PipeMessages<Value>> {
10-
Ok(input)
13+
#[derive(Clone, Debug, Parser, Serialize, Deserialize)]
14+
pub struct FunctionArgs {
15+
#[arg(long, env = "PIPE_IDENTITY_WRITE_TO_PERSISTENT_STORAGE", action = ArgAction::SetTrue)]
16+
#[serde(default)]
17+
write_to_persistent_storage: Option<bool>,
18+
}
19+
20+
async fn tick(
21+
args: Arc<FunctionArgs>,
22+
input: PipeMessages<Value>,
23+
) -> Result<Option<PipeMessages<Value>>> {
24+
match args.write_to_persistent_storage {
25+
Some(true) => Ok(Some(match input {
26+
PipeMessages::Single(message) => PipeMessages::Single(pack_payload(message)?),
27+
PipeMessages::Batch(messages) => PipeMessages::Batch(
28+
messages
29+
.into_iter()
30+
.map(pack_payload)
31+
.collect::<Result<_>>()?,
32+
),
33+
})),
34+
Some(false) | None => Ok(Some(input)),
35+
}
36+
}
37+
38+
fn pack_payload(mut message: PipeMessage<Value>) -> Result<PipeMessage<Value>> {
39+
message.payloads = vec![PipePayload::new("test".into(), message.to_json_bytes()?)];
40+
Ok(message)
1141
}

dash/pipe/functions/loader-webcam/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ repository = "https://github.com/ulagbulag/OpenARK"
1717
dash-pipe-provider = { path = "../../provider" }
1818

1919
anyhow = { workspace = true }
20+
clap = { workspace = true }
21+
serde = { workspace = true }
Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
1+
use std::sync::Arc;
2+
13
use anyhow::Result;
4+
use clap::Parser;
25
use dash_pipe_provider::{PipeEngine, PipeMessages};
6+
use serde::{Deserialize, Serialize};
37

48
fn main() {
59
PipeEngine::from_env().loop_forever(tick)
610
}
711

8-
async fn tick(input: PipeMessages<String>) -> Result<PipeMessages<String>> {
12+
#[derive(Clone, Debug, Parser, Serialize, Deserialize)]
13+
pub struct FunctionArgs {}
14+
15+
async fn tick(
16+
_args: Arc<FunctionArgs>,
17+
input: PipeMessages<String>,
18+
) -> Result<Option<PipeMessages<String>>> {
919
// TODO: to be implemented
1020
dbg!(&input);
11-
Ok(input)
21+
Ok(Some(input))
1222
}

0 commit comments

Comments
 (0)