Skip to content

Commit 5671d74

Browse files
authored
Engine supports dispatching change stream coming from source. (#249)
1 parent d3a20a8 commit 5671d74

File tree

4 files changed

+162
-47
lines changed

4 files changed

+162
-47
lines changed

src/execution/live_updater.rs

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use std::time::Instant;
33
use crate::prelude::*;
44

55
use super::stats;
6+
use futures::future::try_join_all;
67
use sqlx::PgPool;
7-
use tokio::task::JoinSet;
8+
use tokio::{task::JoinSet, time::MissedTickBehavior};
89

910
pub struct FlowLiveUpdater {
1011
flow_ctx: Arc<FlowContext>,
@@ -22,6 +23,14 @@ pub struct FlowLiveUpdaterOptions {
2223
pub print_stats: bool,
2324
}
2425

26+
struct StatsReportState {
27+
last_report_time: Option<Instant>,
28+
last_stats: stats::UpdateStats,
29+
}
30+
31+
const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
32+
const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
33+
2534
async fn update_source(
2635
flow_ctx: Arc<FlowContext>,
2736
plan: Arc<plan::ExecutionPlan>,
@@ -35,44 +44,102 @@ async fn update_source(
3544
.await?;
3645

3746
let import_op = &plan.import_ops[source_idx];
38-
let maybe_print_stats = |stats: &stats::UpdateStats| {
47+
48+
let stats_report_state = Mutex::new(StatsReportState {
49+
last_report_time: None,
50+
last_stats: source_update_stats.as_ref().clone(),
51+
});
52+
let report_stats = || {
53+
let new_stats = source_update_stats.as_ref().clone();
54+
let now = Instant::now();
55+
let delta = {
56+
let mut state = stats_report_state.lock().unwrap();
57+
if let Some(last_report_time) = state.last_report_time {
58+
if now.duration_since(last_report_time) < MIN_REPORT_INTERVAL {
59+
return;
60+
}
61+
}
62+
let delta = new_stats.delta(&state.last_stats);
63+
if delta.is_zero() {
64+
return;
65+
}
66+
state.last_stats = new_stats;
67+
state.last_report_time = Some(now);
68+
delta
69+
};
3970
if options.print_stats {
4071
println!(
4172
"{}.{}: {}",
42-
flow_ctx.flow.flow_instance.name, import_op.name, stats
73+
flow_ctx.flow.flow_instance.name, import_op.name, delta
4374
);
4475
} else {
4576
trace!(
4677
"{}.{}: {}",
4778
flow_ctx.flow.flow_instance.name,
4879
import_op.name,
49-
stats
80+
delta
5081
);
5182
}
5283
};
5384

54-
let mut update_start = Instant::now();
55-
source_context.update(&pool, &source_update_stats).await?;
56-
maybe_print_stats(&source_update_stats);
57-
58-
if let (true, Some(refresh_interval)) = (
59-
options.live_mode,
60-
import_op.refresh_options.refresh_interval,
61-
) {
62-
let mut last_stats = source_update_stats.as_ref().clone();
63-
loop {
64-
let elapsed = update_start.elapsed();
65-
if elapsed < refresh_interval {
66-
tokio::time::sleep(refresh_interval - elapsed).await;
85+
let mut futs: Vec<BoxFuture<'_, Result<()>>> = Vec::new();
86+
87+
// Deal with change streams.
88+
if let (true, Some(change_stream)) = (options.live_mode, import_op.executor.change_stream()) {
89+
let pool = pool.clone();
90+
let source_update_stats = source_update_stats.clone();
91+
futs.push(
92+
async move {
93+
let mut change_stream = change_stream;
94+
while let Some(change) = change_stream.next().await {
95+
source_context
96+
.process_change(change, &pool, &source_update_stats)
97+
.map(tokio::spawn);
98+
}
99+
Ok(())
67100
}
68-
update_start = Instant::now();
101+
.boxed(),
102+
);
103+
futs.push(
104+
async move {
105+
let mut interval = tokio::time::interval(REPORT_INTERVAL);
106+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
107+
interval.tick().await;
108+
loop {
109+
interval.tick().await;
110+
report_stats();
111+
}
112+
}
113+
.boxed(),
114+
);
115+
}
116+
117+
// The main update loop.
118+
let source_update_stats = source_update_stats.clone();
119+
futs.push(
120+
async move {
69121
source_context.update(&pool, &source_update_stats).await?;
122+
report_stats();
70123

71-
let this_stats = source_update_stats.as_ref().clone();
72-
maybe_print_stats(&this_stats.delta(&last_stats));
73-
last_stats = this_stats;
124+
if let (true, Some(refresh_interval)) = (
125+
options.live_mode,
126+
import_op.refresh_options.refresh_interval,
127+
) {
128+
let mut interval = tokio::time::interval(refresh_interval);
129+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
130+
interval.tick().await;
131+
loop {
132+
interval.tick().await;
133+
source_context.update(&pool, &source_update_stats).await?;
134+
report_stats();
135+
}
136+
}
137+
Ok(())
74138
}
75-
}
139+
.boxed(),
140+
);
141+
142+
try_join_all(futs).await?;
76143
Ok(())
77144
}
78145

src/execution/source_indexer.rs

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use crate::prelude::*;
1+
use crate::{ops::interface::SourceValueChange, prelude::*};
22

33
use sqlx::PgPool;
44
use std::collections::{hash_map, HashMap};
55
use tokio::{sync::Semaphore, task::JoinSet};
66

77
use super::{
88
db_tracking,
9-
row_indexer::{self, SkippedOr, SourceVersion},
9+
row_indexer::{self, SkippedOr, SourceVersion, SourceVersionKind},
1010
stats,
1111
};
1212
struct SourceRowIndexingState {
@@ -78,21 +78,23 @@ impl SourceIndexingContext {
7878
})
7979
}
8080

81-
fn process_source_key(
81+
async fn process_source_key(
8282
self: Arc<Self>,
8383
key: value::KeyValue,
8484
source_version: SourceVersion,
85+
value: Option<value::FieldValues>,
8586
update_stats: Arc<stats::UpdateStats>,
8687
processing_sem: Arc<Semaphore>,
8788
pool: PgPool,
88-
join_set: &mut JoinSet<Result<()>>,
8989
) {
90-
let fut = async move {
90+
let process = async move {
9191
let permit = processing_sem.acquire().await?;
9292
let plan = self.flow.get_execution_plan().await?;
9393
let import_op = &plan.import_ops[self.source_idx];
9494
let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted {
9595
None
96+
} else if let Some(value) = value {
97+
Some(value)
9698
} else {
9799
// Even if the source version kind is not Deleted, the source value might be gone one polling.
98100
// In this case, we still use the current source version even if it's already stale - actually this version skew
@@ -154,17 +156,19 @@ impl SourceIndexingContext {
154156
drop(permit);
155157
anyhow::Ok(())
156158
};
157-
join_set.spawn(fut);
159+
if let Err(e) = process.await {
160+
error!("{:?}", e.context("Error in processing a source row"));
161+
}
158162
}
159163

160164
fn process_source_key_if_newer(
161165
self: &Arc<Self>,
162166
key: value::KeyValue,
163167
source_version: SourceVersion,
168+
value: Option<value::FieldValues>,
164169
update_stats: &Arc<stats::UpdateStats>,
165170
pool: &PgPool,
166-
join_set: &mut JoinSet<Result<()>>,
167-
) {
171+
) -> Option<impl Future<Output = ()> + Send + 'static> {
168172
let processing_sem = {
169173
let mut state = self.state.lock().unwrap();
170174
let scan_generation = state.scan_generation;
@@ -174,19 +178,19 @@ impl SourceIndexingContext {
174178
.source_version
175179
.should_skip(&source_version, Some(&update_stats))
176180
{
177-
return;
181+
return None;
178182
}
179183
row_state.source_version = source_version.clone();
180184
row_state.processing_sem.clone()
181185
};
182-
self.clone().process_source_key(
186+
Some(self.clone().process_source_key(
183187
key,
184188
source_version,
189+
value,
185190
update_stats.clone(),
186191
processing_sem,
187192
pool.clone(),
188-
join_set,
189-
);
193+
))
190194
}
191195

192196
pub async fn update(
@@ -212,15 +216,18 @@ impl SourceIndexingContext {
212216
self.process_source_key_if_newer(
213217
row.key,
214218
SourceVersion::from_current(row.ordinal),
219+
None,
215220
update_stats,
216221
pool,
217-
&mut join_set,
218-
);
222+
)
223+
.map(|fut| join_set.spawn(fut));
219224
}
220225
}
221226
while let Some(result) = join_set.join_next().await {
222-
if let Err(e) = (|| anyhow::Ok(result??))() {
223-
error!("{:?}", e.context("Error in indexing a source row"));
227+
if let Err(e) = result {
228+
if !e.is_cancelled() {
229+
error!("{:?}", e);
230+
}
224231
}
225232
}
226233

@@ -239,21 +246,45 @@ impl SourceIndexingContext {
239246
deleted_key_versions
240247
};
241248
for (key, source_version, processing_sem) in deleted_key_versions {
242-
self.clone().process_source_key(
249+
join_set.spawn(self.clone().process_source_key(
243250
key,
244251
source_version,
252+
None,
245253
update_stats.clone(),
246254
processing_sem,
247255
pool.clone(),
248-
&mut join_set,
249-
);
256+
));
250257
}
251258
while let Some(result) = join_set.join_next().await {
252-
if let Err(e) = (|| anyhow::Ok(result??))() {
253-
error!("{:?}", e.context("Error in deleting a source row"));
259+
if let Err(e) = result {
260+
if !e.is_cancelled() {
261+
error!("{:?}", e);
262+
}
254263
}
255264
}
256265

257266
Ok(())
258267
}
268+
269+
pub fn process_change(
270+
self: &Arc<Self>,
271+
change: interface::SourceChange,
272+
pool: &PgPool,
273+
update_stats: &Arc<stats::UpdateStats>,
274+
) -> Option<impl Future<Output = ()> + Send + 'static> {
275+
let (source_version_kind, value) = match change.value {
276+
SourceValueChange::Upsert(value) => (SourceVersionKind::CurrentLogic, value),
277+
SourceValueChange::Delete => (SourceVersionKind::Deleted, None),
278+
};
279+
self.process_source_key_if_newer(
280+
change.key,
281+
SourceVersion {
282+
ordinal: change.ordinal,
283+
kind: source_version_kind,
284+
},
285+
value,
286+
update_stats,
287+
pool,
288+
)
289+
}
259290
}

src/execution/stats.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ impl UpdateStats {
6969
num_errors: self.num_errors.delta(&base.num_errors),
7070
}
7171
}
72+
73+
pub fn is_zero(&self) -> bool {
74+
self.num_skipped.get() == 0
75+
&& self.num_insertions.get() == 0
76+
&& self.num_deletions.get() == 0
77+
&& self.num_repreocesses.get() == 0
78+
&& self.num_errors.get() == 0
79+
}
7280
}
7381

7482
impl std::fmt::Display for UpdateStats {
@@ -97,6 +105,10 @@ impl std::fmt::Display for UpdateStats {
97105
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
98106
)?;
99107
}
108+
109+
if num_skipped == 0 && num_source_rows == 0 {
110+
write!(f, "no changes")?;
111+
}
100112
Ok(())
101113
}
102114
}

src/ops/interface.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,17 @@ pub struct SourceRowMetadata {
4646
pub ordinal: Option<Ordinal>,
4747
}
4848

49-
pub struct SourceChange<'a> {
49+
pub enum SourceValueChange {
50+
/// None means value unavailable in this change - needs a separate poll by get_value() API.
51+
Upsert(Option<FieldValues>),
52+
Delete,
53+
}
54+
55+
pub struct SourceChange {
5056
/// Last update/deletion ordinal. None means unavailable.
5157
pub ordinal: Option<Ordinal>,
5258
pub key: KeyValue,
53-
/// None means a deletion. None within the `BoxFuture` means the item is gone when polling.
54-
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
59+
pub value: SourceValueChange,
5560
}
5661

5762
#[derive(Debug, Default)]
@@ -70,7 +75,7 @@ pub trait SourceExecutor: Send + Sync {
7075
// Get the value for the given key.
7176
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
7277

73-
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
78+
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange>> {
7479
None
7580
}
7681
}

0 commit comments

Comments
 (0)