Skip to content

Commit cf65d69

Browse files
authored
ref(k8s): Improve update behavior of pods in the replicator statefulset (supabase#284)
1 parent b78ca30 commit cf65d69

File tree

4 files changed

+48
-11
lines changed

4 files changed

+48
-11
lines changed

etl-api/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ async-trait = { workspace = true }
2323
aws-lc-rs = { workspace = true, features = ["alloc", "aws-lc-sys"] }
2424
base64 = { workspace = true, features = ["std"] }
2525
constant_time_eq = { workspace = true }
26+
chrono = { workspace = true }
2627
k8s-openapi = { workspace = true, features = ["latest"] }
2728
kube = { workspace = true, features = [
2829
"runtime",

etl-api/src/k8s_client.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use k8s_openapi::api::{
55
core::v1::{ConfigMap, Pod, Secret},
66
};
77
use serde_json::json;
8+
use std::collections::BTreeMap;
89
use thiserror::Error;
910
use tracing::info;
1011

@@ -75,6 +76,7 @@ pub trait K8sClient: Send + Sync {
7576
&self,
7677
prefix: &str,
7778
replicator_image: &str,
79+
template_annotations: Option<BTreeMap<String, String>>,
7880
) -> Result<(), K8sError>;
7981

8082
async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;
@@ -113,6 +115,7 @@ pub const TRUSTED_ROOT_CERT_CONFIG_MAP_NAME: &str = "trusted-root-certs-config";
113115
pub const TRUSTED_ROOT_CERT_KEY_NAME: &str = "trusted_root_certs";
114116
const PG_PASSWORD_ENV_VAR_NAME: &str = "APP_PIPELINE__PG_CONNECTION__PASSWORD";
115117
const BIG_QUERY_SA_KEY_ENV_VAR_NAME: &str = "APP_DESTINATION__BIG_QUERY__SERVICE_ACCOUNT_KEY";
118+
pub const RESTARTED_AT_ANNOTATION_KEY: &str = "etl.supabase.com/restarted-at";
116119

117120
impl HttpK8sClient {
118121
pub async fn new() -> Result<HttpK8sClient, K8sError> {
@@ -304,6 +307,7 @@ impl K8sClient for HttpK8sClient {
304307
&self,
305308
prefix: &str,
306309
replicator_image: &str,
310+
template_annotations: Option<BTreeMap<String, String>>,
307311
) -> Result<(), K8sError> {
308312
info!("patching stateful set");
309313

@@ -315,7 +319,7 @@ impl K8sClient for HttpK8sClient {
315319
let bq_secret_name = format!("{prefix}-{BQ_SECRET_NAME_SUFFIX}");
316320
let replicator_config_map_name = format!("{prefix}-{REPLICATOR_CONFIG_MAP_NAME_SUFFIX}");
317321

318-
let stateful_set_json = json!({
322+
let mut stateful_set_json = json!({
319323
"apiVersion": "apps/v1",
320324
"kind": "StatefulSet",
321325
"metadata": {
@@ -447,15 +451,27 @@ impl K8sClient for HttpK8sClient {
447451
}
448452
});
449453

454+
// Attach template annotations (e.g., restart checksum) to trigger a rolling restart
455+
if let Some(annotations) = template_annotations
456+
&& let Some(template) = stateful_set_json
457+
.get_mut("spec")
458+
.and_then(|s| s.get_mut("template"))
459+
.and_then(|t| t.get_mut("metadata"))
460+
{
461+
// Insert annotations map
462+
let annotations_value = serde_json::to_value(annotations)?;
463+
if let Some(obj) = template.as_object_mut() {
464+
obj.insert("annotations".to_string(), annotations_value);
465+
}
466+
}
467+
450468
let stateful_set: StatefulSet = serde_json::from_value(stateful_set_json)?;
451469

452470
let pp = PatchParams::apply(&stateful_set_name);
453471
self.stateful_sets_api
454472
.patch(&stateful_set_name, &pp, &Patch::Apply(stateful_set))
455473
.await?;
456474

457-
self.delete_pod(prefix).await?;
458-
459475
info!("patched stateful set");
460476

461477
Ok(())
@@ -477,7 +493,7 @@ impl K8sClient for HttpK8sClient {
477493
e => return Err(e.into()),
478494
},
479495
}
480-
self.delete_pod(prefix).await?;
496+
481497
info!("deleted stateful set");
482498

483499
Ok(())

etl-api/src/routes/pipelines.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ use actix_web::{
44
post,
55
web::{Data, Json, Path},
66
};
7+
use chrono::Utc;
78
use etl_config::shared::{ReplicatorConfig, SupabaseConfig, TlsConfig};
89
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, health, state};
910
use etl_postgres::schema::TableId;
1011
use secrecy::ExposeSecret;
1112
use serde::{Deserialize, Serialize};
1213
use sqlx::{PgPool, PgTransaction};
14+
use std::collections::BTreeMap;
1315
use std::ops::DerefMut;
1416
use thiserror::Error;
1517
use utoipa::ToSchema;
@@ -24,8 +26,8 @@ use crate::db::images::{Image, ImagesDbError};
2426
use crate::db::pipelines::{Pipeline, PipelinesDbError};
2527
use crate::db::replicators::{Replicator, ReplicatorsDbError};
2628
use crate::db::sources::{Source, SourcesDbError, source_exists};
27-
use crate::k8s_client::TRUSTED_ROOT_CERT_KEY_NAME;
2829
use crate::k8s_client::{K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME};
30+
use crate::k8s_client::{RESTARTED_AT_ANNOTATION_KEY, TRUSTED_ROOT_CERT_KEY_NAME};
2931
use crate::routes::{
3032
ErrorMessage, TenantIdError, connect_to_source_database_with_defaults, extract_tenant_id,
3133
};
@@ -360,7 +362,6 @@ pub enum PipelineStatus {
360362
Stopped,
361363
Starting,
362364
Started,
363-
Stopping,
364365
Unknown,
365366
Failed,
366367
}
@@ -1088,7 +1089,7 @@ pub async fn update_pipeline_config(
10881089
Ok(Json(response))
10891090
}
10901091

1091-
#[derive(Debug, Serialize, Deserialize)]
1092+
#[derive(Debug, Clone, Serialize, Deserialize)]
10921093
struct Secrets {
10931094
postgres_password: String,
10941095
big_query_service_account_key: Option<String>,
@@ -1107,7 +1108,7 @@ async fn create_or_update_pipeline_in_k8s(
11071108

11081109
// We create the secrets.
11091110
let secrets = build_secrets(&source.config, &destination.config);
1110-
create_or_update_secrets(k8s_client, &prefix, secrets).await?;
1111+
create_or_update_secrets(k8s_client, &prefix, secrets.clone()).await?;
11111112

11121113
// We create the replicator configuration.
11131114
let replicator_config = build_replicator_config(
@@ -1120,14 +1121,31 @@ async fn create_or_update_pipeline_in_k8s(
11201121
},
11211122
)
11221123
.await?;
1123-
create_or_update_config(k8s_client, &prefix, replicator_config).await?;
1124+
create_or_update_config(k8s_client, &prefix, replicator_config.clone()).await?;
1125+
1126+
// To force restart everytime, we want to annotate the stateful set with the current UTC time for every
1127+
// start call. Technically we can optimally perform a restart by calculating a checksum on a deterministic
1128+
// set of inputs like the configs, states in the database, etc... however we deemed that too cumbersome
1129+
// and risky, since forgetting a component will lead to the pipeline not restarting.
1130+
let mut annotations = BTreeMap::new();
1131+
annotations.insert(
1132+
RESTARTED_AT_ANNOTATION_KEY.to_string(),
1133+
get_restarted_at_annotation_value(),
1134+
);
11241135

11251136
// We create the replicator stateful set.
1126-
create_or_update_replicator(k8s_client, &prefix, image.name).await?;
1137+
create_or_update_replicator(k8s_client, &prefix, image.name, Some(annotations)).await?;
11271138

11281139
Ok(())
11291140
}
11301141

1142+
fn get_restarted_at_annotation_value() -> String {
1143+
let now = Utc::now();
1144+
// We use nanoseconds to decrease the likelihood of generating the same annotation in sequence,
1145+
// which would not result in a restart.
1146+
now.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
1147+
}
1148+
11311149
async fn delete_pipeline_in_k8s(
11321150
k8s_client: &dyn K8sClient,
11331151
tenant_id: &str,
@@ -1281,9 +1299,10 @@ async fn create_or_update_replicator(
12811299
k8s_client: &dyn K8sClient,
12821300
prefix: &str,
12831301
replicator_image: String,
1302+
template_annotations: Option<BTreeMap<String, String>>,
12841303
) -> Result<(), PipelineError> {
12851304
k8s_client
1286-
.create_or_update_stateful_set(prefix, &replicator_image)
1305+
.create_or_update_stateful_set(prefix, &replicator_image, template_annotations)
12871306
.await?;
12881307

12891308
Ok(())

etl-api/tests/common/k8s_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ impl K8sClient for MockK8sClient {
6767
&self,
6868
_prefix: &str,
6969
_replicator_image: &str,
70+
_template_annotations: Option<BTreeMap<String, String>>,
7071
) -> Result<(), K8sError> {
7172
Ok(())
7273
}

0 commit comments

Comments
 (0)