Skip to content

Commit 2357826

Browse files
authored
fix(api): Make sure ETL tables are there before performing any action on the source etl schema (supabase#286)
1 parent 4f01699 commit 2357826

File tree

9 files changed

+236
-84
lines changed

9 files changed

+236
-84
lines changed

etl-api/src/configs/pipeline.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use utoipa::ToSchema;
66
const DEFAULT_BATCH_MAX_SIZE: usize = 1000000;
77
const DEFAULT_BATCH_MAX_FILL_MS: u64 = 10000;
88
const DEFAULT_TABLE_ERROR_RETRY_DELAY_MS: u64 = 10000;
9+
const DEFAULT_MAX_TABLE_SYNC_WORKERS: u16 = 4;
910

1011
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
1112
pub struct FullApiPipelineConfig {
@@ -104,7 +105,9 @@ impl From<FullApiPipelineConfig> for StoredPipelineConfig {
104105
table_error_retry_delay_ms: value
105106
.table_error_retry_delay_ms
106107
.unwrap_or(DEFAULT_TABLE_ERROR_RETRY_DELAY_MS),
107-
max_table_sync_workers: value.max_table_sync_workers.unwrap_or_default(),
108+
max_table_sync_workers: value
109+
.max_table_sync_workers
110+
.unwrap_or(DEFAULT_MAX_TABLE_SYNC_WORKERS),
108111
}
109112
}
110113
}
@@ -173,7 +176,10 @@ mod tests {
173176
stored.table_error_retry_delay_ms,
174177
DEFAULT_TABLE_ERROR_RETRY_DELAY_MS
175178
);
176-
assert_eq!(stored.max_table_sync_workers, 0);
179+
assert_eq!(
180+
stored.max_table_sync_workers,
181+
DEFAULT_MAX_TABLE_SYNC_WORKERS
182+
);
177183
}
178184

179185
#[test]

etl-api/src/db/pipelines.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use etl_postgres::replication::{schema, slots, state, table_mappings};
1+
use etl_postgres::replication::{health, schema, slots, state, table_mappings};
22
use sqlx::{PgExecutor, PgTransaction};
33
use std::ops::DerefMut;
44
use thiserror::Error;
@@ -210,22 +210,34 @@ pub async fn delete_pipeline_cascading(
210210
db::destinations::delete_destination(txn.deref_mut(), tenant_id, destination.id).await?;
211211
}
212212

213-
// Get all table IDs for this pipeline before deleting state.
214-
let table_ids = state::get_pipeline_table_ids(source_txn.deref_mut(), pipeline.id).await?;
213+
// Get all table IDs for this pipeline before deleting state (only if all ETL tables exist).
214+
let etl_present = health::etl_tables_present(source_txn.deref_mut()).await?;
215+
let table_ids = if etl_present {
216+
Some(state::get_pipeline_table_ids(source_txn.deref_mut(), pipeline.id).await?)
217+
} else {
218+
None
219+
};
215220

216-
// Delete state, schema, and table mappings from the source database
217-
state::delete_pipeline_replication_state(source_txn.deref_mut(), pipeline.id).await?;
218-
schema::delete_pipeline_table_schemas(source_txn.deref_mut(), pipeline.id).await?;
219-
table_mappings::delete_pipeline_table_mappings(source_txn.deref_mut(), pipeline.id).await?;
221+
// Delete state, schema, and table mappings from the source database, only if ETL tables exist.
222+
if etl_present {
223+
let _ =
224+
state::delete_pipeline_replication_state(source_txn.deref_mut(), pipeline.id).await?;
225+
let _ = schema::delete_pipeline_table_schemas(source_txn.deref_mut(), pipeline.id).await?;
226+
let _ = table_mappings::delete_pipeline_table_mappings(source_txn.deref_mut(), pipeline.id)
227+
.await?;
228+
}
220229

221230
// Here we finish `txn` before `source_txn` since we want the guarantee that the pipeline has
222231
// been deleted before committing the state and slots deletions.
223232
txn.commit().await?;
224233
source_txn.commit().await?;
225234

