Skip to content

Commit 58ad711

Browse files
authored
Update indexer to keep key states in memory for continuous processing (#231)
1 parent 81317f5 commit 58ad711

File tree

8 files changed

+464
-203
lines changed

8 files changed

+464
-203
lines changed

src/execution/db_tracking.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use crate::prelude::*;
2+
13
use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo};
24
use crate::utils::{db::WriteAction, fingerprint::Fingerprint};
3-
use anyhow::Result;
5+
use futures::Stream;
46
use sqlx::PgPool;
57

68
/// (target_key, process_ordinal, fingerprint)
@@ -41,6 +43,7 @@ pub struct SourceTrackingInfoForPrecommit {
4143
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
4244

4345
pub processed_source_ordinal: Option<i64>,
46+
pub process_logic_fingerprint: Option<Vec<u8>>,
4447
pub process_ordinal: Option<i64>,
4548
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
4649
}
@@ -52,7 +55,7 @@ pub async fn read_source_tracking_info_for_precommit(
5255
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
5356
) -> Result<Option<SourceTrackingInfoForPrecommit>> {
5457
let query_str = format!(
55-
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
58+
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
5659
db_setup.table_name
5760
);
5861
let precommit_tracking_info = sqlx::query_as(&query_str)
@@ -178,22 +181,33 @@ pub async fn delete_source_tracking_info(
178181
}
179182

180183
#[derive(sqlx::FromRow, Debug)]
181-
pub struct SourceTrackingKey {
184+
pub struct TrackedSourceKeyMetadata {
182185
pub source_key: serde_json::Value,
186+
pub processed_source_ordinal: Option<i64>,
187+
pub process_logic_fingerprint: Option<Vec<u8>>,
183188
}
184189

185-
pub async fn list_source_tracking_keys(
186-
source_id: i32,
187-
db_setup: &TrackingTableSetupState,
188-
pool: &PgPool,
189-
) -> Result<Vec<SourceTrackingKey>> {
190-
let query_str = format!(
191-
"SELECT source_key FROM {} WHERE source_id = $1",
190+
pub struct ListTrackedSourceKeyMetadataState {
191+
query_str: String,
192+
}
193+
194+
impl ListTrackedSourceKeyMetadataState {
195+
pub fn new() -> Self {
196+
Self {
197+
query_str: String::new(),
198+
}
199+
}
200+
201+
pub fn list<'a>(
202+
&'a mut self,
203+
source_id: i32,
204+
db_setup: &'a TrackingTableSetupState,
205+
pool: &'a PgPool,
206+
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
207+
self.query_str = format!(
208+
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
192209
db_setup.table_name
193210
);
194-
let keys: Vec<SourceTrackingKey> = sqlx::query_as(&query_str)
195-
.bind(source_id)
196-
.fetch_all(pool)
197-
.await?;
198-
Ok(keys)
211+
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)
212+
}
199213
}

0 commit comments

Comments
 (0)