Skip to content

Commit 22f9ba2

Browse files
authored
feat: databend-meta transaction support generic bool-expression and else-if chain (#17064)
Since this commit, application is allowed to specify a complex bool expressions as the transaction predicate. For example, the transaction will execute as if running the following pseudo codes: ``` if (a == 1 || b == 2) && (x == 3 || y == 4) { ops1 } else if (x == 2 || y == 1) { ops2 } else if (y == 3 && z == 4) { ops3 } else { ops4 } ``` ```rust let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val)); TxnRequest{ operations: vec![ BoolExpression::new( Some(eq("a", 1).or(eq("b", 2)) .and(eq("x", 3).or(eq("y", 4)))), ops1), BoolExpression::new( Some(eq("x", 2).or(eq("y", 1))), ops2), ], condition: vec![eq("y", 3), eq("z", 4)], if_then: ops3, else_then: ops4, } ``` For backward compatibility, both already existing `condition` and the new `operations` will be evaluated: transaction handler evaluate the `operations` first. If there is a met condition, execute and return. Otherwise, it evaluate `condition` and then execute `if_then` branch or `else_then` branch. TxnReply changes: Add field `execution_path` to indicate the executed branch, which is one of: - `"operation:<index>"`, operation at `index` is executed. - `"then"`: `if_then` is executed. - `"else"`: `else_then` is executed. `TxnReply.success` is set to `false` only when `else` is executed.
1 parent bed61d2 commit 22f9ba2

File tree

21 files changed

+955
-292
lines changed

21 files changed

+955
-292
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/api/src/schema_api_impl.rs

Lines changed: 49 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -439,18 +439,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
439439
}
440440
db_meta.drop_on = None;
441441

442-
let txn_req = TxnRequest {
443-
condition: vec![
442+
let txn_req = TxnRequest::new(
443+
vec![
444444
txn_cond_seq(name_key, Eq, 0),
445445
txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq),
446446
txn_cond_seq(&dbid, Eq, db_meta_seq),
447447
],
448-
if_then: vec![
448+
vec![
449449
txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id
450450
txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta
451451
],
452-
else_then: vec![],
453-
};
452+
);
454453

455454
let (succ, _responses) = send_txn(self, txn_req).await?;
456455

@@ -594,11 +593,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
594593
), /* __fd_database_id_to_name/<db_id> -> (tenant,db_name) */
595594
];
596595

597-
let txn_req = TxnRequest {
598-
condition,
599-
if_then,
600-
else_then: vec![],
601-
};
596+
let txn_req = TxnRequest::new(condition, if_then);
602597

603598
let (succ, _responses) = send_txn(self, txn_req).await?;
604599

@@ -1203,19 +1198,19 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
12031198
txn_cond_seq(&save_key_table_id_list, Eq, tb_id_list_seq),
12041199
]);
12051200

1206-
txn.if_then.extend( vec![
1207-
// Changing a table in a db has to update the seq of db_meta,
1208-
// to block the batch-delete-tables when deleting a db.
1209-
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
1210-
txn_op_put(
1211-
key_table_id,
1212-
serialize_struct(&req.table_meta)?,
1213-
), /* (tenant, db_id, tb_id) -> tb_meta */
1214-
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
1215-
// This record does not need to assert `table_id_to_name_key == 0`,
1216-
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
1217-
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
1218-
]);
1201+
txn.if_then.extend(vec![
1202+
// Changing a table in a db has to update the seq of db_meta,
1203+
// to block the batch-delete-tables when deleting a db.
1204+
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
1205+
txn_op_put(
1206+
key_table_id,
1207+
serialize_struct(&req.table_meta)?,
1208+
), /* (tenant, db_id, tb_id) -> tb_meta */
1209+
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
1210+
// This record does not need to assert `table_id_to_name_key == 0`,
1211+
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
1212+
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
1213+
]);
12191214

