Skip to content

Commit 367a17d

Browse files
authored
refactor(query): add warehouse_id config (#18407)
* refactor(query): add warehouse_id config * refactor(query): add warehouse_id config * refactor(query): add warehouse_id config
1 parent 91b8bce commit 367a17d

File tree

23 files changed

+159
-54
lines changed

23 files changed

+159
-54
lines changed

src/binaries/query/entry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
149149
.await
150150
.with_context(make_error)?;
151151
info!(
152-
"Databend query has been registered:{:?} to metasrv:{:?}.",
153-
conf.query.cluster_id, conf.meta.endpoints
152+
"Databend query has been registered:{:?}/{:?} to metasrv:{:?}.",
153+
conf.query.warehouse_id, conf.query.cluster_id, conf.meta.endpoints
154154
);
155155
}
156156

src/query/catalog/src/cluster_info.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_exception::ErrorCode;
18+
use databend_common_exception::Result;
1719
use databend_common_meta_types::NodeInfo;
1820

1921
#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -43,4 +45,24 @@ impl Cluster {
4345
pub fn is_empty(&self) -> bool {
4446
self.nodes.len() <= 1
4547
}
48+
49+
pub fn get_cluster_id(&self) -> Result<String> {
50+
for node in &self.nodes {
51+
if node.id == self.local_id {
52+
return Ok(node.cluster_id.clone());
53+
}
54+
}
55+
56+
Err(ErrorCode::Internal("Cannot found local node in cluster"))
57+
}
58+
59+
pub fn get_warehouse_id(&self) -> Result<String> {
60+
for node in &self.nodes {
61+
if node.id == self.local_id {
62+
return Ok(node.warehouse_id.clone());
63+
}
64+
}
65+
66+
Err(ErrorCode::Internal("Cannot found local node in cluster"))
67+
}
4668
}

