@@ -18,6 +18,7 @@ use std::collections::HashSet;
1818use common_meta_app:: app_error:: AppError ;
1919use common_meta_app:: app_error:: ShareAccountsAlreadyExists ;
2020use common_meta_app:: app_error:: ShareAlreadyExists ;
21+ use common_meta_app:: app_error:: ShareEndpointAlreadyExists ;
2122use common_meta_app:: app_error:: TxnRetryMaxTimes ;
2223use common_meta_app:: app_error:: UnknownShare ;
2324use common_meta_app:: app_error:: UnknownShareAccounts ;
@@ -64,6 +65,8 @@ use crate::table_has_to_exist;
6465use crate :: txn_cond_seq;
6566use crate :: txn_op_del;
6667use crate :: txn_op_put;
68+ use crate :: util:: get_share_endpoint_id_to_name_or_err;
69+ use crate :: util:: get_share_endpoint_or_err;
6770use crate :: util:: list_tables_from_unshare_db;
6871use crate :: ShareApi ;
6972use crate :: TXN_MAX_RETRY_TIMES ;
@@ -150,7 +153,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> ShareApi for KV {
150153 name = debug( & name_key) ,
151154 id = debug( & id_key) ,
152155 succ = display( succ) ,
153- "create_database "
156+ "create_share "
154157 ) ;
155158
156159 if succ {
@@ -956,6 +959,201 @@ impl<KV: kvapi::KVApi<Error = MetaError>> ShareApi for KV {
956959 }
957960 Ok ( GetObjectGrantPrivilegesReply { privileges } )
958961 }
962+
963+ async fn create_share_endpoint (
964+ & self ,
965+ req : CreateShareEndpointReq ,
966+ ) -> Result < CreateShareEndpointReply , KVAppError > {
967+ debug ! ( req = debug( & req) , "ShareApi: {}" , func_name!( ) ) ;
968+
969+ let name_key = & req. endpoint ;
970+ let mut retry = 0 ;
971+ while retry < TXN_MAX_RETRY_TIMES {
972+ retry += 1 ;
973+
974+ // Get share endpoint by name to ensure absence
975+ let ( share_endpoint_id_seq, share_endpoint_id) = get_u64_value ( self , name_key) . await ?;
976+ debug ! (
977+ share_endpoint_id_seq,
978+ share_endpoint_id,
979+ ?name_key,
980+ "get_share_endpoint"
981+ ) ;
982+
983+ if share_endpoint_id_seq > 0 {
984+ return if req. if_not_exists {
985+ Ok ( CreateShareEndpointReply { share_endpoint_id } )
986+ } else {
987+ Err ( KVAppError :: AppError ( AppError :: ShareEndpointAlreadyExists (
988+ ShareEndpointAlreadyExists :: new (
989+ & name_key. endpoint ,
990+ format ! ( "create share endpoint: tenant: {}" , name_key. tenant) ,
991+ ) ,
992+ ) ) )
993+ } ;
994+ }
995+
996+ // Create share endpoint by inserting these record:
997+ // (tenant, endpoint) -> share_endpoint_id
998+ // (share_endpoint_id) -> share_endpoint_meta
999+ // (share) -> (tenant,share_name)
1000+
1001+ let share_endpoint_id = fetch_id ( self , IdGenerator :: share_endpoint_id ( ) ) . await ?;
1002+ let id_key = ShareEndpointId { share_endpoint_id } ;
1003+ let id_to_name_key = ShareEndpointIdToName { share_endpoint_id } ;
1004+
1005+ debug ! (
1006+ share_endpoint_id,
1007+ name_key = debug( & name_key) ,
1008+ "new share endpoint id"
1009+ ) ;
1010+
1011+ // Create share endpoint by transaction.
1012+ {
1013+ let share_endpoint_meta = ShareEndpointMeta :: new ( & req) ;
1014+ let txn_req = TxnRequest {
1015+ condition : vec ! [
1016+ txn_cond_seq( name_key, Eq , 0 ) ,
1017+ txn_cond_seq( & id_to_name_key, Eq , 0 ) ,
1018+ ] ,
1019+ if_then : vec ! [
1020+ txn_op_put( name_key, serialize_u64( share_endpoint_id) ?) , /* (tenant, share_endpoint_name) -> share_endpoint_id */
1021+ txn_op_put( & id_key, serialize_struct( & share_endpoint_meta) ?) , /* (share_endpoint_id) -> share_endpoint_meta */
1022+ txn_op_put( & id_to_name_key, serialize_struct( name_key) ?) , /* __fd_share_endpoint_id_to_name/<share_endpoint_id> -> (tenant,share_endpoint_name) */
1023+ ] ,
1024+ else_then : vec ! [ ] ,
1025+ } ;
1026+
1027+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1028+
1029+ debug ! (
1030+ name = debug( & name_key) ,
1031+ id = debug( & id_key) ,
1032+ succ = display( succ) ,
1033+ "create_share_endpoint"
1034+ ) ;
1035+
1036+ if succ {
1037+ return Ok ( CreateShareEndpointReply { share_endpoint_id } ) ;
1038+ }
1039+ }
1040+ }
1041+
1042+ Err ( KVAppError :: AppError ( AppError :: TxnRetryMaxTimes (
1043+ TxnRetryMaxTimes :: new ( "create_share_endpoint" , TXN_MAX_RETRY_TIMES ) ,
1044+ ) ) )
1045+ }
1046+
1047+ async fn get_share_endpoint (
1048+ & self ,
1049+ req : GetShareEndpointReq ,
1050+ ) -> Result < GetShareEndpointReply , KVAppError > {
1051+ let mut share_endpoint_meta_vec = vec ! [ ] ;
1052+
1053+ let tenant_share_endpoint_name_key = ShareEndpointIdent {
1054+ tenant : req. tenant . clone ( ) ,
1055+ endpoint : match req. endpoint {
1056+ Some ( ref endpoint) => endpoint. clone ( ) ,
1057+ None => "" . to_string ( ) ,
1058+ } ,
1059+ } ;
1060+ let share_endpoints = list_keys ( self , & tenant_share_endpoint_name_key) . await ?;
1061+
1062+ for share_endpoint in share_endpoints {
1063+ let ( _seq, share_endpoint_id) = get_u64_value ( self , & share_endpoint) . await ?;
1064+ let id_key = ShareEndpointId { share_endpoint_id } ;
1065+ let ( _seq, share_endpoint_meta) : ( u64 , Option < ShareEndpointMeta > ) =
1066+ get_pb_value ( self , & id_key) . await ?;
1067+ if let Some ( share_endpoint_meta) = share_endpoint_meta {
1068+ share_endpoint_meta_vec. push ( ( share_endpoint, share_endpoint_meta) ) ;
1069+ }
1070+ }
1071+
1072+ Ok ( GetShareEndpointReply {
1073+ share_endpoint_meta_vec,
1074+ } )
1075+ }
1076+
1077+ async fn drop_share_endpoint (
1078+ & self ,
1079+ req : DropShareEndpointReq ,
1080+ ) -> Result < DropShareEndpointReply , KVAppError > {
1081+ debug ! ( req = debug( & req) , "ShareApi: {}" , func_name!( ) ) ;
1082+
1083+ let name_key = & req. endpoint ;
1084+ let mut retry = 0 ;
1085+ while retry < TXN_MAX_RETRY_TIMES {
1086+ retry += 1 ;
1087+
1088+ let (
1089+ share_endpoint_id_seq,
1090+ share_endpoint_id,
1091+ share_endpoint_meta_seq,
1092+ _share_endpoint_meta,
1093+ ) = get_share_endpoint_or_err (
1094+ self ,
1095+ name_key,
1096+ format ! ( "drop_share_endpoint: {}" , & name_key) ,
1097+ )
1098+ . await ?;
1099+
1100+ let ( share_endpoint_name_seq, _share_endpoint) = get_share_endpoint_id_to_name_or_err (
1101+ self ,
1102+ share_endpoint_id,
1103+ format ! ( "drop_share_endpoint: {}" , & name_key) ,
1104+ )
1105+ . await ?;
1106+
1107+ // Delete share endpoint by these operations:
1108+ // del (tenant, share_endpoint)
1109+ // del share_endpoint_id
1110+ // del (share_endpoint_id) -> (tenant, share_endpoint)
1111+
1112+ let mut condition = vec ! [ ] ;
1113+ let mut if_then = vec ! [ ] ;
1114+
1115+ let share_id_key = ShareEndpointId { share_endpoint_id } ;
1116+ let id_name_key = ShareEndpointIdToName { share_endpoint_id } ;
1117+
1118+ debug ! (
1119+ share_endpoint_id,
1120+ name_key = debug( & name_key) ,
1121+ "drop_share_endpoint"
1122+ ) ;
1123+
1124+ {
1125+ condition. push ( txn_cond_seq ( name_key, Eq , share_endpoint_id_seq) ) ;
1126+ condition. push ( txn_cond_seq ( & share_id_key, Eq , share_endpoint_meta_seq) ) ;
1127+ condition. push ( txn_cond_seq ( & id_name_key, Eq , share_endpoint_name_seq) ) ;
1128+ if_then. push ( txn_op_del ( name_key) ) ; // del (tenant, share_endpoint)
1129+ if_then. push ( txn_op_del ( & share_id_key) ) ; // del share_endpoint_id
1130+ if_then. push ( txn_op_del ( & id_name_key) ) ; // del (share_endpoint_id) -> (tenant, share_endpoint)
1131+
1132+ let txn_req = TxnRequest {
1133+ condition,
1134+ if_then,
1135+ else_then : vec ! [ ] ,
1136+ } ;
1137+
1138+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1139+
1140+ debug ! (
1141+ name = debug( & name_key) ,
1142+ id = debug( & share_id_key) ,
1143+ succ = display( succ) ,
1144+ "drop_share_endpoint"
1145+ ) ;
1146+
1147+ if succ {
1148+ return Ok ( DropShareEndpointReply { } ) ;
1149+ }
1150+ }
1151+ }
1152+
1153+ Err ( KVAppError :: AppError ( AppError :: TxnRetryMaxTimes (
1154+ TxnRetryMaxTimes :: new ( "drop_share_endpoint" , TXN_MAX_RETRY_TIMES ) ,
1155+ ) ) )
1156+ }
9591157}
9601158
9611159async fn get_share_database_name (
0 commit comments