@@ -61,6 +61,9 @@ use databend_common_meta_app::app_error::VirtualColumnIdOutBound;
6161use databend_common_meta_app:: app_error:: VirtualColumnTooMany ;
6262use databend_common_meta_app:: data_mask:: MaskPolicyTableIdListIdent ;
6363use databend_common_meta_app:: id_generator:: IdGenerator ;
64+ use databend_common_meta_app:: row_access_policy:: row_access_policy_table_id_ident:: RowAccessPolicyIdTableId ;
65+ use databend_common_meta_app:: row_access_policy:: RowAccessPolicyTableId ;
66+ use databend_common_meta_app:: row_access_policy:: RowAccessPolicyTableIdIdent ;
6467use databend_common_meta_app:: schema:: catalog_id_ident:: CatalogId ;
6568use databend_common_meta_app:: schema:: catalog_name_ident:: CatalogNameIdentRaw ;
6669use databend_common_meta_app:: schema:: database_name_ident:: DatabaseNameIdent ;
@@ -147,6 +150,9 @@ use databend_common_meta_app::schema::RenameTableReq;
147150use databend_common_meta_app:: schema:: SetTableColumnMaskPolicyAction ;
148151use databend_common_meta_app:: schema:: SetTableColumnMaskPolicyReply ;
149152use databend_common_meta_app:: schema:: SetTableColumnMaskPolicyReq ;
153+ use databend_common_meta_app:: schema:: SetTableRowAccessPolicyAction ;
154+ use databend_common_meta_app:: schema:: SetTableRowAccessPolicyReply ;
155+ use databend_common_meta_app:: schema:: SetTableRowAccessPolicyReq ;
150156use databend_common_meta_app:: schema:: TableCopiedFileNameIdent ;
151157use databend_common_meta_app:: schema:: TableId ;
152158use databend_common_meta_app:: schema:: TableIdHistoryIdent ;
@@ -207,6 +213,7 @@ use ConditionResult::Eq;
207213use crate :: assert_table_exist;
208214use crate :: db_has_to_exist;
209215use crate :: deserialize_struct;
216+ use crate :: errors:: TableError ;
210217use crate :: fetch_id;
211218use crate :: get_u64_value;
212219use crate :: kv_app_error:: KVAppError ;
@@ -2469,6 +2476,95 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
24692476 }
24702477 }
24712478
2479+ #[ logcall:: logcall]
2480+ #[ fastrace:: trace]
2481+ async fn set_table_row_access_policy (
2482+ & self ,
2483+ req : SetTableRowAccessPolicyReq ,
2484+ ) -> Result < Result < SetTableRowAccessPolicyReply , TableError > , MetaTxnError > {
2485+ debug ! ( req : ? =( & req) ; "SchemaApi: {}" , func_name!( ) ) ;
2486+ let tbid = TableId {
2487+ table_id : req. table_id ,
2488+ } ;
2489+
2490+ let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
2491+ loop {
2492+ trials. next ( ) . unwrap ( ) ?. await ;
2493+
2494+ let seq_meta = self . get_pb ( & tbid) . await ?;
2495+
2496+ debug ! ( ident : % =( & tbid) ; "set_table_row_access_policy" ) ;
2497+
2498+ let Some ( seq_meta) = seq_meta else {
2499+ return Ok ( Err ( TableError :: UnknownTableId {
2500+ tenant : req. tenant . tenant_name ( ) . to_string ( ) ,
2501+ table_id : req. table_id ,
2502+ context : "set_table_row_access_policy" . to_string ( ) ,
2503+ } ) ) ;
2504+ } ;
2505+
2506+ // upsert row access policy
2507+ let table_meta = seq_meta. data ;
2508+ let mut new_table_meta = table_meta. clone ( ) ;
2509+ let id = RowAccessPolicyIdTableId {
2510+ policy_id : req. policy_id ,
2511+ table_id : req. table_id ,
2512+ } ;
2513+ let ident = RowAccessPolicyTableIdIdent :: new_generic ( req. tenant . clone ( ) , id) ;
2514+
2515+ let mut txn_req = TxnRequest :: default ( ) ;
2516+
2517+ txn_req
2518+ . condition
2519+ . push ( txn_cond_seq ( & tbid, Eq , seq_meta. seq ) ) ;
2520+ match & req. action {
2521+ SetTableRowAccessPolicyAction :: Set ( new_mask_name) => {
2522+ if table_meta. row_access_policy . is_some ( ) {
2523+ return Ok ( Err ( TableError :: AlterTableError {
2524+ tenant : req. tenant . tenant_name ( ) . to_string ( ) ,
2525+ context : "Table already has a ROW_ACCESS_POLICY. Only one ROW_ACCESS_POLICY is allowed at a time." . to_string ( ) ,
2526+ } ) ) ;
2527+ }
2528+ new_table_meta. row_access_policy = Some ( new_mask_name. to_string ( ) ) ;
2529+ txn_req. if_then = vec ! [
2530+ txn_op_put( & tbid, serialize_struct( & new_table_meta) ?) , /* tb_id -> tb_meta row access policy Some */
2531+ txn_op_put( & ident, serialize_struct( & RowAccessPolicyTableId { } ) ?) , /* add policy_tb_id */
2532+ ] ;
2533+ }
2534+ SetTableRowAccessPolicyAction :: Unset ( old_policy) => {
2535+ // drop row access policy and table does not have row access policy
2536+ if let Some ( policy) = & table_meta. row_access_policy {
2537+ if policy != old_policy {
2538+ return Ok ( Err ( TableError :: AlterTableError {
2539+ tenant : req. tenant . tenant_name ( ) . to_string ( ) ,
2540+ context : format ! ( "Unknown row access policy {} on table" , policy) ,
2541+ } ) ) ;
2542+ }
2543+ } else {
2544+ return Ok ( Ok ( SetTableRowAccessPolicyReply { } ) ) ;
2545+ }
2546+ new_table_meta. row_access_policy = None ;
2547+ txn_req. if_then = vec ! [
2548+ txn_op_put( & tbid, serialize_struct( & new_table_meta) ?) , /* tb_id -> tb_meta row access policy None */
2549+ txn_op_del( & ident) , // table drop row access policy, del policy_tb_id
2550+ ] ;
2551+ }
2552+ }
2553+
2554+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
2555+
2556+ debug ! (
2557+ id : ? =( & tbid) ,
2558+ succ = succ;
2559+ "set_table_row_access_policy"
2560+ ) ;
2561+
2562+ if succ {
2563+ return Ok ( Ok ( SetTableRowAccessPolicyReply { } ) ) ;
2564+ }
2565+ }
2566+ }
2567+
24722568 #[ logcall:: logcall]
24732569 #[ fastrace:: trace]
24742570 async fn create_table_index ( & self , req : CreateTableIndexReq ) -> Result < ( ) , KVAppError > {
0 commit comments