@@ -33,6 +33,7 @@ use databend_common_management::RoleApi;
3333use databend_common_meta_app:: principal:: OwnershipObject ;
3434use databend_common_meta_app:: schema:: CommitTableMetaReq ;
3535use databend_common_meta_app:: schema:: CreateOption ;
36+ use databend_common_meta_app:: schema:: CreateTableReply ;
3637use databend_common_meta_app:: schema:: CreateTableReq ;
3738use databend_common_meta_app:: schema:: TableIdent ;
3839use databend_common_meta_app:: schema:: TableIndexType ;
@@ -41,6 +42,7 @@ use databend_common_meta_app::schema::TableMeta;
4142use databend_common_meta_app:: schema:: TableNameIdent ;
4243use databend_common_meta_app:: schema:: TablePartition ;
4344use databend_common_meta_app:: schema:: TableStatistics ;
45+ use databend_common_meta_app:: tenant:: Tenant ;
4446use databend_common_meta_types:: MatchSeq ;
4547use databend_common_pipeline_core:: always_callback;
4648use databend_common_pipeline_core:: ExecutionInfo ;
@@ -228,22 +230,7 @@ impl CreateTableInterpreter {
228230 let db_id = reply. db_id ;
229231
230232 if !req. table_meta . options . contains_key ( OPT_KEY_TEMP_PREFIX ) {
231- // grant the ownership of the table to the current role.
232- let current_role = self . ctx . get_current_role ( ) ;
233- if let Some ( current_role) = current_role {
234- let role_api = UserApiProvider :: instance ( ) . role_api ( & tenant) ;
235- role_api
236- . grant_ownership (
237- & OwnershipObject :: Table {
238- catalog_name : self . plan . catalog . clone ( ) ,
239- db_id,
240- table_id,
241- } ,
242- & current_role. name ,
243- )
244- . await ?;
245- RoleCacheManager :: instance ( ) . invalidate_cache ( & tenant) ;
246- }
233+ self . process_ownership ( & tenant, reply) . await ?;
247234 }
248235
249236 // If the table creation query contains column definitions, like 'CREATE TABLE t1(a int) AS SELECT * from t2',
@@ -336,6 +323,42 @@ impl CreateTableInterpreter {
336323 Ok ( pipeline)
337324 }
338325
326+ async fn process_ownership ( & self , tenant : & Tenant , reply : CreateTableReply ) -> Result < ( ) > {
327+ // grant the ownership of the table to the current role.
328+ let mut invalid_cache = false ;
329+ let current_role = self . ctx . get_current_role ( ) ;
330+ let role_api = UserApiProvider :: instance ( ) . role_api ( tenant) ;
331+ if let Some ( current_role) = current_role {
332+ role_api
333+ . grant_ownership (
334+ & OwnershipObject :: Table {
335+ catalog_name : self . plan . catalog . clone ( ) ,
336+ db_id : reply. db_id ,
337+ table_id : reply. table_id ,
338+ } ,
339+ & current_role. name ,
340+ )
341+ . await ?;
342+ invalid_cache = true ;
343+ }
344+
345+ // if old_table_id is some means create or replace is success, we should delete the old table id's ownership key
346+ if let Some ( old_table_id) = reply. old_table_id {
347+ role_api
348+ . revoke_ownership ( & OwnershipObject :: Table {
349+ catalog_name : self . plan . catalog . clone ( ) ,
350+ db_id : reply. db_id ,
351+ table_id : old_table_id,
352+ } )
353+ . await ?;
354+ invalid_cache = true ;
355+ }
356+ if invalid_cache {
357+ RoleCacheManager :: instance ( ) . invalidate_cache ( tenant) ;
358+ }
359+ Ok ( ( ) )
360+ }
361+
339362 #[ async_backtrace:: framed]
340363 async fn create_table ( & self ) -> Result < PipelineBuildResult > {
341364 let catalog = self . ctx . get_catalog ( self . plan . catalog . as_str ( ) ) . await ?;
@@ -390,27 +413,10 @@ impl CreateTableInterpreter {
390413 self . register_temp_table ( prefix) . await ?;
391414 }
392415
416+ // iceberg table do not need to generate ownership.
393417 if !req. table_meta . options . contains_key ( OPT_KEY_TEMP_PREFIX ) && !catalog. is_external ( ) {
394- // iceberg table do not need to generate ownership.
395- // grant the ownership of the table to the current role, the above req.table_meta.owner could be removed in the future.
396- if let Some ( current_role) = self . ctx . get_current_role ( ) {
397- let tenant = self . ctx . get_tenant ( ) ;
398- let db = catalog. get_database ( & tenant, & self . plan . database ) . await ?;
399- let db_id = db. get_db_info ( ) . database_id . db_id ;
400-
401- let role_api = UserApiProvider :: instance ( ) . role_api ( & tenant) ;
402- role_api
403- . grant_ownership (
404- & OwnershipObject :: Table {
405- catalog_name : self . plan . catalog . clone ( ) ,
406- db_id,
407- table_id : reply. table_id ,
408- } ,
409- & current_role. name ,
410- )
411- . await ?;
412- RoleCacheManager :: instance ( ) . invalidate_cache ( & tenant) ;
413- }
418+ let tenant = self . ctx . get_tenant ( ) ;
419+ self . process_ownership ( & tenant, reply) . await ?;
414420 }
415421
416422 Ok ( PipelineBuildResult :: create ( ) )
0 commit comments