Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit 18b70ec

Browse files
committed
Begin implementation of storage connector
1 parent 410a435 commit 18b70ec

File tree

16 files changed

+350
-102
lines changed

16 files changed

+350
-102
lines changed

Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ members = [
1111
"dash/controller",
1212
"dash/gateway",
1313
"dash/openapi",
14+
"dash/pipe/connectors/storage",
1415
"dash/pipe/connectors/webcam", # exclude(alpine)
1516
"dash/pipe/functions/identity",
1617
"dash/pipe/functions/python", # exclude(alpine)
@@ -57,6 +58,7 @@ actix-web = { version = "=4.4", default-features = false, features = [
5758
anyhow = { version = "=1.0", features = ["backtrace"] }
5859
argon2 = { version = "=0.5" }
5960
async-recursion = { version = "=1.0" }
61+
async-stream = { version = "=0.3" }
6062
async-trait = { version = "=0.1" }
6163
base64 = { version = "=0.21" }
6264
byteorder = { version = "=1.4" }
@@ -67,7 +69,7 @@ chrono = { version = "=0.4", features = ["serde"] }
6769
clap = { version = "=4.4", features = ["env", "derive"] }
6870
csv = { version = "=1.2" }
6971
ctrlc = { version = "=3.4" }
70-
deltalake = { version = "0.16", default-features = false }
72+
deltalake = { version = "=0.16", default-features = false }
7173
email_address = { version = "=0.2" }
7274
futures = { version = "=0.3" }
7375
gethostname = { version = "=0.4" }
@@ -87,7 +89,7 @@ k8s-openapi = { version = "=0.20", features = ["schemars", "v1_26"] }
8789
kube = { version = "=0.86", default-features = false }
8890
language-tags = { version = "=0.3", features = ["serde"] }
8991
tracing = { version = "=0.1" }
90-
tracing-subscriber = { version = "0.3" }
92+
tracing-subscriber = { version = "=0.3" }
9193
mime = { version = "=0.3" }
9294
# FIXME: push a PR: rustls-tls feature support
9395
minio = { git = "https://github.com/ulagbulag/minio-rs.git", default-features = false, rev = "5be4686e307b058aa4190134a555c925301c59b2", features = [
@@ -99,8 +101,8 @@ num-traits = { version = "=0.2" }
99101
octocrab = { git = "https://github.com/ulagbulag/octocrab.git", default-features = false, features = [
100102
"rustls-tls",
101103
] }
102-
opencv = { version = "0.84", default-features = false }
103-
ordered-float = { version = "4.0", default-features = false, features = [
104+
opencv = { version = "=0.84", default-features = false }
105+
ordered-float = { version = "=4.0", default-features = false, features = [
104106
"bytemuck",
105107
"schemars",
106108
"serde",
@@ -147,6 +149,7 @@ sio = { git = "https://github.com/ulagbulag/sio-rs.git" }
147149
strum = { version = "=0.25", features = ["derive"] }
148150
tera = { version = "=1.19" }
149151
tokio = { version = "=1.32", features = ["macros", "rt"] }
152+
tokio-stream = { version = "=0.1" }
150153
url = { version = "=2.4", features = ["serde"] }
151154
uuid = { version = "=1.4", features = ["js", "serde", "v4"] }
152155
which = { version = "=4.4" }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "dash-pipe-connector-storage"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
authors = ["Ho Kim <ho.kim@ulagbulag.io>"]
7+
description = "Kubernetes Is Simple, Stupid which a part of OpenARK"
8+
documentation = "https://docs.rs/kiss-api"
9+
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
10+
readme = "../../README.md"
11+
homepage = "https://github.com/ulagbulag/OpenARK"
12+
repository = "https://github.com/ulagbulag/OpenARK"
13+
14+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
15+
16+
[dependencies]
17+
dash-pipe-provider = { path = "../../provider" }
18+
19+
anyhow = { workspace = true }
20+
async-trait = { workspace = true }
21+
clap = { workspace = true }
22+
futures = { workspace = true }
23+
serde = { workspace = true }
24+
tokio = { workspace = true, features = ["time"] }
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::{bail, Result};
4+
use async_trait::async_trait;
5+
use clap::{ArgAction, Parser};
6+
use dash_pipe_provider::{
7+
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet, StorageType,
8+
Stream,
9+
};
10+
use futures::StreamExt;
11+
use serde::{Deserialize, Serialize};
12+
13+
fn main() {
14+
PipeArgs::<Function>::from_env().loop_forever()
15+
}
16+
17+
#[derive(Clone, Debug, Serialize, Deserialize, Parser)]
18+
pub struct FunctionArgs {
19+
#[arg(long, env = "PIPE_PERSISTENCE", action = ArgAction::SetTrue)]
20+
#[serde(default)]
21+
persistence: Option<bool>,
22+
}
23+
24+
pub struct Function {
25+
ctx: FunctionContext,
26+
items: Stream,
27+
}
28+
29+
#[async_trait(?Send)]
30+
impl ::dash_pipe_provider::Function for Function {
31+
type Args = FunctionArgs;
32+
type Input = ();
33+
type Output = usize;
34+
35+
async fn try_new(
36+
args: &<Self as ::dash_pipe_provider::Function>::Args,
37+
ctx: &mut FunctionContext,
38+
storage: &Arc<StorageSet>,
39+
) -> Result<Self> {
40+
let storage_type = match args.persistence {
41+
Some(true) => StorageType::PERSISTENT,
42+
Some(false) | None => StorageType::TEMPORARY,
43+
};
44+
45+
Ok(Self {
46+
ctx: ctx.clone(),
47+
items: storage.get(storage_type).list().await?,
48+
})
49+
}
50+
51+
async fn tick(
52+
&mut self,
53+
_inputs: PipeMessages<<Self as ::dash_pipe_provider::Function>::Input>,
54+
) -> Result<PipeMessages<<Self as ::dash_pipe_provider::Function>::Output>> {
55+
match self.items.next().await {
56+
// TODO: stream이 JSON 메타데이터를 포함한 PipeMessage Object를 배출
57+
Some(Ok((path, value))) => Ok(PipeMessages::Single(PipeMessage {
58+
payloads: vec![PipePayload::new(path.to_string(), value)],
59+
value: Default::default(),
60+
})),
61+
Some(Err(error)) => bail!("failed to load data: {error}"),
62+
None => self.ctx.terminate_ok(),
63+
}
64+
}
65+
}

dash/pipe/connectors/webcam/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,3 @@ image = { workspace = true, features = ["png"] }
2323
opencv = { workspace = true, features = ["imgcodecs", "videoio"] }
2424
serde = { workspace = true }
2525
tokio = { workspace = true, features = ["time"] }
26-
tracing = { workspace = true }

dash/pipe/connectors/webcam/src/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
use std::time::Duration;
1+
use std::sync::Arc;
22

33
use anyhow::{anyhow, bail, Result};
44
use async_trait::async_trait;
55
use clap::{Parser, ValueEnum};
6-
use dash_pipe_provider::{PipeArgs, PipeMessage, PipeMessages, PipePayload};
6+
use dash_pipe_provider::{
7+
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet,
8+
};
79
use image::{codecs, RgbImage};
810
use opencv::{
911
core::{Mat, MatTraitConst, MatTraitConstManual, Vec3b, Vector},
1012
imgcodecs,
1113
videoio::{self, VideoCapture, VideoCaptureTrait, VideoCaptureTraitConst},
1214
};
1315
use serde::{Deserialize, Serialize};
14-
use tokio::time::sleep;
15-
use tracing::error;
1616

1717
fn main() {
1818
PipeArgs::<Function>::from_env().loop_forever()
@@ -69,6 +69,7 @@ impl CameraEncoder {
6969
pub struct Function {
7070
camera_encoder: CameraEncoder,
7171
capture: VideoCapture,
72+
ctx: FunctionContext,
7273
frame: Mat,
7374
frame_counter: FrameCounter,
7475
frame_size: FrameSize,
@@ -81,7 +82,11 @@ impl ::dash_pipe_provider::Function for Function {
8182
type Input = ();
8283
type Output = usize;
8384

84-
async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
85+
async fn try_new(
86+
args: &<Self as ::dash_pipe_provider::Function>::Args,
87+
ctx: &mut FunctionContext,
88+
_storage: &Arc<StorageSet>,
89+
) -> Result<Self> {
8590
let FunctionArgs {
8691
camera_device,
8792
camera_encoder,
@@ -96,6 +101,7 @@ impl ::dash_pipe_provider::Function for Function {
96101
Ok(Self {
97102
camera_encoder,
98103
capture,
104+
ctx: ctx.clone(),
99105
frame: Default::default(),
100106
frame_counter: Default::default(),
101107
frame_size: Default::default(),
@@ -158,9 +164,9 @@ impl ::dash_pipe_provider::Function for Function {
158164
}
159165
}
160166
Ok(false) => {
161-
error!("video capture is disconnected!");
162-
sleep(Duration::from_millis(u64::MAX)).await;
163-
return Ok(PipeMessages::None);
167+
return self
168+
.ctx
169+
.terminate_err(anyhow!("video capture is disconnected!"))
164170
}
165171
Err(error) => bail!("failed to capture a frame: {error}"),
166172
};

dash/pipe/functions/identity/src/main.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
use std::sync::Arc;
2+
13
use anyhow::Result;
24
use async_trait::async_trait;
35
use clap::{ArgAction, Parser};
4-
use dash_pipe_provider::{PipeArgs, PipeMessage, PipeMessages, PipePayload};
6+
use dash_pipe_provider::{
7+
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet,
8+
};
59
use serde::{Deserialize, Serialize};
610
use serde_json::Value;
711

@@ -26,7 +30,11 @@ impl ::dash_pipe_provider::Function for Function {
2630
type Input = Value;
2731
type Output = Value;
2832

29-
async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
33+
async fn try_new(
34+
args: &<Self as ::dash_pipe_provider::Function>::Args,
35+
_ctx: &mut FunctionContext,
36+
_storage: &Arc<StorageSet>,
37+
) -> Result<Self> {
3038
Ok(Self { args: args.clone() })
3139
}
3240

dash/pipe/functions/python/src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::path::PathBuf;
1+
use std::{path::PathBuf, sync::Arc};
22

33
use anyhow::{anyhow, Error, Result};
44
use async_trait::async_trait;
55
use clap::Parser;
6-
use dash_pipe_provider::{PipeArgs, PipeMessages, PyPipeMessage};
6+
use dash_pipe_provider::{FunctionContext, PipeArgs, PipeMessages, PyPipeMessage, StorageSet};
77
use pyo3::{types::PyModule, PyObject, Python};
88
use serde::{Deserialize, Serialize};
99
use serde_json::Value;
@@ -29,7 +29,11 @@ impl ::dash_pipe_provider::Function for Function {
2929
type Input = Value;
3030
type Output = Value;
3131

32-
async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
32+
async fn try_new(
33+
args: &<Self as ::dash_pipe_provider::Function>::Args,
34+
_ctx: &mut FunctionContext,
35+
_storage: &Arc<StorageSet>,
36+
) -> Result<Self> {
3337
let FunctionArgs {
3438
python_script: file_path,
3539
} = args;

dash/pipe/provider/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ s3 = ["minio"]
2525
ark-core = { path = "../../../ark/core" }
2626

2727
anyhow = { workspace = true }
28+
async-stream = { workspace = true }
2829
async-trait = { workspace = true }
2930
bytes = { workspace = true }
3031
clap = { workspace = true }
32+
ctrlc = { workspace = true }
3133
deltalake = { workspace = true }
3234
futures = { workspace = true }
3335
minio = { workspace = true, optional = true }
@@ -36,5 +38,6 @@ pyo3 = { workspace = true, optional = true }
3638
serde = { workspace = true, features = ["derive"] }
3739
serde_json = { workspace = true }
3840
tokio = { workspace = true, features = ["full"] }
41+
tokio-stream = { workspace = true }
3942
tracing = { workspace = true }
4043
url = { workspace = true }

dash/pipe/provider/src/function.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
1-
use std::fmt;
1+
use std::{
2+
fmt,
3+
sync::{
4+
atomic::{AtomicBool, Ordering},
5+
Arc,
6+
},
7+
};
28

3-
use anyhow::Result;
9+
use anyhow::{anyhow, Error, Result};
410
use async_trait::async_trait;
511
use clap::Args;
612
use serde::{de::DeserializeOwned, Serialize};
713

8-
use crate::PipeMessages;
14+
use crate::{PipeMessages, StorageSet};
915

1016
#[async_trait(?Send)]
1117
pub trait Function {
1218
type Args: Clone + fmt::Debug + Serialize + DeserializeOwned + Args;
1319
type Input: 'static + Send + Sync + DeserializeOwned;
1420
type Output: 'static + Send + Serialize;
1521

16-
async fn try_new(args: &<Self as Function>::Args) -> Result<Self>
22+
async fn try_new(
23+
args: &<Self as Function>::Args,
24+
ctx: &mut FunctionContext,
25+
storage: &Arc<StorageSet>,
26+
) -> Result<Self>
1727
where
1828
Self: Sized;
1929

@@ -22,3 +32,33 @@ pub trait Function {
2232
inputs: PipeMessages<<Self as Function>::Input>,
2333
) -> Result<PipeMessages<<Self as Function>::Output>>;
2434
}
35+
36+
#[derive(Clone, Debug, Default)]
37+
pub struct FunctionContext {
38+
is_terminating: Arc<AtomicBool>,
39+
}
40+
41+
impl FunctionContext {
42+
pub(crate) fn trap_on_sigint(self) -> Result<()> {
43+
::ctrlc::set_handler(move || self.terminate())
44+
.map_err(|error| anyhow!("failed to set SIGINT handler: {error}"))
45+
}
46+
47+
pub(crate) fn terminate(&self) {
48+
self.is_terminating.store(true, Ordering::SeqCst)
49+
}
50+
51+
pub fn terminate_ok<T>(&self) -> Result<PipeMessages<T>> {
52+
self.terminate();
53+
Ok(PipeMessages::None)
54+
}
55+
56+
pub fn terminate_err<T>(&self, error: impl Into<Error>) -> Result<PipeMessages<T>> {
57+
self.terminate();
58+
Err(error.into())
59+
}
60+
61+
pub(crate) fn is_terminating(&self) -> bool {
62+
self.is_terminating.load(Ordering::SeqCst)
63+
}
64+
}

dash/pipe/provider/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ mod message;
33
mod pipe;
44
mod storage;
55

6-
pub use self::function::Function;
6+
pub use self::function::{Function, FunctionContext};
77
#[cfg(feature = "pyo3")]
88
pub use self::message::PyPipeMessage;
99
pub use self::message::{PipeMessage, PipeMessages, PipePayload};
1010
pub use self::pipe::PipeArgs;
11+
pub use self::storage::{Storage, StorageSet, StorageType, Stream};

0 commit comments

Comments
 (0)