Skip to content

Commit 7a44344

Browse files
authored
refactor: add share endpoint manager to query share endpoint (#10843)
* fix: add share endpoing from share config; init endpoint only needed * fix: fix test fail * fix: fix test fail, remove stateful share test now * refactor: add share endpoint manager to query share endpoint * fix: make clippy happy * fix: make clippy happy * refactor: refactor by review comment
1 parent 73a08e0 commit 7a44344

File tree

14 files changed

+208
-198
lines changed

14 files changed

+208
-198
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/binaries/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ common-meta-stoerr = { path = "../meta/stoerr" }
5252
common-meta-store = { path = "../meta/store" }
5353
common-meta-types = { path = "../meta/types" }
5454
common-metrics = { path = "../common/metrics" }
55-
common-sharing = { path = "../query/sharing" }
5655
common-tracing = { path = "../common/tracing" }
5756
databend-meta = { path = "../meta/service" }
5857
databend-query = { path = "../query/service" }

src/binaries/query/main.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use common_exception::ErrorCode;
3030
use common_exception::Result;
3131
use common_meta_client::MIN_METASRV_SEMVER;
3232
use common_metrics::init_default_metrics_recorder;
33-
use common_sharing::init_share_endpoint_config;
3433
use common_tracing::set_panic_hook;
3534
use databend_query::api::HttpService;
3635
use databend_query::api::RpcService;
@@ -227,8 +226,6 @@ async fn main_entrypoint() -> Result<()> {
227226
);
228227
}
229228

230-
init_share_endpoint_config(&conf).await?;
231-
232229
// Print information to users.
233230
println!("Databend Query");
234231
println!();

src/meta/api/src/schema_api_impl.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
168168
)));
169169
}
170170