12201215
if req.as_dropped {
12211216
// To create the table in a "dropped" state,
@@ -1401,8 +1396,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14011396
tb_id_list.pop();
14021397
new_tb_id_list.append(table_id);
14031398

1404-
let mut txn = TxnRequest {
1405-
condition: vec![
1399+
let mut txn = TxnRequest::new(
1400+
vec![
14061401
// db has not to change, i.e., no new table is created.
14071402
// Renaming db is OK and does not affect the seq of db_meta.
14081403
txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq),
@@ -1416,7 +1411,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14161411
txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq),
14171412
txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq),
14181413
],
1419-
if_then: vec![
1414+
vec![
14201415
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
14211416
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
14221417
// Changing a table in a db has to update the seq of db_meta,
@@ -1426,8 +1421,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14261421
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
14271422
txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
14281423
],
1429-
else_then: vec![],
1430-
};
1424+
);
14311425

14321426
if *seq_db_id.data != *new_seq_db_id.data {
14331427
txn.if_then.push(
@@ -1909,8 +1903,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19091903
}
19101904
tb_meta.drop_on = None;
19111905

1912-
let txn_req = TxnRequest {
1913-
condition: vec![
1906+
let txn_req = TxnRequest::new(
1907+
vec![
19141908
// db has not to change, i.e., no new table is created.
19151909
// Renaming db is OK and does not affect the seq of db_meta.
19161910
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta_seq),
@@ -1921,7 +1915,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19211915
txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq),
19221916
txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq),
19231917
],
1924-
if_then: vec![
1918+
vec![
19251919
// Changing a table in a db has to update the seq of db_meta,
19261920
// to block the batch-delete-tables when deleting a db.
19271921
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
@@ -1931,8 +1925,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19311925
txn_op_del(&orphan_dbid_tbname_idlist), // del orphan table idlist
19321926
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list.data)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
19331927
],
1934-
else_then: vec![],
1935-
};
1928+
);
19361929

19371930
let (succ, _responses) = send_txn(self, txn_req).await?;
19381931

@@ -2061,16 +2054,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
20612054
// non-changed ones.
20622055

20632056
for chunk in copied_files.chunks(chunk_size as usize) {
2064-
let txn = TxnRequest {
2065-
condition: vec![],
2066-
if_then: chunk
2057+
let txn = TxnRequest::new(
2058+
vec![],
2059+
chunk
20672060
.iter()
20682061
.map(|(name, seq_file)| {
20692062
TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq()))
20702063
})
20712064
.collect(),
2072-
else_then: vec![],
2073-
};
2065+
);
20742066

20752067
let (_succ, _responses) = send_txn(self, txn).await?;
20762068
}
@@ -2377,16 +2369,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
23772369
}
23782370
}
23792371

