@@ -25,6 +25,7 @@ use common_meta_app::app_error::CreateTableWithDropTime;
2525use common_meta_app:: app_error:: DatabaseAlreadyExists ;
2626use common_meta_app:: app_error:: DropDbWithDropTime ;
2727use common_meta_app:: app_error:: DropTableWithDropTime ;
28+ use common_meta_app:: app_error:: DuplicatedUpsertFiles ;
2829use common_meta_app:: app_error:: ShareHasNoGrantedDatabase ;
2930use common_meta_app:: app_error:: ShareHasNoGrantedPrivilege ;
3031use common_meta_app:: app_error:: TableAlreadyExists ;
@@ -100,7 +101,10 @@ use common_meta_app::share::ShareId;
100101use common_meta_app:: share:: ShareNameIdent ;
101102use common_meta_kvapi:: kvapi;
102103use common_meta_kvapi:: kvapi:: Key ;
104+ use common_meta_types:: txn_op:: Request ;
105+ use common_meta_types:: txn_op_response:: Response ;
103106use common_meta_types:: ConditionResult ;
107+ use common_meta_types:: ConditionResult :: Ge ;
104108use common_meta_types:: GCDroppedDataReply ;
105109use common_meta_types:: GCDroppedDataReq ;
106110use common_meta_types:: InvalidReply ;
@@ -109,6 +113,7 @@ use common_meta_types::MetaError;
109113use common_meta_types:: MetaId ;
110114use common_meta_types:: MetaNetworkError ;
111115use common_meta_types:: TxnCondition ;
116+ use common_meta_types:: TxnGetRequest ;
112117use common_meta_types:: TxnOp ;
113118use common_meta_types:: TxnRequest ;
114119use common_tracing:: func_name;
@@ -1926,47 +1931,11 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
19261931 "upsert_table_copied_file_info"
19271932 ) ;
19281933
1929- let mut condition = vec ! [ txn_cond_seq( & tbid, Eq , tb_meta_seq) ] ;
1930- let mut if_then = vec ! [ ] ;
1931- // `remove_table_copied_files` and `upsert_table_copied_file_info`
1932- // all modify `TableCopiedFileInfo`,
1933- // so there used to has `TableCopiedFileLockKey` in these two functions
1934- // to protect TableCopiedFileInfo modification.
1935- // In issue: https://github.com/datafuselabs/databend/issues/8897,
1936- // there is chance that if copy files concurrently, `upsert_table_copied_file_info`
1937- // may return `TxnRetryMaxTimes`.
1938- // So now, in case that `TableCopiedFileInfo` has expire time, remove `TableCopiedFileLockKey`
1939- // in each function. In this case there is chance that some `TableCopiedFileInfo` may not be
1940- // removed in `remove_table_copied_files`, but these data can be purged in case of expire time.
1941-
1942- let mut file_name_infos = req. file_info . clone ( ) . into_iter ( ) ;
1943-
1944- for c in keys. chunks ( DEFAULT_MGET_SIZE ) {
1945- let seq_infos: Vec < ( u64 , Option < TableCopiedFileInfo > ) > =
1946- mget_pb_values ( self , c) . await ?;
1947-
1948- for ( file_seq, _file_info_opt) in seq_infos {
1949- let ( f_name, file_info) = file_name_infos. next ( ) . unwrap ( ) ;
1950-
1951- let key = TableCopiedFileNameIdent {
1952- table_id,
1953- file : f_name. to_owned ( ) ,
1954- } ;
1955- condition. push ( txn_cond_seq ( & key, Eq , file_seq) ) ;
1956- match & req. expire_at {
1957- Some ( expire_at) => {
1958- if_then. push ( txn_op_put_with_expire (
1959- & key,
1960- serialize_struct ( & file_info) ?,
1961- * expire_at,
1962- ) ) ;
1963- }
1964- None => {
1965- if_then. push ( txn_op_put ( & key, serialize_struct ( & file_info) ?) ) ;
1966- }
1967- }
1968- }
1969- }
1934+ let ( condition, if_then) = build_upsert_table_copied_file_info_conditions (
1935+ & req,
1936+ tb_meta_seq,
1937+ req. fail_if_duplicated ,
1938+ ) ?;
19701939
19711940 let txn_req = TxnRequest {
19721941 condition,
@@ -1984,6 +1953,11 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
19841953
19851954 if succ {
19861955 return Ok ( UpsertTableCopiedFileReply { } ) ;
1956+ } else if req. fail_if_duplicated {
1957+ // fail fast if txn failed, which caused by file duplication
1958+ return Err ( KVAppError :: AppError ( AppError :: DuplicatedUpsertFiles (
1959+ DuplicatedUpsertFiles :: new ( req. table_id , "upsert_table_copied_file_info" ) ,
1960+ ) ) ) ;
19871961 }
19881962 }
19891963
@@ -2130,12 +2104,17 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
21302104 req : UpdateTableMetaReq ,
21312105 ) -> Result < UpdateTableMetaReply , KVAppError > {
21322106 debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
2133-
21342107 let tbid = TableId {
21352108 table_id : req. table_id ,
21362109 } ;
21372110 let req_seq = req. seq ;
21382111
2112+ let fail_if_duplicated = req
2113+ . copied_files
2114+ . as_ref ( )
2115+ . map ( |v| v. fail_if_duplicated )
2116+ . unwrap_or ( false ) ;
2117+
21392118 loop {
21402119 let ( tb_meta_seq, table_meta) : ( _ , Option < TableMeta > ) =
21412120 get_pb_value ( self , & tbid) . await ?;
@@ -2158,23 +2137,83 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
21582137 ) ) ) ;
21592138 }
21602139
2161- let txn_req = TxnRequest {
2140+ let get_table_meta = TxnOp {
2141+ request : Some ( Request :: Get ( TxnGetRequest {
2142+ key : tbid. to_string_key ( ) ,
2143+ } ) ) ,
2144+ } ;
2145+
2146+ let mut txn_req = TxnRequest {
21622147 condition : vec ! [
21632148 // table is not changed
21642149 txn_cond_seq( & tbid, Eq , tb_meta_seq) ,
21652150 ] ,
21662151 if_then : vec ! [
21672152 txn_op_put( & tbid, serialize_struct( & req. new_table_meta) ?) , // tb_id -> tb_meta
21682153 ] ,
2169- else_then : vec ! [ ] ,
2154+ else_then : vec ! [ get_table_meta ] ,
21702155 } ;
21712156
2172- let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
2157+ if let Some ( req) = & req. copied_files {
2158+ let ( conditions, match_operations) =
2159+ build_upsert_table_copied_file_info_conditions (
2160+ req,
2161+ tb_meta_seq,
2162+ req. fail_if_duplicated ,
2163+ ) ?;
2164+ txn_req. condition . extend ( conditions) ;
2165+ txn_req. if_then . extend ( match_operations)
2166+ }
2167+
2168+ let ( succ, responses) = send_txn ( self , txn_req) . await ?;
21732169
21742170 debug ! ( id = debug( & tbid) , succ = display( succ) , "update_table_meta" ) ;
21752171
21762172 if succ {
21772173 return Ok ( UpdateTableMetaReply { } ) ;
2174+ } else {
2175+ let resp = responses
2176+ . get ( 0 )
2177+ // fail fast if response is None (which should not happen)
2178+ . expect ( "internal error: expect one response if update_table_meta txn failed." ) ;
2179+
2180+ if let Some ( Response :: Get ( get_resp) ) = & resp. response {
2181+ // deserialize table version info
2182+ let ( tb_meta_seq, _) : ( _ , Option < TableMeta > ) =
2183+ if let Some ( seq_v) = & get_resp. value {
2184+ ( seq_v. seq , Some ( deserialize_struct ( & seq_v. data ) ?) )
2185+ } else {
2186+ ( 0 , None )
2187+ } ;
2188+
2189+ // check table version
2190+ if req_seq. match_seq ( tb_meta_seq) . is_ok ( ) {
2191+ // if table version does match, but tx failed,
2192+ if fail_if_duplicated {
2193+ // report file duplication error
2194+ return Err ( KVAppError :: AppError ( AppError :: from (
2195+ DuplicatedUpsertFiles :: new ( req. table_id , "update_table_meta" ) ,
2196+ ) ) ) ;
2197+ } else {
2198+ // continue and try update the "table copied files"
2199+ continue ;
2200+ } ;
2201+ } else {
2202+ return Err ( KVAppError :: AppError ( AppError :: from (
2203+ TableVersionMismatched :: new (
2204+ req. table_id ,
2205+ req. seq ,
2206+ tb_meta_seq,
2207+ "update_table_meta" ,
2208+ ) ,
2209+ ) ) ) ;
2210+ }
2211+ } else {
2212+ unreachable ! (
2213+ "internal error: expect some TxnGetResponseGet, but got {:?}" ,
2214+ resp. response
2215+ ) ;
2216+ }
21782217 }
21792218 }
21802219 }
@@ -2855,3 +2894,64 @@ async fn list_tables_from_share_db(
28552894 )
28562895 . await
28572896}
2897+
2898+ fn build_upsert_table_copied_file_info_conditions (
2899+ req : & UpsertTableCopiedFileReq ,
2900+ tb_meta_seq : u64 ,
2901+ fail_if_duplicated : bool ,
2902+ ) -> Result < ( Vec < TxnCondition > , Vec < TxnOp > ) , KVAppError > {
2903+ let table_id = req. table_id ;
2904+ let tbid = TableId { table_id } ;
2905+
2906+ let mut condition = vec ! [ txn_cond_seq( & tbid, Eq , tb_meta_seq) ] ;
2907+ let mut if_then = vec ! [ ] ;
2908+
2909+ // `remove_table_copied_files` and `upsert_table_copied_file_info`
2910+ // all modify `TableCopiedFileInfo`,
2911+ // so there used to has `TableCopiedFileLockKey` in these two functions
2912+ // to protect TableCopiedFileInfo modification.
2913+ // In issue: https://github.com/datafuselabs/databend/issues/8897,
2914+ // there is chance that if copy files concurrently, `upsert_table_copied_file_info`
2915+ // may return `TxnRetryMaxTimes`.
2916+ // So now, in case that `TableCopiedFileInfo` has expire time, remove `TableCopiedFileLockKey`
2917+ // in each function. In this case there is chance that some `TableCopiedFileInfo` may not be
2918+ // removed in `remove_table_copied_files`, but these data can be purged in case of expire time.
2919+
2920+ let file_name_infos = req. file_info . clone ( ) . into_iter ( ) ;
2921+
2922+ for ( file_name, file_info) in file_name_infos {
2923+ let key = TableCopiedFileNameIdent {
2924+ table_id,
2925+ file : file_name. to_owned ( ) ,
2926+ } ;
2927+ if fail_if_duplicated {
2928+ // "fail_if_duplicated" mode, assumes files are absent
2929+ condition. push ( txn_cond_seq ( & key, Eq , 0 ) ) ;
2930+ } else {
2931+ condition. push ( txn_cond_seq ( & key, Ge , 0 ) ) ;
2932+ }
2933+ set_update_expire_operation ( & key, & file_info, & req. expire_at , & mut if_then) ?;
2934+ }
2935+ Ok ( ( condition, if_then) )
2936+ }
2937+
2938+ fn set_update_expire_operation (
2939+ key : & TableCopiedFileNameIdent ,
2940+ file_info : & TableCopiedFileInfo ,
2941+ expire_at_opt : & Option < u64 > ,
2942+ then_branch : & mut Vec < TxnOp > ,
2943+ ) -> Result < ( ) , KVAppError > {
2944+ match expire_at_opt {
2945+ Some ( expire_at) => {
2946+ then_branch. push ( txn_op_put_with_expire (
2947+ key,
2948+ serialize_struct ( file_info) ?,
2949+ * expire_at,
2950+ ) ) ;
2951+ }
2952+ None => {
2953+ then_branch. push ( txn_op_put ( key, serialize_struct ( file_info) ?) ) ;
2954+ }
2955+ }
2956+ Ok ( ( ) )
2957+ }
0 commit comments