Skip to content

Commit 32171fd

Browse files
authored
feat(reexport): expose an option to reexport to targets during export (#974)
1 parent 074bf52 commit 32171fd

File tree

6 files changed

+137
-49
lines changed

6 files changed

+137
-49
lines changed

python/cocoindex/cli.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,13 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
363363
default=False,
364364
help="Continuously watch changes from data sources and apply to the target index.",
365365
)
366+
@click.option(
367+
"--reexport",
368+
is_flag=True,
369+
show_default=True,
370+
default=False,
371+
help="Reexport to targets even if there's no change.",
372+
)
366373
@click.option(
367374
"--setup",
368375
is_flag=True,
@@ -389,6 +396,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], force: bool) -> Non
389396
def update(
390397
app_flow_specifier: str,
391398
live: bool,
399+
reexport: bool,
392400
setup: bool, # pylint: disable=redefined-outer-name
393401
force: bool,
394402
quiet: bool,
@@ -408,7 +416,11 @@ def update(
408416
fg="yellow",
409417
)
410418

411-
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
419+
options = flow.FlowLiveUpdaterOptions(
420+
live_mode=live,
421+
reexport_targets=reexport,
422+
print_stats=not quiet,
423+
)
412424
if flow_name is None:
413425
if setup:
414426
_setup_flows(

python/cocoindex/flow.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,9 +563,14 @@ def declare(self, spec: op.DeclarationSpec) -> None:
563563
class FlowLiveUpdaterOptions:
564564
"""
565565
Options for live updating a flow.
566+
567+
- live_mode: Whether to perform live update for data sources with change capture mechanisms.
568+
- reexport_targets: Whether to reexport to targets even if there's no change.
569+
- print_stats: Whether to print stats during update.
566570
"""
567571

568572
live_mode: bool = True
573+
reexport_targets: bool = False
569574
print_stats: bool = False
570575

571576

@@ -759,20 +764,25 @@ def full_name(self) -> str:
759764
"""
760765
return self._full_name
761766

762-
def update(self) -> _engine.IndexUpdateInfo:
767+
def update(self, /, *, reexport_targets: bool = False) -> _engine.IndexUpdateInfo:
763768
"""
764769
Update the index defined by the flow.
765770
Once the function returns, the index is fresh up to the moment when the function is called.
766771
"""
767-
return execution_context.run(self.update_async())
772+
return execution_context.run(
773+
self.update_async(reexport_targets=reexport_targets)
774+
)
768775

769-
async def update_async(self) -> _engine.IndexUpdateInfo:
776+
async def update_async(
777+
self, /, *, reexport_targets: bool = False
778+
) -> _engine.IndexUpdateInfo:
770779
"""
771780
Update the index defined by the flow.
772781
Once the function returns, the index is fresh up to the moment when the function is called.
773782
"""
774783
async with FlowLiveUpdater(
775-
self, FlowLiveUpdaterOptions(live_mode=False)
784+
self,
785+
FlowLiveUpdaterOptions(live_mode=False, reexport_targets=reexport_targets),
776786
) as updater:
777787
await updater.wait_async()
778788
return updater.update_stats()

src/execution/live_updater.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ pub struct FlowLiveUpdaterOptions {
4141
/// Otherwise, it will only apply changes from the source up to the current time.
4242
pub live_mode: bool,
4343

44+
/// If true, the updater will reexport the targets even if there's no change.
45+
pub reexport_targets: bool,
46+
4447
/// If true, stats will be printed to the console.
4548
pub print_stats: bool,
4649
}
@@ -198,6 +201,7 @@ impl SourceUpdateTask {
198201
key_aux_info: Some(change.key_aux_info),
199202
data: change.data,
200203
},
204+
super::source_indexer::UpdateMode::Normal,
201205
update_stats.clone(),
202206
concur_permit,
203207
Some(move || async move {
@@ -243,7 +247,18 @@ impl SourceUpdateTask {
243247
async move {
244248
let update_stats = Arc::new(stats::UpdateStats::default());
245249
source_context
246-
.update(&pool, &update_stats, /*expect_little_diff=*/ false)
250+
.update(
251+
&pool,
252+
&update_stats,
253+
super::source_indexer::UpdateOptions {
254+
expect_little_diff: false,
255+
mode: if self.options.reexport_targets {
256+
super::source_indexer::UpdateMode::ReexportTargets
257+
} else {
258+
super::source_indexer::UpdateMode::Normal
259+
},
260+
},
261+
)
247262
.await?;
248263
if update_stats.has_any_change() {
249264
status_tx.send_modify(|update| {
@@ -263,7 +278,14 @@ impl SourceUpdateTask {
263278

264279
let update_stats = Arc::new(stats::UpdateStats::default());
265280
source_context
266-
.update(&pool, &update_stats, /*expect_little_diff=*/ true)
281+
.update(
282+
&pool,
283+
&update_stats,
284+
super::source_indexer::UpdateOptions {
285+
expect_little_diff: true,
286+
mode: super::source_indexer::UpdateMode::Normal,
287+
},
288+
)
267289
.await?;
268290
if update_stats.has_any_change() {
269291
status_tx.send_modify(|update| {

src/execution/row_indexer.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ struct PrecommitOutput {
181181
pub struct RowIndexer<'a> {
182182
src_eval_ctx: &'a SourceRowEvaluationContext<'a>,
183183
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
184+
mode: super::source_indexer::UpdateMode,
184185
update_stats: &'a stats::UpdateStats,
185186
pool: &'a PgPool,
186187

@@ -197,8 +198,9 @@ impl<'a> RowIndexer<'a> {
197198
pub fn new(
198199
src_eval_ctx: &'a SourceRowEvaluationContext<'_>,
199200
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
200-
pool: &'a PgPool,
201+
mode: super::source_indexer::UpdateMode,
201202
update_stats: &'a stats::UpdateStats,
203+
pool: &'a PgPool,
202204
) -> Result<Self> {
203205
Ok(Self {
204206
source_id: setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id,
@@ -207,8 +209,9 @@ impl<'a> RowIndexer<'a> {
207209

208210
src_eval_ctx,
209211
setup_execution_ctx,
210-
pool,
212+
mode,
211213
update_stats,
214+
pool,
212215
})
213216
}
214217

@@ -236,7 +239,9 @@ impl<'a> RowIndexer<'a> {
236239
);
237240

238241
// First check ordinal-based skipping
239-
if existing_version.should_skip(source_version, Some(self.update_stats)) {
242+
if self.mode == super::source_indexer::UpdateMode::Normal
243+
&& existing_version.should_skip(source_version, Some(self.update_stats))
244+
{
240245
return Ok(SkippedOr::Skipped(
241246
existing_version,
242247
info.processed_source_fp.clone(),
@@ -260,7 +265,9 @@ impl<'a> RowIndexer<'a> {
260265
(None, interface::SourceValue::NonExistence) => None,
261266
};
262267

263-
if let Some(content_version_fp) = &content_version_fp {
268+
if self.mode == super::source_indexer::UpdateMode::Normal
269+
&& let Some(content_version_fp) = &content_version_fp
270+
{
264271
let baseline = if tracking_setup_state.has_fast_fingerprint_column {
265272
existing_tracking_info
266273
.as_ref()
@@ -373,7 +380,9 @@ impl<'a> RowIndexer<'a> {
373380

374381
if let Some(existing_version) = existing_version {
375382
if output.is_some() {
376-
if existing_version.kind == SourceVersionKind::DifferentLogic {
383+
if existing_version.kind == SourceVersionKind::DifferentLogic
384+
|| self.mode == super::source_indexer::UpdateMode::ReexportTargets
385+
{
377386
self.update_stats.num_reprocesses.inc(1);
378387
} else {
379388
self.update_stats.num_updates.inc(1);
@@ -398,7 +407,9 @@ impl<'a> RowIndexer<'a> {
398407
let tracking_table_setup = &self.setup_execution_ctx.setup_state.tracking_table;
399408

400409
// Check if we can use content hash optimization
401-
if existing_version.kind != SourceVersionKind::CurrentLogic {
410+
if self.mode != super::source_indexer::UpdateMode::Normal
411+
|| existing_version.kind != SourceVersionKind::CurrentLogic
412+
{
402413
return Ok(None);
403414
}
404415

@@ -509,7 +520,9 @@ impl<'a> RowIndexer<'a> {
509520
&mut *txn,
510521
)
511522
.await?;
512-
if let Some(tracking_info) = &tracking_info {
523+
if self.mode == super::source_indexer::UpdateMode::Normal
524+
&& let Some(tracking_info) = &tracking_info
525+
{
513526
let existing_source_version =
514527
SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp);
515528
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
@@ -620,13 +633,12 @@ impl<'a> RowIndexer<'a> {
620633
} else {
621634
None
622635
};
623-
if existing_target_keys
624-
.as_ref()
625-
.map(|keys| !keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp))
626-
.unwrap_or(false)
636+
if self.mode == super::source_indexer::UpdateMode::Normal
637+
&& existing_target_keys.as_ref().map_or(false, |keys| {
638+
!keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp)
639+
})
627640
&& existing_staging_target_keys
628-
.map(|keys| keys.iter().all(|(_, fp)| fp == &curr_fp))
629-
.unwrap_or(true)
641+
.map_or(true, |keys| keys.iter().all(|(_, fp)| fp == &curr_fp))
630642
{
631643
// carry over existing target keys info
632644
let (existing_ordinal, existing_fp) = existing_target_keys

0 commit comments

Comments
 (0)