Skip to content

Commit ecc40b6

Browse files
authored
refactor: rename target setup related stuffs for ease of understanding (#880)
1 parent fc6bfe5 commit ecc40b6

File tree

14 files changed

+215
-215
lines changed

14 files changed

+215
-215
lines changed

src/builder/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub enum AnalyzedPrimaryKeyDef {
101101
pub struct AnalyzedExportOp {
102102
pub name: String,
103103
pub input: AnalyzedLocalCollectorReference,
104-
pub export_target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
104+
pub export_target_factory: Arc<dyn TargetFactory + Send + Sync>,
105105
pub export_context: Arc<dyn Any + Send + Sync>,
106106
pub primary_key_def: AnalyzedPrimaryKeyDef,
107107
pub primary_key_type: schema::ValueType,
@@ -113,7 +113,7 @@ pub struct AnalyzedExportOp {
113113
}
114114

115115
pub struct AnalyzedExportTargetOpGroup {
116-
pub target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
116+
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
117117
pub op_idx: Vec<usize>,
118118
}
119119

src/execution/db_tracking_setup.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::prelude::*;
22

3-
use crate::setup::{CombinedState, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType};
3+
use crate::setup::{CombinedState, ResourceSetupChange, ResourceSetupInfo, SetupChangeType};
44
use serde::{Deserialize, Serialize};
55
use sqlx::PgPool;
66

@@ -76,7 +76,7 @@ pub struct TrackingTableSetupState {
7676
}
7777

7878
#[derive(Debug)]
79-
pub struct TrackingTableSetupStatus {
79+
pub struct TrackingTableSetupChange {
8080
pub desired_state: Option<TrackingTableSetupState>,
8181

8282
pub min_existing_version_id: Option<i32>,
@@ -88,7 +88,7 @@ pub struct TrackingTableSetupStatus {
8888
pub source_ids_to_delete: Vec<i32>,
8989
}
9090

91-
impl TrackingTableSetupStatus {
91+
impl TrackingTableSetupChange {
9292
pub fn new(
9393
desired: Option<&TrackingTableSetupState>,
9494
existing: &CombinedState<TrackingTableSetupState>,
@@ -127,18 +127,18 @@ impl TrackingTableSetupStatus {
127127

128128
pub fn into_setup_info(
129129
self,
130-
) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatus> {
130+
) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange> {
131131
ResourceSetupInfo {
132132
key: (),
133133
state: self.desired_state.clone(),
134134
description: "Internal Storage for Tracking".to_string(),
135-
setup_status: Some(self),
135+
setup_change: Some(self),
136136
legacy_key: None,
137137
}
138138
}
139139
}
140140

141-
impl ResourceSetupStatus for TrackingTableSetupStatus {
141+
impl ResourceSetupChange for TrackingTableSetupChange {
142142
fn describe_changes(&self) -> Vec<setup::ChangeDescription> {
143143
let mut changes: Vec<setup::ChangeDescription> = vec![];
144144
if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() {
@@ -234,7 +234,7 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
234234
}
235235
}
236236

237-
impl TrackingTableSetupStatus {
237+
impl TrackingTableSetupChange {
238238
pub async fn apply_change(&self) -> Result<()> {
239239
let lib_context = get_lib_context()?;
240240
let pool = lib_context.require_builtin_db_pool()?;

src/lib_context.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ use crate::builder::AnalyzedFlow;
66
use crate::execution::source_indexer::SourceIndexingContext;
77
use crate::service::error::ApiError;
88
use crate::settings;
9-
use crate::setup::ObjectSetupStatus;
9+
use crate::setup::ObjectSetupChange;
1010
use axum::http::StatusCode;
1111
use sqlx::PgPool;
1212
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
1313
use tokio::runtime::Runtime;
1414

1515
pub struct FlowExecutionContext {
1616
pub setup_execution_context: Arc<exec_ctx::FlowSetupExecutionContext>,
17-
pub setup_status: setup::FlowSetupStatus,
17+
pub setup_change: setup::FlowSetupChange,
1818
source_indexing_contexts: Vec<tokio::sync::OnceCell<Arc<SourceIndexingContext>>>,
1919
}
2020

@@ -23,7 +23,7 @@ async fn build_setup_context(
2323
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
2424
) -> Result<(
2525
Arc<exec_ctx::FlowSetupExecutionContext>,
26-
setup::FlowSetupStatus,
26+
setup::FlowSetupChange,
2727
)> {
2828
let setup_execution_context = Arc::new(exec_ctx::build_flow_setup_execution_context(
2929
&analyzed_flow.flow_instance,
@@ -32,22 +32,22 @@ async fn build_setup_context(
3232
existing_flow_ss,
3333
)?);
3434

35-
let setup_status = setup::check_flow_setup_status(
35+
let setup_change = setup::diff_flow_setup_states(
3636
Some(&setup_execution_context.setup_state),
3737
existing_flow_ss,
3838
&analyzed_flow.flow_instance_ctx,
3939
)
4040
.await?;
4141

42-
Ok((setup_execution_context, setup_status))
42+
Ok((setup_execution_context, setup_change))
4343
}
4444

4545
impl FlowExecutionContext {
4646
async fn new(
4747
analyzed_flow: &AnalyzedFlow,
4848
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
4949
) -> Result<Self> {
50-
let (setup_execution_context, setup_status) =
50+
let (setup_execution_context, setup_change) =
5151
build_setup_context(analyzed_flow, existing_flow_ss).await?;
5252

5353
let mut source_indexing_contexts = Vec::new();
@@ -57,7 +57,7 @@ impl FlowExecutionContext {
5757

5858
Ok(Self {
5959
setup_execution_context,
60-
setup_status,
60+
setup_change,
6161
source_indexing_contexts,
6262
})
6363
}
@@ -67,11 +67,11 @@ impl FlowExecutionContext {
6767
analyzed_flow: &AnalyzedFlow,
6868
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
6969
) -> Result<()> {
70-
let (setup_execution_context, setup_status) =
70+
let (setup_execution_context, setup_change) =
7171
build_setup_context(analyzed_flow, existing_flow_ss).await?;
7272

7373
self.setup_execution_context = setup_execution_context;
74-
self.setup_status = setup_status;
74+
self.setup_change = setup_change;
7575
Ok(())
7676
}
7777

@@ -124,7 +124,7 @@ impl FlowContext {
124124
&self,
125125
) -> Result<tokio::sync::RwLockReadGuard<FlowExecutionContext>> {
126126
let execution_ctx = self.execution_ctx.read().await;
127-
if !execution_ctx.setup_status.is_up_to_date() {
127+
if !execution_ctx.setup_change.is_up_to_date() {
128128
api_bail!(
129129
"Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.",
130130
self.flow_name()
@@ -137,7 +137,7 @@ impl FlowContext {
137137
&self,
138138
) -> Result<tokio::sync::OwnedRwLockReadGuard<FlowExecutionContext>> {
139139
let execution_ctx = self.execution_ctx.clone().read_owned().await;
140-
if !execution_ctx.setup_status.is_up_to_date() {
140+
if !execution_ctx.setup_change.is_up_to_date() {
141141
api_bail!(
142142
"Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.",
143143
self.flow_name()
@@ -212,7 +212,7 @@ impl DbPools {
212212

213213
pub struct LibSetupContext {
214214
pub all_setup_states: setup::AllSetupStates<setup::ExistingMode>,
215-
pub global_setup_status: setup::GlobalSetupStatus,
215+
pub global_setup_change: setup::GlobalSetupChange,
216216
}
217217
pub struct PersistenceContext {
218218
pub builtin_db_pool: PgPool,
@@ -286,7 +286,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
286286
Some(PersistenceContext {
287287
builtin_db_pool: pool,
288288
setup_ctx: tokio::sync::RwLock::new(LibSetupContext {
289-
global_setup_status: setup::GlobalSetupStatus::from_setup_states(&all_setup_states),
289+
global_setup_change: setup::GlobalSetupChange::from_setup_states(&all_setup_states),
290290
all_setup_states,
291291
}),
292292
})

src/ops/factory_bases.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::prelude::*;
2-
use crate::setup::ResourceSetupStatus;
2+
use crate::setup::ResourceSetupChange;
33
use std::fmt::Debug;
44
use std::hash::Hash;
55

@@ -350,31 +350,31 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
350350
}
351351
}
352352

353-
pub struct TypedExportDataCollectionBuildOutput<F: StorageFactoryBase + ?Sized> {
353+
pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
354354
pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
355355
pub setup_key: F::Key,
356356
pub desired_setup_state: F::SetupState,
357357
}
358-
pub struct TypedExportDataCollectionSpec<F: StorageFactoryBase + ?Sized> {
358+
pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
359359
pub name: String,
360360
pub spec: F::Spec,
361361
pub key_fields_schema: Vec<FieldSchema>,
362362
pub value_fields_schema: Vec<FieldSchema>,
363363
pub index_options: IndexOptions,
364364
}
365365

366-
pub struct TypedResourceSetupChangeItem<'a, F: StorageFactoryBase + ?Sized> {
366+
pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
367367
pub key: F::Key,
368-
pub setup_status: &'a F::SetupStatus,
368+
pub setup_change: &'a F::SetupChange,
369369
}
370370

371371
#[async_trait]
372-
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
372+
pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
373373
type Spec: DeserializeOwned + Send + Sync;
374374
type DeclarationSpec: DeserializeOwned + Send + Sync;
375375
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
376376
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
377-
type SetupStatus: ResourceSetupStatus;
377+
type SetupChange: ResourceSetupChange;
378378
type ExportContext: Send + Sync + 'static;
379379

380380
fn name(&self) -> &str;
@@ -397,13 +397,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
397397

398398
/// Will not be called if it's setup by user.
399399
/// It returns an error if the target only supports setup by user.
400-
async fn check_setup_status(
400+
async fn diff_setup_states(
401401
&self,
402402
key: Self::Key,
403403
desired_state: Option<Self::SetupState>,
404404
existing_states: setup::CombinedState<Self::SetupState>,
405405
flow_instance_ctx: Arc<FlowInstanceContext>,
406-
) -> Result<Self::SetupStatus>;
406+
) -> Result<Self::SetupChange>;
407407

408408
fn check_state_compatibility(
409409
&self,
@@ -439,13 +439,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
439439

440440
async fn apply_setup_changes(
441441
&self,
442-
setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
442+
setup_change: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
443443
context: Arc<FlowInstanceContext>,
444444
) -> Result<()>;
445445
}
446446

447447
#[async_trait]
448-
impl<T: StorageFactoryBase> ExportTargetFactory for T {
448+
impl<T: TargetFactoryBase> TargetFactory for T {
449449
async fn build(
450450
self: Arc<Self>,
451451
data_collections: Vec<interface::ExportDataCollectionSpec>,
@@ -455,7 +455,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
455455
Vec<interface::ExportDataCollectionBuildOutput>,
456456
Vec<(serde_json::Value, serde_json::Value)>,
457457
)> {
458-
let (data_coll_output, decl_output) = StorageFactoryBase::build(
458+
let (data_coll_output, decl_output) = TargetFactoryBase::build(
459459
self,
460460
data_collections
461461
.into_iter()
@@ -497,32 +497,32 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
497497
Ok((data_coll_output, decl_output))
498498
}
499499

500-
async fn check_setup_status(
500+
async fn diff_setup_states(
501501
&self,
502502
key: &serde_json::Value,
503503
desired_state: Option<serde_json::Value>,
504504
existing_states: setup::CombinedState<serde_json::Value>,
505505
flow_instance_ctx: Arc<FlowInstanceContext>,
506-
) -> Result<Box<dyn setup::ResourceSetupStatus>> {
506+
) -> Result<Box<dyn setup::ResourceSetupChange>> {
507507
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
508508
let desired_state: Option<T::SetupState> = desired_state
509509
.map(|v| serde_json::from_value(v.clone()))
510510
.transpose()?;
511511
let existing_states = from_json_combined_state(existing_states)?;
512-
let setup_status = StorageFactoryBase::check_setup_status(
512+
let setup_change = TargetFactoryBase::diff_setup_states(
513513
self,
514514
key,
515515
desired_state,
516516
existing_states,
517517
flow_instance_ctx,
518518
)
519519
.await?;
520-
Ok(Box::new(setup_status))
520+
Ok(Box::new(setup_change))
521521
}
522522

523523
fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
524524
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
525-
StorageFactoryBase::describe_resource(self, &key)
525+
TargetFactoryBase::describe_resource(self, &key)
526526
}
527527

528528
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
@@ -535,21 +535,23 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
535535
desired_state: &serde_json::Value,
536536
existing_state: &serde_json::Value,
537537
) -> Result<SetupStateCompatibility> {
538-
let result = StorageFactoryBase::check_state_compatibility(
538+
let result = TargetFactoryBase::check_state_compatibility(
539539
self,
540540
&serde_json::from_value(desired_state.clone())?,
541541
&serde_json::from_value(existing_state.clone())?,
542542
)?;
543543
Ok(result)
544544
}
545545

546+
/// Extract additional keys that are passed through as part of the mutation to `apply_mutation()`.
547+
/// This is useful for targets that need to use additional parts as key for the target (which is not considered as part of the key for cocoindex).
546548
fn extract_additional_key(
547549
&self,
548550
key: &value::KeyValue,
549551
value: &value::FieldValues,
550552
export_context: &(dyn Any + Send + Sync),
551553
) -> Result<serde_json::Value> {
552-
StorageFactoryBase::extract_additional_key(
554+
TargetFactoryBase::extract_additional_key(
553555
self,
554556
key,
555557
value,
@@ -575,23 +577,23 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
575577
})
576578
})
577579
.collect::<Result<_>>()?;
578-
StorageFactoryBase::apply_mutation(self, mutations).await
580+
TargetFactoryBase::apply_mutation(self, mutations).await
579581
}
580582

581583
async fn apply_setup_changes(
582584
&self,
583-
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
585+
setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
584586
context: Arc<FlowInstanceContext>,
585587
) -> Result<()> {
586-
StorageFactoryBase::apply_setup_changes(
588+
TargetFactoryBase::apply_setup_changes(
587589
self,
588-
setup_status
590+
setup_change
589591
.into_iter()
590592
.map(|item| -> anyhow::Result<_> {
591593
Ok(TypedResourceSetupChangeItem {
592594
key: serde_json::from_value(item.key.clone())?,
593-
setup_status: (item.setup_status as &dyn Any)
594-
.downcast_ref::<T::SetupStatus>()
595+
setup_change: (item.setup_change as &dyn Any)
596+
.downcast_ref::<T::SetupChange>()
595597
.ok_or_else(invariance_violation)?,
596598
})
597599
})

0 commit comments

Comments
 (0)