Skip to content

Commit d8c9eae

Browse files
authored
feat(query): support create/drop/desc row policy (#18490)
* feat(query):support create/drop/desc row policy * fix conversation * Introduce Experimental Feature Gate enable_experimental_row_access_policy default disable * RowAccessPolicyTableIdList -> RowAccessPolicyTableIdIdent __fd_row_access_policy_apply_table_id/tenant/<policy_id>/<table_id> * fix conversation * optimize Error
1 parent 401d89d commit d8c9eae

File tree

62 files changed

+2209
-37
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2209
-37
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ databend-enterprise-hilbert-clustering = { path = "src/query/ee_features/hilbert
200200
databend-enterprise-meta = { path = "src/meta/ee" }
201201
databend-enterprise-query = { path = "src/query/ee" }
202202
databend-enterprise-resources-management = { path = "src/query/ee_features/resources_management" }
203+
databend-enterprise-row-access-policy-feature = { path = "src/query/ee_features/row_access_policy" }
203204
databend-enterprise-storage-encryption = { path = "src/query/ee_features/storage_encryption" }
204205
databend-enterprise-storage-quota = { path = "src/query/ee_features/storage_quota" }
205206
databend-enterprise-stream-handler = { path = "src/query/ee_features/stream_handler" }

src/common/exception/src/exception_code.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ build_exceptions! {
283283
UnmatchMaskPolicyReturnType(1121),
284284
/// Empty share endpoint config
285285
EmptyShareEndpointConfig(1130),
286+
/// Unknown row policy
287+
UnknownRowAccessPolicy(1131),
286288
}
287289

288290
// Sequence Errors [1124-1126, 3101]
@@ -519,6 +521,8 @@ build_exceptions! {
519521
CommitTableMetaError(2322),
520522
/// Create as drop table without drop time
521523
CreateAsDropTableWithoutDropTime(2323),
524+
/// Row Policy already exists
525+
RowAccessPolicyAlreadyExists(2324),
522526
}
523527

524528
// Stage and Connection Errors [2501-2505, 2510-2512]

src/common/license/src/license.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub enum Feature {
8989
MaxNodeQuota(usize),
9090
#[serde(alias = "max_cpu_quota", alias = "MAX_CPU_QUOTA")]
9191
MaxCpuQuota(usize),
92+
#[serde(alias = "row_access_policy", alias = "ROW_ACCESS_POLICY")]
93+
RowAccessPolicy,
9294
#[serde(other)]
9395
Unknown,
9496
}
@@ -141,6 +143,7 @@ impl fmt::Display for Feature {
141143
Feature::SystemHistory => write!(f, "system_history"),
142144
Feature::VectorIndex => write!(f, "vector_index"),
143145
Feature::PrivateTask => write!(f, "private_task"),
146+
Feature::RowAccessPolicy => write!(f, "row_access_policy"),
144147
Feature::Unknown => write!(f, "unknown"),
145148
Feature::MaxCpuQuota(v) => write!(f, "max_cpu_quota({})", v),
146149
Feature::MaxNodeQuota(v) => write!(f, "max_node_quota({})", v),
@@ -191,6 +194,7 @@ impl Feature {
191194
| (Feature::LicenseInfo, Feature::LicenseInfo)
192195
| (Feature::Stream, Feature::Stream)
193196
| (Feature::DataMask, Feature::DataMask)
197+
| (Feature::RowAccessPolicy, Feature::RowAccessPolicy)
194198
| (Feature::InvertedIndex, Feature::InvertedIndex)
195199
| (Feature::VirtualColumn, Feature::VirtualColumn)
196200
| (Feature::AttacheTable, Feature::AttacheTable)
@@ -396,6 +400,11 @@ mod tests {
396400
serde_json::from_str::<Feature>("{\"MaxNodeQuota\": 1}").unwrap()
397401
);
398402

403+
assert_eq!(
404+
Feature::RowAccessPolicy,
405+
serde_json::from_str::<Feature>("\"RowAccessPolicy\"").unwrap()
406+
);
407+
399408
assert_eq!(
400409
Feature::Unknown,
401410
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
@@ -433,11 +442,12 @@ mod tests {
433442
Feature::WorkloadGroup,
434443
Feature::SystemHistory,
435444
Feature::PrivateTask,
445+
Feature::RowAccessPolicy,
436446
]),
437447
};
438448

439449
assert_eq!(
440-
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,private_task,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
450+
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,ngram_index,private_task,row_access_policy,storage_encryption,storage_quota(storage_usage: 1),stream,system_history,vacuum,virtual_column,workload_group] }",
441451
license_info.to_string()
442452
);
443453
}

