Skip to content

Commit 390332c

Browse files
authored
Implement ResourceSetupStatusCheck for Infallible. (#273)
1 parent 9027cef commit 390332c

File tree

8 files changed

+97
-73
lines changed

8 files changed

+97
-73
lines changed

src/execution/db_tracking_setup.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,16 @@ impl TrackingTableSetupStatusCheck {
8282
}
8383

8484
#[async_trait]
85-
impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck {
86-
type Key = ();
87-
type State = TrackingTableSetupState;
88-
85+
impl ResourceSetupStatusCheck<(), TrackingTableSetupState> for TrackingTableSetupStatusCheck {
8986
fn describe_resource(&self) -> String {
9087
"Tracking Table".to_string()
9188
}
9289

93-
fn key(&self) -> &Self::Key {
90+
fn key(&self) -> &() {
9491
&()
9592
}
9693

97-
fn desired_state(&self) -> Option<&Self::State> {
94+
fn desired_state(&self) -> Option<&TrackingTableSetupState> {
9895
self.desired_state.as_ref()
9996
}
10097

src/ops/factory_bases.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
295295
key: Self::Key,
296296
desired_state: Option<Self::SetupState>,
297297
existing_states: setup::CombinedState<Self::SetupState>,
298-
) -> Result<
299-
impl setup::ResourceSetupStatusCheck<Key = Self::Key, State = Self::SetupState> + 'static,
300-
>;
298+
) -> Result<impl setup::ResourceSetupStatusCheck<Self::Key, Self::SetupState> + 'static>;
301299

302300
fn check_state_compatibility(
303301
&self,
@@ -317,17 +315,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
317315
}
318316

319317
struct ResourceSetupStatusCheckWrapper<T: StorageFactoryBase> {
320-
inner:
321-
Box<dyn setup::ResourceSetupStatusCheck<Key = T::Key, State = T::SetupState> + Send + Sync>,
318+
inner: Box<dyn setup::ResourceSetupStatusCheck<T::Key, T::SetupState> + Send + Sync>,
322319
key_json: serde_json::Value,
323320
state_json: Option<serde_json::Value>,
324321
}
325322

326323
impl<T: StorageFactoryBase> ResourceSetupStatusCheckWrapper<T> {
327324
fn new(
328-
inner: Box<
329-
dyn setup::ResourceSetupStatusCheck<Key = T::Key, State = T::SetupState> + Send + Sync,
330-
>,
325+
inner: Box<dyn setup::ResourceSetupStatusCheck<T::Key, T::SetupState> + Send + Sync>,
331326
) -> Result<Self> {
332327
Ok(Self {
333328
key_json: serde_json::to_value(inner.key())?,
@@ -347,19 +342,18 @@ impl<T: StorageFactoryBase> Debug for ResourceSetupStatusCheckWrapper<T> {
347342
}
348343

349344
#[async_trait]
350-
impl<T: StorageFactoryBase> setup::ResourceSetupStatusCheck for ResourceSetupStatusCheckWrapper<T> {
351-
type Key = serde_json::Value;
352-
type State = serde_json::Value;
353-
345+
impl<T: StorageFactoryBase> setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value>
346+
for ResourceSetupStatusCheckWrapper<T>
347+
{
354348
fn describe_resource(&self) -> String {
355349
self.inner.describe_resource()
356350
}
357351

358-
fn key(&self) -> &Self::Key {
352+
fn key(&self) -> &serde_json::Value {
359353
&self.key_json
360354
}
361355

362-
fn desired_state(&self) -> Option<&Self::State> {
356+
fn desired_state(&self) -> Option<&serde_json::Value> {
363357
self.state_json.as_ref()
364358
}
365359

@@ -410,9 +404,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
410404
existing_states: setup::CombinedState<serde_json::Value>,
411405
) -> Result<
412406
Box<
413-
dyn setup::ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
414-
+ Send
415-
+ Sync,
407+
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
416408
>,
417409
> {
418410
let key: T::Key = serde_json::from_value(key.clone())?;

src/ops/interface.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,7 @@ pub trait ExportTargetFactory {
182182
existing_states: setup::CombinedState<serde_json::Value>,
183183
) -> Result<
184184
Box<
185-
dyn setup::ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
186-
+ Send
187-
+ Sync,
185+
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
188186
>,
189187
>;
190188

src/ops/storages/postgres.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -736,19 +736,16 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String
736736
}
737737

738738
#[async_trait]
739-
impl setup::ResourceSetupStatusCheck for SetupStatusCheck {
740-
type Key = TableId;
741-
type State = SetupState;
742-
739+
impl setup::ResourceSetupStatusCheck<TableId, SetupState> for SetupStatusCheck {
743740
fn describe_resource(&self) -> String {
744741
format!("Postgres table {}", self.table_id)
745742
}
746743

747-
fn key(&self) -> &Self::Key {
744+
fn key(&self) -> &TableId {
748745
&self.table_id
749746
}
750747

751-
fn desired_state(&self) -> Option<&Self::State> {
748+
fn desired_state(&self) -> Option<&SetupState> {
752749
self.desired_state.as_ref()
753750
}
754751

@@ -960,8 +957,7 @@ impl StorageFactoryBase for Arc<Factory> {
960957
key: TableId,
961958
desired: Option<SetupState>,
962959
existing: setup::CombinedState<SetupState>,
963-
) -> Result<impl setup::ResourceSetupStatusCheck<Key = TableId, State = SetupState> + 'static>
964-
{
960+
) -> Result<impl setup::ResourceSetupStatusCheck<TableId, SetupState> + 'static> {
965961
Ok(SetupStatusCheck::new(self.clone(), key, desired, existing))
966962
}
967963

src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub(crate) use itertools::Itertools;
1010
pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize};
1111
pub(crate) use std::borrow::Cow;
1212
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13+
pub(crate) use std::hash::Hash;
1314
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};
1415

1516
pub(crate) use crate::base::{schema, spec, value};

src/setup/db_metadata.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,15 +295,12 @@ pub struct MetadataTableSetup {
295295
}
296296

297297
#[async_trait]
298-
impl ResourceSetupStatusCheck for MetadataTableSetup {
299-
type Key = ();
300-
type State = ();
301-
302-
fn key(&self) -> &Self::Key {
298+
impl ResourceSetupStatusCheck<(), ()> for MetadataTableSetup {
299+
fn key(&self) -> &() {
303300
&()
304301
}
305302

306-
fn desired_state(&self) -> Option<&Self::State> {
303+
fn desired_state(&self) -> Option<&()> {
307304
Some(&())
308305
}
309306

src/setup/driver.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -259,13 +259,9 @@ pub fn check_flow_setup_status(
259259
)
260260
})?;
261261
target_setup_state_updates.push((resource_id.clone(), v.desired.clone()));
262-
let (desired_state, desired_common) = match v.desired {
263-
Some(desired) => (
264-
(!desired.common.setup_by_user).then_some(desired.state),
265-
Some(desired.common),
266-
),
267-
None => (None, None),
268-
};
262+
let desired_state = v
263+
.desired
264+
.and_then(|state| (!state.common.setup_by_user).then_some(state.state));
269265
let existing_without_setup_by_user = CombinedState {
270266
current: v
271267
.existing
@@ -295,11 +291,7 @@ pub fn check_flow_setup_status(
295291
)?,
296292
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
297293
};
298-
target_resources.push(TargetResourceSetupStatusCheck {
299-
target_kind: resource_id.target_kind.clone(),
300-
common: desired_common,
301-
status_check,
302-
});
294+
target_resources.push(TargetResourceSetupStatusCheck { status_check });
303295
}
304296
}
305297
Ok(FlowSetupStatusCheck {
@@ -356,9 +348,12 @@ pub fn drop_setup(
356348
})
357349
}
358350

359-
async fn maybe_update_resource_setup(
351+
async fn maybe_update_resource_setup<
352+
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
353+
S: Debug + Clone + Serialize + DeserializeOwned,
354+
>(
360355
write: &mut impl std::io::Write,
361-
resource: &(impl ResourceSetupStatusCheck + ?Sized),
356+
resource: &(impl ResourceSetupStatusCheck<K, S> + ?Sized),
362357
) -> Result<()> {
363358
if resource.change_type() != SetupChangeType::NoChange {
364359
writeln!(write, "{}:", resource.describe_resource(),)?;

src/setup/states.rs

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use indenter::indented;
1717
use std::fmt::Debug;
1818
use std::fmt::{Display, Write};
1919
use std::hash::Hash;
20+
use std::marker::PhantomData;
2021

2122
use super::db_metadata;
2223
use crate::execution::db_tracking_setup;
@@ -214,15 +215,16 @@ pub enum SetupChangeType {
214215
}
215216

216217
#[async_trait]
217-
pub trait ResourceSetupStatusCheck: Debug + Send + Sync {
218-
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash;
219-
type State: Debug + Clone + Serialize + DeserializeOwned;
220-
218+
pub trait ResourceSetupStatusCheck<K, S>: Debug + Send + Sync
219+
where
220+
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
221+
S: Debug + Clone + Serialize + DeserializeOwned,
222+
{
221223
fn describe_resource(&self) -> String;
222224

223-
fn key(&self) -> &Self::Key;
225+
fn key(&self) -> &K;
224226

225-
fn desired_state(&self) -> Option<&Self::State>;
227+
fn desired_state(&self) -> Option<&S>;
226228

227229
fn describe_changes(&self) -> Vec<String>;
228230

@@ -233,6 +235,37 @@ pub trait ResourceSetupStatusCheck: Debug + Send + Sync {
233235
}
234236
}
235237

238+
#[async_trait]
239+
impl<K, S> ResourceSetupStatusCheck<K, S> for std::convert::Infallible
240+
where
241+
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
242+
S: Debug + Clone + Serialize + DeserializeOwned,
243+
{
244+
fn describe_resource(&self) -> String {
245+
unreachable!()
246+
}
247+
248+
fn key(&self) -> &K {
249+
unreachable!()
250+
}
251+
252+
fn desired_state(&self) -> Option<&S> {
253+
unreachable!()
254+
}
255+
256+
fn describe_changes(&self) -> Vec<String> {
257+
unreachable!()
258+
}
259+
260+
fn change_type(&self) -> SetupChangeType {
261+
unreachable!()
262+
}
263+
264+
async fn apply_change(&self) -> Result<()> {
265+
unreachable!()
266+
}
267+
}
268+
236269
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
237270
pub enum ObjectStatus {
238271
Invalid,
@@ -248,18 +281,17 @@ pub trait ObjectSetupStatusCheck {
248281

249282
#[derive(Debug)]
250283
pub struct TargetResourceSetupStatusCheck {
251-
pub target_kind: String,
252-
pub common: Option<TargetSetupStateCommon>,
253-
pub status_check: Box<
254-
dyn ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
255-
+ Send
256-
+ Sync,
257-
>,
284+
pub status_check:
285+
Box<dyn ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync>,
258286
}
259287

260288
impl std::fmt::Display for TargetResourceSetupStatusCheck {
261289
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262-
write!(f, "{}", FormattedResourceSetup(self.status_check.as_ref()))
290+
write!(
291+
f,
292+
"{}",
293+
FormattedResourceSetup(self.status_check.as_ref(), PhantomData::default())
294+
)
263295
}
264296
}
265297

@@ -326,10 +358,18 @@ impl<StatusCheck: ObjectSetupStatusCheck> std::fmt::Display
326358
}
327359
}
328360

329-
pub struct FormattedResourceSetup<'a, Check: ResourceSetupStatusCheck + ?Sized>(&'a Check);
361+
pub struct FormattedResourceSetup<
362+
'a,
363+
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
364+
S: Debug + Clone + Serialize + DeserializeOwned,
365+
Check: ResourceSetupStatusCheck<K, S> + ?Sized,
366+
>(&'a Check, PhantomData<(K, S)>);
330367

331-
impl<Change: ResourceSetupStatusCheck + ?Sized> std::fmt::Display
332-
for FormattedResourceSetup<'_, Change>
368+
impl<K, S, Check> std::fmt::Display for FormattedResourceSetup<'_, K, S, Check>
369+
where
370+
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
371+
S: Debug + Clone + Serialize + DeserializeOwned,
372+
Check: ResourceSetupStatusCheck<K, S> + ?Sized,
333373
{
334374
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335375
let status_code = match self.0.change_type() {
@@ -377,7 +417,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> {
377417
)?;
378418

379419
let mut f = indented(f).with_str(INDENT);
380-
write!(f, "{}", FormattedResourceSetup(&flow_ssc.tracking_table))?;
420+
write!(
421+
f,
422+
"{}",
423+
FormattedResourceSetup(&flow_ssc.tracking_table, PhantomData::default())
424+
)?;
381425

382426
for target_resource in &flow_ssc.target_resources {
383427
writeln!(f, "{}", target_resource)?;
@@ -389,7 +433,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> {
389433

390434
impl std::fmt::Display for AllSetupStatusCheck {
391435
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392-
write!(f, "{}", FormattedResourceSetup(&self.metadata_table))?;
436+
write!(
437+
f,
438+
"{}",
439+
FormattedResourceSetup(&self.metadata_table, PhantomData::default())
440+
)?;
393441
for (flow_name, flow_status) in &self.flows {
394442
write!(
395443
f,

0 commit comments

Comments
 (0)