171-
// if create a database from a share, check if the share exists and grant access, update share_meta.
172-
if let Some(from_share) = &req.meta.from_share {
173-
if from_share.tenant == req.name_ident.tenant {
174-
return Err(KVAppError::AppError(AppError::WrongShare(WrongShare::new(
175-
req.name_ident.to_string_key(),
176-
))));
177-
}
178-
}
179-
180171
let mut retry = 0;
181172
while retry < TXN_MAX_RETRY_TIMES {
182173
retry += 1;

src/query/ast/src/parser/statement.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -984,32 +984,19 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
984984
~ #ident
985985
~ URL ~ "=" ~ #share_endpoint_uri_location
986986
~ TENANT ~ "=" ~ #ident
987-
~ ARGS ~ "=" ~ #options
987+
~ ( ARGS ~ "=" ~ #options)?
988988
~ ( COMMENT ~ "=" ~ #literal_string)?
989989
},
990-
|(
991-
_,
992-
_,
993-
_,
994-
opt_if_not_exists,
995-
endpoint,
996-
_,
997-
_,
998-
url,
999-
_,
1000-
_,
1001-
tenant,
1002-
_,
1003-
_,
1004-
args,
1005-
comment_opt,
1006-
)| {
990+
|(_, _, _, opt_if_not_exists, endpoint, _, _, url, _, _, tenant, args_opt, comment_opt)| {
1007991
Statement::CreateShareEndpoint(CreateShareEndpointStmt {
1008992
if_not_exists: opt_if_not_exists.is_some(),
1009993
endpoint,
1010994
url,
1011995
tenant,
1012-
args,
996+
args: match args_opt {
997+
Some(opt) => opt.2,
998+
None => BTreeMap::new(),
999+
},
10131000
comment: match comment_opt {
10141001
Some(opt) => Some(opt.2),
10151002
None => None,

src/query/service/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ enable-histogram-metrics = [
4444
# Workspace dependencies
4545
common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
47-
common-auth = { path = "../../common/auth" }
4847
common-base = { path = "../../common/base" }
4948
common-cache = { path = "../../common/cache" }
5049
common-catalog = { path = "../catalog" }
@@ -73,6 +72,7 @@ common-pipeline-sources = { path = "../pipeline/sources" }
7372
common-pipeline-transforms = { path = "../pipeline/transforms" }
7473
common-profile = { path = "../../common/profile" }
7574
common-settings = { path = "../settings" }
75+
common-sharing = { path = "../sharing" }
7676
common-sql = { path = "../sql" }
7777
common-storage = { path = "../../common/storage" }
7878
common-storages-factory = { path = "../storages/factory" }
@@ -111,7 +111,6 @@ base64 = "0.21.0"
111111
bincode = "1.3.3"
112112
bumpalo = { workspace = true }
113113
byteorder = "1.4.3"
114-
bytes = "1"
115114
chrono = { workspace = true }
116115
chrono-tz = { workspace = true }
117116
ctor = "0.1.26"

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use crate::storages::Table;
7373
#[derive(Clone)]
7474
pub struct MutableCatalog {
7575
ctx: CatalogContext,
76+
tenant: String,
7677
}
7778

7879
impl MutableCatalog {
@@ -123,13 +124,17 @@ impl MutableCatalog {
123124
storage_factory: Arc::new(storage_factory),
124125
database_factory: Arc::new(database_factory),
125126
};
126-
Ok(MutableCatalog { ctx })
127+
Ok(MutableCatalog {
128+
ctx,
129+
tenant: conf.query.tenant_id.clone(),
130+
})
127131
}
128132

129133
fn build_db_instance(&self, db_info: &Arc<DatabaseInfo>) -> Result<Arc<dyn Database>> {
130134
let ctx = DatabaseContext {
131135
meta: self.ctx.meta.clone(),
132136
storage_factory: self.ctx.storage_factory.clone(),
137+
tenant: self.tenant.clone(),
133138
};
134139
self.ctx.database_factory.get_database(ctx, db_info)
135140
}

src/query/service/src/databases/database_context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ use crate::storages::StorageFactory;
2323
pub struct DatabaseContext {
2424
pub meta: MetaStore,
2525
pub storage_factory: Arc<StorageFactory>,
26+
pub tenant: String,
2627
}

src/query/service/src/databases/share/share_database.rs

Lines changed: 9 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@
1414

1515
use std::str;
1616
use std::sync::Arc;
17-
use std::sync::Mutex;
1817

19-
use bytes::Bytes;
20-
use common_auth::RefreshableToken;
2118
use common_catalog::table::Table;
22-
use common_config::GlobalConfig;
2319
use common_exception::ErrorCode;
2420
use common_exception::Result;
2521
use common_meta_api::SchemaApi;
26-
use common_meta_api::ShareApi;
2722
use common_meta_app::schema::CreateTableReq;
2823
use common_meta_app::schema::DatabaseInfo;
2924
use common_meta_app::schema::DropTableByIdReq;
@@ -43,89 +38,23 @@ use common_meta_app::schema::UpsertTableCopiedFileReply;
4338
use common_meta_app::schema::UpsertTableCopiedFileReq;
4439
use common_meta_app::schema::UpsertTableOptionReply;
4540
use common_meta_app::schema::UpsertTableOptionReq;
46-
use common_meta_app::share::GetShareEndpointReq;
47-
use common_meta_app::share::TableInfoMap;
48-
use common_users::UserApiProvider;
49-
use http::header::AUTHORIZATION;
50-
use http::header::CONTENT_LENGTH;
51-
use http::Method;
52-
use http::Request;
53-
use opendal::raw::AsyncBody;
54-
use opendal::raw::HttpClient;
55-
use tracing::error;
56-
use tracing::info;
41+
use common_sharing::ShareEndpointManager;
5742

5843
use crate::databases::Database;
5944
use crate::databases::DatabaseContext;
6045

61-
const TENANT_HEADER: &str = "X-DATABEND-TENANT";
62-
63-
#[derive(Clone, Debug)]
64-
struct EndpointConfig {
65-
pub url: String,
66-
67-
pub token: RefreshableToken,
68-
}
69-
7046
// Share Database implementation for `Database` trait.
7147
#[derive(Clone)]
7248
pub struct ShareDatabase {
7349
ctx: DatabaseContext,
7450

7551
db_info: DatabaseInfo,
76-
77-
client: HttpClient,
78-
79-
endpoint_config: Arc<Mutex<Option<EndpointConfig>>>,
8052
}
8153

8254
impl ShareDatabase {
8355
pub const NAME: &'static str = "SHARE";
8456
pub fn try_create(ctx: DatabaseContext, db_info: DatabaseInfo) -> Result<Box<dyn Database>> {
85-
Ok(Box::new(Self {
86-
ctx,
87-
db_info,
88-
client: HttpClient::new()?,
89-
endpoint_config: Arc::new(Mutex::new(None)),
90-
}))
91-
}
92-
93-
async fn get_share_endpoint(&self) -> Result<EndpointConfig> {
94-
let endpoint_config = {
95-
let endpoint_config = self.endpoint_config.lock().unwrap();
96-
endpoint_config.clone()
97-
};
98-
99-
match endpoint_config {
100-
Some(ref endpoint_config) => Ok(endpoint_config.clone()),
101-
None => {
102-
let share_name = self.db_info.meta.from_share.clone().unwrap();
103-
let req = GetShareEndpointReq {
104-
tenant: self.db_info.name_ident.tenant.clone(),
105-
endpoint: None,
106-
to_tenant: Some(share_name.tenant.clone()),
107-
};
108-
let meta_api = UserApiProvider::instance().get_meta_store_client();
109-
let resp = meta_api.get_share_endpoint(req).await?;
110-
if let Some((_, endpoint_meta)) = resp.share_endpoint_meta_vec.into_iter().next() {
111-
let endpoint = format!(
112-
"{}tenant/{}/{}/meta",
113-
endpoint_meta.url, share_name.tenant, share_name.share_name,
114-
);
115-
let config = EndpointConfig {
116-
url: endpoint,
117-
token: RefreshableToken::Direct(self.db_info.name_ident.tenant.clone()),
118-
};
119-
let mut endpoint_config = self.endpoint_config.lock().unwrap();
120-
*endpoint_config = Some(config.clone());
121-
return Ok(config);
122-
}
123-
124-
Err(ErrorCode::EmptyShareEndpointConfig(
125-
"EmptyShareEndpointConfig, cannot query share databases".to_string(),
126-
))
127-
}
128-
}
57+
Ok(Box::new(Self { ctx, db_info }))
12958
}
13059

13160
fn load_tables(&self, table_infos: Vec<Arc<TableInfo>>) -> Result<Vec<Arc<dyn Table>>> {
@@ -136,53 +65,11 @@ impl ShareDatabase {
13665
})
13766
}
13867

139-
// Read table info map from operator
140-
async fn get_table_info_map(&self, req: Vec<String>) -> Result<TableInfoMap> {
141-
// only when endpoint_config is Some can try again
142-
let mut try_again = {
143-
let endpoint_config = self.endpoint_config.lock().unwrap();
144-
!endpoint_config.is_some()
145-
};
146-
147-
loop {
148-
let endpoint_config = self.get_share_endpoint().await?;
149-
let bs = Bytes::from(serde_json::to_vec(&req)?);
150-
let auth = endpoint_config.token.to_header().await?;
151-
let requester = GlobalConfig::instance().as_ref().query.tenant_id.clone();
152-
let req = Request::builder()
153-
.method(Method::POST)
154-
.uri(&endpoint_config.url)
155-
.header(AUTHORIZATION, auth)
156-
.header(CONTENT_LENGTH, bs.len())
157-
.header(TENANT_HEADER, requester)
158-
.body(AsyncBody::Bytes(bs))?;
159-
let resp = self.client.send_async(req).await;
160-
match resp {
161-
Ok(resp) => {
162-
let bs = resp.into_body().bytes().await?;
163-
let table_info_map: TableInfoMap = serde_json::from_slice(&bs)?;
164-
165-
return Ok(table_info_map);
166-
}
167-
Err(err) => {
168-
if try_again {
169-
error!("get_table_info_map error: {:?}", err);
170-
return Err(err.into());
171-
} else {
172-
// endpoint may be changed, so cleanup endpoint and try again
173-
try_again = true;
174-
let mut endpoint_config = self.endpoint_config.lock().unwrap();
175-
*endpoint_config = None;
176-
info!("get_table_info_map error: {:?}, try again", err);
177-
}
178-
}
179-
}
180-
}
181-
}
182-
18368
async fn get_table_info(&self, table_name: &str) -> Result<Arc<TableInfo>> {
184-
let table_info_map = self
185-
.get_table_info_map(vec![table_name.to_string()])
69+
let table_info_map = ShareEndpointManager::instance()
70+
.get_table_info_map(&self.ctx.tenant, &self.db_info, vec![
71+
table_name.to_string(),
72+
])
18673
.await?;
18774
match table_info_map.get(table_name) {
18875
None => Err(ErrorCode::UnknownTable(format!(
@@ -194,7 +81,9 @@ impl ShareDatabase {
19481
}
19582

19683
async fn list_tables(&self) -> Result<Vec<Arc<TableInfo>>> {
197-
let table_info_map = self.get_table_info_map(vec![]).await?;
84+
let table_info_map = ShareEndpointManager::instance()
85+
.get_table_info_map(&self.ctx.tenant, &self.db_info, vec![])
86+
.await?;
19887
let table_infos: Vec<Arc<TableInfo>> = table_info_map
19988
.values()
20089
.map(|table_info| Arc::new(table_info.to_owned()))

src/query/service/src/global_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_catalog::catalog::CatalogManager;
1919
use common_config::GlobalConfig;
2020
use common_config::InnerConfig;
2121
use common_exception::Result;
22+
use common_sharing::ShareEndpointManager;
2223
use common_storage::DataOperator;
2324
use common_storage::ShareTableConfig;
2425
use common_tracing::QueryLogger;
@@ -76,6 +77,7 @@ impl GlobalServices {
7677
)
7778
.await?;
7879
RoleCacheManager::init()?;
80+
ShareEndpointManager::init()?;
7981

8082
Ok(())
8183
}

0 commit comments

Comments
 (0)