Skip to content

Commit 7f3ab83

Browse files
committed
feat(mgmt): add k8s-less mode
Add a k8s-less client that watches the config directory and uses the config client to request the config processor to apply a new config. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
1 parent ba0aa73 commit 7f3ab83

File tree

6 files changed

+89
-1
lines changed

6 files changed

+89
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dataplane/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ fn main() {
158158
/* start management */
159159
start_mgmt(MgmtParams {
160160
grpc_addr,
161+
config_dir: args.config_dir().cloned(),
161162
hostname: get_gw_name().unwrap_or_else(|| unreachable!()).to_owned(),
162163
processor_params: ConfigProcessorParams {
163164
router_ctl: setup.router.get_ctl_tx(),

mgmt/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ gateway_config = { workspace = true }
2424
id = { workspace = true }
2525
interface-manager = { workspace = true }
2626
k8s-intf = { workspace = true }
27+
k8s-less = { workspace = true }
2728
lpm = { workspace = true }
2829
nat = { workspace = true }
2930
net = { workspace = true }
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Open Network Fabric Authors
3+
4+
use config::{ExternalConfig, GwConfig};
5+
use futures::TryFutureExt;
6+
use k8s_less::kubeless_watch_gateway_agent_crd;
7+
use std::sync::Arc;
8+
use tracing::{error, info};
9+
10+
use crate::processor::mgmt_client::{ConfigClient, ConfigProcessorError};
11+
12+
#[derive(Debug, thiserror::Error)]
13+
pub enum K8sLessError {
14+
#[error("K8sless exited early")]
15+
EarlyTermination,
16+
#[error("Watching error: {0}")]
17+
WatchError(String),
18+
}
19+
20+
pub struct K8sLess {
21+
pathdir: String,
22+
client: ConfigClient,
23+
}
24+
25+
impl K8sLess {
26+
pub fn new(pathdir: &str, client: ConfigClient) -> Self {
27+
Self {
28+
pathdir: pathdir.to_string(),
29+
client,
30+
}
31+
}
32+
33+
pub async fn start_config_watch(k8sless: Arc<Self>) -> Result<(), K8sLessError> {
34+
info!("Starting config watcher for directory {}", k8sless.pathdir);
35+
36+
kubeless_watch_gateway_agent_crd(&k8sless.pathdir.clone(), async move |ga| {
37+
info!("Attempting to deserialize new gateway CRD ...");
38+
39+
let external_config = ExternalConfig::try_from(ga);
40+
match external_config {
41+
Err(e) => error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"),
42+
Ok(external_config) => {
43+
let genid = external_config.genid;
44+
let applied_genid = match k8sless.client.get_generation().await {
45+
Ok(genid) => genid,
46+
Err(ConfigProcessorError::NoConfigApplied) => 0,
47+
Err(e) => {
48+
error!("Failed to get current config generation: {e}");
49+
return;
50+
}
51+
};
52+
info!("Current configuration is {applied_genid}");
53+
54+
let gwconfig = GwConfig::new(external_config);
55+
56+
// request the config processor to apply the config and update status on success
57+
match k8sless.client.apply_config(gwconfig).await {
58+
Ok(()) => info!("Config for generation {genid} was successfully applied"),
59+
Err(e) => error!("Failed to apply the config for generation {genid}: {e}"),
60+
}
61+
}
62+
}
63+
})
64+
.map_err(K8sLessError::WatchError)
65+
.await?;
66+
67+
Err(K8sLessError::EarlyTermination)
68+
}
69+
}

mgmt/src/processor/launch.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Copyright Open Network Fabric Authors
33

44
use crate::processor::k8s_client::{K8sClient, K8sClientError};
5+
use crate::processor::k8s_less_client::{K8sLess, K8sLessError};
56
use crate::processor::proc::ConfigProcessor;
67

78
use std::fmt::Display;
@@ -47,6 +48,9 @@ pub enum LaunchError {
4748
ProcessorError(std::io::Error),
4849
#[error("Error starting/waiting for Config Processor task: {0}")]
4950
ProcessorJoinError(tokio::task::JoinError),
51+
52+
#[error("Error in k8s-less mode: {0}")]
53+
K8LessError(#[from] K8sLessError),
5054
}
5155

5256
/// Start the gRPC server on TCP
@@ -181,6 +185,7 @@ impl Display for ServerAddress {
181185

182186
pub struct MgmtParams {
183187
pub grpc_addr: Option<GrpcAddress>,
188+
pub config_dir: Option<String>,
184189
pub hostname: String,
185190
pub processor_params: ConfigProcessorParams,
186191
}
@@ -229,7 +234,17 @@ pub fn start_mgmt(
229234
Err(LaunchError::PrematureGrpcExit)
230235
}
231236
})
232-
} else {
237+
} else if let Some(config_dir) = &params.config_dir {
238+
warn!("Running in k8s-less mode....");
239+
rt.block_on(async {
240+
let (processor, client) = ConfigProcessor::new(params.processor_params);
241+
let k8sless = Arc::new(K8sLess::new(config_dir, client));
242+
tokio::spawn(async { processor.run().await });
243+
K8sLess::start_config_watch(k8sless).await
244+
})?;
245+
Ok(())
246+
}
247+
else {
233248
debug!("Will start watching k8s for configuration changes");
234249
rt.block_on(async {
235250
let (processor, client) = ConfigProcessor::new(params.processor_params);

mgmt/src/processor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub(crate) mod confbuild;
88
mod display;
99
pub(crate) mod gwconfigdb;
1010
pub(crate) mod k8s_client;
11+
pub(crate) mod k8s_less_client;
1112
pub(crate) mod launch;
1213
pub(crate) mod mgmt_client;
1314
pub(crate) mod proc;

0 commit comments

Comments
 (0)