Skip to content

Commit 6269e79

Browse files
committed
feat(row-status): expose an API to get row indexing status
1 parent aa256ec commit 6269e79

File tree

10 files changed

+182
-53
lines changed

10 files changed

+182
-53
lines changed

src/execution/db_tracking.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,28 @@ impl ListTrackedSourceKeyMetadataState {
211211
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)
212212
}
213213
}
214+
215+
#[derive(sqlx::FromRow, Debug)]
216+
pub struct SourceLastProcessedInfo {
217+
pub processed_source_ordinal: Option<i64>,
218+
pub process_logic_fingerprint: Option<Vec<u8>>,
219+
pub process_time_micros: Option<i64>,
220+
}
221+
222+
pub async fn read_source_last_processed_info(
223+
source_id: i32,
224+
source_key_json: &serde_json::Value,
225+
db_setup: &TrackingTableSetupState,
226+
pool: &PgPool,
227+
) -> Result<Option<SourceLastProcessedInfo>> {
228+
let query_str = format!(
229+
"SELECT processed_source_ordinal, process_logic_fingerprint, process_time_micros FROM {} WHERE source_id = $1 AND source_key = $2",
230+
db_setup.table_name
231+
);
232+
let last_processed_info = sqlx::query_as(&query_str)
233+
.bind(source_id)
234+
.bind(source_key_json)
235+
.fetch_optional(pool)
236+
.await?;
237+
Ok(last_processed_info)
238+
}

src/execution/indexing_status.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use crate::prelude::*;
2+
3+
use super::db_tracking;
4+
use super::evaluator;
5+
use futures::try_join;
6+
7+
#[derive(Debug, Serialize)]
8+
pub struct SourceRowLastProcessedInfo {
9+
pub source_ordinal: Option<interface::Ordinal>,
10+
pub processing_time: Option<chrono::DateTime<chrono::Utc>>,
11+
pub is_logic_current: bool,
12+
}
13+
14+
#[derive(Debug, Serialize)]
15+
pub struct SourceRowInfo {
16+
pub ordinal: Option<interface::Ordinal>,
17+
}
18+
19+
#[derive(Debug, Serialize)]
20+
pub struct SourceRowIndexingStatus {
21+
pub last_processed: Option<SourceRowLastProcessedInfo>,
22+
pub current: Option<SourceRowInfo>,
23+
}
24+
25+
pub async fn get_source_row_indexing_status(
26+
src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>,
27+
pool: &sqlx::PgPool,
28+
) -> Result<SourceRowIndexingStatus> {
29+
let source_key_json = serde_json::to_value(src_eval_ctx.key)?;
30+
let last_processed_fut = db_tracking::read_source_last_processed_info(
31+
src_eval_ctx.import_op.source_id,
32+
&source_key_json,
33+
&src_eval_ctx.plan.tracking_table_setup,
34+
pool,
35+
);
36+
let current_fut = src_eval_ctx.import_op.executor.get_value(
37+
&src_eval_ctx.key,
38+
&interface::SourceExecutorGetOptions {
39+
include_value: false,
40+
include_ordinal: true,
41+
},
42+
);
43+
let (last_processed, current) = try_join!(last_processed_fut, current_fut)?;
44+
45+
let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo {
46+
source_ordinal: l.processed_source_ordinal.map(interface::Ordinal),
47+
processing_time: l
48+
.process_time_micros
49+
.map(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
50+
.flatten(),
51+
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
52+
== l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()),
53+
});
54+
let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal });
55+
Ok(SourceRowIndexingStatus {
56+
last_processed,
57+
current,
58+
})
59+
}

src/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod db_tracking_setup;
22
pub(crate) mod dumper;
33
pub(crate) mod evaluator;
4+
pub(crate) mod indexing_status;
45
pub(crate) mod memoization;
56
pub(crate) mod query;
67
pub(crate) mod row_indexer;

src/execution/row_indexer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,10 @@ pub async fn evaluate_source_entry_with_memory(
471471
},
472472
)
473473
.await?
474-
.value
475474
{
476-
Some(d) => d,
475+
Some(d) => d
476+
.value
477+
.ok_or_else(|| anyhow::anyhow!("value not returned"))?,
477478
None => return Ok(None),
478479
};
479480
let output = evaluate_source_entry(src_eval_ctx, source_value, &memory).await?;