src/meta/api/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ pub mod txn_backoff;
3636
pub mod util;
3737

3838
pub mod crud;
39+
mod row_access_policy_api;
40+
mod row_access_policy_api_impl;
3941
mod sequence_api_impl;
4042
pub(crate) mod sequence_nextval_impl;
4143

4244
pub use data_mask_api::DatamaskApi;
45+
pub use row_access_policy_api::RowAccessPolicyApi;
4346
pub use schema_api::SchemaApi;
4447
pub use schema_api_test_suite::SchemaApiTestSuite;
4548
pub use sequence_api::SequenceApi;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::row_access_policy::row_access_policy_name_ident;
16+
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply;
17+
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq;
18+
use databend_common_meta_app::row_access_policy::RowAccessPolicyId;
19+
use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta;
20+
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
21+
use databend_common_meta_app::tenant_key::errors::ExistError;
22+
use databend_common_meta_types::MetaError;
23+
use databend_common_meta_types::SeqV;
24+
25+
use crate::meta_txn_error::MetaTxnError;
26+
27+
#[async_trait::async_trait]
28+
pub trait RowAccessPolicyApi: Send + Sync {
29+
async fn create_row_access(
30+
&self,
31+
req: CreateRowAccessPolicyReq,
32+
) -> Result<
33+
Result<CreateRowAccessPolicyReply, ExistError<row_access_policy_name_ident::Resource>>,
34+
MetaTxnError,
35+
>;
36+
37+
/// On success, returns the dropped id and row policy.
38+
/// Returning None, means nothing is removed.
39+
async fn drop_row_access(
40+
&self,
41+
name_ident: &RowAccessPolicyNameIdent,
42+
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError>;
43+
44+
async fn get_row_access(
45+
&self,
46+
name_ident: &RowAccessPolicyNameIdent,
47+
) -> Result<Option<SeqV<RowAccessPolicyMeta>>, MetaError>;
48+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::id_generator::IdGenerator;
16+
use databend_common_meta_app::row_access_policy::row_access_policy_name_ident;
17+
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply;
18+
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq;
19+
use databend_common_meta_app::row_access_policy::RowAccessPolicyId;
20+
use databend_common_meta_app::row_access_policy::RowAccessPolicyIdIdent;
21+
use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta;
22+
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
23+
use databend_common_meta_app::tenant_key::errors::ExistError;
24+
use databend_common_meta_app::KeyWithTenant;
25+
use databend_common_meta_kvapi::kvapi;
26+
use databend_common_meta_types::MetaError;
27+
use databend_common_meta_types::SeqV;
28+
use databend_common_meta_types::TxnRequest;
29+
use fastrace::func_name;
30+
use log::debug;
31+
32+
use crate::fetch_id;
33+
use crate::kv_pb_api::KVPbApi;
34+
use crate::meta_txn_error::MetaTxnError;
35+
use crate::row_access_policy_api::RowAccessPolicyApi;
36+
use crate::send_txn;
37+
use crate::txn_backoff::txn_backoff;
38+
use crate::txn_cond_eq_seq;
39+
use crate::util::txn_delete_exact;
40+
use crate::util::txn_op_put_pb;
41+
42+
/// RowAccessPolicyApi is implemented upon kvapi::KVApi.
43+
/// Thus every type that impl kvapi::KVApi impls RowAccessPolicyApi.
44+
#[tonic::async_trait]
45+
impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
46+
async fn create_row_access(
47+
&self,
48+
req: CreateRowAccessPolicyReq,
49+
) -> Result<
50+
Result<CreateRowAccessPolicyReply, ExistError<row_access_policy_name_ident::Resource>>,
51+
MetaTxnError,
52+
> {
53+
debug!(req :? =(&req); "RowAccessPolicyApi: {}", func_name!());
54+
55+
let name_ident = &req.name;
56+
57+
let id = fetch_id(self, IdGenerator::row_access_id()).await?;
58+
let mut trials = txn_backoff(None, func_name!());
59+
let id = loop {
60+
trials.next().unwrap()?.await;
61+
62+
let mut txn = TxnRequest::default();
63+
64+
let res = self.get_id_and_value(name_ident).await?;
65+
debug!(res :? = res, name_key :? =(name_ident); "create_row_access");
66+
67+
let mut curr_seq = 0;
68+
69+
if let Some((seq_id, seq_meta)) = res {
70+
if req.can_replace {
71+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
72+
73+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
74+
75+
// TODO(eason): need to remove row policy from table meta
76+
77+
curr_seq = seq_id.seq;
78+
} else {
79+
return Ok(Err(name_ident.exist_error(func_name!())));
80+
}
81+
}
82+
83+
// Create row policy by inserting these record:
84+
// name -> id
85+
// id -> policy
86+
87+
let id = RowAccessPolicyId::new(id);
88+
let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), id);
89+
90+
debug!(
91+
id :? =(&id_ident),
92+
name_key :? =(name_ident);
93+
"new RowAccessPolicy id"
94+
);
95+
96+
{
97+
let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone();
98+
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
99+
txn.if_then.extend(vec![
100+
txn_op_put_pb(name_ident, &id, None)?, // name -> policy_id
101+
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
102+
]);
103+
104+
let (succ, _responses) = send_txn(self, txn).await?;
105+
106+
debug!(
107+
name :? =(name_ident),
108+
id :? =(&id_ident),
109+
succ = succ;
110+
"create_row_access"
111+
);
112+
113+
if succ {
114+
break id;
115+
}
116+
}
117+
};
118+
119+
Ok(Ok(CreateRowAccessPolicyReply { id: *id }))
120+
}
121+
122+
async fn drop_row_access(
123+
&self,
124+
name_ident: &RowAccessPolicyNameIdent,
125+
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError> {
126+
debug!(name_ident :? =(name_ident); "RowAccessPolicyApi: {}", func_name!());
127+
128+
let mut trials = txn_backoff(None, func_name!());
129+
loop {
130+
trials.next().unwrap()?.await;
131+
132+
let mut txn = TxnRequest::default();
133+
134+
let res = self.get_id_and_value(name_ident).await?;
135+
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());
136+
137+
let Some((seq_id, seq_meta)) = res else {
138+
return Ok(None);
139+
};
140+
141+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
142+
143+
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
144+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
145+
146+
// TODO(eason): need to remove row policy from table meta
147+
148+
let (succ, _responses) = send_txn(self, txn).await?;
149+
debug!(succ = succ;"{}", func_name!());
150+
151+
if succ {
152+
return Ok(Some((seq_id, seq_meta)));
153+
}
154+
}
155+
}
156+
157+
async fn get_row_access(
158+
&self,
159+
name_ident: &RowAccessPolicyNameIdent,
160+
) -> Result<Option<SeqV<RowAccessPolicyMeta>>, MetaError> {
161+
debug!(req :? =(&name_ident); "RowAccessPolicyApi: {}", func_name!());
162+
163+
let res = self.get_id_and_value(name_ident).await?;
164+
165+
Ok(res.map(|(_, seq_meta)| seq_meta))
166+
}
167+
}

