Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub enum AnalyzedPrimaryKeyDef {
pub struct AnalyzedExportOp {
pub name: String,
pub input: AnalyzedLocalCollectorReference,
pub export_target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
pub export_target_factory: Arc<dyn TargetFactory + Send + Sync>,
pub export_context: Arc<dyn Any + Send + Sync>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
pub primary_key_type: schema::ValueType,
Expand All @@ -113,7 +113,7 @@ pub struct AnalyzedExportOp {
}

pub struct AnalyzedExportTargetOpGroup {
pub target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
pub op_idx: Vec<usize>,
}

Expand Down
14 changes: 7 additions & 7 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::prelude::*;

use crate::setup::{CombinedState, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType};
use crate::setup::{CombinedState, ResourceSetupChange, ResourceSetupInfo, SetupChangeType};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct TrackingTableSetupState {
}

#[derive(Debug)]
pub struct TrackingTableSetupStatus {
pub struct TrackingTableSetupChange {
pub desired_state: Option<TrackingTableSetupState>,

pub min_existing_version_id: Option<i32>,
Expand All @@ -88,7 +88,7 @@ pub struct TrackingTableSetupStatus {
pub source_ids_to_delete: Vec<i32>,
}

impl TrackingTableSetupStatus {
impl TrackingTableSetupChange {
pub fn new(
desired: Option<&TrackingTableSetupState>,
existing: &CombinedState<TrackingTableSetupState>,
Expand Down Expand Up @@ -127,18 +127,18 @@ impl TrackingTableSetupStatus {

pub fn into_setup_info(
self,
) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatus> {
) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange> {
ResourceSetupInfo {
key: (),
state: self.desired_state.clone(),
description: "Internal Storage for Tracking".to_string(),
setup_status: Some(self),
setup_change: Some(self),
legacy_key: None,
}
}
}

impl ResourceSetupStatus for TrackingTableSetupStatus {
impl ResourceSetupChange for TrackingTableSetupChange {
fn describe_changes(&self) -> Vec<setup::ChangeDescription> {
let mut changes: Vec<setup::ChangeDescription> = vec![];
if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
}
}

impl TrackingTableSetupStatus {
impl TrackingTableSetupChange {
pub async fn apply_change(&self) -> Result<()> {
let lib_context = get_lib_context()?;
let pool = lib_context.require_builtin_db_pool()?;
Expand Down
26 changes: 13 additions & 13 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use crate::builder::AnalyzedFlow;
use crate::execution::source_indexer::SourceIndexingContext;
use crate::service::error::ApiError;
use crate::settings;
use crate::setup::ObjectSetupStatus;
use crate::setup::ObjectSetupChange;
use axum::http::StatusCode;
use sqlx::PgPool;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use tokio::runtime::Runtime;

pub struct FlowExecutionContext {
pub setup_execution_context: Arc<exec_ctx::FlowSetupExecutionContext>,
pub setup_status: setup::FlowSetupStatus,
pub setup_change: setup::FlowSetupChange,
source_indexing_contexts: Vec<tokio::sync::OnceCell<Arc<SourceIndexingContext>>>,
}

Expand All @@ -23,7 +23,7 @@ async fn build_setup_context(
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
) -> Result<(
Arc<exec_ctx::FlowSetupExecutionContext>,
setup::FlowSetupStatus,
setup::FlowSetupChange,
)> {
let setup_execution_context = Arc::new(exec_ctx::build_flow_setup_execution_context(
&analyzed_flow.flow_instance,
Expand All @@ -32,22 +32,22 @@ async fn build_setup_context(
existing_flow_ss,
)?);

let setup_status = setup::check_flow_setup_status(
let setup_change = setup::diff_flow_setup_states(
Some(&setup_execution_context.setup_state),
existing_flow_ss,
&analyzed_flow.flow_instance_ctx,
)
.await?;

Ok((setup_execution_context, setup_status))
Ok((setup_execution_context, setup_change))
}

impl FlowExecutionContext {
async fn new(
analyzed_flow: &AnalyzedFlow,
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
) -> Result<Self> {
let (setup_execution_context, setup_status) =
let (setup_execution_context, setup_change) =
build_setup_context(analyzed_flow, existing_flow_ss).await?;

let mut source_indexing_contexts = Vec::new();
Expand All @@ -57,7 +57,7 @@ impl FlowExecutionContext {

Ok(Self {
setup_execution_context,
setup_status,
setup_change,
source_indexing_contexts,
})
}
Expand All @@ -67,11 +67,11 @@ impl FlowExecutionContext {
analyzed_flow: &AnalyzedFlow,
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
) -> Result<()> {
let (setup_execution_context, setup_status) =
let (setup_execution_context, setup_change) =
build_setup_context(analyzed_flow, existing_flow_ss).await?;

self.setup_execution_context = setup_execution_context;
self.setup_status = setup_status;
self.setup_change = setup_change;
Ok(())
}

Expand Down Expand Up @@ -124,7 +124,7 @@ impl FlowContext {
&self,
) -> Result<tokio::sync::RwLockReadGuard<FlowExecutionContext>> {
let execution_ctx = self.execution_ctx.read().await;
if !execution_ctx.setup_status.is_up_to_date() {
if !execution_ctx.setup_change.is_up_to_date() {
api_bail!(
"Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.",
self.flow_name()
Expand All @@ -137,7 +137,7 @@ impl FlowContext {
&self,
) -> Result<tokio::sync::OwnedRwLockReadGuard<FlowExecutionContext>> {
let execution_ctx = self.execution_ctx.clone().read_owned().await;
if !execution_ctx.setup_status.is_up_to_date() {
if !execution_ctx.setup_change.is_up_to_date() {
api_bail!(
"Setup for flow `{}` is not up-to-date. Please run `cocoindex setup` to update the setup.",
self.flow_name()
Expand Down Expand Up @@ -212,7 +212,7 @@ impl DbPools {

pub struct LibSetupContext {
pub all_setup_states: setup::AllSetupStates<setup::ExistingMode>,
pub global_setup_status: setup::GlobalSetupStatus,
pub global_setup_change: setup::GlobalSetupChange,
}
pub struct PersistenceContext {
pub builtin_db_pool: PgPool,
Expand Down Expand Up @@ -286,7 +286,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
Some(PersistenceContext {
builtin_db_pool: pool,
setup_ctx: tokio::sync::RwLock::new(LibSetupContext {
global_setup_status: setup::GlobalSetupStatus::from_setup_states(&all_setup_states),
global_setup_change: setup::GlobalSetupChange::from_setup_states(&all_setup_states),
all_setup_states,
}),
})
Expand Down
52 changes: 27 additions & 25 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::prelude::*;
use crate::setup::ResourceSetupStatus;
use crate::setup::ResourceSetupChange;
use std::fmt::Debug;
use std::hash::Hash;

Expand Down Expand Up @@ -350,31 +350,31 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
}
}

pub struct TypedExportDataCollectionBuildOutput<F: StorageFactoryBase + ?Sized> {
pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
pub setup_key: F::Key,
pub desired_setup_state: F::SetupState,
}
pub struct TypedExportDataCollectionSpec<F: StorageFactoryBase + ?Sized> {
pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
pub name: String,
pub spec: F::Spec,
pub key_fields_schema: Vec<FieldSchema>,
pub value_fields_schema: Vec<FieldSchema>,
pub index_options: IndexOptions,
}

pub struct TypedResourceSetupChangeItem<'a, F: StorageFactoryBase + ?Sized> {
pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
pub key: F::Key,
pub setup_status: &'a F::SetupStatus,
pub setup_change: &'a F::SetupChange,
}

#[async_trait]
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
type Spec: DeserializeOwned + Send + Sync;
type DeclarationSpec: DeserializeOwned + Send + Sync;
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type SetupStatus: ResourceSetupStatus;
type SetupChange: ResourceSetupChange;
type ExportContext: Send + Sync + 'static;

fn name(&self) -> &str;
Expand All @@ -397,13 +397,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
async fn check_setup_status(
async fn diff_setup_states(
&self,
key: Self::Key,
desired_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
flow_instance_ctx: Arc<FlowInstanceContext>,
) -> Result<Self::SetupStatus>;
) -> Result<Self::SetupChange>;

fn check_state_compatibility(
&self,
Expand Down Expand Up @@ -439,13 +439,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {

async fn apply_setup_changes(
&self,
setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
setup_change: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
context: Arc<FlowInstanceContext>,
) -> Result<()>;
}

#[async_trait]
impl<T: StorageFactoryBase> ExportTargetFactory for T {
impl<T: TargetFactoryBase> TargetFactory for T {
async fn build(
self: Arc<Self>,
data_collections: Vec<interface::ExportDataCollectionSpec>,
Expand All @@ -455,7 +455,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
Vec<interface::ExportDataCollectionBuildOutput>,
Vec<(serde_json::Value, serde_json::Value)>,
)> {
let (data_coll_output, decl_output) = StorageFactoryBase::build(
let (data_coll_output, decl_output) = TargetFactoryBase::build(
self,
data_collections
.into_iter()
Expand Down Expand Up @@ -497,32 +497,32 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
Ok((data_coll_output, decl_output))
}

async fn check_setup_status(
async fn diff_setup_states(
&self,
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
flow_instance_ctx: Arc<FlowInstanceContext>,
) -> Result<Box<dyn setup::ResourceSetupStatus>> {
) -> Result<Box<dyn setup::ResourceSetupChange>> {
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
let desired_state: Option<T::SetupState> = desired_state
.map(|v| serde_json::from_value(v.clone()))
.transpose()?;
let existing_states = from_json_combined_state(existing_states)?;
let setup_status = StorageFactoryBase::check_setup_status(
let setup_change = TargetFactoryBase::diff_setup_states(
self,
key,
desired_state,
existing_states,
flow_instance_ctx,
)
.await?;
Ok(Box::new(setup_status))
Ok(Box::new(setup_change))
}

fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
StorageFactoryBase::describe_resource(self, &key)
TargetFactoryBase::describe_resource(self, &key)
}

fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
Expand All @@ -535,21 +535,23 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<SetupStateCompatibility> {
let result = StorageFactoryBase::check_state_compatibility(
let result = TargetFactoryBase::check_state_compatibility(
self,
&serde_json::from_value(desired_state.clone())?,
&serde_json::from_value(existing_state.clone())?,
)?;
Ok(result)
}

/// Extract additional keys that are passed through as part of the mutation to `apply_mutation()`.
/// This is useful for targets that need to use additional parts as key for the target (which is not considered as part of the key for cocoindex).
fn extract_additional_key(
&self,
key: &value::KeyValue,
value: &value::FieldValues,
export_context: &(dyn Any + Send + Sync),
) -> Result<serde_json::Value> {
StorageFactoryBase::extract_additional_key(
TargetFactoryBase::extract_additional_key(
self,
key,
value,
Expand All @@ -575,23 +577,23 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
})
})
.collect::<Result<_>>()?;
StorageFactoryBase::apply_mutation(self, mutations).await
TargetFactoryBase::apply_mutation(self, mutations).await
}

async fn apply_setup_changes(
&self,
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
context: Arc<FlowInstanceContext>,
) -> Result<()> {
StorageFactoryBase::apply_setup_changes(
TargetFactoryBase::apply_setup_changes(
self,
setup_status
setup_change
.into_iter()
.map(|item| -> anyhow::Result<_> {
Ok(TypedResourceSetupChangeItem {
key: serde_json::from_value(item.key.clone())?,
setup_status: (item.setup_status as &dyn Any)
.downcast_ref::<T::SetupStatus>()
setup_change: (item.setup_change as &dyn Any)
.downcast_ref::<T::SetupChange>()
.ok_or_else(invariance_violation)?,
})
})
Expand Down
Loading
Loading