Skip to content

Commit 764abe8

Browse files
authored
refactor(storage-interface): plumbing state key to apply_setup_change (#555)
1 parent f2fbd12 commit 764abe8

File tree

6 files changed

+119
-87
lines changed

6 files changed

+119
-87
lines changed

src/ops/factory_bases.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ pub struct TypedExportDataCollectionSpec<F: StorageFactoryBase + ?Sized> {
283283
pub index_options: IndexOptions,
284284
}
285285

286+
pub struct TypedResourceSetupChangeItem<'a, F: StorageFactoryBase + ?Sized> {
287+
pub key: F::Key,
288+
pub setup_status: &'a F::SetupStatus,
289+
}
290+
286291
#[async_trait]
287292
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
288293
type Spec: DeserializeOwned + Send + Sync;
@@ -339,7 +344,8 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
339344

340345
async fn apply_setup_changes(
341346
&self,
342-
setup_status: Vec<&'async_trait Self::SetupStatus>,
347+
setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
348+
auth_registry: &Arc<AuthRegistry>,
343349
) -> Result<()>;
344350
}
345351

@@ -466,18 +472,25 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
466472

467473
async fn apply_setup_changes(
468474
&self,
469-
setup_status: Vec<&'async_trait dyn ResourceSetupStatus>,
475+
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
476+
auth_registry: &Arc<AuthRegistry>,
470477
) -> Result<()> {
471478
StorageFactoryBase::apply_setup_changes(
472479
self,
473480
setup_status
474481
.into_iter()
475-
.map(|s| -> anyhow::Result<_> {
476-
Ok(s.as_any()
477-
.downcast_ref::<T::SetupStatus>()
478-
.ok_or_else(|| anyhow!("Unexpected setup status type"))?)
482+
.map(|item| -> anyhow::Result<_> {
483+
Ok(TypedResourceSetupChangeItem {
484+
key: serde_json::from_value(item.key.clone())?,
485+
setup_status: item
486+
.setup_status
487+
.as_any()
488+
.downcast_ref::<T::SetupStatus>()
489+
.ok_or_else(|| anyhow!("Unexpected setup status type"))?,
490+
})
479491
})
480492
.collect::<Result<Vec<_>>>()?,
493+
auth_registry,
481494
)
482495
.await
483496
}

src/ops/interface.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
216216
pub export_context: &'ctx T,
217217
}
218218

219+
pub struct ResourceSetupChangeItem<'a> {
220+
pub key: &'a serde_json::Value,
221+
pub setup_status: &'a dyn setup::ResourceSetupStatus,
222+
}
223+
219224
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220225
pub enum SetupStateCompatibility {
221226
/// The resource is fully compatible with the desired state.
@@ -288,7 +293,8 @@ pub trait ExportTargetFactory: Send + Sync {
288293

289294
async fn apply_setup_changes(
290295
&self,
291-
setup_status: Vec<&'async_trait dyn setup::ResourceSetupStatus>,
296+
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
297+
auth_registry: &Arc<AuthRegistry>,
292298
) -> Result<()>;
293299
}
294300

src/ops/storages/neo4j.rs

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub struct GraphPool {
6767
}
6868

6969
impl GraphPool {
70-
pub async fn get_graph(&self, spec: &ConnectionSpec) -> Result<Arc<Graph>> {
70+
async fn get_graph(&self, spec: &ConnectionSpec) -> Result<Arc<Graph>> {
7171
let graph_key = GraphKey::from_spec(spec);
7272
let cell = {
7373
let mut graphs = self.graphs.lock().unwrap();
@@ -87,6 +87,15 @@ impl GraphPool {
8787
.await?;
8888
Ok(graph.clone())
8989
}
90+
91+
async fn get_graph_for_key(
92+
&self,
93+
key: &Neo4jGraphElement,
94+
auth_registry: &AuthRegistry,
95+
) -> Result<Arc<Graph>> {
96+
let spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
97+
self.get_graph(&spec).await
98+
}
9099
}
91100

92101
pub struct ExportContext {
@@ -798,19 +807,12 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
798807
}
799808
#[derive(Debug)]
800809
pub struct GraphElementDataSetupStatus {
801-
key: Neo4jGraphElement,
802-
conn_spec: ConnectionSpec,
803810
data_clear: Option<DataClearAction>,
804811
change_type: SetupChangeType,
805812
}
806813

807814
impl GraphElementDataSetupStatus {
808-
fn new(
809-
key: Neo4jGraphElement,
810-
conn_spec: ConnectionSpec,
811-
desired_state: Option<&SetupState>,
812-
existing: &CombinedState<SetupState>,
813-
) -> Self {
815+
fn new(desired_state: Option<&SetupState>, existing: &CombinedState<SetupState>) -> Self {
814816
let mut data_clear: Option<DataClearAction> = None;
815817
for v in existing.possible_versions() {
816818
if desired_state.as_ref().is_none_or(|desired| {
@@ -837,8 +839,6 @@ impl GraphElementDataSetupStatus {
837839
};
838840

839841
Self {
840-
key,
841-
conn_spec,
842842
data_clear,
843843
change_type,
844844
}
@@ -1024,8 +1024,7 @@ impl StorageFactoryBase for Factory {
10241024
auth_registry: &Arc<AuthRegistry>,
10251025
) -> Result<Self::SetupStatus> {
10261026
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
1027-
let data_status =
1028-
GraphElementDataSetupStatus::new(key, conn_spec.clone(), desired.as_ref(), &existing);
1027+
let data_status = GraphElementDataSetupStatus::new(desired.as_ref(), &existing);
10291028
let components = components::SetupStatus::create(
10301029
SetupComponentOperator {
10311030
graph_pool: self.graph_pool.clone(),
@@ -1094,51 +1093,58 @@ impl StorageFactoryBase for Factory {
10941093

10951094
async fn apply_setup_changes(
10961095
&self,
1097-
changes: Vec<&'async_trait Self::SetupStatus>,
1096+
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
1097+
auth_registry: &Arc<AuthRegistry>,
10981098
) -> Result<()> {
1099-
let (data_statuses, components): (Vec<_>, Vec<_>) =
1100-
changes.into_iter().map(|c| (&c.0, &c.1)).unzip();
1101-
11021099
// Relationships first, then nodes, as relationships need to be deleted before nodes they referenced.
1103-
let mut relationship_types = IndexMap::<&Neo4jGraphElement, &ConnectionSpec>::new();
1104-
let mut node_labels = IndexMap::<&Neo4jGraphElement, &ConnectionSpec>::new();
1105-
let mut dependent_node_labels = IndexMap::<Neo4jGraphElement, &ConnectionSpec>::new();
1106-
for data_status in data_statuses.iter() {
1107-
if let Some(data_clear) = &data_status.data_clear {
1108-
match &data_status.key.typ {
1100+
let mut relationship_types = IndexSet::<&Neo4jGraphElement>::new();
1101+
let mut node_labels = IndexSet::<&Neo4jGraphElement>::new();
1102+
let mut dependent_node_labels = IndexSet::<Neo4jGraphElement>::new();
1103+
1104+
let mut components = vec![];
1105+
for change in changes.iter() {
1106+
if let Some(data_clear) = &change.setup_status.0.data_clear {
1107+
match &change.key.typ {
11091108
ElementType::Relationship(_) => {
1110-
relationship_types.insert(&data_status.key, &data_status.conn_spec);
1109+
relationship_types.insert(&change.key);
11111110
for label in &data_clear.dependent_node_labels {
1112-
dependent_node_labels.insert(
1113-
Neo4jGraphElement {
1114-
connection: data_status.key.connection.clone(),
1115-
typ: ElementType::Node(label.clone()),
1116-
},
1117-
&data_status.conn_spec,
1118-
);
1111+
dependent_node_labels.insert(Neo4jGraphElement {
1112+
connection: change.key.connection.clone(),
1113+
typ: ElementType::Node(label.clone()),
1114+
});
11191115
}
11201116
}
11211117
ElementType::Node(_) => {
1122-
node_labels.insert(&data_status.key, &data_status.conn_spec);
1118+
node_labels.insert(&change.key);
11231119
}
11241120
}
11251121
}
1122+
components.push(&change.setup_status.1);
11261123
}
11271124

11281125
// Relationships have no dependency, so can be cleared first.
1129-
for (rel_type, conn_spec) in relationship_types.iter() {
1130-
let graph = self.graph_pool.get_graph(conn_spec).await?;
1126+
for rel_type in relationship_types.into_iter() {
1127+
let graph = self
1128+
.graph_pool
1129+
.get_graph_for_key(rel_type, auth_registry)
1130+
.await?;
11311131
clear_graph_element_data(&graph, rel_type, true).await?;
11321132
}
11331133
// Clear standalone nodes, which is simpler than dependent nodes.
1134-
for (node_label, conn_spec) in node_labels.iter() {
1135-
let graph = self.graph_pool.get_graph(conn_spec).await?;
1134+
for node_label in node_labels.iter() {
1135+
let graph = self
1136+
.graph_pool
1137+
.get_graph_for_key(node_label, auth_registry)
1138+
.await?;
11361139
clear_graph_element_data(&graph, node_label, true).await?;
11371140
}
11381141
// Clear dependent nodes if they're not covered by standalone nodes.
1139-
for (node_label, conn_spec) in dependent_node_labels.iter() {
1140-
if !node_labels.contains_key(node_label) {
1141-
let graph = self.graph_pool.get_graph(conn_spec).await?;
1142+
for node_label in dependent_node_labels.iter() {
1143+
if !node_labels.contains(node_label) {
1144+
let graph = self
1145+
.graph_pool
1146+
.get_graph_for_key(node_label, auth_registry)
1147+
.await?;
11421148
clear_graph_element_data(&graph, node_label, false).await?;
11431149
}
11441150
}

src/ops/storages/postgres.rs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -601,20 +601,12 @@ pub struct TableSetupAction {
601601

602602
#[derive(Debug)]
603603
pub struct SetupStatus {
604-
db_pool: PgPool,
605-
table_name: String,
606-
607604
create_pgvector_extension: bool,
608605
actions: TableSetupAction,
609606
}
610607

611608
impl SetupStatus {
612-
fn new(
613-
db_pool: PgPool,
614-
table_name: String,
615-
desired_state: Option<SetupState>,
616-
existing: setup::CombinedState<SetupState>,
617-
) -> Self {
609+
fn new(desired_state: Option<SetupState>, existing: setup::CombinedState<SetupState>) -> Self {
618610
let table_action = TableMainSetupAction::from_states(desired_state.as_ref(), &existing);
619611
let (indexes_to_delete, indexes_to_create) = desired_state
620612
.as_ref()
@@ -647,8 +639,6 @@ impl SetupStatus {
647639
&& !existing.current.map(|s| s.uses_pgvector()).unwrap_or(false);
648640

649641
Self {
650-
db_pool,
651-
table_name,
652642
create_pgvector_extension,
653643
actions: TableSetupAction {
654644
table_action,
@@ -726,21 +716,20 @@ impl setup::ResourceSetupStatus for SetupStatus {
726716
}
727717

728718
impl SetupStatus {
729-
async fn apply_change(&self) -> Result<()> {
730-
let table_name = &self.table_name;
719+
async fn apply_change(&self, db_pool: &PgPool, table_name: &str) -> Result<()> {
731720
if self.actions.table_action.drop_existing {
732721
sqlx::query(&format!("DROP TABLE IF EXISTS {table_name}"))
733-
.execute(&self.db_pool)
722+
.execute(db_pool)
734723
.await?;
735724
}
736725
if self.create_pgvector_extension {
737726
sqlx::query("CREATE EXTENSION IF NOT EXISTS vector;")
738-
.execute(&self.db_pool)
727+
.execute(db_pool)
739728
.await?;
740729
}
741730
for index_name in self.actions.indexes_to_delete.iter() {
742731
let sql = format!("DROP INDEX IF EXISTS {}", index_name);
743-
sqlx::query(&sql).execute(&self.db_pool).await?;
732+
sqlx::query(&sql).execute(db_pool).await?;
744733
}
745734
if let Some(table_upsertion) = &self.actions.table_action.table_upsertion {
746735
match table_upsertion {
@@ -752,7 +741,7 @@ impl SetupStatus {
752741
fields.join(", "),
753742
keys.keys().join(", ")
754743
);
755-
sqlx::query(&sql).execute(&self.db_pool).await?;
744+
sqlx::query(&sql).execute(db_pool).await?;
756745
}
757746
TableUpsertionAction::Update {
758747
columns_to_delete,
@@ -762,13 +751,13 @@ impl SetupStatus {
762751
let sql = format!(
763752
"ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column_name}",
764753
);
765-
sqlx::query(&sql).execute(&self.db_pool).await?;
754+
sqlx::query(&sql).execute(db_pool).await?;
766755
}
767756
for (column_name, column_type) in columns_to_upsert.iter() {
768757
let sql = format!(
769758
"ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column_name}, ADD COLUMN {column_name} {column_type}"
770759
);
771-
sqlx::query(&sql).execute(&self.db_pool).await?;
760+
sqlx::query(&sql).execute(db_pool).await?;
772761
}
773762
}
774763
}
@@ -778,7 +767,7 @@ impl SetupStatus {
778767
"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} {}",
779768
to_index_spec_sql(index_spec)
780769
);
781-
sqlx::query(&sql).execute(&self.db_pool).await?;
770+
sqlx::query(&sql).execute(db_pool).await?;
782771
}
783772
Ok(())
784773
}
@@ -873,17 +862,12 @@ impl StorageFactoryBase for Factory {
873862

874863
async fn check_setup_status(
875864
&self,
876-
key: TableId,
865+
_key: TableId,
877866
desired: Option<SetupState>,
878867
existing: setup::CombinedState<SetupState>,
879-
auth_registry: &Arc<AuthRegistry>,
868+
_auth_registry: &Arc<AuthRegistry>,
880869
) -> Result<SetupStatus> {
881-
Ok(SetupStatus::new(
882-
get_db_pool(key.database.as_ref(), auth_registry).await?,
883-
key.table_name,
884-
desired,
885-
existing,
886-
))
870+
Ok(SetupStatus::new(desired, existing))
887871
}
888872

889873
fn check_state_compatibility(
@@ -938,10 +922,15 @@ impl StorageFactoryBase for Factory {
938922

939923
async fn apply_setup_changes(
940924
&self,
941-
setup_status: Vec<&'async_trait Self::SetupStatus>,
925+
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
926+
auth_registry: &Arc<AuthRegistry>,
942927
) -> Result<()> {
943-
for setup_status in setup_status.iter() {
944-
setup_status.apply_change().await?;
928+
for change in changes.iter() {
929+
let db_pool = get_db_pool(change.key.database.as_ref(), &auth_registry).await?;
930+
change
931+
.setup_status
932+
.apply_change(&db_pool, &change.key.table_name)
933+
.await?;
945934
}
946935
Ok(())
947936
}

src/ops/storages/qdrant.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,8 @@ impl StorageFactoryBase for Arc<Factory> {
429429

430430
async fn apply_setup_changes(
431431
&self,
432-
_setup_status: Vec<&'async_trait Self::SetupStatus>,
432+
_setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
433+
_auth_registry: &Arc<AuthRegistry>,
433434
) -> Result<()> {
434435
Err(anyhow!("Qdrant does not support setup changes"))
435436
}

0 commit comments

Comments
 (0)