src/meta/app/src/app_error.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_meta_types::MatchSeq;
2020
use crate::data_mask::data_mask_name_ident;
2121
use crate::principal::procedure_name_ident;
2222
use crate::principal::ProcedureIdentity;
23+
use crate::row_access_policy::row_access_policy_name_ident;
2324
use crate::schema::catalog_name_ident;
2425
use crate::schema::dictionary_name_ident;
2526
use crate::schema::index_name_ident;
@@ -1088,6 +1089,9 @@ pub enum AppError {
10881089
#[error(transparent)]
10891090
UnknownDataMask(#[from] UnknownError<data_mask_name_ident::Resource>),
10901091

1092+
#[error(transparent)]
1093+
UnknownRowAccessPolicy(#[from] UnknownError<row_access_policy_name_ident::Resource>),
1094+
10911095
#[error(transparent)]
10921096
UnmatchColumnDataType(#[from] UnmatchColumnDataType),
10931097

@@ -1467,12 +1471,6 @@ impl AppErrorMessage for IndexColumnIdNotFound {
14671471
}
14681472
}
14691473

1470-
impl AppErrorMessage for UnknownDatamask {
1471-
fn message(&self) -> String {
1472-
format!("Datamask '{}' does not exists", self.name)
1473-
}
1474-
}
1475-
14761474
impl AppErrorMessage for UnmatchColumnDataType {
14771475
fn message(&self) -> String {
14781476
format!(
@@ -1627,6 +1625,9 @@ impl From<AppError> for ErrorCode {
16271625

16281626
AppError::DatamaskAlreadyExists(err) => ErrorCode::DatamaskAlreadyExists(err.message()),
16291627
AppError::UnknownDataMask(err) => ErrorCode::UnknownDatamask(err.message()),
1628+
AppError::UnknownRowAccessPolicy(err) => {
1629+
ErrorCode::UnknownRowAccessPolicy(err.message())
1630+
}
16301631

16311632
AppError::UnmatchColumnDataType(err) => ErrorCode::UnmatchColumnDataType(err.message()),
16321633
AppError::UnmatchMaskPolicyReturnType(err) => {

0 commit comments

Comments
 (0)