Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::barrier::{
CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
Scheduled,
};
use crate::error::MetaErrorInner;
use crate::hummock::CommitEpochInfo;
use crate::manager::LocalNotification;
use crate::model::FragmentDownstreamRelation;
Expand Down Expand Up @@ -150,13 +151,12 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
continue;
}

// Find the database ID for this table
let database_id = self
.metadata_manager
.catalog_controller
.get_object_database_id(associated_source_id)
.await
.context("Failed to get database id for table")?;
let Some(database_id) = self
.get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list")
.await?
else {
continue;
};

// Create ListFinish command
let list_finish_command = Command::ListFinish {
Expand Down Expand Up @@ -202,13 +202,12 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
continue;
}

// Find the database ID for this table
let database_id = self
.metadata_manager
.catalog_controller
.get_object_database_id(associated_source_id)
.await
.context("Failed to get database id for table")?;
let Some(database_id) = self
.get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load")
.await?
else {
continue;
};

// Create LoadFinish command
let load_finish_command = Command::LoadFinish {
Expand Down Expand Up @@ -246,11 +245,58 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
}

impl GlobalBarrierWorkerContextImpl {
async fn get_source_database_id_for_refresh_stage(
&self,
table_id: TableId,
associated_source_id: SourceId,
stage: &'static str,
) -> MetaResult<Option<DatabaseId>> {
match self
.metadata_manager
.catalog_controller
.get_object_database_id(associated_source_id)
.await
{
Ok(database_id) => Ok(Some(database_id)),
Err(err) if should_skip_refresh_finish_for_missing_object(&err) => {
tracing::warn!(
%table_id,
%associated_source_id,
stage,
"skip refresh finish command because associated source is already dropped"
);
Ok(None)
}
Err(err) => Err(err),
}
}

fn set_status(&self, new_status: BarrierManagerStatus) {
self.status.store(Arc::new(new_status));
}
}

fn should_skip_refresh_finish_for_missing_object(err: &MetaError) -> bool {
matches!(err.inner(), MetaErrorInner::CatalogIdNotFound("object", _))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_skip_refresh_finish_when_associated_source_missing() {
let err = MetaError::catalog_id_not_found("object", 42);
assert!(should_skip_refresh_finish_for_missing_object(&err));
}

#[test]
fn test_do_not_skip_refresh_finish_for_other_not_found_types() {
let err = MetaError::catalog_id_not_found("table", 42);
assert!(!should_skip_refresh_finish_for_missing_object(&err));
}
}

impl PostCollectCommand {
/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
Expand Down