Skip to content

Commit c7e4fd0

Browse files
committed
adapter: Make determine_real_time_recent_timestamp handle it gracefully if a collection went away
1 parent 4cb4f85 commit c7e4fd0

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,14 +2512,20 @@ impl Coordinator {
25122512
real_time_recency_timeout: Duration,
25132513
) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
25142514
{
2515-
let item_ids = source_ids.map(|gid| self.catalog.resolve_item_id(&gid));
2515+
let item_ids = source_ids
2516+
.map(|gid| {
2517+
self.catalog
2518+
.try_resolve_item_id(&gid)
2519+
.ok_or_else(|| AdapterError::RtrDropFailure(gid.to_string()))
2520+
})
2521+
.collect::<Result<Vec<_>, _>>()?;
25162522

25172523
// Find all dependencies transitively because we need to ensure that
25182524
// RTR queries determine the timestamp from the sources' (i.e.
25192525
// storage objects that ingest data from external systems) remap
25202526
// data. We "cheat" a little bit and filter out any IDs that aren't
25212527
// user objects because we know they are not a RTR source.
2522-
let mut to_visit = VecDeque::from_iter(item_ids.filter(CatalogItemId::is_user));
2528+
let mut to_visit = VecDeque::from_iter(item_ids.into_iter().filter(CatalogItemId::is_user));
25232529
// If none of the sources are user objects, we don't need to provide
25242530
// a RTR timestamp.
25252531
if to_visit.is_empty() {

0 commit comments

Comments
 (0)