Skip to content

Commit 0a7421b

Browse files
authored
Merge pull request #34304 from ggevay/frontend-peek-rtr-RtrDropFailure
Solve some frontend peek sequencing Nightly fails: RTR `catalog out of sync` and COPY TO `cannot fail to drop collections`
2 parents 35faa84 + c423544 commit 0a7421b

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

src/adapter/src/coord/peek.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,6 +1339,10 @@ impl crate::coord::Coordinator {
13391339
.await
13401340
.map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
13411341
{
1342+
// Clean up the active compute sink that was added above, since the dataflow was never
1343+
// created. If we don't do this, the sink_id remains in drop_sinks but no collection
1344+
// exists in the compute controller, causing a panic when the connection terminates.
1345+
self.remove_active_compute_sink(sink_id).await;
13421346
let _ = tx.send(Err(e));
13431347
return;
13441348
}

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,14 +2512,20 @@ impl Coordinator {
25122512
real_time_recency_timeout: Duration,
25132513
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
25142514
{
2515-
let item_ids = source_ids.map(|gid| self.catalog.resolve_item_id(&gid));
2515+
let item_ids = source_ids
2516+
.map(|gid| {
2517+
self.catalog
2518+
.try_resolve_item_id(&gid)
2519+
.ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2520+
})
2521+
.collect::<Result<Vec<_>, _>>()?;
25162522

25172523
// Find all dependencies transitively because we need to ensure that
25182524
// RTR queries determine the timestamp from the sources' (i.e.
25192525
// storage objects that ingest data from external systems) remap
25202526
// data. We "cheat" a little bit and filter out any IDs that aren't
25212527
// user objects because we know they are not a RTR source.
2522-
let mut to_visit = VecDeque::from_iter(item_ids.filter(CatalogItemId::is_user));
2528+
let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
25232529
// If none of the sources are user objects, we don't need to provide
25242530
// a RTR timestamp.
25252531
if to_visit.is_empty() {

0 commit comments

Comments
 (0)