diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 893b57d9dfb77..3e59dabb314f5 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2744,26 +2744,17 @@ impl Coordinator { migrated_storage_collections: &BTreeSet, ) { let catalog = self.catalog(); - let source_status_collection_id = catalog - .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY); - let source_status_collection_id = catalog - .get_entry(&source_status_collection_id) - .latest_global_id(); let source_desc = |object_id: GlobalId, data_source: &DataSourceDesc, desc: &RelationDesc, timeline: &Timeline| { - let (data_source, status_collection_id) = match data_source.clone() { + let data_source = match data_source.clone() { // Re-announce the source description. DataSourceDesc::Ingestion { desc, cluster_id } => { let desc = desc.into_inline_connection(catalog.state()); let ingestion = IngestionDescription::new(desc, cluster_id, object_id); - - ( - DataSource::Ingestion(ingestion), - Some(source_status_collection_id), - ) + DataSource::Ingestion(ingestion) } DataSourceDesc::OldSyntaxIngestion { desc, @@ -2787,10 +2778,7 @@ impl Coordinator { }; ingestion.source_exports.insert(object_id, legacy_export); - ( - DataSource::Ingestion(ingestion), - Some(source_status_collection_id), - ) + DataSource::Ingestion(ingestion) } DataSourceDesc::IngestionExport { ingestion_id, @@ -2801,28 +2789,23 @@ impl Coordinator { // TODO(parkmycar): We should probably check the type here, but I'm not sure if // this will always be a Source or a Table. let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id(); - ( - DataSource::IngestionExport { - ingestion_id, - details, - data_config: data_config.into_inline_connection(catalog.state()), - }, - Some(source_status_collection_id), - ) - } - DataSourceDesc::Webhook { .. } => { - (DataSource::Webhook, Some(source_status_collection_id)) + + DataSource::IngestionExport { + ingestion_id, + details, + data_config: data_config.into_inline_connection(catalog.state()), + } } - DataSourceDesc::Progress => (DataSource::Progress, None), + DataSourceDesc::Webhook { .. } => DataSource::Webhook, + DataSourceDesc::Progress => DataSource::Progress, DataSourceDesc::Introspection(introspection) => { - (DataSource::Introspection(introspection), None) + DataSource::Introspection(introspection) } }; CollectionDescription { desc: desc.clone(), data_source, since: None, - status_collection_id, timeline: Some(timeline.clone()), primary: None, } @@ -2938,7 +2921,6 @@ impl Coordinator { }, }, since: None, - status_collection_id: None, timeline: None, primary: None, }; diff --git a/src/adapter/src/coord/catalog_implications.rs b/src/adapter/src/coord/catalog_implications.rs index 6983579995476..7f9d869e07e36 100644 --- a/src/adapter/src/coord/catalog_implications.rs +++ b/src/adapter/src/coord/catalog_implications.rs @@ -33,7 +33,6 @@ use std::time::{Duration, Instant}; use fail::fail_point; use itertools::Itertools; use mz_adapter_types::compaction::CompactionWindow; -use mz_catalog::builtin; use mz_catalog::memory::objects::{ CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index, MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View, @@ -1021,19 +1020,8 @@ impl Coordinator { details, data_config, } => { - // TODO: It's a little weird that a table will be present in this - // source status collection, we might want to split out into a separate - // status collection. - let status_collection_id = self - .catalog() - .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY); - let global_ingestion_id = self.catalog().get_entry(ingestion_id).latest_global_id(); - let global_status_collection_id = self - .catalog() - .get_entry(&status_collection_id) - .latest_global_id(); let collection_desc = CollectionDescription:: { desc: table.desc.latest(), @@ -1045,7 +1033,6 @@ impl Coordinator { .into_inline_connection(self.catalog.state()), }, since: None, - status_collection_id: Some(global_status_collection_id), timeline: Some(timeline.clone()), primary: None, }; @@ -1091,7 +1078,6 @@ impl Coordinator { desc, data_source: DataSource::Webhook, since: None, - status_collection_id: None, // Webhook tables don't use status collections timeline: Some(timeline.clone()), primary: None, }; @@ -1233,16 +1219,7 @@ impl Coordinator { source: Source, compaction_windows: BTreeMap>, ) -> Result<(), AdapterError> { - let source_status_item_id = self - .catalog() - .resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY); - let source_status_collection_id = Some( - self.catalog() - .get_entry(&source_status_item_id) - .latest_global_id(), - ); - - let (data_source, status_collection_id) = match source.data_source { + let data_source = match source.data_source { DataSourceDesc::Ingestion { desc, cluster_id } => { let desc = desc.into_inline_connection(self.catalog().state()); let item_global_id = self.catalog().get_entry(&item_id).latest_global_id(); @@ -1253,10 +1230,7 @@ impl Coordinator { item_global_id, ); - ( - DataSource::Ingestion(ingestion), - source_status_collection_id, - ) + DataSource::Ingestion(ingestion) } DataSourceDesc::OldSyntaxIngestion { desc, @@ -1291,10 +1265,7 @@ impl Coordinator { .source_exports .insert(source.global_id, legacy_export); - ( - DataSource::Ingestion(ingestion), - source_status_collection_id, - ) + DataSource::Ingestion(ingestion) } DataSourceDesc::IngestionExport { ingestion_id, @@ -1305,17 +1276,15 @@ impl Coordinator { // TODO(parkmycar): We should probably check the type here, but I'm not sure if // this will always be a Source or a Table. let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id(); - ( - DataSource::IngestionExport { - ingestion_id, - details, - data_config: data_config.into_inline_connection(self.catalog().state()), - }, - source_status_collection_id, - ) + + DataSource::IngestionExport { + ingestion_id, + details, + data_config: data_config.into_inline_connection(self.catalog().state()), + } } - DataSourceDesc::Progress => (DataSource::Progress, None), - DataSourceDesc::Webhook { .. } => (DataSource::Webhook, None), + DataSourceDesc::Progress => DataSource::Progress, + DataSourceDesc::Webhook { .. } => DataSource::Webhook, DataSourceDesc::Introspection(_) => { unreachable!("cannot create sources with introspection data sources") } @@ -1328,7 +1297,6 @@ impl Coordinator { data_source, timeline: Some(source.timeline), since: None, - status_collection_id, primary: None, }, ); diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 924fab9174898..f1e1dde96622d 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1000,7 +1000,6 @@ impl Coordinator { }, }, since: None, - status_collection_id: None, timeline: None, primary: None, }; diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index f3134e61731bc..d4b94cf33fcbb 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -867,12 +867,10 @@ mod tests { use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn}; use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor}; use mz_storage_types::StorageDiff; - use mz_storage_types::connections::inline::InlinedConnection; use mz_storage_types::controller::{CollectionMetadata, StorageError}; use mz_storage_types::errors::CollectionMissing; use mz_storage_types::parameters::StorageParameters; - use mz_storage_types::sources::{GenericSourceConnection, SourceDesc}; - use mz_storage_types::sources::{SourceData, SourceExportDataConfig}; + use mz_storage_types::sources::SourceData; use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError}; use timely::progress::Timestamp as TimelyTimestamp; @@ -1041,28 +1039,6 @@ mod tests { unimplemented!() } - async fn alter_ingestion_source_desc( - &self, - _ingestion_id: GlobalId, - _source_desc: SourceDesc, - ) -> Result<(), StorageError> { - unimplemented!() - } - - async fn alter_ingestion_export_data_configs( - &self, - _source_exports: BTreeMap, - ) -> Result<(), StorageError> { - unimplemented!() - } - - async fn alter_ingestion_connections( - &self, - _source_connections: BTreeMap>, - ) -> Result<(), StorageError> { - unimplemented!() - } - async fn alter_table_desc( &self, _existing_collection: GlobalId, diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 430355aad67c0..2f1d87d0db46f 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -250,10 +250,10 @@ impl StorageUsageClient { } /// Computes [ShardUsageReferenced] for a given set of shards. Suitable for customer billing. - pub async fn shards_usage_referenced(&self, shard_ids: I) -> ShardsUsageReferenced - where - I: IntoIterator, - { + pub async fn shards_usage_referenced( + &self, + shard_ids: impl IntoIterator, + ) -> ShardsUsageReferenced { let semaphore = Arc::new(Semaphore::new( USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg), )); diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index d7c5cfd9b6560..1ae4f628cfc36 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -142,9 +142,6 @@ pub struct CollectionDescription { pub data_source: DataSource, /// An optional frontier to which the collection's `since` should be advanced. pub since: Option>, - /// A GlobalId to use for this collection to use for the status collection. - /// Used to keep track of source status/error information. - pub status_collection_id: Option, /// The timeline of the source. Absent for materialized views, continual tasks, etc. pub timeline: Option, /// The primary of this collections. @@ -163,7 +160,6 @@ impl CollectionDescription { desc, data_source: DataSource::Other, since, - status_collection_id: None, timeline: None, primary: None, } @@ -175,7 +171,6 @@ impl CollectionDescription { desc, data_source: DataSource::Table, since: None, - status_collection_id: None, timeline: Some(Timeline::EpochMilliseconds), primary: None, } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 0dfd81ad2ca7b..746811dad670e 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -22,7 +22,6 @@ use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{Future, FutureExt, StreamExt}; use itertools::Itertools; - use mz_ore::collections::CollectionExt; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn}; @@ -43,17 +42,13 @@ use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulatio use mz_storage_types::StorageDiff; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; -use mz_storage_types::connections::inline::InlinedConnection; use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow}; use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION; use mz_storage_types::errors::CollectionMissing; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::read_holds::ReadHold; use mz_storage_types::read_policy::ReadPolicy; -use mz_storage_types::sources::{ - GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport, - SourceExportDataConfig, Timeline, -}; +use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline}; use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError}; use mz_txn_wal::metrics::Metrics as TxnMetrics; use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead}; @@ -267,34 +262,6 @@ pub trait StorageCollections: Debug + Sync { migrated_storage_collections: &BTreeSet, ) -> Result<(), StorageError>; - /// Alters the identified ingestion to use the provided [`SourceDesc`]. - /// - /// NOTE: Ideally, [StorageCollections] would not care about these, but we - /// have to learn about changes such that when new subsources are created we - /// can correctly determine a since based on its depenencies' sinces. This - /// is really only relevant because newly created subsources depend on the - /// remap shard, and we can't just have them start at since 0. - async fn alter_ingestion_source_desc( - &self, - ingestion_id: GlobalId, - source_desc: SourceDesc, - ) -> Result<(), StorageError>; - - /// Alters the data config for the specified source exports of the specified ingestions. - async fn alter_ingestion_export_data_configs( - &self, - source_exports: BTreeMap, - ) -> Result<(), StorageError>; - - /// Alters each identified collection to use the correlated - /// [`GenericSourceConnection`]. - /// - /// See NOTE on [StorageCollections::alter_ingestion_source_desc]. - async fn alter_ingestion_connections( - &self, - source_connections: BTreeMap>, - ) -> Result<(), StorageError>; - /// Updates the [`RelationDesc`] for the specified table. async fn alter_table_desc( &self, @@ -431,8 +398,8 @@ pub struct StorageCollectionsImpl< config: Arc>, /// The upper of the txn shard as it was when we booted. We forward the - /// upper of created/registered tables to make sure that their uppers are at - /// least not less than the initially known txn upper. + /// upper of created/registered tables to make sure that their uppers are + /// not less than the initially known txn upper. /// /// NOTE: This works around a quirk in how the adapter chooses the as_of of /// existing indexes when bootstrapping, where tables that have an upper @@ -897,7 +864,6 @@ where /// Returns the given collection's dependencies. fn determine_collection_dependencies( - &self, self_collections: &BTreeMap>, source_id: GlobalId, collection_desc: &CollectionDescription, @@ -924,13 +890,13 @@ where let source = self_collections .get(ingestion_id) .ok_or(StorageError::IdentifierMissing(*ingestion_id))?; - let DataSource::Ingestion(ingestion) = &source.description.data_source else { + let Some(remap_collection_id) = &source.ingestion_remap_collection_id else { panic!("SourceExport must refer to a primary source that already exists"); }; match data_config.envelope { SourceEnvelope::CdcV2 => (), - _ => dependencies.push(ingestion.remap_collection_id), + _ => dependencies.push(*remap_collection_id), } } // Ingestions depend on their remap collection. @@ -1329,7 +1295,7 @@ where if !changes.is_empty() { // If the collection has a "primary" collection, let that primary drive compaction. let collection = collections.get(&key).expect("must still exist"); - let should_emit_persist_compaction = collection.description.primary.is_none(); + let should_emit_persist_compaction = collection.primary.is_none(); if frontier.is_empty() { info!(id = %key, "removing collection state because the since advanced to []!"); @@ -1794,22 +1760,6 @@ where } } - { - // Early sanity check: if we knew about a collection already it's - // description must match! - // - // NOTE: There could be concurrent modifications to - // `self.collections`, but this sanity check is better than nothing. - let self_collections = self.collections.lock().expect("lock poisoned"); - for (id, description) in collections.iter() { - if let Some(existing_collection) = self_collections.get(id) { - if &existing_collection.description != description { - return Err(StorageError::CollectionIdReused(*id)); - } - } - } - } - // We first enrich each collection description with some additional // metadata... let enriched_with_metadata = collections @@ -1966,7 +1916,7 @@ where // Determine if this collection has any dependencies. let storage_dependencies = - self.determine_collection_dependencies(&*self_collections, id, &description)?; + Self::determine_collection_dependencies(&*self_collections, id, &description)?; // Determine the initial since of the collection. let initial_since = match storage_dependencies @@ -2029,8 +1979,52 @@ where None => data_shard_since, }; + // Determine the time dependence of the collection. + let time_dependence = { + use DataSource::*; + if let Some(timeline) = &description.timeline + && *timeline != Timeline::EpochMilliseconds + { + // Only the epoch timeline follows wall-clock. + None + } else { + match &description.data_source { + Ingestion(ingestion) => { + use GenericSourceConnection::*; + match ingestion.desc.connection { + // Kafka, Postgres, MySql, and SQL Server sources all + // follow wall clock. + Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => { + Some(TimeDependence::default()) + } + // Load generators not further specified. + LoadGenerator(_) => None, + } + } + IngestionExport { ingestion_id, .. } => { + let c = self_collections.get(ingestion_id).expect("known to exist"); + c.time_dependence.clone() + } + // Introspection, other, progress, table, and webhook sources follow wall clock. + Introspection(_) | Progress | Table { .. } | Webhook { .. } => { + Some(TimeDependence::default()) + } + // Materialized views, continual tasks, etc, aren't managed by storage. + Other => None, + Sink { .. } => None, + } + } + }; + + let ingestion_remap_collection_id = match &description.data_source { + DataSource::Ingestion(desc) => Some(desc.remap_collection_id), + _ => None, + }; + let mut collection_state = CollectionState::new( - description, + description.primary, + time_dependence, + ingestion_remap_collection_id, initial_since, write_frontier.clone(), storage_dependencies, @@ -2038,39 +2032,14 @@ where ); // Install the collection state in the appropriate spot. - match &collection_state.description.data_source { + match &description.data_source { DataSource::Introspection(_) => { self_collections.insert(id, collection_state); } DataSource::Webhook => { self_collections.insert(id, collection_state); } - DataSource::IngestionExport { - ingestion_id, - details, - data_config, - } => { - // Adjust the source to contain this export. - let source_collection = self_collections - .get_mut(ingestion_id) - .expect("known to exist"); - match &mut source_collection.description { - CollectionDescription { - data_source: DataSource::Ingestion(ingestion_desc), - .. - } => ingestion_desc.source_exports.insert( - id, - SourceExport { - storage_metadata: (), - details: details.clone(), - data_config: data_config.clone(), - }, - ), - _ => unreachable!( - "SourceExport must only refer to primary sources that already exist" - ), - }; - + DataSource::IngestionExport { .. } => { self_collections.insert(id, collection_state); } DataSource::Table => { @@ -2117,125 +2086,6 @@ where Ok(()) } - async fn alter_ingestion_source_desc( - &self, - ingestion_id: GlobalId, - source_desc: SourceDesc, - ) -> Result<(), StorageError> { - // The StorageController checks the validity of these. And we just - // accept them. - - let mut self_collections = self.collections.lock().expect("lock poisoned"); - let collection = self_collections - .get_mut(&ingestion_id) - .ok_or(StorageError::IdentifierMissing(ingestion_id))?; - - let curr_ingestion = match &mut collection.description.data_source { - DataSource::Ingestion(active_ingestion) => active_ingestion, - _ => unreachable!("verified collection refers to ingestion"), - }; - - curr_ingestion.desc = source_desc; - debug!("altered {ingestion_id}'s SourceDesc"); - - Ok(()) - } - - async fn alter_ingestion_export_data_configs( - &self, - source_exports: BTreeMap, - ) -> Result<(), StorageError> { - let mut self_collections = self.collections.lock().expect("lock poisoned"); - - for (source_export_id, new_data_config) in source_exports { - // We need to adjust the data config on the CollectionState for - // the source export collection directly - let source_export_collection = self_collections - .get_mut(&source_export_id) - .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?; - let ingestion_id = match &mut source_export_collection.description.data_source { - DataSource::IngestionExport { - ingestion_id, - details: _, - data_config, - } => { - *data_config = new_data_config.clone(); - *ingestion_id - } - o => { - tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o); - Err(StorageError::IdentifierInvalid(source_export_id))? - } - }; - // We also need to adjust the data config on the CollectionState of the - // Ingestion that the export is associated with. - let ingestion_collection = self_collections - .get_mut(&ingestion_id) - .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?; - - match &mut ingestion_collection.description.data_source { - DataSource::Ingestion(ingestion_desc) => { - let source_export = ingestion_desc - .source_exports - .get_mut(&source_export_id) - .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?; - - if source_export.data_config != new_data_config { - tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating"); - source_export.data_config = new_data_config; - } else { - tracing::warn!( - "alter_ingestion_export_data_configs called on \ - export {source_export_id} of {ingestion_id} but \ - the data config was the same" - ); - } - } - o => { - tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o); - Err(StorageError::IdentifierInvalid(ingestion_id))?; - } - } - } - - Ok(()) - } - - async fn alter_ingestion_connections( - &self, - source_connections: BTreeMap>, - ) -> Result<(), StorageError> { - let mut self_collections = self.collections.lock().expect("lock poisoned"); - - for (id, conn) in source_connections { - let collection = self_collections - .get_mut(&id) - .ok_or_else(|| StorageError::IdentifierMissing(id))?; - - match &mut collection.description.data_source { - DataSource::Ingestion(ingestion) => { - // If the connection hasn't changed, there's no sense in - // re-rendering the dataflow. - if ingestion.desc.connection != conn { - info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating"); - ingestion.desc.connection = conn; - } else { - warn!( - "update_source_connection called on {id} but the \ - connection was the same" - ); - } - } - o => { - warn!("update_source_connection called on {:?}", o); - Err(StorageError::IdentifierInvalid(id))?; - } - } - } - - Ok(()) - } - async fn alter_table_desc( &self, existing_collection: GlobalId, @@ -2249,11 +2099,6 @@ where .get(&existing_collection) .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?; - // TODO(alter_table): Support changes to sources. - if existing.description.data_source != DataSource::Table { - return Err(StorageError::IdentifierInvalid(existing_collection)); - } - existing.collection_metadata.data_shard }; @@ -2337,11 +2182,10 @@ where .expect("existing collection missing"); // A higher level should already be asserting this, but let's make sure. - assert_eq!(existing.description.data_source, DataSource::Table); - assert_none!(existing.description.primary); + assert_none!(existing.primary); // The existing version of the table will depend on the new version. - existing.description.primary = Some(new_collection); + existing.primary = Some(new_collection); existing.storage_dependencies.push(new_collection); // Copy over the frontiers from the previous version. @@ -2360,15 +2204,16 @@ where changes.extend(implied_capability.iter().map(|t| (t.clone(), 1))); // Note: The new collection is now the "primary collection". - let collection_desc = CollectionDescription::for_table(new_desc.clone()); let collection_meta = CollectionMetadata { persist_location: self.persist_location.clone(), - relation_desc: collection_desc.desc.clone(), + relation_desc: new_desc.clone(), data_shard, txns_shard: Some(self.txns_read.txns_id().clone()), }; let collection_state = CollectionState::new( - collection_desc, + None, + existing.time_dependence.clone(), + existing.ingestion_remap_collection_id.clone(), implied_capability, write_frontier, Vec::new(), @@ -2410,43 +2255,6 @@ where "dropping {id}, but drop was not synchronized with storage \ controller via `synchronize_collections`" ); - - let dropped_data_source = match self_collections.get(id) { - Some(col) => col.description.data_source.clone(), - None => continue, - }; - - // If we are dropping source exports, we need to modify the - // ingestion that it runs on. - if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source { - // Adjust the source to remove this export. - let ingestion = match self_collections.get_mut(&ingestion_id) { - Some(ingestion) => ingestion, - // Primary ingestion already dropped. - None => { - tracing::error!( - "primary source {ingestion_id} seemingly dropped before subsource {id}", - ); - continue; - } - }; - - match &mut ingestion.description { - CollectionDescription { - data_source: DataSource::Ingestion(ingestion_desc), - .. - } => { - let removed = ingestion_desc.source_exports.remove(id); - mz_ore::soft_assert_or_log!( - removed.is_some(), - "dropped subsource {id} already removed from source exports" - ); - } - _ => unreachable!( - "SourceExport must only refer to primary sources that already exist" - ), - }; - } } // Policies that advance the since to the empty antichain. We do still @@ -2562,47 +2370,8 @@ where ) -> Result, TimeDependenceError> { use TimeDependenceError::CollectionMissing; let collections = self.collections.lock().expect("lock poisoned"); - let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?); - - let mut result = None; - - while let Some(c) = collection.take() { - use DataSource::*; - if let Some(timeline) = &c.description.timeline { - // Only the epoch timeline follows wall-clock. - if *timeline != Timeline::EpochMilliseconds { - break; - } - } - match &c.description.data_source { - Ingestion(ingestion) => { - use GenericSourceConnection::*; - match ingestion.desc.connection { - // Kafka, Postgres, MySql, and SQL Server sources all - // follow wall clock. - Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => { - result = Some(TimeDependence::default()) - } - // Load generators not further specified. - LoadGenerator(_) => {} - } - } - IngestionExport { ingestion_id, .. } => { - let c = collections - .get(ingestion_id) - .ok_or(CollectionMissing(*ingestion_id))?; - collection = Some(c); - } - // Introspection, other, progress, table, and webhook sources follow wall clock. - Introspection(_) | Progress | Table { .. } | Webhook { .. } => { - result = Some(TimeDependence::default()) - } - // Materialized views, continual tasks, etc, aren't managed by storage. - Other => {} - Sink { .. } => {} - }; - } - Ok(result) + let state = collections.get(&id).ok_or(CollectionMissing(id))?; + Ok(state.time_dependence.clone()) } fn dump(&self) -> Result { @@ -2778,8 +2547,18 @@ where /// State maintained about individual collections. #[derive(Debug, Clone)] struct CollectionState { - /// Description with which the collection was created - pub description: CollectionDescription, + /// The primary of this collections. + /// + /// Multiple storage collections can point to the same persist shard, + /// possibly with different schemas. In such a configuration, we select one + /// of the involved collections as the primary, who "owns" the persist + /// shard. All other involved collections have a dependency on the primary. + primary: Option, + + /// Description of how this collection's frontier follows time. + time_dependence: Option, + /// The ID of the source remap/progress collection, if this is an ingestion. + ingestion_remap_collection_id: Option, /// Accumulation of read capabilities for the collection. /// @@ -2809,7 +2588,9 @@ impl CollectionState { /// Creates a new collection state, with an initial read policy valid from /// `since`. pub fn new( - description: CollectionDescription, + primary: Option, + time_dependence: Option, + ingestion_remap_collection_id: Option, since: Antichain, write_frontier: Antichain, storage_dependencies: Vec, @@ -2818,7 +2599,9 @@ impl CollectionState { let mut read_capabilities = MutableAntichain::new(); read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1))); Self { - description, + primary, + time_dependence, + ingestion_remap_collection_id, read_capabilities, implied_capability: since.clone(), read_policy: ReadPolicy::NoPolicy { diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0d2b83b36c5d6..34c6a9f1a0020 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1252,11 +1252,6 @@ where &mut self, source_connections: BTreeMap>, ) -> Result<(), StorageError> { - // Also have to let StorageCollections know! - self.storage_collections - .alter_ingestion_connections(source_connections.clone()) - .await?; - let mut ingestions_to_run = BTreeSet::new(); for (id, conn) in source_connections { @@ -1297,11 +1292,6 @@ where &mut self, source_exports: BTreeMap, ) -> Result<(), StorageError> { - // Also have to let StorageCollections know! - self.storage_collections - .alter_ingestion_export_data_configs(source_exports.clone()) - .await?; - let mut ingestions_to_run = BTreeSet::new(); for (source_export_id, new_data_config) in source_exports {