Skip to content

Commit 2022886

Browse files
authored
feat(k8s): Improve resource tuning of K8S (#345)
1 parent c87f58b commit 2022886

File tree

8 files changed

+237
-104
lines changed

8 files changed

+237
-104
lines changed

etl-api/src/k8s/base.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use async_trait::async_trait;
2+
use k8s_openapi::api::core::v1::ConfigMap;
3+
use std::collections::BTreeMap;
4+
use thiserror::Error;
5+
6+
/// Errors emitted by the Kubernetes integration.
7+
///
8+
/// Variants wrap lower-level libraries where appropriate to preserve context.
9+
#[derive(Debug, Error)]
10+
pub enum K8sError {
11+
/// A serialization or deserialization error while building or parsing
12+
/// Kubernetes resources.
13+
#[error("An error occurred in serde: {0}")]
14+
Serde(#[from] serde_json::error::Error),
15+
/// An error returned by the [`kube`] client when talking to the API
16+
/// server.
17+
#[error("An error occurred with kube: {0}")]
18+
Kube(#[from] kube::Error),
19+
/// The environment-dependent replicator configuration could not be
20+
/// determined.
21+
#[error("An error occurred while configuring the replicator")]
22+
ReplicatorConfiguration,
23+
}
24+
25+
/// A simplified view of a pod phase.
26+
///
27+
/// This mirrors the string phases reported by Kubernetes but only tracks the
28+
/// states needed by the API. Unknown values map to [`PodPhase::Unknown`].
29+
pub enum PodPhase {
30+
Pending,
31+
Running,
32+
Succeeded,
33+
Failed,
34+
Unknown,
35+
}
36+
37+
impl From<&str> for PodPhase {
38+
/// Converts a Kubernetes pod phase string into a [`PodPhase`].
39+
///
40+
/// Unrecognized values result in [`PodPhase::Unknown`].
41+
fn from(value: &str) -> Self {
42+
match value {
43+
"Pending" => PodPhase::Pending,
44+
"Running" => PodPhase::Running,
45+
"Succeeded" => PodPhase::Succeeded,
46+
"Failed" => PodPhase::Failed,
47+
_ => PodPhase::Unknown,
48+
}
49+
}
50+
}
51+
52+
/// Client interface describing the Kubernetes operations used by the API.
53+
///
54+
/// Implementations are expected to be idempotent where possible by issuing
55+
/// server-side apply patches for create-or-update behaviors.
56+
#[async_trait]
57+
pub trait K8sClient: Send + Sync {
58+
/// Creates or updates the Postgres password secret for a replicator.
59+
///
60+
/// The secret name is derived from `prefix` and is stored in the
61+
/// data-plane namespace.
62+
async fn create_or_update_postgres_secret(
63+
&self,
64+
prefix: &str,
65+
postgres_password: &str,
66+
) -> Result<(), K8sError>;
67+
68+
/// Creates or updates the BigQuery service account secret for a
69+
/// replicator.
70+
///
71+
/// The secret name is derived from `prefix` and is stored in the
72+
/// data-plane namespace.
73+
async fn create_or_update_bq_secret(
74+
&self,
75+
prefix: &str,
76+
bq_service_account_key: &str,
77+
) -> Result<(), K8sError>;
78+
79+
/// Deletes the Postgres password secret for a replicator if it exists.
80+
async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>;
81+
82+
/// Deletes the BigQuery service account secret for a replicator if it
83+
/// exists.
84+
async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>;
85+
86+
/// Retrieves a named [`ConfigMap`].
87+
async fn get_config_map(&self, config_map_name: &str) -> Result<ConfigMap, K8sError>;
88+
89+
/// Creates or updates the replicator configuration [`ConfigMap`].
90+
///
91+
/// The config map stores two YAML documents: a base and a production
92+
/// override.
93+
async fn create_or_update_config_map(
94+
&self,
95+
prefix: &str,
96+
base_config: &str,
97+
prod_config: &str,
98+
) -> Result<(), K8sError>;
99+
100+
/// Deletes the replicator configuration [`ConfigMap`] if it exists.
101+
async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>;
102+
103+
/// Creates or updates the replicator [`StatefulSet`].
104+
///
105+
/// The set references previously created secrets and config maps. Optional
106+
/// `template_annotations` may be used to trigger a rolling restart.
107+
async fn create_or_update_stateful_set(
108+
&self,
109+
prefix: &str,
110+
replicator_image: &str,
111+
template_annotations: Option<BTreeMap<String, String>>,
112+
) -> Result<(), K8sError>;
113+
114+
/// Deletes the replicator [`StatefulSet`] if it exists.
115+
async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;
116+
117+
/// Returns the phase of the replicator pod.
118+
async fn get_pod_phase(&self, prefix: &str) -> Result<PodPhase, K8sError>;
119+
120+
/// Reports whether the replicator container terminated with a non-zero exit
121+
/// code.
122+
async fn has_replicator_container_error(&self, prefix: &str) -> Result<bool, K8sError>;
123+
124+
/// Deletes the replicator pod if it exists.
125+
async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>;
126+
}

etl-api/src/k8s_client.rs renamed to etl-api/src/k8s/http.rs

Lines changed: 83 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,124 +1,114 @@
1+
use crate::k8s::{K8sClient, K8sError, PodPhase};
12
use async_trait::async_trait;
23
use base64::{Engine, prelude::BASE64_STANDARD};
4+
use etl_config::Environment;
35
use k8s_openapi::api::{
46
apps::v1::StatefulSet,
57
core::v1::{ConfigMap, Pod, Secret},
68
};
7-
use serde_json::json;
8-
use std::collections::BTreeMap;
9-
use thiserror::Error;
10-
use tracing::info;
11-
129
use kube::{
1310
Client,
1411
api::{Api, DeleteParams, Patch, PatchParams},
1512
};
13+
use serde_json::json;
14+
use std::collections::BTreeMap;
15+
use tracing::info;
1616

17-
#[derive(Debug, Error)]
18-
pub enum K8sError {
19-
#[error("serde_json error: {0}")]
20-
Serde(#[from] serde_json::error::Error),
21-
22-
#[error("kube error: {0}")]
23-
Kube(#[from] kube::Error),
24-
}
25-
26-
pub enum PodPhase {
27-
Pending,
28-
Running,
29-
Succeeded,
30-
Failed,
31-
Unknown,
32-
}
33-
34-
impl From<&str> for PodPhase {
35-
fn from(value: &str) -> Self {
36-
match value {
37-
"Pending" => PodPhase::Pending,
38-
"Running" => PodPhase::Running,
39-
"Succeeded" => PodPhase::Succeeded,
40-
"Failed" => PodPhase::Failed,
41-
_ => PodPhase::Unknown,
42-
}
43-
}
44-
}
45-
46-
#[async_trait]
47-
pub trait K8sClient: Send + Sync {
48-
async fn create_or_update_postgres_secret(
49-
&self,
50-
prefix: &str,
51-
postgres_password: &str,
52-
) -> Result<(), K8sError>;
53-
54-
async fn create_or_update_bq_secret(
55-
&self,
56-
prefix: &str,
57-
bq_service_account_key: &str,
58-
) -> Result<(), K8sError>;
59-
60-
async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>;
61-
62-
async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>;
63-
64-
async fn get_config_map(&self, config_map_name: &str) -> Result<ConfigMap, K8sError>;
65-
66-
async fn create_or_update_config_map(
67-
&self,
68-
prefix: &str,
69-
base_config: &str,
70-
prod_config: &str,
71-
) -> Result<(), K8sError>;
72-
73-
async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>;
74-
75-
async fn create_or_update_stateful_set(
76-
&self,
77-
prefix: &str,
78-
replicator_image: &str,
79-
template_annotations: Option<BTreeMap<String, String>>,
80-
) -> Result<(), K8sError>;
81-
82-
async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;
83-
84-
async fn get_pod_phase(&self, prefix: &str) -> Result<PodPhase, K8sError>;
85-
86-
async fn has_replicator_container_error(&self, prefix: &str) -> Result<bool, K8sError>;
87-
88-
async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>;
89-
}
90-
91-
#[derive(Debug)]
92-
pub struct HttpK8sClient {
93-
secrets_api: Api<Secret>,
94-
config_maps_api: Api<ConfigMap>,
95-
stateful_sets_api: Api<StatefulSet>,
96-
pods_api: Api<Pod>,
97-
}
98-
17+
/// Secret name suffix for the BigQuery service account key.
9918
const BQ_SECRET_NAME_SUFFIX: &str = "bq-service-account-key";
19+
/// Secret name suffix for the Postgres password.
10020
const POSTGRES_SECRET_NAME_SUFFIX: &str = "postgres-password";
21+
/// ConfigMap name suffix for the replicator configuration files.
10122
const REPLICATOR_CONFIG_MAP_NAME_SUFFIX: &str = "replicator-config";
23+
/// StatefulSet name suffix for the replicator workload.
10224
const REPLICATOR_STATEFUL_SET_SUFFIX: &str = "replicator-stateful-set";
25+
/// Application label suffix used to group resources.
10326
const REPLICATOR_APP_SUFFIX: &str = "replicator-app";
27+
/// Container name suffix for the replicator container.
10428
const REPLICATOR_CONTAINER_NAME_SUFFIX: &str = "replicator";
29+
/// Container name suffix for the Vector sidecar.
10530
const VECTOR_CONTAINER_NAME_SUFFIX: &str = "vector";
31+
/// Namespace where data-plane resources are created.
10632
const DATA_PLANE_NAMESPACE: &str = "etl-data-plane";
33+
/// Secret storing the Logflare API key.
10734
const LOGFLARE_SECRET_NAME: &str = "replicator-logflare-api-key";
35+
/// Docker image used for the Vector sidecar.
10836
const VECTOR_IMAGE_NAME: &str = "timberio/vector:0.46.1-distroless-libc";
37+
/// ConfigMap name containing the Vector configuration.
10938
const VECTOR_CONFIG_MAP_NAME: &str = "replicator-vector-config";
39+
/// Volume name for the replicator config file.
11040
const REPLICATOR_CONFIG_FILE_VOLUME_NAME: &str = "replicator-config-file";
41+
/// Volume name for the Vector config file.
11142
const VECTOR_CONFIG_FILE_VOLUME_NAME: &str = "vector-config-file";
43+
/// Secret storing the Sentry DSN.
11244
const SENTRY_DSN_SECRET_NAME: &str = "replicator-sentry-dsn";
45+
/// EmptyDir volume name used to share logs.
11346
const LOGS_VOLUME_NAME: &str = "logs";
47+
/// ConfigMap name providing trusted root certificates.
11448
pub const TRUSTED_ROOT_CERT_CONFIG_MAP_NAME: &str = "trusted-root-certs-config";
49+
/// Key inside the trusted root certificates ConfigMap.
11550
pub const TRUSTED_ROOT_CERT_KEY_NAME: &str = "trusted_root_certs";
51+
/// Environment variable for the Postgres password.
11652
const PG_PASSWORD_ENV_VAR_NAME: &str = "APP_PIPELINE__PG_CONNECTION__PASSWORD";
53+
/// Environment variable for the BigQuery service account key.
11754
const BIG_QUERY_SA_KEY_ENV_VAR_NAME: &str = "APP_DESTINATION__BIG_QUERY__SERVICE_ACCOUNT_KEY";
55+
/// Pod template annotation used to trigger rolling restarts.
11856
pub const RESTARTED_AT_ANNOTATION_KEY: &str = "etl.supabase.com/restarted-at";
57+
/// Label used to identify replicator pods.
11958
const REPLICATOR_APP_LABEL: &str = "etl-replicator-app";
12059

60+
/// Replicator memory limit tuned for `c6in.4xlarge` instances.
61+
const REPLICATOR_MAX_MEMORY_PROD: &str = "500Mi";
62+
/// Replicator CPU limit tuned for `c6in.4xlarge` instances.
63+
const REPLICATOR_MAX_CPU_PROD: &str = "100m";
64+
/// Replicator memory limit tuned for `t3.small` instances.
65+
const REPLICATOR_MAX_MEMORY_STAGING: &str = "100Mi";
66+
/// Replicator CPU limit tuned for `t3.small` instances.
67+
const REPLICATOR_MAX_CPU_STAGING: &str = "100m";
68+
69+
/// Runtime limits derived from the current environment.
70+
struct DynamicReplicatorConfig {
71+
max_memory: &'static str,
72+
max_cpu: &'static str,
73+
}
74+
75+
impl DynamicReplicatorConfig {
76+
/// Loads the runtime limits for the current environment.
77+
fn load() -> Result<Self, K8sError> {
78+
let environment = Environment::load().map_err(|_| K8sError::ReplicatorConfiguration)?;
79+
80+
let config = match environment {
81+
Environment::Prod => Self {
82+
max_memory: REPLICATOR_MAX_MEMORY_PROD,
83+
max_cpu: REPLICATOR_MAX_CPU_PROD,
84+
},
85+
_ => Self {
86+
max_memory: REPLICATOR_MAX_MEMORY_STAGING,
87+
max_cpu: REPLICATOR_MAX_CPU_STAGING,
88+
},
89+
};
90+
91+
Ok(config)
92+
}
93+
}
94+
95+
/// HTTP-based implementation of [`K8sClient`].
96+
///
97+
/// The client is namespaced to the data-plane namespace and uses server-side
98+
/// apply to keep resources in sync.
99+
#[derive(Debug)]
100+
pub struct HttpK8sClient {
101+
secrets_api: Api<Secret>,
102+
config_maps_api: Api<ConfigMap>,
103+
stateful_sets_api: Api<StatefulSet>,
104+
pods_api: Api<Pod>,
105+
}
106+
121107
impl HttpK8sClient {
108+
/// Creates a new [`HttpK8sClient`] using the ambient Kubernetes config.
109+
///
110+
/// Prefers in-cluster configuration and falls back to the local kubeconfig
111+
/// when running outside the cluster.
122112
pub async fn new() -> Result<HttpK8sClient, K8sError> {
123113
let client = Client::try_default().await?;
124114

@@ -320,6 +310,8 @@ impl K8sClient for HttpK8sClient {
320310
let bq_secret_name = format!("{prefix}-{BQ_SECRET_NAME_SUFFIX}");
321311
let replicator_config_map_name = format!("{prefix}-{REPLICATOR_CONFIG_MAP_NAME_SUFFIX}");
322312

313+
let config = DynamicReplicatorConfig::load()?;
314+
323315
let mut stateful_set_json = json!({
324316
"apiVersion": "apps/v1",
325317
"kind": "StatefulSet",
@@ -393,11 +385,12 @@ impl K8sClient for HttpK8sClient {
393385
],
394386
"resources": {
395387
"limits": {
396-
"memory": "200Mi",
388+
"memory": config.max_memory,
389+
"cpu": config.max_cpu,
397390
},
398391
"requests": {
399-
"memory": "200Mi",
400-
"cpu": "100m"
392+
"memory": config.max_memory,
393+
"cpu": config.max_cpu,
401394
}
402395
},
403396
"volumeMounts": [

etl-api/src/k8s/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//! Kubernetes integration for the ETL API.
2+
//!
3+
//! This module contains the abstractions and implementations used by the HTTP
4+
//! API to manage Kubernetes resources required by replicators (secrets, config
5+
//! maps, stateful sets, and pods). Consumers should depend on the trait
6+
//! [`K8sClient`] and avoid relying on a specific transport.
7+
//!
8+
//! The default client, [`http::HttpK8sClient`], is backed by the [`kube`]
9+
//! crate and talks to the cluster using the ambient configuration (in-cluster
10+
//! or local `~/.kube/config`). Keeping the abstraction in [`base`] lets us
11+
//! swap implementations in tests and non-Kubernetes environments.
12+
//!
13+
//! See [`base`] for errors, pod phase mapping, and the client trait.
14+
15+
mod base;
16+
pub mod http;
17+
18+
pub use base::*;

etl-api/src/lib.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
1-
//! ETL API service for managing data replication pipelines.
2-
//!
3-
//! Provides a REST API for configuring and managing ETL pipelines, including tenants,
4-
//! sources, destinations, and replication monitoring. Includes authentication, encryption,
5-
//! Kubernetes integration, and comprehensive OpenAPI documentation.
6-
71
pub mod authentication;
82
pub mod config;
93
pub mod configs;
104
pub mod db;
11-
pub mod k8s_client;
5+
pub mod k8s;
126
pub mod routes;
137
pub mod span_builder;
148
pub mod startup;

0 commit comments

Comments
 (0)