Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 12 additions & 30 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2744,26 +2744,17 @@ impl Coordinator {
migrated_storage_collections: &BTreeSet<CatalogItemId>,
) {
let catalog = self.catalog();
let source_status_collection_id = catalog
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
let source_status_collection_id = catalog
.get_entry(&source_status_collection_id)
.latest_global_id();

let source_desc = |object_id: GlobalId,
data_source: &DataSourceDesc,
desc: &RelationDesc,
timeline: &Timeline| {
let (data_source, status_collection_id) = match data_source.clone() {
let data_source = match data_source.clone() {
// Re-announce the source description.
DataSourceDesc::Ingestion { desc, cluster_id } => {
let desc = desc.into_inline_connection(catalog.state());
let ingestion = IngestionDescription::new(desc, cluster_id, object_id);

(
DataSource::Ingestion(ingestion),
Some(source_status_collection_id),
)
DataSource::Ingestion(ingestion)
}
DataSourceDesc::OldSyntaxIngestion {
desc,
Expand All @@ -2787,10 +2778,7 @@ impl Coordinator {
};
ingestion.source_exports.insert(object_id, legacy_export);

(
DataSource::Ingestion(ingestion),
Some(source_status_collection_id),
)
DataSource::Ingestion(ingestion)
}
DataSourceDesc::IngestionExport {
ingestion_id,
Expand All @@ -2801,28 +2789,23 @@ impl Coordinator {
// TODO(parkmycar): We should probably check the type here, but I'm not sure if
// this will always be a Source or a Table.
let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
(
DataSource::IngestionExport {
ingestion_id,
details,
data_config: data_config.into_inline_connection(catalog.state()),
},
Some(source_status_collection_id),
)
}
DataSourceDesc::Webhook { .. } => {
(DataSource::Webhook, Some(source_status_collection_id))

DataSource::IngestionExport {
ingestion_id,
details,
data_config: data_config.into_inline_connection(catalog.state()),
}
}
DataSourceDesc::Progress => (DataSource::Progress, None),
DataSourceDesc::Webhook { .. } => DataSource::Webhook,
DataSourceDesc::Progress => DataSource::Progress,
DataSourceDesc::Introspection(introspection) => {
(DataSource::Introspection(introspection), None)
DataSource::Introspection(introspection)
}
};
CollectionDescription {
desc: desc.clone(),
data_source,
since: None,
status_collection_id,
timeline: Some(timeline.clone()),
primary: None,
}
Expand Down Expand Up @@ -2938,7 +2921,6 @@ impl Coordinator {
},
},
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
Expand Down
54 changes: 11 additions & 43 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use std::time::{Duration, Instant};
use fail::fail_point;
use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::builtin;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
Expand Down Expand Up @@ -1021,19 +1020,8 @@ impl Coordinator {
details,
data_config,
} => {
// TODO: It's a little weird that a table will be present in this
// source status collection, we might want to split out into a separate
// status collection.
let status_collection_id = self
.catalog()
.resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);

let global_ingestion_id =
self.catalog().get_entry(ingestion_id).latest_global_id();
let global_status_collection_id = self
.catalog()
.get_entry(&status_collection_id)
.latest_global_id();

let collection_desc = CollectionDescription::<Timestamp> {
desc: table.desc.latest(),
Expand All @@ -1045,7 +1033,6 @@ impl Coordinator {
.into_inline_connection(self.catalog.state()),
},
since: None,
status_collection_id: Some(global_status_collection_id),
timeline: Some(timeline.clone()),
primary: None,
};
Expand Down Expand Up @@ -1091,7 +1078,6 @@ impl Coordinator {
desc,
data_source: DataSource::Webhook,
since: None,
status_collection_id: None, // Webhook tables don't use status collections
timeline: Some(timeline.clone()),
primary: None,
};
Expand Down Expand Up @@ -1233,16 +1219,7 @@ impl Coordinator {
source: Source,
compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
) -> Result<(), AdapterError> {
let source_status_item_id = self
.catalog()
.resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
let source_status_collection_id = Some(
self.catalog()
.get_entry(&source_status_item_id)
.latest_global_id(),
);

let (data_source, status_collection_id) = match source.data_source {
let data_source = match source.data_source {
DataSourceDesc::Ingestion { desc, cluster_id } => {
let desc = desc.into_inline_connection(self.catalog().state());
let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
Expand All @@ -1253,10 +1230,7 @@ impl Coordinator {
item_global_id,
);

(
DataSource::Ingestion(ingestion),
source_status_collection_id,
)
DataSource::Ingestion(ingestion)
}
DataSourceDesc::OldSyntaxIngestion {
desc,
Expand Down Expand Up @@ -1291,10 +1265,7 @@ impl Coordinator {
.source_exports
.insert(source.global_id, legacy_export);

(
DataSource::Ingestion(ingestion),
source_status_collection_id,
)
DataSource::Ingestion(ingestion)
}
DataSourceDesc::IngestionExport {
ingestion_id,
Expand All @@ -1305,17 +1276,15 @@ impl Coordinator {
// TODO(parkmycar): We should probably check the type here, but I'm not sure if
// this will always be a Source or a Table.
let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
(
DataSource::IngestionExport {
ingestion_id,
details,
data_config: data_config.into_inline_connection(self.catalog().state()),
},
source_status_collection_id,
)

DataSource::IngestionExport {
ingestion_id,
details,
data_config: data_config.into_inline_connection(self.catalog().state()),
}
}
DataSourceDesc::Progress => (DataSource::Progress, None),
DataSourceDesc::Webhook { .. } => (DataSource::Webhook, None),
DataSourceDesc::Progress => DataSource::Progress,
DataSourceDesc::Webhook { .. } => DataSource::Webhook,
DataSourceDesc::Introspection(_) => {
unreachable!("cannot create sources with introspection data sources")
}
Expand All @@ -1328,7 +1297,6 @@ impl Coordinator {
data_source,
timeline: Some(source.timeline),
since: None,
status_collection_id,
primary: None,
},
);
Expand Down
1 change: 0 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,6 @@ impl Coordinator {
},
},
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
Expand Down
26 changes: 1 addition & 25 deletions src/compute-client/src/as_of_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,10 @@ mod tests {
use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
use mz_storage_types::StorageDiff;
use mz_storage_types::connections::inline::InlinedConnection;
use mz_storage_types::controller::{CollectionMetadata, StorageError};
use mz_storage_types::errors::CollectionMissing;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
use mz_storage_types::sources::SourceData;
use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
use timely::progress::Timestamp as TimelyTimestamp;

Expand Down Expand Up @@ -1041,28 +1039,6 @@ mod tests {
unimplemented!()
}

async fn alter_ingestion_source_desc(
&self,
_ingestion_id: GlobalId,
_source_desc: SourceDesc,
) -> Result<(), StorageError<Self::Timestamp>> {
unimplemented!()
}

async fn alter_ingestion_export_data_configs(
&self,
_source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
) -> Result<(), StorageError<Self::Timestamp>> {
unimplemented!()
}

async fn alter_ingestion_connections(
&self,
_source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
) -> Result<(), StorageError<Self::Timestamp>> {
unimplemented!()
}

async fn alter_table_desc(
&self,
_existing_collection: GlobalId,
Expand Down
8 changes: 4 additions & 4 deletions src/persist-client/src/usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ impl StorageUsageClient {
}

/// Computes [ShardUsageReferenced] for a given set of shards. Suitable for customer billing.
pub async fn shards_usage_referenced<I>(&self, shard_ids: I) -> ShardsUsageReferenced
where
I: IntoIterator<Item = ShardId>,
{
pub async fn shards_usage_referenced(
&self,
shard_ids: impl IntoIterator<Item = ShardId>,
) -> ShardsUsageReferenced {
let semaphore = Arc::new(Semaphore::new(
USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg),
));
Expand Down
5 changes: 0 additions & 5 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ pub struct CollectionDescription<T> {
pub data_source: DataSource<T>,
/// An optional frontier to which the collection's `since` should be advanced.
pub since: Option<Antichain<T>>,
/// A GlobalId to use for this collection to use for the status collection.
/// Used to keep track of source status/error information.
pub status_collection_id: Option<GlobalId>,
/// The timeline of the source. Absent for materialized views, continual tasks, etc.
pub timeline: Option<Timeline>,
/// The primary of this collections.
Expand All @@ -163,7 +160,6 @@ impl<T> CollectionDescription<T> {
desc,
data_source: DataSource::Other,
since,
status_collection_id: None,
timeline: None,
primary: None,
}
Expand All @@ -175,7 +171,6 @@ impl<T> CollectionDescription<T> {
desc,
data_source: DataSource::Table,
since: None,
status_collection_id: None,
timeline: Some(Timeline::EpochMilliseconds),
primary: None,
}
Expand Down
Loading