Skip to content

Commit e156b20

Browse files
authored
refactor: optimize list some OwnershipObjects (#18455)
1 parent 5f068bc commit e156b20

35 files changed

+257
-33
lines changed

src/meta/app/src/principal/tenant_ownership_object_ident.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ mod kvapi_impl {
105105

106106
#[cfg(test)]
107107
mod tests {
108+
use databend_common_meta_kvapi::kvapi;
108109
use databend_common_meta_kvapi::kvapi::Key;
109110

110111
use crate::principal::OwnershipObject;
@@ -272,6 +273,21 @@ mod tests {
272273
}
273274
}
274275

276+
#[test]
277+
fn test_ownership_seq_list_key() {
278+
use databend_common_meta_kvapi::kvapi::Key;
279+
let obj = OwnershipObject::Sequence {
280+
name: "seq1".to_string(),
281+
};
282+
283+
let ident = TenantOwnershipObjectIdent::new(Tenant::new_literal("tenant1"), obj);
284+
let dir_name = kvapi::DirName::new(ident);
285+
assert_eq!(
286+
dir_name.to_string_key(),
287+
"__fd_object_owners/tenant1/sequence-by-name"
288+
);
289+
}
290+
275291
#[test]
276292
fn test_tenant_ownership_object_with_key_space() {
277293
// TODO(xp): implement this test

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use databend_common_storage::StageFileInfo;
5757
use databend_common_storage::StageFilesInfo;
5858
use databend_common_storage::StorageMetrics;
5959
use databend_common_users::GrantObjectVisibilityChecker;
60+
use databend_common_users::Object;
6061
use databend_storages_common_session::SessionState;
6162
use databend_storages_common_session::TxnManagerRef;
6263
use databend_storages_common_table_meta::meta::Location;
@@ -234,6 +235,7 @@ pub trait TableContext: Send + Sync {
234235
async fn get_visibility_checker(
235236
&self,
236237
ignore_ownership: bool,
238+
object: Object,
237239
) -> Result<GrantObjectVisibilityChecker>;
238240
fn get_fuse_version(&self) -> String;
239241
fn get_format_settings(&self) -> Result<FormatSettings>;

src/query/management/src/role/role_api.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ pub trait RoleApi: Sync + Send {
3434

3535
async fn list_ownerships(&self) -> Result<Vec<SeqV<OwnershipInfo>>>;
3636

37+
async fn list_udf_ownerships(&self) -> Result<Vec<OwnershipInfo>>;
38+
async fn list_stage_ownerships(&self) -> Result<Vec<OwnershipInfo>>;
39+
async fn list_seq_ownerships(&self) -> Result<Vec<OwnershipInfo>>;
40+
async fn list_connection_ownerships(&self) -> Result<Vec<OwnershipInfo>>;
41+
async fn list_warehouse_ownerships(&self) -> Result<Vec<OwnershipInfo>>;
42+
3743
/// General role update.
3844
///
3945
/// It fetches the role that matches the specified seq number, update it in place, then write it back with the seq it sees.

src/query/management/src/role/role_mgr.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616

1717
use databend_common_base::base::tokio::sync::Mutex;
1818
use databend_common_exception::ErrorCode;
19+
use databend_common_meta_api::kv_pb_api::KVPbApi;
1920
use databend_common_meta_api::reply::unpack_txn_reply;
2021
use databend_common_meta_api::txn_backoff::txn_backoff;
2122
use databend_common_meta_api::txn_cond_seq;
@@ -35,6 +36,7 @@ use databend_common_meta_app::KeyWithTenant;
3536
use databend_common_meta_cache::Cache;
3637
use databend_common_meta_client::ClientHandle;
3738
use databend_common_meta_kvapi::kvapi;
39+
use databend_common_meta_kvapi::kvapi::DirName;
3840
use databend_common_meta_kvapi::kvapi::Key;
3941
use databend_common_meta_kvapi::kvapi::ListKVReply;
4042
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
@@ -48,6 +50,7 @@ use databend_common_meta_types::TxnRequest;
4850
use databend_common_meta_types::UpsertKV;
4951
use enumflags2::make_bitflags;
5052
use fastrace::func_name;
53+
use futures::TryStreamExt;
5154
use log::debug;
5255
use log::error;
5356
use log::info;
@@ -298,6 +301,80 @@ impl RoleApi for RoleMgr {
298301
Ok(r)
299302
}
300303

304+
#[async_backtrace::framed]
305+
#[fastrace::trace]
306+
async fn list_udf_ownerships(&self) -> databend_common_exception::Result<Vec<OwnershipInfo>> {
307+
let obj = OwnershipObject::UDF {
308+
name: "foo".to_string(),
309+
};
310+
311+
let ident = TenantOwnershipObjectIdent::new(self.tenant.clone(), obj);
312+
let dir_name = DirName::new(ident);
313+
let values = self.kv_api.list_pb_values(&dir_name).await?;
314+
let udfs = values.try_collect().await?;
315+
Ok(udfs)
316+
}
317+
318+
#[async_backtrace::framed]
319+
#[fastrace::trace]
320+
async fn list_stage_ownerships(&self) -> databend_common_exception::Result<Vec<OwnershipInfo>> {
321+
let obj = OwnershipObject::Stage {
322+
name: "s1".to_string(),
323+
};
324+
325+
let ident = TenantOwnershipObjectIdent::new(self.tenant.clone(), obj);
326+
let dir_name = DirName::new(ident);
327+
let values = self.kv_api.list_pb_values(&dir_name).await?;
328+
let stages = values.try_collect().await?;
329+
Ok(stages)
330+
}
331+
332+
#[async_backtrace::framed]
333+
#[fastrace::trace]
334+
async fn list_seq_ownerships(&self) -> databend_common_exception::Result<Vec<OwnershipInfo>> {
335+
let obj = OwnershipObject::Sequence {
336+
name: "seq1".to_string(),
337+
};
338+
339+
let ident = TenantOwnershipObjectIdent::new(self.tenant.clone(), obj);
340+
let dir_name = DirName::new(ident);
341+
let values = self.kv_api.list_pb_values(&dir_name).await?;
342+
let seqs = values.try_collect().await?;
343+
Ok(seqs)
344+
}
345+
346+
#[async_backtrace::framed]
347+
#[fastrace::trace]
348+
async fn list_connection_ownerships(
349+
&self,
350+
) -> databend_common_exception::Result<Vec<OwnershipInfo>> {
351+
let obj = OwnershipObject::Connection {
352+
name: "con".to_string(),
353+
};
354+
355+
let ident = TenantOwnershipObjectIdent::new(self.tenant.clone(), obj);
356+
let dir_name = DirName::new(ident);
357+
let values = self.kv_api.list_pb_values(&dir_name).await?;
358+
let conns = values.try_collect().await?;
359+
Ok(conns)
360+
}
361+
362+
#[async_backtrace::framed]
363+
#[fastrace::trace]
364+
async fn list_warehouse_ownerships(
365+
&self,
366+
) -> databend_common_exception::Result<Vec<OwnershipInfo>> {
367+
let obj = OwnershipObject::Warehouse {
368+
id: "w".to_string(),
369+
};
370+
371+
let ident = TenantOwnershipObjectIdent::new(self.tenant.clone(), obj);
372+
let dir_name = DirName::new(ident);
373+
let values = self.kv_api.list_pb_values(&dir_name).await?;
374+
let ws = values.try_collect().await?;
375+
Ok(ws)
376+
}
377+
301378
/// General role update.
302379
///
303380
/// It fetch the role that matches the specified seq number, update it in place, then write it back with the seq it sees.

src/query/service/src/interpreters/interpreter_connection_show.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use databend_common_exception::Result;
1919
use databend_common_expression::types::StringType;
2020
use databend_common_expression::DataBlock;
2121
use databend_common_expression::FromData;
22+
use databend_common_users::Object;
2223
use databend_common_users::UserApiProvider;
2324
use log::debug;
2425

@@ -64,7 +65,10 @@ impl Interpreter for ShowConnectionsInterpreter {
6465
.get_settings()
6566
.get_enable_experimental_connection_privilege_check()?
6667
{
67-
let visibility_checker = self.ctx.get_visibility_checker(false).await?;
68+
let visibility_checker = self
69+
.ctx
70+
.get_visibility_checker(false, Object::Connection)
71+
.await?;
6872
formats.retain(|c| visibility_checker.check_connection_visibility(&c.name));
6973
}
7074

src/query/service/src/interpreters/interpreter_show_warehouses.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_expression::Scalar;
2424
use databend_common_license::license::Feature;
2525
use databend_common_license::license_manager::LicenseManagerSwitch;
2626
use databend_common_management::WarehouseInfo;
27+
use databend_common_users::Object;
2728
use databend_enterprise_resources_management::ResourcesManagement;
2829

2930
use crate::interpreters::Interpreter;
@@ -63,7 +64,10 @@ impl Interpreter for ShowWarehousesInterpreter {
6364
let mut warehouses_status =
6465
ColumnBuilder::with_capacity(&DataType::String, warehouses.len());
6566

66-
let visibility_checker = self.ctx.get_visibility_checker(false).await?;
67+
let visibility_checker = self
68+
.ctx
69+
.get_visibility_checker(false, Object::Warehouse)
70+
.await?;
6771
for warehouse in warehouses {
6872
match warehouse {
6973
WarehouseInfo::SelfManaged(name) => {

src/query/service/src/pipelines/processors/transforms/transform_async_function.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_meta_app::schema::GetSequenceNextValueReq;
2626
use databend_common_meta_app::schema::SequenceIdent;
2727
use databend_common_pipeline_transforms::processors::AsyncTransform;
2828
use databend_common_storages_fuse::TableContext;
29+
use databend_common_users::Object;
2930

3031
use crate::pipelines::processors::transforms::transform_dictionary::DictionaryOperator;
3132
use crate::sessions::QueryContext;
@@ -171,7 +172,7 @@ impl TransformAsyncFunction {
171172
.get_settings()
172173
.get_enable_experimental_sequence_privilege_check()?
173174
{
174-
Some(ctx.get_visibility_checker(false).await?)
175+
Some(ctx.get_visibility_checker(false, Object::Sequence).await?)
175176
} else {
176177
None
177178
};

src/query/service/src/servers/http/v1/catalog/get_database_table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_ast::parser::Dialect;
1818
use databend_common_catalog::catalog::CatalogManager;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
21+
use databend_common_users::Object;
2122
use poem::error::InternalServerError;
2223
use poem::error::NotFound;
2324
use poem::error::Result as PoemResult;
@@ -58,7 +59,10 @@ async fn handle(
5859
table: String,
5960
) -> Result<GetDatabaseTableResponse> {
6061
let tenant = ctx.session.get_current_tenant();
61-
let visibility_checker = ctx.session.get_visibility_checker(false).await?;
62+
let visibility_checker = ctx
63+
.session
64+
.get_visibility_checker(false, Object::All)
65+
.await?;
6266

6367
let catalog = CatalogManager::instance().get_default_catalog(Default::default())?;
6468

src/query/service/src/servers/http/v1/catalog/list_database_table_fields.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use databend_common_catalog::catalog::CatalogManager;
1616
use databend_common_exception::ErrorCode;
1717
use databend_common_exception::Result;
18+
use databend_common_users::Object;
1819
use poem::error::InternalServerError;
1920
use poem::error::NotFound;
2021
use poem::error::Result as PoemResult;
@@ -47,7 +48,10 @@ async fn handle(
4748
table: String,
4849
) -> Result<ListDatabaseTableFieldsResponse> {
4950
let tenant = ctx.session.get_current_tenant();
50-
let visibility_checker = ctx.session.get_visibility_checker(false).await?;
51+
let visibility_checker = ctx
52+
.session
53+
.get_visibility_checker(false, Object::All)
54+
.await?;
5155

5256
let catalog = CatalogManager::instance().get_default_catalog(Default::default())?;
5357
let db = catalog.get_database(&tenant, &database).await?;

src/query/service/src/servers/http/v1/catalog/list_database_tables.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use chrono::Utc;
1717
use databend_common_catalog::catalog::CatalogManager;
1818
use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
20+
use databend_common_users::Object;
2021
use poem::error::InternalServerError;
2122
use poem::error::NotFound;
2223
use poem::error::Result as PoemResult;
@@ -50,7 +51,10 @@ pub struct TableInfo {
5051
#[async_backtrace::framed]
5152
async fn handle(ctx: &HttpQueryContext, database: String) -> Result<ListDatabaseTablesResponse> {
5253
let tenant = ctx.session.get_current_tenant();
53-
let visibility_checker = ctx.session.get_visibility_checker(false).await?;
54+
let visibility_checker = ctx
55+
.session
56+
.get_visibility_checker(false, Object::All)
57+
.await?;
5458

5559
let catalog = CatalogManager::instance().get_default_catalog(Default::default())?;
5660
let db = catalog.get_database(&tenant, &database).await?;

0 commit comments

Comments
 (0)