Skip to content

Commit 7188477

Browse files
authored
chore: reinfe log for transaction commit (#18435)
* fm unused code * refine log * refine error message * fix ut
1 parent 005469a commit 7188477

File tree

6 files changed

+43
-35
lines changed

6 files changed

+43
-35
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use databend_common_meta_app::app_error::DatabaseAlreadyExists;
3939
use databend_common_meta_app::app_error::DropDbWithDropTime;
4040
use databend_common_meta_app::app_error::DropTableWithDropTime;
4141
use databend_common_meta_app::app_error::DuplicatedIndexColumnId;
42+
use databend_common_meta_app::app_error::DuplicatedUpsertFiles;
4243
use databend_common_meta_app::app_error::IndexColumnIdNotFound;
4344
use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
4445
use databend_common_meta_app::app_error::StreamAlreadyExists;
@@ -2205,6 +2206,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22052206
// in each function. In this case there is chance that some `TableCopiedFileInfo` may not be
22062207
// removed in `remove_table_copied_files`, but these data can be purged in case of expire time.
22072208

2209+
let insert_if_not_exists_table_ids = copied_files
2210+
.iter()
2211+
.filter(|(_, req)| req.insert_if_not_exists)
2212+
.map(|(table_id, _)| *table_id)
2213+
.collect::<Vec<_>>();
2214+
22082215
for (table_id, req) in copied_files {
22092216
let tbid = TableId { table_id };
22102217

@@ -2300,10 +2307,20 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
23002307
}
23012308

23022309
if mismatched_tbs.is_empty() {
2303-
// if all table version does match, but tx failed, we don't know why, just return error
2304-
Err(KVAppError::AppError(AppError::from(
2305-
MultiStmtTxnCommitFailed::new("update_multi_table_meta"),
2306-
)))
2310+
if !insert_if_not_exists_table_ids.is_empty() {
2311+
// If insert_if_not_exists is true and transaction failed, it's likely due to duplicated files
2312+
Err(KVAppError::AppError(AppError::from(
2313+
DuplicatedUpsertFiles::new(
2314+
insert_if_not_exists_table_ids,
2315+
"update_multi_table_meta",
2316+
),
2317+
)))
2318+
} else {
2319+
// if all table version does match, but tx failed, we don't know why, just return error
2320+
Err(KVAppError::AppError(AppError::from(
2321+
MultiStmtTxnCommitFailed::new("update_multi_table_meta"),
2322+
)))
2323+
}
23072324
} else {
23082325
// up layer will retry
23092326
Ok(Err(mismatched_tbs))

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2792,7 +2792,7 @@ impl SchemaApiTestSuite {
27922792
.await;
27932793
let err = result.unwrap_err();
27942794
let err = ErrorCode::from(err);
2795-
assert_eq!(ErrorCode::UNRESOLVABLE_CONFLICT, err.code());
2795+
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
27962796
}
27972797

27982798
info!("--- update table meta: virtual column too many");
@@ -7762,7 +7762,7 @@ impl SchemaApiTestSuite {
77627762
let result = mt.update_multi_table_meta(req).await;
77637763
let err = result.unwrap_err();
77647764
let err = ErrorCode::from(err);
7765-
assert_eq!(ErrorCode::UNRESOLVABLE_CONFLICT, err.code());
7765+
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
77667766

77677767
let req = GetTableCopiedFileReq {
77687768
table_id,

src/meta/app/src/app_error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,14 +361,14 @@ impl crate::app_error::UpdateStreamMetasFailed {
361361
}
362362

363363
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
364-
#[error("DuplicatedUpsertFiles: {table_id} , in operation `{context}`")]
364+
#[error("DuplicatedUpsertFiles: {table_id:?} , in operation `{context}`")]
365365
pub struct DuplicatedUpsertFiles {
366-
table_id: u64,
366+
table_id: Vec<u64>,
367367
context: String,
368368
}
369369

370370
impl DuplicatedUpsertFiles {
371-
pub fn new(table_id: u64, context: impl Into<String>) -> Self {
371+
pub fn new(table_id: Vec<u64>, context: impl Into<String>) -> Self {
372372
DuplicatedUpsertFiles {
373373
table_id,
374374
context: context.into(),

src/query/catalog/src/database.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ use databend_common_meta_app::schema::TableInfo;
3434
use databend_common_meta_app::schema::TruncateTableReply;
3535
use databend_common_meta_app::schema::TruncateTableReq;
3636
use databend_common_meta_app::schema::UndropTableReq;
37-
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
38-
use databend_common_meta_app::schema::UpdateMultiTableMetaResult;
3937
use databend_common_meta_app::schema::UpsertTableOptionReply;
4038
use databend_common_meta_app::schema::UpsertTableOptionReq;
4139
use databend_common_meta_app::tenant::Tenant;
@@ -218,15 +216,4 @@ pub trait Database: DynClone + Sync + Send {
218216
self.name()
219217
)))
220218
}
221-
222-
#[async_backtrace::framed]
223-
async fn retryable_update_multi_table_meta(
224-
&self,
225-
_req: UpdateMultiTableMetaReq,
226-
) -> Result<UpdateMultiTableMetaResult> {
227-
Err(ErrorCode::Unimplemented(format!(
228-
"UnImplement retryable_update_multi_table_meta in {} Database",
229-
self.name()
230-
)))
231-
}
232219
}

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,9 +697,24 @@ impl Catalog for MutableCatalog {
697697
}
698698
}
699699

700+
let table_updates: Vec<String> = req
701+
.update_table_metas
702+
.iter()
703+
.map(|(update_req, _)| {
704+
format!("table_id={}, seq={}", update_req.table_id, update_req.seq)
705+
})
706+
.collect();
707+
708+
let stream_updates: Vec<String> = req
709+
.update_stream_metas
710+
.iter()
711+
.map(|stream_req| format!("stream_id={}, seq={}", stream_req.stream_id, stream_req.seq))
712+
.collect();
713+
700714
info!(
701-
"[CATALOG] Updating multiple table metadata: table_count={}",
702-
req.update_table_metas.len()
715+
"[CATALOG] Updating multiple table metadata: table_updates=[{}], stream_updates=[{}]",
716+
table_updates.join("; "),
717+
stream_updates.join("; ")
703718
);
704719
let begin = Instant::now();
705720
let res = self.ctx.meta.update_multi_table_meta(req).await;

src/query/service/src/databases/default/default_database.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ use databend_common_meta_app::schema::TableInfo;
4242
use databend_common_meta_app::schema::TruncateTableReply;
4343
use databend_common_meta_app::schema::TruncateTableReq;
4444
use databend_common_meta_app::schema::UndropTableReq;
45-
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
46-
use databend_common_meta_app::schema::UpdateMultiTableMetaResult;
4745
use databend_common_meta_app::schema::UpsertTableOptionReply;
4846
use databend_common_meta_app::schema::UpsertTableOptionReq;
4947

@@ -291,13 +289,4 @@ impl Database for DefaultDatabase {
291289
let res = self.ctx.meta.truncate_table(req).await?;
292290
Ok(res)
293291
}
294-
295-
#[async_backtrace::framed]
296-
async fn retryable_update_multi_table_meta(
297-
&self,
298-
req: UpdateMultiTableMetaReq,
299-
) -> Result<UpdateMultiTableMetaResult> {
300-
let res = self.ctx.meta.update_multi_table_meta(req).await?;
301-
Ok(res)
302-
}
303292
}

0 commit comments

Comments
 (0)