src/execution/source_indexer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ impl SourceIndexingContext {
112112
},
113113
)
114114
.await?
115-
.value
115+
.map(|v| v.value)
116+
.flatten()
116117
};
117118
let schema = &self.flow.data_schema;
118119
let result = row_indexer::update_source_row(

src/ops/interface.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub struct FlowInstanceContext {
1616
pub py_exec_ctx: Option<Arc<crate::py::PythonExecutionContext>>,
1717
}
1818

19-
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
2020
pub struct Ordinal(pub i64);
2121

2222
impl From<Ordinal> for i64 {
@@ -72,9 +72,9 @@ pub struct SourceExecutorGetOptions {
7272
pub include_value: bool,
7373
}
7474

75-
#[derive(Debug, Default)]
75+
#[derive(Debug)]
7676
pub struct SourceValue {
77-
// None if not exists, or not included in the option.
77+
// None if not included in the option.
7878
pub value: Option<FieldValues>,
7979
// None if unavailable, or not included in the option.
8080
pub ordinal: Option<Ordinal>,
@@ -93,7 +93,7 @@ pub trait SourceExecutor: Send + Sync {
9393
&self,
9494
key: &KeyValue,
9595
options: &SourceExecutorGetOptions,
96-
) -> Result<SourceValue>;
96+
) -> Result<Option<SourceValue>>;
9797

9898
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
9999
Ok(None)

src/ops/sources/google_drive.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl SourceExecutor for Executor {
341341
&self,
342342
key: &KeyValue,
343343
options: &SourceExecutorGetOptions,
344-
) -> Result<SourceValue> {
344+
) -> Result<Option<SourceValue>> {
345345
let file_id = key.str_value()?;
346346
let fields = format!(
347347
"id,name,mimeType,trashed{}",
@@ -359,7 +359,7 @@ impl SourceExecutor for Executor {
359359
let file = match resp {
360360
Some((_, file)) if file.trashed != Some(true) => file,
361361
_ => {
362-
return Ok(SourceValue::default());
362+
return Ok(None);
363363
}
364364
};
365365
let ordinal = if options.include_ordinal {
@@ -415,7 +415,7 @@ impl SourceExecutor for Executor {
415415
}
416416
None => None,
417417
};
418-
Ok(SourceValue { value, ordinal })
418+
Ok(Some(SourceValue { value, ordinal }))
419419
}
420420

421421
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {

src/ops/sources/local_file.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,9 @@ impl SourceExecutor for Executor {
8888
&self,
8989
key: &KeyValue,
9090
options: &SourceExecutorGetOptions,
91-
) -> Result<SourceValue> {
91+
) -> Result<Option<SourceValue>> {
9292
if !self.is_file_included(key.str_value()?.as_ref()) {
93-
return Ok(SourceValue {
94-
value: None,
95-
ordinal: None,
96-
});
93+
return Ok(None);
9794
}
9895
let path = self.root_path.join(key.str_value()?.as_ref());
9996
let ordinal = if options.include_ordinal {
@@ -117,7 +114,7 @@ impl SourceExecutor for Executor {
117114
} else {
118115
None
119116
};
120-
Ok(SourceValue { value, ordinal })
117+
Ok(Some(SourceValue { value, ordinal }))
121118
}
122119
}
123120

src/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ pub async fn init_server(
6161
"/flows/:flowInstName/data",
6262
routing::get(service::flows::evaluate_data),
6363
)
64+
.route(
65+
"/flows/:flowInstName/rowStatus",
66+
routing::get(service::flows::get_row_index_status),
67+
)
6468
.route(
6569
"/flows/:flowInstName/update",
6670
routing::post(service::flows::update),

src/service/flows.rs

Lines changed: 78 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::prelude::*;
22

3-
use crate::execution::{evaluator, memoization, row_indexer, stats};
3+
use crate::execution::{evaluator, indexing_status, memoization, row_indexer, stats};
44
use crate::lib_context::LibContext;
55
use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorListOptions};
66
use axum::{
@@ -100,7 +100,7 @@ pub async fn get_keys(
100100
}
101101

102102
#[derive(Deserialize)]
103-
pub struct EvaluateDataParams {
103+
pub struct SourceRowKeyParams {
104104
field: String,
105105
key: Vec<String>,
106106
}
@@ -111,56 +111,82 @@ pub struct EvaluateDataResponse {
111111
data: value::ScopeValue,
112112
}
113113

114+
struct SourceRowKeyContextHolder<'a> {
115+
plan: Arc<plan::ExecutionPlan>,
116+
import_op_idx: usize,
117+
schema: &'a FlowSchema,
118+
key: value::KeyValue,
119+
}
120+
121+
impl<'a> SourceRowKeyContextHolder<'a> {
122+
async fn create(flow_ctx: &'a FlowContext, source_row_key: SourceRowKeyParams) -> Result<Self> {
123+
let schema = &flow_ctx.flow.data_schema;
124+
let import_op_idx = flow_ctx
125+
.flow
126+
.flow_instance
127+
.import_ops
128+
.iter()
129+
.position(|op| op.name == source_row_key.field)
130+
.ok_or_else(|| {
131+
ApiError::new(
132+
&format!("source field not found: {}", source_row_key.field),
133+
StatusCode::BAD_REQUEST,
134+
)
135+
})?;
136+
let plan = flow_ctx.flow.get_execution_plan().await?;
137+
let import_op = &plan.import_ops[import_op_idx];
138+
let field_schema = &schema.fields[import_op.output.field_idx as usize];
139+
let table_schema = match &field_schema.value_type.typ {
140+
schema::ValueType::Table(table) => table,
141+
_ => api_bail!("field is not a table: {}", source_row_key.field),
142+
};
143+
let key_field = table_schema
144+
.key_field()
145+
.ok_or_else(|| api_error!("field {} does not have a key", source_row_key.field))?;
146+
let key = value::KeyValue::from_strs(source_row_key.key, &key_field.value_type.typ)?;
147+
Ok(Self {
148+
plan,
149+
import_op_idx,
150+
schema,
151+
key,
152+
})
153+
}
154+
155+
fn as_context<'b>(&'b self) -> evaluator::SourceRowEvaluationContext<'b> {
156+
evaluator::SourceRowEvaluationContext {
157+
plan: &self.plan,
158+
import_op: &self.plan.import_ops[self.import_op_idx],
159+
schema: self.schema,
160+
key: &self.key,
161+
}
162+
}
163+
}
164+
114165
pub async fn evaluate_data(
115166
Path(flow_name): Path<String>,
116-
Query(query): Query<EvaluateDataParams>,
167+
Query(query): Query<SourceRowKeyParams>,
117168
State(lib_context): State<Arc<LibContext>>,
118169
) -> Result<Json<EvaluateDataResponse>, ApiError> {
119170
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
120-
let schema = &flow_ctx.flow.data_schema;
121-
122-
let import_op_idx = flow_ctx
123-
.flow
124-
.flow_instance
125-
.import_ops
126-
.iter()
127-
.position(|op| op.name == query.field)
128-
.ok_or_else(|| {
129-
ApiError::new(
130-
&format!("source field not found: {}", query.field),
131-
StatusCode::BAD_REQUEST,
132-
)
133-
})?;
134-
let plan = flow_ctx.flow.get_execution_plan().await?;
135-
let import_op = &plan.import_ops[import_op_idx];
136-
let field_schema = &schema.fields[import_op.output.field_idx as usize];
137-
let table_schema = match &field_schema.value_type.typ {
138-
schema::ValueType::Table(table) => table,
139-
_ => api_bail!("field is not a table: {}", query.field),
140-
};
141-
let key_field = table_schema
142-
.key_field()
143-
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
144-
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;
145-
171+
let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?;
146172
let evaluate_output = row_indexer::evaluate_source_entry_with_memory(
147-
&evaluator::SourceRowEvaluationContext {
148-
plan: &plan,
149-
import_op,
150-
schema,
151-
key: &key,
152-
},
173+
&source_row_key_ctx.as_context(),
153174
memoization::EvaluationMemoryOptions {
154175
enable_cache: true,
155176
evaluation_only: true,
156177
},
157178
&lib_context.builtin_db_pool,
158179
)
159180
.await?
160-
.ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?;
181+
.ok_or_else(|| {
182+
api_error!(
183+
"value not found for source at the specified key: {key:?}",
184+
key = source_row_key_ctx.key
185+
)
186+
})?;
161187

162188
Ok(Json(EvaluateDataResponse {
163-
schema: schema.clone(),
189+
schema: flow_ctx.flow.data_schema.clone(),
164190
data: evaluate_output.data_scope.into(),
165191
}))
166192
}
@@ -182,3 +208,18 @@ pub async fn update(
182208
live_updater.wait().await?;
183209
Ok(Json(live_updater.index_update_info()))
184210
}
211+
212+
pub async fn get_row_index_status(
213+
Path(flow_name): Path<String>,
214+
Query(query): Query<SourceRowKeyParams>,
215+
State(lib_context): State<Arc<LibContext>>,
216+
) -> Result<Json<indexing_status::SourceRowIndexingStatus>, ApiError> {
217+
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
218+
let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?;
219+
let index_status = indexing_status::get_source_row_indexing_status(
220+
&source_row_key_ctx.as_context(),
221+
&lib_context.builtin_db_pool,
222+
)
223+
.await?;
224+
Ok(Json(index_status))
225+
}

0 commit comments

Comments
 (0)