Skip to content

Commit d8415ee

Browse files
authored
chore: refine error message in transaction retry and add more log (#18528)
* refine error message * add log * add log
1 parent 73bfb16 commit d8415ee

File tree

6 files changed

+21
-8
lines changed

6 files changed

+21
-8
lines changed

src/query/ee/src/stream/handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl StreamHandler for RealStreamHandler {
119119
.get(OPT_KEY_DATABASE_ID)
120120
.ok_or_else(|| {
121121
ErrorCode::Internal(format!(
122-
"Invalid fuse table, table option {} not found",
122+
"Invalid fuse table, table option {} not found when creating stream",
123123
OPT_KEY_DATABASE_ID
124124
))
125125
})?;

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -713,15 +713,17 @@ impl Catalog for MutableCatalog {
713713
.collect();
714714

715715
info!(
716-
"[CATALOG] Updating multiple table metadata: table_updates=[{}], stream_updates=[{}]",
716+
"[CATALOG] Updating multiple table metadata: table_updates=[{}], stream_updates=[{}], req={:?}",
717717
table_updates.join("; "),
718-
stream_updates.join("; ")
718+
stream_updates.join("; "),
719+
req
719720
);
720721
let begin = Instant::now();
721722
let res = self.ctx.meta.update_multi_table_meta(req).await;
722723
info!(
723-
"[CATALOG] Multiple table metadata update completed: elapsed_time={:?}",
724-
begin.elapsed()
724+
"[CATALOG] Multiple table metadata update completed: elapsed_time={:?}, result={:?}",
725+
begin.elapsed(),
726+
res
725727
);
726728
Ok(res?)
727729
}

src/query/service/src/interpreters/common/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn dml_build_update_stream_req(
7070
.get(OPT_KEY_DATABASE_ID)
7171
.ok_or_else(|| {
7272
ErrorCode::Internal(format!(
73-
"Invalid fuse table, table option {} not found",
73+
"Invalid fuse table, table option {} not found when building update stream req",
7474
OPT_KEY_DATABASE_ID
7575
))
7676
})?;

src/query/storages/common/table_meta/src/meta/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pub fn parse_storage_prefix(options: &BTreeMap<String, String>, table_id: u64) -
160160

161161
let db_id = options.get(OPT_KEY_DATABASE_ID).ok_or_else(|| {
162162
ErrorCode::Internal(format!(
163-
"Invalid fuse table, table option {} not found",
163+
"Invalid fuse table, table option {} not found when parsing storage prefix",
164164
OPT_KEY_DATABASE_ID
165165
))
166166
})?;

src/query/storages/fuse/src/retry/commit.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_storages_common_cache::Table;
2828
use databend_storages_common_cache::TableSnapshot;
2929
use databend_storages_common_table_meta::meta::Versioned;
3030
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
31+
use log::info;
3132

3233
use super::diff::SegmentsDiff;
3334
use crate::operations::set_backoff;
@@ -68,8 +69,18 @@ async fn try_rebuild_req(
6869
req: &mut UpdateMultiTableMetaReq,
6970
update_failed_tbls: Vec<(u64, u64, TableMeta)>,
7071
) -> Result<()> {
72+
info!(
73+
"try_rebuild_req: update_failed_tbls={:?}",
74+
update_failed_tbls
75+
);
7176
let txn_mgr = ctx.txn_mgr();
7277
for (tid, seq, table_meta) in update_failed_tbls {
78+
if table_meta.engine == "STREAM" {
79+
return Err(ErrorCode::UnresolvableConflict(format!(
80+
"Concurrent transaction commit failed. Stream table {} has unresolvable conflicts.",
81+
tid
82+
)));
83+
}
7384
let latest_table = FuseTable::from_table_meta(tid, seq, table_meta)?;
7485
let default_cluster_key_id = latest_table.cluster_key_id();
7586
let latest_snapshot = latest_table.read_table_snapshot().await?;

src/query/storages/stream/src/stream_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl StreamTable {
290290
.get(OPT_KEY_DATABASE_ID)
291291
.ok_or_else(|| {
292292
ErrorCode::Internal(format!(
293-
"Invalid fuse table, table option {} not found",
293+
"Invalid fuse table, table option {} not found when getting source database id",
294294
OPT_KEY_DATABASE_ID
295295
))
296296
})?

0 commit comments

Comments
 (0)