diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index e137ecdeb..55623e5f6 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -8,6 +8,10 @@ use serde::ser::SerializeSeq; use sqlx::PgPool; use std::fmt; +//////////////////////////////////////////////////////////// +// Access for the row tracking table +//////////////////////////////////////////////////////////// + #[derive(Debug, Clone)] pub struct TrackedTargetKeyInfo { pub key: serde_json::Value, @@ -366,3 +370,54 @@ pub async fn update_source_tracking_ordinal( .await?; Ok(()) } + +//////////////////////////////////////////////////////////// +/// Access for the source state table +//////////////////////////////////////////////////////////// + +pub async fn read_source_state( + source_id: i32, + source_key_json: &serde_json::Value, + db_setup: &TrackingTableSetupState, + db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, +) -> Result> { + let Some(table_name) = db_setup.source_state_table_name.as_ref() else { + bail!("Source state table not enabled for this flow"); + }; + + let query_str = format!( + "SELECT value FROM {} WHERE source_id = $1 AND key = $2", + table_name + ); + let state: Option = sqlx::query_scalar(&query_str) + .bind(source_id) + .bind(source_key_json) + .fetch_optional(db_executor) + .await?; + Ok(state) +} + +pub async fn upsert_source_state( + source_id: i32, + source_key_json: &serde_json::Value, + state: serde_json::Value, + db_setup: &TrackingTableSetupState, + db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, +) -> Result<()> { + let Some(table_name) = db_setup.source_state_table_name.as_ref() else { + bail!("Source state table not enabled for this flow"); + }; + + let query_str = format!( + "INSERT INTO {} (source_id, key, value) VALUES ($1, $2, $3) \ + ON CONFLICT (source_id, key) DO UPDATE SET value = EXCLUDED.value", + table_name + ); + sqlx::query(&query_str) + .bind(source_id) + .bind(source_key_json) + .bind(sqlx::types::Json(state)) + .execute(db_executor) + .await?; + Ok(()) +}