src/query/config/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,6 +1581,10 @@ pub struct QueryConfig {
15811581
#[clap(long, value_name = "VALUE", default_value_t)]
15821582
pub cluster_id: String,
15831583

1584+
/// ID for construct the warehouse.
1585+
#[clap(long, value_name = "VALUE", default_value_t)]
1586+
pub warehouse_id: String,
1587+
15841588
#[clap(long, value_name = "VALUE", default_value_t)]
15851589
pub num_cpus: u64,
15861590

@@ -1930,7 +1934,14 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
19301934
type Error = ErrorCode;
19311935

19321936
fn try_into(self) -> Result<InnerQueryConfig> {
1937+
let mut warehouse_id = self.warehouse_id;
1938+
1939+
if warehouse_id.is_empty() {
1940+
warehouse_id = self.cluster_id.clone();
1941+
}
1942+
19331943
Ok(InnerQueryConfig {
1944+
warehouse_id,
19341945
tenant_id: Tenant::new_or_err(self.tenant_id, "")
19351946
.map_err(|_e| ErrorCode::InvalidConfig("tenant-id can not be empty"))?,
19361947
cluster_id: self.cluster_id,
@@ -2030,6 +2041,7 @@ impl From<InnerQueryConfig> for QueryConfig {
20302041
Self {
20312042
tenant_id: inner.tenant_id.tenant_name().to_string(),
20322043
cluster_id: inner.cluster_id,
2044+
warehouse_id: inner.warehouse_id,
20332045
num_cpus: inner.num_cpus,
20342046
mysql_handler_host: inner.mysql_handler_host,
20352047
mysql_handler_port: inner.mysql_handler_port,

src/query/config/src/inner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ pub struct QueryConfig {
149149
pub tenant_id: Tenant,
150150
/// ID for construct the cluster.
151151
pub cluster_id: String,
152+
/// ID for construct the warehouse.
153+
pub warehouse_id: String,
152154
// ID for the query node.
153155
// This only initialized when InnerConfig::load().
154156
pub node_id: String,
@@ -259,6 +261,7 @@ impl Default for QueryConfig {
259261
Self {
260262
tenant_id: Tenant::new_or_err("admin", "default()").unwrap(),
261263
cluster_id: "".to_string(),
264+
warehouse_id: "".to_string(),
262265
node_id: "".to_string(),
263266
node_secret: "".to_string(),
264267
num_cpus: 0,

src/query/ee/src/resource_management/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ pub async fn init_resources_management(cfg: &InnerConfig) -> Result<()> {
3636
true => Err(ErrorCode::InvalidConfig(
3737
"cluster_id is empty without resources management.",
3838
)),
39-
false => SelfManagedResourcesManagement::create(cfg),
39+
false => match cfg.query.warehouse_id.is_empty() {
40+
true => Err(ErrorCode::InvalidConfig(
41+
"warehouse_id is empty without resources management.",
42+
)),
43+
false => SelfManagedResourcesManagement::create(cfg),
44+
},
4045
},
4146
Some(resources_management) => {
4247
match resources_management.typ.to_ascii_lowercase().as_str() {

src/query/ee/src/resource_management/resources_management_self_managed.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ pub struct SelfManagedResourcesManagement {}
2929

3030
impl SelfManagedResourcesManagement {
3131
pub fn create(cfg: &InnerConfig) -> Result<Arc<dyn ResourcesManagement>> {
32-
if cfg.query.cluster_id.is_empty() {
32+
if cfg.query.cluster_id.is_empty() || cfg.query.warehouse_id.is_empty() {
3333
return Err(ErrorCode::InvalidConfig(
34-
"cluster_id is empty with self-managed resources management",
34+
"cluster_id or warehouse_id is empty with self-managed resources management",
3535
));
3636
}
3737

@@ -48,7 +48,7 @@ impl ResourcesManagement for SelfManagedResourcesManagement {
4848
async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
4949
let config = GlobalConfig::instance();
5050
node.cluster_id = config.query.cluster_id.clone();
51-
node.warehouse_id = config.query.cluster_id.clone();
51+
node.warehouse_id = config.query.warehouse_id.clone();
5252
node.node_type = NodeType::SelfManaged;
5353
Ok(())
5454
}

src/query/ee/src/resource_management/resources_management_system.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl ResourcesManagement for SystemResourcesManagement {
4545
async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
4646
let config = GlobalConfig::instance();
4747
assert!(config.query.cluster_id.is_empty());
48+
assert!(config.query.warehouse_id.is_empty());
4849
assert!(config.query.resources_management.is_some());
4950

5051
if let Some(resources_management) = &config.query.resources_management {

src/query/ee/src/test_kits/setup.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ impl TestFixture {
4545
.register_to_metastore(config)
4646
.await?;
4747
info!(
48-
"Databend query has been registered:{:?} to metasrv:{:?}.",
49-
config.query.cluster_id, config.meta.endpoints
48+
"Databend query has been registered:{:?}/{:?} to metasrv:{:?}.",
49+
config.query.warehouse_id, config.query.cluster_id, config.meta.endpoints
5050
);
5151
}
5252

src/query/ee_features/resources_management/src/resources_management.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl ResourcesManagement for DummyResourcesManagement {
9292
async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
9393
let config = GlobalConfig::instance();
9494
node.cluster_id = config.query.cluster_id.clone();
95-
node.warehouse_id = config.query.cluster_id.clone();
95+
node.warehouse_id = config.query.warehouse_id.clone();
9696
node.node_type = NodeType::SelfManaged;
9797
Ok(())
9898
}

src/query/service/src/clusters/cluster.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub struct ClusterDiscovery {
8282
#[async_trait::async_trait]
8383
pub trait ClusterHelper {
8484
fn create(nodes: Vec<Arc<NodeInfo>>, local_id: String) -> Arc<Cluster>;
85-
fn empty() -> Arc<Cluster>;
85+
fn empty(node: NodeInfo) -> Arc<Cluster>;
8686
fn is_empty(&self) -> bool;
8787
fn is_local(&self, node: &NodeInfo) -> bool;
8888
fn local_id(&self) -> String;
@@ -110,12 +110,10 @@ impl ClusterHelper for Cluster {
110110
})
111111
}
112112

113-
fn empty() -> Arc<Cluster> {
114-
Arc::new(Cluster {
115-
unassign: false,
116-
local_id: String::from(""),
117-
nodes: Vec::new(),
118-
})
113+
fn empty(node: NodeInfo) -> Arc<Cluster> {
114+
let node_info = Arc::new(node);
115+
let local_id = node_info.id.clone();
116+
Cluster::create(vec![node_info], local_id)
119117
}
120118

121119
fn is_empty(&self) -> bool {
@@ -321,10 +319,23 @@ impl ClusterDiscovery {
321319

322320
// compatibility, for self-managed nodes, we allow queries to continue executing even when the heartbeat fails.
323321
if cluster_nodes.is_empty() && !config.query.cluster_id.is_empty() {
324-
let mut cluster = Cluster::empty();
325-
let mut_cluster = Arc::get_mut(&mut cluster).unwrap();
326-
mut_cluster.local_id = self.local_id.clone();
327-
return Ok(cluster);
322+
let mut node_info = NodeInfo::create(
323+
config.query.node_id.clone(),
324+
config.query.node_secret.clone(),
325+
config.query.num_cpus,
326+
format!(
327+
"{}:{}",
328+
config.query.http_handler_host, config.query.http_handler_port
329+
),
330+
config.query.flight_api_address.clone(),
331+
config.query.discovery_address.clone(),
332+
String::new(),
333+
String::new(),
334+
);
335+
336+
node_info.cluster_id = config.query.cluster_id.clone();
337+
node_info.warehouse_id = config.query.warehouse_id.clone();
338+
return Ok(Cluster::empty(node_info));
328339
}
329340

330341
Ok(Cluster::create(res, self.local_id.clone()))
@@ -355,14 +366,49 @@ impl ClusterDiscovery {
355366
true => self.warehouse_manager.discover(&config.query.node_id).await,
356367
false => {
357368
self.warehouse_manager
358-
.list_warehouse_cluster_nodes(&self.cluster_id, &self.cluster_id)
369+
.list_warehouse_cluster_nodes(
370+
&config.query.warehouse_id,
371+
&config.query.cluster_id,
372+
)
359373
.await
360374
}
361375
};
362376

363377
self.create_cluster_with_try_connect(config, nodes).await
364378
}
365379

380+
pub async fn single_node_cluster(&self, config: &InnerConfig) -> Result<Arc<Cluster>> {
381+
match self
382+
.warehouse_manager
383+
.get_node_info(&config.query.node_id)
384+
.await?
385+
{
386+
None => {
387+
let mut node_info = NodeInfo::create(
388+
config.query.node_id.clone(),
389+
config.query.node_secret.clone(),
390+
config.query.num_cpus,
391+
format!(
392+
"{}:{}",
393+
config.query.http_handler_host, config.query.http_handler_port
394+
),
395+
config.query.flight_api_address.clone(),
396+
config.query.discovery_address.clone(),
397+
String::new(),
398+
String::new(),
399+
);
400+
401+
node_info.cluster_id = config.query.cluster_id.clone();
402+
node_info.warehouse_id = config.query.warehouse_id.clone();
403+
Ok(Cluster::empty(node_info))
404+
}
405+
Some(v) => Ok(Cluster::create(
406+
vec![Arc::new(v)],
407+
config.query.node_id.clone(),
408+
)),
409+
}
410+
}
411+
366412
pub async fn find_node_by_warehouse(
367413
self: Arc<Self>,
368414
warehouse: &str,

0 commit comments

Comments
 (0)