diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 3312e3b3ed771..06a0584ef471d 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -39,6 +39,7 @@ use crate::barrier::{ CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan, Scheduled, }; +use crate::error::MetaErrorInner; use crate::hummock::CommitEpochInfo; use crate::manager::LocalNotification; use crate::model::FragmentDownstreamRelation; @@ -150,13 +151,12 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { continue; } - // Find the database ID for this table - let database_id = self - .metadata_manager - .catalog_controller - .get_object_database_id(associated_source_id) - .await - .context("Failed to get database id for table")?; + let Some(database_id) = self + .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list") + .await? + else { + continue; + }; // Create ListFinish command let list_finish_command = Command::ListFinish { @@ -202,13 +202,12 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { continue; } - // Find the database ID for this table - let database_id = self - .metadata_manager - .catalog_controller - .get_object_database_id(associated_source_id) - .await - .context("Failed to get database id for table")?; + let Some(database_id) = self + .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load") + .await? + else { + continue; + }; // Create LoadFinish command let load_finish_command = Command::LoadFinish { @@ -246,11 +245,58 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl { } impl GlobalBarrierWorkerContextImpl { + async fn get_source_database_id_for_refresh_stage( + &self, + table_id: TableId, + associated_source_id: SourceId, + stage: &'static str, + ) -> MetaResult> { + match self + .metadata_manager + .catalog_controller + .get_object_database_id(associated_source_id) + .await + { + Ok(database_id) => Ok(Some(database_id)), + Err(err) if should_skip_refresh_finish_for_missing_object(&err) => { + tracing::warn!( + %table_id, + %associated_source_id, + stage, + "skip refresh finish command because associated source is already dropped" + ); + Ok(None) + } + Err(err) => Err(err), + } + } + fn set_status(&self, new_status: BarrierManagerStatus) { self.status.store(Arc::new(new_status)); } } +fn should_skip_refresh_finish_for_missing_object(err: &MetaError) -> bool { + matches!(err.inner(), MetaErrorInner::CatalogIdNotFound("object", _)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_skip_refresh_finish_when_associated_source_missing() { + let err = MetaError::catalog_id_not_found("object", 42); + assert!(should_skip_refresh_finish_for_missing_object(&err)); + } + + #[test] + fn test_do_not_skip_refresh_finish_for_other_not_found_types() { + let err = MetaError::catalog_id_not_found("table", 42); + assert!(!should_skip_refresh_finish_for_missing_object(&err)); + } +} + impl PostCollectCommand { /// Do some stuffs after barriers are collected and the new storage version is committed, for /// the given command.