226-
// If we succeeded to commit both transactions, we are safe to delete the slots. The reason for
227-
// not deleting slots in the transaction is that `pg_drop_replication_slot(...)` is not transactional.
228-
slots::delete_pipeline_replication_slots(&source_pool, pipeline.id as u64, &table_ids).await?;
235+
if let Some(table_ids) = table_ids {
236+
// If we succeeded to commit both transactions, we are safe to delete the slots. The reason for
237+
// not deleting slots in the transaction is that `pg_drop_replication_slot(...)` is not transactional.
238+
slots::delete_pipeline_replication_slots(&source_pool, pipeline.id as u64, &table_ids)
239+
.await?;
240+
}
229241

230242
Ok(())
231243
}

etl-api/src/routes/pipelines.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use actix_web::{
55
web::{Data, Json, Path},
66
};
77
use etl_config::shared::{ReplicatorConfig, SupabaseConfig, TlsConfig};
8-
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, state};
8+
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, health, state};
99
use etl_postgres::schema::TableId;
1010
use secrecy::ExposeSecret;
1111
use serde::{Deserialize, Serialize};
@@ -62,6 +62,9 @@ pub enum PipelineError {
6262
#[error("The table state is not rollbackable: {0}")]
6363
NotRollbackable(String),
6464

65+
#[error("The ETL table state has not been initialized first")]
66+
EtlStateNotInitialized,
67+
6568
#[error("invalid destination config")]
6669
InvalidConfig(#[from] serde_json::Error),
6770

@@ -145,9 +148,9 @@ impl ResponseError for PipelineError {
145148
| PipelineError::InvalidTableReplicationState(_)
146149
| PipelineError::MissingTableReplicationState => StatusCode::INTERNAL_SERVER_ERROR,
147150
PipelineError::NotRollbackable(_) => StatusCode::BAD_REQUEST,
148-
PipelineError::PipelineNotFound(_) | PipelineError::ImageNotFoundById(_) => {
149-
StatusCode::NOT_FOUND
150-
}
151+
PipelineError::PipelineNotFound(_)
152+
| PipelineError::ImageNotFoundById(_)
153+
| PipelineError::EtlStateNotInitialized => StatusCode::NOT_FOUND,
151154
PipelineError::TenantId(_)
152155
| PipelineError::SourceNotFound(_)
153156
| PipelineError::DestinationNotFound(_) => StatusCode::BAD_REQUEST,
@@ -811,6 +814,11 @@ pub async fn get_pipeline_replication_status(
811814
let source_pool =
812815
connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?;
813816

817+
// Ensure ETL tables exist in the source DB
818+
if !health::etl_tables_present(&source_pool).await? {
819+
return Err(PipelineError::EtlStateNotInitialized);
820+
}
821+
814822
// Fetch replication state for all tables in this pipeline
815823
let state_rows = state::get_table_replication_state_rows(&source_pool, pipeline_id).await?;
816824

@@ -894,6 +902,11 @@ pub async fn rollback_table_state(
894902
let source_pool =
895903
connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?;
896904

905+
// Ensure ETL tables exist in the source DB
906+
if !health::etl_tables_present(&source_pool).await? {
907+
return Err(PipelineError::EtlStateNotInitialized);
908+
}
909+
897910
// First, check current state to ensure it's rollbackable (manual retry policy)
898911
let state_rows = state::get_table_replication_state_rows(&source_pool, pipeline_id).await?;
899912
let current_row = state_rows

etl-api/tests/common/test_app.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ pub async fn spawn_test_app() -> TestApp {
505505
// We use a random database name.
506506
config.database.name = Uuid::new_v4().to_string();
507507

508-
let connection_pool = create_etl_api_database(&config.database).await;
508+
let api_db_pool = create_etl_api_database(&config.database).await;
509509

510510
let key = generate_random_key::<32>().expect("failed to generate random key");
511511
let encryption_key = encryption::EncryptionKey { id: 0, key };
@@ -515,7 +515,7 @@ pub async fn spawn_test_app() -> TestApp {
515515
let server = run(
516516
config.clone(),
517517
listener,
518-
connection_pool,
518+
api_db_pool,
519519
encryption_key,
520520
k8s_client,
521521
)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use etl_api::routes::pipelines::{
2+
CreatePipelineRequest, CreatePipelineResponse, RollbackTableStateRequest, RollbackType,
3+
};
4+
use etl_config::shared::PgConnectionConfig;
5+
use etl_postgres::sqlx::test_utils::drop_pg_database;
6+
use etl_telemetry::tracing::init_test_tracing;
7+
use reqwest::StatusCode;
8+
use sqlx::PgPool;
9+
10+
use crate::common::database::create_test_source_database;
11+
use crate::common::test_app::{TestApp, spawn_test_app};
12+
use crate::integration::destination_test::create_destination;
13+
use crate::integration::images_test::create_default_image;
14+
use crate::integration::tenants_test::create_tenant;
15+
16+
async fn create_pipeline_with_unmigrated_source_db(
17+
app: &TestApp,
18+
tenant_id: &str,
19+
) -> (i64, PgPool, PgConnectionConfig) {
20+
let (source_pool, source_id, source_db_config) =
21+
create_test_source_database(app, tenant_id).await;
22+
let destination_id = create_destination(app, tenant_id).await;
23+
create_default_image(app).await;
24+
25+
let req = CreatePipelineRequest {
26+
source_id,
27+
destination_id,
28+
config: crate::integration::pipelines_test::new_pipeline_config(),
29+
};
30+
31+
let response = app.create_pipeline(tenant_id, &req).await;
32+
let response: CreatePipelineResponse = response
33+
.json()
34+
.await
35+
.expect("failed to deserialize response");
36+
37+
(response.id, source_pool, source_db_config)
38+
}
39+
40+
#[tokio::test(flavor = "multi_thread")]
41+
async fn replication_status_fails_when_etl_tables_missing() {
42+
init_test_tracing();
43+
let app = spawn_test_app().await;
44+
let tenant_id = create_tenant(&app).await;
45+
46+
let (pipeline_id, _source_pool, source_db_config) =
47+
create_pipeline_with_unmigrated_source_db(&app, &tenant_id).await;
48+
49+
let response = app
50+
.get_pipeline_replication_status(&tenant_id, pipeline_id)
51+
.await;
52+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
53+
assert!(
54+
response
55+
.text()
56+
.await
57+
.unwrap()
58+
.contains("The ETL table state has not been initialized first")
59+
);
60+
61+
drop_pg_database(&source_db_config).await;
62+
}
63+
64+
#[tokio::test(flavor = "multi_thread")]
65+
async fn rollback_fails_when_etl_tables_missing() {
66+
init_test_tracing();
67+
let app = spawn_test_app().await;
68+
let tenant_id = create_tenant(&app).await;
69+
70+
let (pipeline_id, _source_pool, source_db_config) =
71+
create_pipeline_with_unmigrated_source_db(&app, &tenant_id).await;
72+
73+
let req = RollbackTableStateRequest {
74+
table_id: 1,
75+
rollback_type: RollbackType::Individual,
76+
};
77+
let response = app
78+
.rollback_table_state(&tenant_id, pipeline_id, &req)
79+
.await;
80+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
81+
assert!(
82+
response
83+
.text()
84+
.await
85+
.unwrap()
86+
.contains("The ETL table state has not been initialized first")
87+
);
88+
89+
drop_pg_database(&source_db_config).await;
90+
}
91+
92+
#[tokio::test(flavor = "multi_thread")]
93+
async fn deleting_pipeline_succeeds_when_etl_tables_missing() {
94+
init_test_tracing();
95+
let app = spawn_test_app().await;
96+
let tenant_id = create_tenant(&app).await;
97+
98+
let (pipeline_id, _source_pool, source_db_config) =
99+
create_pipeline_with_unmigrated_source_db(&app, &tenant_id).await;
100+
101+
let response = app.delete_pipeline(&tenant_id, pipeline_id).await;
102+
assert_eq!(response.status(), StatusCode::OK);
103+
104+
let response = app.read_pipeline(&tenant_id, pipeline_id).await;
105+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
106+
107+
drop_pg_database(&source_db_config).await;
108+
}

etl-api/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod destination_test;
22
mod destinations_pipelines_test;
3+
mod etl_tables_missing_test;
34
mod health_check_test;
45
mod images_test;
56
mod pipelines_test;

0 commit comments

Comments
 (0)