Skip to content

Commit e63194a

Browse files
authored
feat(source-states): add db functions to read/upsert source states (#904)
1 parent b9a3d2c commit e63194a

File tree

1 file changed

+55
-0
lines changed

1 file changed

+55
-0
lines changed

src/execution/db_tracking.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use serde::ser::SerializeSeq;
88
use sqlx::PgPool;
99
use std::fmt;
1010

11+
////////////////////////////////////////////////////////////
12+
// Access for the row tracking table
13+
////////////////////////////////////////////////////////////
14+
1115
#[derive(Debug, Clone)]
1216
pub struct TrackedTargetKeyInfo {
1317
pub key: serde_json::Value,
@@ -366,3 +370,54 @@ pub async fn update_source_tracking_ordinal(
366370
.await?;
367371
Ok(())
368372
}
373+
374+
////////////////////////////////////////////////////////////
375+
/// Access for the source state table
376+
////////////////////////////////////////////////////////////
377+
378+
pub async fn read_source_state(
379+
source_id: i32,
380+
source_key_json: &serde_json::Value,
381+
db_setup: &TrackingTableSetupState,
382+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
383+
) -> Result<Option<serde_json::Value>> {
384+
let Some(table_name) = db_setup.source_state_table_name.as_ref() else {
385+
bail!("Source state table not enabled for this flow");
386+
};
387+
388+
let query_str = format!(
389+
"SELECT value FROM {} WHERE source_id = $1 AND key = $2",
390+
table_name
391+
);
392+
let state: Option<serde_json::Value> = sqlx::query_scalar(&query_str)
393+
.bind(source_id)
394+
.bind(source_key_json)
395+
.fetch_optional(db_executor)
396+
.await?;
397+
Ok(state)
398+
}
399+
400+
pub async fn upsert_source_state(
401+
source_id: i32,
402+
source_key_json: &serde_json::Value,
403+
state: serde_json::Value,
404+
db_setup: &TrackingTableSetupState,
405+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
406+
) -> Result<()> {
407+
let Some(table_name) = db_setup.source_state_table_name.as_ref() else {
408+
bail!("Source state table not enabled for this flow");
409+
};
410+
411+
let query_str = format!(
412+
"INSERT INTO {} (source_id, key, value) VALUES ($1, $2, $3) \
413+
ON CONFLICT (source_id, key) DO UPDATE SET value = EXCLUDED.value",
414+
table_name
415+
);
416+
sqlx::query(&query_str)
417+
.bind(source_id)
418+
.bind(source_key_json)
419+
.bind(sqlx::types::Json(state))
420+
.execute(db_executor)
421+
.await?;
422+
Ok(())
423+
}

0 commit comments

Comments
 (0)