2380-
let mut txn_req = TxnRequest {
2381-
condition: vec![
2372+
let mut txn_req = TxnRequest::new(
2373+
vec![
23822374
// table is not changed
23832375
txn_cond_seq(&tbid, Eq, seq_meta.seq),
23842376
],
2385-
if_then: vec![
2377+
vec![
23862378
txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta
23872379
],
2388-
else_then: vec![],
2389-
};
2380+
);
23902381

23912382
let _ = update_mask_policy(self, &req.action, &mut txn_req, &req.tenant, req.table_id)
23922383
.await;
@@ -2485,13 +2476,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
24852476
};
24862477
indexes.insert(req.name.clone(), index);
24872478

2488-
let txn_req = TxnRequest {
2489-
condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
2490-
if_then: vec![
2479+
let txn_req = TxnRequest::new(
2480+
//
2481+
vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
2482+
vec![
24912483
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
24922484
],
2493-
else_then: vec![],
2494-
};
2485+
);
24952486

24962487
let (succ, _responses) = send_txn(self, txn_req).await?;
24972488

@@ -2540,16 +2531,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
25402531
}
25412532
indexes.remove(&req.name);
25422533

2543-
let txn_req = TxnRequest {
2544-
condition: vec![
2534+
let txn_req = TxnRequest::new(
2535+
vec![
25452536
// table is not changed
25462537
txn_cond_seq(&tbid, Eq, seq_meta.seq),
25472538
],
2548-
if_then: vec![
2539+
vec![
25492540
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
25502541
],
2551-
else_then: vec![],
2552-
};
2542+
);
25532543

25542544
let (succ, _responses) = send_txn(self, txn_req).await?;
25552545
debug!(id :? =(&tbid),succ = succ;"drop_table_index");
@@ -2742,11 +2732,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
27422732
txn_op_put(&id_generator, b"".to_vec()),
27432733
txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?,
27442734
];
2745-
let txn_req = TxnRequest {
2746-
condition,
2747-
if_then,
2748-
else_then: vec![],
2749-
};
2735+
let txn_req = TxnRequest::new(condition, if_then);
27502736
let (succ, _responses) = send_txn(self, txn_req).await?;
27512737

27522738
if succ {
@@ -3053,11 +3039,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
30533039
txn_op_put_pb(&new_name_ident, &dict_id.data, None)?, // put new dict name
30543040
];
30553041

3056-
let txn_req = TxnRequest {
3057-
condition,
3058-
if_then,
3059-
else_then: vec![],
3060-
};
3042+
let txn_req = TxnRequest::new(condition, if_then);
30613043

30623044
let (succ, _responses) = send_txn(self, txn_req).await?;
30633045

@@ -4042,8 +4024,8 @@ async fn handle_undrop_table(
40424024
// reset drop on time
40434025
seq_table_meta.drop_on = None;
40444026

4045-
let txn = TxnRequest {
4046-
condition: vec![
4027+
let txn = TxnRequest::new(
4028+
vec![
40474029
// db has not to change, i.e., no new table is created.
40484030
// Renaming db is OK and does not affect the seq of db_meta.
40494031
txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq),
@@ -4052,15 +4034,14 @@ async fn handle_undrop_table(
40524034
// table is not changed
40534035
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
40544036
],
4055-
if_then: vec![
4037+
vec![
40564038
// Changing a table in a db has to update the seq of db_meta,
40574039
// to block the batch-delete-tables when deleting a db.
40584040
txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */
40594041
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
40604042
txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */
40614043
],
4062-
else_then: vec![],
4063-
};
4044+
);
40644045

40654046
let (succ, _responses) = send_txn(kv_api, txn).await?;
40664047

src/meta/api/src/sequence_api_impl.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {
142142
txn_op_put_pb(&ident, &sequence_meta, None)?, // name -> meta
143143
];
144144

145-
let txn_req = TxnRequest {
146-
condition,
147-
if_then,
148-
else_then: vec![],
149-
};
145+
let txn_req = TxnRequest::new(condition, if_then);
150146

151147
let (succ, _responses) = send_txn(self, txn_req).await?;
152148

src/meta/binaries/metabench/main.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,7 @@ async fn benchmark_table_copy_file(
298298
serde_json::from_str(param).unwrap()
299299
};
300300

301-
let mut txn = TxnRequest {
302-
condition: vec![],
303-
if_then: vec![],
304-
else_then: vec![],
305-
};
301+
let mut txn = TxnRequest::default();
306302

307303
for file_index in 0..param.file_cnt {
308304
let copied_file_ident = TableCopiedFileNameIdent {

src/meta/client/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
117117
/// 🖥 server: add `txn_condition::Target::KeysWithPrefix`,
118118
/// to support matching the key count by a prefix.
119119
///
120+
/// - 2024-12-1*: since 1.2.*
121+
/// 🖥 server: add `TxnRequest::condition_tree`,
122+
/// to specify a complex bool expression.
123+
///
120124
///
121125
/// Server feature set:
122126
/// ```yaml

src/meta/kvapi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ test = true
1515
[dependencies]
1616
anyhow = { workspace = true }
1717
async-trait = { workspace = true }
18+
databend-common-base = { workspace = true }
1819
databend-common-meta-types = { workspace = true }
1920
fastrace = { workspace = true }
2021
futures-util = { workspace = true }

0 commit comments

Comments
 (0)