Skip to content

Commit 2c59ba7

Browse files
authored
refactor: wrap optional args for process_source_key() (#872)
1 parent cdc3db5 commit 2c59ba7

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

src/execution/live_updater.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{execution::stats::UpdateStats, prelude::*};
1+
use crate::{
2+
execution::{source_indexer::ProcessSourceKeyOptions, stats::UpdateStats},
3+
prelude::*,
4+
};
25

36
use super::stats;
47
use futures::future::try_join_all;
@@ -191,14 +194,17 @@ impl SourceUpdateTask {
191194
.await?;
192195
tokio::spawn(source_context.clone().process_source_key(
193196
change.key,
194-
Some(change.key_aux_info),
195-
change.data,
196197
update_stats.clone(),
197198
concur_permit,
198199
Some(move || async move {
199200
SharedAckFn::ack(&shared_ack_fn).await
200201
}),
201202
pool.clone(),
203+
ProcessSourceKeyOptions {
204+
key_aux_info: Some(change.key_aux_info),
205+
source_data: change.data,
206+
..Default::default()
207+
},
202208
));
203209
}
204210
}

src/execution/source_indexer.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ pub struct SourceIndexingContext {
4848

4949
pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
5050

51+
#[derive(Default)]
52+
pub struct ProcessSourceKeyOptions {
53+
/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
54+
pub key_aux_info: Option<serde_json::Value>,
55+
pub source_data: Option<interface::SourceData>,
56+
}
57+
5158
impl SourceIndexingContext {
5259
pub async fn load(
5360
flow: Arc<builder::AnalyzedFlow>,
@@ -100,31 +107,29 @@ impl SourceIndexingContext {
100107
})
101108
}
102109

103-
/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
104110
pub async fn process_source_key<
105111
AckFut: Future<Output = Result<()>> + Send + 'static,
106112
AckFn: FnOnce() -> AckFut,
107113
>(
108114
self: Arc<Self>,
109115
key: value::KeyValue,
110-
key_aux_info: Option<serde_json::Value>,
111-
source_data: Option<interface::SourceData>,
112116
update_stats: Arc<stats::UpdateStats>,
113117
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
114118
ack_fn: Option<AckFn>,
115119
pool: PgPool,
120+
options: ProcessSourceKeyOptions,
116121
) {
117122
let process = async {
118123
let plan = self.flow.get_execution_plan().await?;
119124
let import_op = &plan.import_ops[self.source_idx];
120125
let schema = &self.flow.data_schema;
121-
let source_data = match source_data {
126+
let source_data = match options.source_data {
122127
Some(source_data) => source_data,
123128
None => import_op
124129
.executor
125130
.get_value(
126131
&key,
127-
key_aux_info.as_ref().ok_or_else(|| {
132+
options.key_aux_info.as_ref().ok_or_else(|| {
128133
anyhow::anyhow!(
129134
"`key_aux_info` must be provided when there's no `source_data`"
130135
)
@@ -337,12 +342,14 @@ impl SourceIndexingContext {
337342
.await?;
338343
join_set.spawn(self.clone().process_source_key(
339344
row.key,
340-
Some(row.key_aux_info),
341-
None,
342345
update_stats.clone(),
343346
concur_permit,
344347
NO_ACK,
345348
pool.clone(),
349+
ProcessSourceKeyOptions {
350+
key_aux_info: Some(row.key_aux_info),
351+
..Default::default()
352+
},
346353
));
347354
}
348355
}
@@ -372,12 +379,14 @@ impl SourceIndexingContext {
372379
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
373380
join_set.spawn(self.clone().process_source_key(
374381
key,
375-
None,
376-
Some(source_data),
377382
update_stats.clone(),
378383
concur_permit,
379384
NO_ACK,
380385
pool.clone(),
386+
ProcessSourceKeyOptions {
387+
source_data: Some(source_data),
388+
..Default::default()
389+
},
381390
));
382391
}
383392
while let Some(result) = join_set.join_next().await {

0 commit comments

Comments
 (0)