Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit 9403ad2

Browse files
committed
Rename pipe crd into function
1 parent 978b85f commit 9403ad2

File tree

32 files changed

+409
-198
lines changed

32 files changed

+409
-198
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ members = [
1515
"dash/pipe/connectors/storage",
1616
"dash/pipe/connectors/webcam", # exclude(alpine)
1717
"dash/pipe/functions/ai", # exclude(alpine)
18+
"dash/pipe/functions/ai/plugin",
1819
"dash/pipe/functions/identity",
1920
"dash/pipe/functions/performance-test",
2021
"dash/pipe/functions/python", # exclude(alpine)

ark/core/k8s/src/data.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ impl FromStr for Name {
2323
type Err = Error;
2424

2525
fn from_str(name: &str) -> Result<Self, <Self as FromStr>::Err> {
26-
let re = Regex::new(r"^[a-z][a-z0-9_-]*[a-z0-9]?$")?;
26+
let field = Self::RE_FIELD;
27+
let fields = format!(r"^{field}(\.{field})*$");
28+
let re = Regex::new(&fields)?;
2729
if re.is_match(name) {
2830
Ok(Self(name.into()))
2931
} else {
@@ -38,13 +40,6 @@ impl From<Name> for String {
3840
}
3941
}
4042

41-
// #[cfg(feature = "nats")]
42-
// impl From<Name> for ::async_nats::Subject {
43-
// fn from(value: Name) -> Self {
44-
// value.0.into()
45-
// }
46-
// }
47-
4843
impl Borrow<str> for Name {
4944
fn borrow(&self) -> &str {
5045
&self.0
@@ -143,6 +138,14 @@ impl<'de> Deserialize<'de> for Name {
143138
}
144139
}
145140

141+
impl Name {
142+
const RE_FIELD: &str = r"[a-z]([a-z0-9_-]*[a-z0-9])?";
143+
144+
pub fn storage(&self) -> &str {
145+
self.0.split('.').next().unwrap()
146+
}
147+
}
148+
146149
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
147150
#[serde(transparent)]
148151
pub struct Url(pub ::url::Url);
Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
33
use kube::CustomResource;
44
use schemars::JsonSchema;
55
use serde::{Deserialize, Serialize};
6-
use straw_api::pipe::StrawPipe;
6+
use straw_api::function::{StrawFunction, StrawFunctionType};
77
use strum::{Display, EnumString};
88

99
use crate::model::ModelFieldsNativeSpec;
@@ -12,15 +12,15 @@ use crate::model::ModelFieldsNativeSpec;
1212
#[kube(
1313
group = "dash.ulagbulag.io",
1414
version = "v1alpha1",
15-
kind = "Pipe",
16-
struct = "PipeCrd",
17-
status = "PipeStatus",
18-
shortname = "pi",
15+
kind = "Function",
16+
struct = "FunctionCrd",
17+
status = "FunctionStatus",
18+
shortname = "f",
1919
namespaced,
2020
printcolumn = r#"{
2121
"name": "state",
2222
"type": "string",
23-
"description": "state of the pipe",
23+
"description": "state of the function",
2424
"jsonPath": ".status.state"
2525
}"#,
2626
printcolumn = r#"{
@@ -37,32 +37,67 @@ use crate::model::ModelFieldsNativeSpec;
3737
}"#
3838
)]
3939
#[serde(rename_all = "camelCase")]
40-
pub struct PipeSpec<Spec = Name, Exec = PipeExec> {
40+
pub struct FunctionSpec<Spec = Name, Exec = FunctionExec> {
4141
pub input: Spec,
4242
pub output: Spec,
4343
#[serde(default, flatten)]
4444
pub exec: Exec,
45+
#[serde(rename = "type")]
46+
pub type_: StrawFunctionType,
47+
pub volatility: FunctionVolatility,
4548
}
4649

4750
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
4851
#[serde(rename_all = "camelCase")]
49-
pub enum PipeExec {
52+
pub enum FunctionExec {
5053
Placeholder {},
51-
Straw(StrawPipe),
54+
Straw(StrawFunction),
5255
}
5356

54-
impl Default for PipeExec {
57+
impl Default for FunctionExec {
5558
fn default() -> Self {
5659
Self::Placeholder {}
5760
}
5861
}
5962

63+
#[derive(
64+
Copy,
65+
Clone,
66+
Debug,
67+
Display,
68+
EnumString,
69+
PartialEq,
70+
Eq,
71+
PartialOrd,
72+
Ord,
73+
Hash,
74+
Serialize,
75+
Deserialize,
76+
JsonSchema,
77+
)]
78+
pub enum FunctionVolatility {
79+
/// Immutable function.
80+
/// If the same input value is received during execution of the function,
81+
/// the same output value is always returned.
82+
Immutable,
83+
/// Stable function.
84+
/// If the same input value is received during execution of the function,
85+
/// the output value may change.
86+
/// However, if the same input value is input with the same timestamp,
87+
/// the same output value is always returned.
88+
Stable,
89+
/// Volatile function.
90+
/// This can produce different results every time
91+
/// even if the same input value is input.
92+
Volatile,
93+
}
94+
6095
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
6196
#[serde(rename_all = "camelCase")]
62-
pub struct PipeStatus {
97+
pub struct FunctionStatus {
6398
#[serde(default)]
64-
pub state: PipeState,
65-
pub spec: Option<PipeSpec<ModelFieldsNativeSpec>>,
99+
pub state: FunctionState,
100+
pub spec: Option<FunctionSpec<ModelFieldsNativeSpec>>,
66101
pub last_updated: DateTime<Utc>,
67102
}
68103

@@ -82,7 +117,7 @@ pub struct PipeStatus {
82117
Deserialize,
83118
JsonSchema,
84119
)]
85-
pub enum PipeState {
120+
pub enum FunctionState {
86121
#[default]
87122
Pending,
88123
Ready,

dash/api/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
pub mod function;
12
pub mod job;
23
pub mod model;
34
pub mod model_storage_binding;
4-
pub mod pipe;
55
pub mod storage;
66
pub mod task;
77

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use ark_core_k8s::manager::Manager;
55
use async_trait::async_trait;
66
use chrono::Utc;
77
use dash_api::{
8+
function::{FunctionCrd, FunctionSpec, FunctionState, FunctionStatus},
89
model::ModelFieldsNativeSpec,
9-
pipe::{PipeCrd, PipeSpec, PipeState, PipeStatus},
1010
};
1111
use kube::{
1212
api::{Patch, PatchParams},
@@ -16,14 +16,14 @@ use kube::{
1616
use serde_json::json;
1717
use tracing::{info, warn};
1818

19-
use crate::validator::pipe::PipeValidator;
19+
use crate::validator::function::FunctionValidator;
2020

2121
#[derive(Default)]
2222
pub struct Ctx {}
2323

2424
#[async_trait]
2525
impl ::ark_core_k8s::manager::Ctx for Ctx {
26-
type Data = PipeCrd;
26+
type Data = FunctionCrd;
2727

2828
const NAME: &'static str = crate::consts::NAME;
2929
const NAMESPACE: &'static str = ::dash_api::consts::NAMESPACE;
@@ -45,24 +45,24 @@ impl ::ark_core_k8s::manager::Ctx for Ctx {
4545
.map(|status| status.state)
4646
.unwrap_or_default()
4747
{
48-
PipeState::Pending => {
49-
let validator = PipeValidator {
48+
FunctionState::Pending => {
49+
let validator = FunctionValidator {
5050
namespace: &namespace,
5151
kube: &manager.kube,
5252
};
53-
match validator.validate_pipe(data.spec.clone()).await {
53+
match validator.validate_function(data.spec.clone()).await {
5454
Ok(spec) => {
5555
Self::update_spec_or_requeue(&namespace, &manager.kube, &name, spec).await
5656
}
5757
Err(e) => {
58-
warn!("failed to validate pipe: {name:?}: {e}");
58+
warn!("failed to validate function: {name:?}: {e}");
5959
Ok(Action::requeue(
6060
<Self as ::ark_core_k8s::manager::Ctx>::FALLBACK,
6161
))
6262
}
6363
}
6464
}
65-
PipeState::Ready => {
65+
FunctionState::Ready => {
6666
// TODO: implement to finding changes
6767
Ok(Action::await_change())
6868
}
@@ -75,17 +75,17 @@ impl Ctx {
7575
namespace: &str,
7676
kube: &Client,
7777
name: &str,
78-
spec: PipeSpec<ModelFieldsNativeSpec>,
78+
spec: FunctionSpec<ModelFieldsNativeSpec>,
7979
) -> Result<Action, Error> {
8080
match Self::update_spec(namespace, kube, name, spec).await {
8181
Ok(()) => {
82-
info!("pipe is ready: {namespace}/{name}");
82+
info!("function is ready: {namespace}/{name}");
8383
Ok(Action::requeue(
8484
<Self as ::ark_core_k8s::manager::Ctx>::FALLBACK,
8585
))
8686
}
8787
Err(e) => {
88-
warn!("failed to update pipe state ({namespace}/{name}): {e}");
88+
warn!("failed to update function state ({namespace}/{name}): {e}");
8989
Ok(Action::requeue(
9090
<Self as ::ark_core_k8s::manager::Ctx>::FALLBACK,
9191
))
@@ -97,7 +97,7 @@ impl Ctx {
9797
namespace: &str,
9898
kube: &Client,
9999
name: &str,
100-
spec: PipeSpec<ModelFieldsNativeSpec>,
100+
spec: FunctionSpec<ModelFieldsNativeSpec>,
101101
) -> Result<()> {
102102
let api = Api::<<Self as ::ark_core_k8s::manager::Ctx>::Data>::namespaced(
103103
kube.clone(),
@@ -108,8 +108,8 @@ impl Ctx {
108108
let patch = Patch::Merge(json!({
109109
"apiVersion": crd.api_version,
110110
"kind": crd.kind,
111-
"status": PipeStatus {
112-
state: PipeState::Ready,
111+
"status": FunctionStatus {
112+
state: FunctionState::Ready,
113113
spec: Some(spec),
114114
last_updated: Utc::now(),
115115
},

dash/controller/src/ctx/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
pub mod function;
12
pub mod job;
23
pub mod model;
34
pub mod model_storage_binding;
4-
pub mod pipe;
55
pub mod storage;
66
pub mod task;

dash/controller/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ pub(crate) mod consts {
1111
#[tokio::main]
1212
async fn main() {
1313
join!(
14-
self::ctx::task::Ctx::spawn_crd(),
14+
self::ctx::function::Ctx::spawn_crd(),
1515
self::ctx::job::Ctx::spawn_crd(),
1616
self::ctx::model::Ctx::spawn_crd(),
1717
self::ctx::model_storage_binding::Ctx::spawn_crd(),
18-
self::ctx::pipe::Ctx::spawn_crd(),
1918
self::ctx::storage::Ctx::spawn_crd(),
19+
self::ctx::task::Ctx::spawn_crd(),
2020
);
2121
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use anyhow::Result;
2+
use ark_core_k8s::data::Name;
3+
use dash_api::{
4+
function::{FunctionExec, FunctionSpec},
5+
model::{
6+
ModelFieldKindExtendedSpec, ModelFieldKindSpec, ModelFieldSpec, ModelFieldsNativeSpec,
7+
},
8+
};
9+
use dash_provider::storage::KubernetesStorageClient;
10+
use kube::Client;
11+
use straw_api::{
12+
function::{StrawFunction, StrawFunctionType},
13+
plugin::PluginContext,
14+
};
15+
use straw_provider::StrawSession;
16+
17+
use super::model::ModelValidator;
18+
19+
pub struct FunctionValidator<'namespace, 'kube> {
20+
pub namespace: &'namespace str,
21+
pub kube: &'kube Client,
22+
}
23+
24+
impl<'namespace, 'kube> FunctionValidator<'namespace, 'kube> {
25+
pub async fn validate_function(
26+
&self,
27+
spec: FunctionSpec,
28+
) -> Result<FunctionSpec<ModelFieldsNativeSpec>> {
29+
let FunctionSpec {
30+
input,
31+
output,
32+
exec,
33+
type_,
34+
volatility,
35+
} = spec;
36+
37+
let models = Models { input, output };
38+
39+
let model_validator = ModelValidator {
40+
kubernetes_storage: KubernetesStorageClient {
41+
namespace: self.namespace,
42+
kube: self.kube,
43+
},
44+
};
45+
let validate_model = |name| async {
46+
model_validator
47+
.validate_fields(vec![ModelFieldSpec {
48+
name: "/".into(),
49+
kind: ModelFieldKindSpec::Extended(ModelFieldKindExtendedSpec::Model { name }),
50+
attribute: Default::default(),
51+
}])
52+
.await
53+
};
54+
55+
Ok(FunctionSpec {
56+
input: validate_model(models.input.clone().into()).await?,
57+
output: validate_model(models.output.clone().into()).await?,
58+
exec: self.validate_exec(type_, exec, models).await?,
59+
type_,
60+
volatility,
61+
})
62+
}
63+
64+
async fn validate_exec(
65+
&self,
66+
type_: StrawFunctionType,
67+
exec: FunctionExec,
68+
models: Models,
69+
) -> Result<FunctionExec> {
70+
match exec {
71+
FunctionExec::Placeholder {} => Ok(exec),
72+
FunctionExec::Straw(exec) => self
73+
.validate_exec_straw(type_, exec, models)
74+
.await
75+
.map(FunctionExec::Straw),
76+
}
77+
}
78+
79+
async fn validate_exec_straw(
80+
&self,
81+
type_: StrawFunctionType,
82+
function: StrawFunction,
83+
models: Models,
84+
) -> Result<StrawFunction> {
85+
if type_ == StrawFunctionType::Pipe {
86+
self.validate_model_storage_binding(&models).await?;
87+
}
88+
89+
let ctx = PluginContext::new(type_, Some(models.input), Some(models.output));
90+
let session = StrawSession::new(self.kube.clone(), Some(self.namespace.into()));
91+
session.create(&ctx, &function).await.map(|()| function)
92+
}
93+
94+
async fn validate_model_storage_binding(&self, models: &Models) -> Result<()> {
95+
let client = KubernetesStorageClient {
96+
namespace: self.namespace,
97+
kube: self.kube,
98+
};
99+
client.ensure_model_storage_binding(&models.input).await?;
100+
client.ensure_model_storage_binding(&models.output).await?;
101+
Ok(())
102+
}
103+
}
104+
105+
struct Models {
106+
input: Name,
107+
output: Name,
108+
}

0 commit comments

Comments
 (0)