Skip to content

Commit 420f4b4

Browse files
authored
feat(cluster): support warehouse level show processlist and kill query (#17249)
* feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse * feat(cluster): support proxy to warehouse
1 parent 82611b1 commit 420f4b4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+392
-187
lines changed

src/query/catalog/src/plan/partition.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ pub enum PartitionsShuffleKind {
102102
// Bind the Partition to executor by partition.rand() order.
103103
Rand,
104104
// Bind the Partition to executor by broadcast
105-
Broadcast,
105+
BroadcastCluster,
106+
// Bind the Partition to warehouse executor by broadcast
107+
BroadcastWarehouse,
106108
}
107109

108110
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
@@ -190,7 +192,8 @@ impl Partitions {
190192
parts.shuffle(&mut rng);
191193
parts
192194
}
193-
PartitionsShuffleKind::Broadcast => {
195+
// the executors will be all nodes in the warehouse if a query is BroadcastWarehouse.
196+
PartitionsShuffleKind::BroadcastCluster | PartitionsShuffleKind::BroadcastWarehouse => {
194197
return Ok(executors_sorted
195198
.into_iter()
196199
.map(|executor| {

src/query/catalog/src/table.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ pub trait Table: Sync + Send {
9494
self.get_table_info().ident.table_id
9595
}
9696

97-
fn is_local(&self) -> bool {
98-
true
97+
fn distribution_level(&self) -> DistributionLevel {
98+
DistributionLevel::Local
9999
}
100100

101101
fn as_any(&self) -> &dyn Any;
@@ -450,7 +450,7 @@ pub trait Table: Sync + Send {
450450
false
451451
}
452452

453-
fn broadcast_truncate_to_cluster(&self) -> bool {
453+
fn broadcast_truncate_to_warehouse(&self) -> bool {
454454
false
455455
}
456456

@@ -678,3 +678,10 @@ pub struct ColumnRange {
678678
pub min: Bound,
679679
pub max: Bound,
680680
}
681+
682+
#[derive(Debug)]
683+
pub enum DistributionLevel {
684+
Local,
685+
Cluster,
686+
Warehouse,
687+
}

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ pub trait TableContext: Send + Sync {
234234
fn get_settings(&self) -> Arc<Settings>;
235235
fn get_session_settings(&self) -> Arc<Settings>;
236236
fn get_cluster(&self) -> Arc<Cluster>;
237+
fn set_cluster(&self, cluster: Arc<Cluster>);
238+
async fn get_warehouse_cluster(&self) -> Result<Arc<Cluster>>;
237239
fn get_processes_info(&self) -> Vec<ProcessInfo>;
238240
fn get_queued_queries(&self) -> Vec<ProcessInfo>;
239241
fn get_queries_profile(&self) -> HashMap<String, Vec<PlanProfile>>;

src/query/catalog/tests/it/partitions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ fn test_partition_reshuffle() {
239239

240240
// Broadcast.
241241
{
242-
let partitions = gen_parts(PartitionsShuffleKind::Broadcast, 3);
242+
let partitions = gen_parts(PartitionsShuffleKind::BroadcastCluster, 3);
243243
let shuffle = partitions.reshuffle(executors_2.clone()).unwrap();
244244

245245
writeln!(

src/query/management/src/warehouse/warehouse_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,7 @@ pub trait WarehouseApi: Sync + Send {
132132

133133
async fn discover(&self, node_id: &str) -> Result<Vec<NodeInfo>>;
134134

135+
async fn discover_warehouse_nodes(&self, node_id: &str) -> Result<Vec<NodeInfo>>;
136+
135137
async fn get_node_info(&self, node_id: &str) -> Result<Option<NodeInfo>>;
136138
}

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,13 +1485,34 @@ impl WarehouseApi for WarehouseMgr {
14851485
return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty."));
14861486
}
14871487

1488-
let warehouse_snapshot = self.warehouse_snapshot(&warehouse).await?;
1488+
let nodes_prefix = format!(
1489+
"{}/{}/",
1490+
self.cluster_node_key_prefix,
1491+
escape_for_key(&warehouse)?
1492+
);
14891493

1490-
Ok(warehouse_snapshot
1491-
.snapshot_nodes
1492-
.into_iter()
1493-
.map(|x| x.node_info)
1494-
.collect())
1494+
let values = self.metastore.prefix_list_kv(&nodes_prefix).await?;
1495+
1496+
let mut nodes_info = Vec::with_capacity(values.len());
1497+
for (node_key, value) in values {
1498+
let mut node_info = serde_json::from_slice::<NodeInfo>(&value.data)?;
1499+
1500+
let suffix = &node_key[nodes_prefix.len()..];
1501+
1502+
let Some((cluster, node)) = suffix.split_once('/') else {
1503+
return Err(ErrorCode::InvalidWarehouse(format!(
1504+
"Node key is invalid {:?}",
1505+
node_key
1506+
)));
1507+
};
1508+
1509+
node_info.id = unescape_for_key(node)?;
1510+
node_info.cluster_id = unescape_for_key(cluster)?;
1511+
node_info.warehouse_id = warehouse.to_string();
1512+
nodes_info.push(node_info);
1513+
}
1514+
1515+
Ok(nodes_info)
14951516
}
14961517

14971518
async fn add_warehouse_cluster(
@@ -2155,6 +2176,32 @@ impl WarehouseApi for WarehouseMgr {
21552176
.collect::<Vec<_>>())
21562177
}
21572178

2179+
async fn discover_warehouse_nodes(&self, node_id: &str) -> Result<Vec<NodeInfo>> {
2180+
let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(node_id)?);
2181+
2182+
let Some(seq) = self.metastore.get_kv(&node_key).await? else {
2183+
return Err(ErrorCode::NotFoundClusterNode(format!(
2184+
"Node {} is offline, Please restart this node.",
2185+
node_id
2186+
)));
2187+
};
2188+
2189+
let node = serde_json::from_slice::<NodeInfo>(&seq.data)?;
2190+
2191+
if !node.assigned_warehouse() {
2192+
return Ok(vec![node]);
2193+
}
2194+
2195+
let expect_version = DATABEND_COMMIT_VERSION.to_string();
2196+
2197+
Ok(self
2198+
.list_warehouse_nodes(node.warehouse_id.clone())
2199+
.await?
2200+
.into_iter()
2201+
.filter(|x| x.binary_version == expect_version)
2202+
.collect::<Vec<_>>())
2203+
}
2204+
21582205
async fn get_node_info(&self, node_id: &str) -> Result<Option<NodeInfo>> {
21592206
let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(node_id)?);
21602207
match self.metastore.get_kv(&node_key).await? {

src/query/service/src/clusters/cluster.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,17 +273,11 @@ impl ClusterDiscovery {
273273
Ok((lift_time, Arc::new(cluster_manager)))
274274
}
275275

276-
#[async_backtrace::framed]
277-
pub async fn discover(&self, config: &InnerConfig) -> Result<Arc<Cluster>> {
278-
let nodes = match config.query.cluster_id.is_empty() {
279-
true => self.warehouse_manager.discover(&config.query.node_id).await,
280-
false => {
281-
self.warehouse_manager
282-
.list_warehouse_cluster_nodes(&self.cluster_id, &self.cluster_id)
283-
.await
284-
}
285-
};
286-
276+
async fn create_cluster_with_try_connect(
277+
&self,
278+
config: &InnerConfig,
279+
nodes: Result<Vec<NodeInfo>>,
280+
) -> Result<Arc<Cluster>> {
287281
match nodes {
288282
Err(cause) => {
289283
metric_incr_cluster_error_count(
@@ -336,6 +330,37 @@ impl ClusterDiscovery {
336330
}
337331
}
338332

333+
pub async fn discover_warehouse_nodes(&self, config: &InnerConfig) -> Result<Arc<Cluster>> {
334+
let nodes = match config.query.cluster_id.is_empty() {
335+
true => {
336+
self.warehouse_manager
337+
.discover_warehouse_nodes(&config.query.node_id)
338+
.await
339+
}
340+
false => {
341+
self.warehouse_manager
342+
.list_warehouse_nodes(self.cluster_id.clone())
343+
.await
344+
}
345+
};
346+
347+
self.create_cluster_with_try_connect(config, nodes).await
348+
}
349+
350+
#[async_backtrace::framed]
351+
pub async fn discover(&self, config: &InnerConfig) -> Result<Arc<Cluster>> {
352+
let nodes = match config.query.cluster_id.is_empty() {
353+
true => self.warehouse_manager.discover(&config.query.node_id).await,
354+
false => {
355+
self.warehouse_manager
356+
.list_warehouse_cluster_nodes(&self.cluster_id, &self.cluster_id)
357+
.await
358+
}
359+
};
360+
361+
self.create_cluster_with_try_connect(config, nodes).await
362+
}
363+
339364
pub async fn find_node_by_warehouse(
340365
self: Arc<Self>,
341366
warehouse: &str,

src/query/service/src/interpreters/interpreter_kill.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,45 +31,46 @@ use crate::sessions::QueryContext;
3131
pub struct KillInterpreter {
3232
ctx: Arc<QueryContext>,
3333
plan: KillPlan,
34-
proxy_to_cluster: bool,
34+
proxy_to_warehouse: bool,
3535
}
3636

3737
impl KillInterpreter {
3838
pub fn try_create(ctx: Arc<QueryContext>, plan: KillPlan) -> Result<Self> {
3939
Ok(KillInterpreter {
4040
ctx,
4141
plan,
42-
proxy_to_cluster: true,
42+
proxy_to_warehouse: true,
4343
})
4444
}
4545

4646
pub fn from_flight(ctx: Arc<QueryContext>, plan: KillPlan) -> Result<Self> {
4747
Ok(KillInterpreter {
4848
ctx,
4949
plan,
50-
proxy_to_cluster: false,
50+
proxy_to_warehouse: false,
5151
})
5252
}
5353

5454
#[async_backtrace::framed]
55-
async fn kill_cluster_query(&self) -> Result<PipelineBuildResult> {
56-
let cluster = self.ctx.get_cluster();
55+
async fn kill_warehouse_query(&self) -> Result<PipelineBuildResult> {
5756
let settings = self.ctx.get_settings();
57+
let warehouse = self.ctx.get_warehouse_cluster().await?;
58+
5859
let flight_params = FlightParams {
5960
timeout: settings.get_flight_client_timeout()?,
6061
retry_times: settings.get_flight_max_retry_times()?,
6162
retry_interval: settings.get_flight_retry_interval()?,
6263
};
6364

64-
let mut message = HashMap::with_capacity(cluster.nodes.len());
65+
let mut message = HashMap::with_capacity(warehouse.nodes.len());
6566

66-
for node_info in &cluster.nodes {
67-
if node_info.id != cluster.local_id {
67+
for node_info in &warehouse.nodes {
68+
if node_info.id != warehouse.local_id {
6869
message.insert(node_info.id.clone(), self.plan.clone());
6970
}
7071
}
7172

72-
let res = cluster
73+
let res = warehouse
7374
.do_action::<_, bool>(KILL_QUERY, message, flight_params)
7475
.await?;
7576

@@ -85,8 +86,8 @@ impl KillInterpreter {
8586
#[async_backtrace::framed]
8687
async fn execute_kill(&self, session_id: &String) -> Result<PipelineBuildResult> {
8788
match self.ctx.get_session_by_id(session_id) {
88-
None => match self.proxy_to_cluster {
89-
true => self.kill_cluster_query().await,
89+
None => match self.proxy_to_warehouse {
90+
true => self.kill_warehouse_query().await,
9091
false => Err(ErrorCode::UnknownSession(format!(
9192
"Not found session id {}",
9293
session_id

src/query/service/src/interpreters/interpreter_set_priority.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,33 +30,33 @@ use crate::sessions::QueryContext;
3030
pub struct SetPriorityInterpreter {
3131
ctx: Arc<QueryContext>,
3232
plan: SetPriorityPlan,
33-
proxy_to_cluster: bool,
33+
proxy_to_warehouse: bool,
3434
}
3535

3636
impl SetPriorityInterpreter {
3737
pub fn try_create(ctx: Arc<QueryContext>, plan: SetPriorityPlan) -> Result<Self> {
3838
Ok(SetPriorityInterpreter {
3939
ctx,
4040
plan,
41-
proxy_to_cluster: true,
41+
proxy_to_warehouse: true,
4242
})
4343
}
4444

4545
pub fn from_flight(ctx: Arc<QueryContext>, plan: SetPriorityPlan) -> Result<Self> {
4646
Ok(SetPriorityInterpreter {
4747
ctx,
4848
plan,
49-
proxy_to_cluster: false,
49+
proxy_to_warehouse: false,
5050
})
5151
}
5252

5353
#[async_backtrace::framed]
54-
async fn set_cluster_priority(&self) -> Result<PipelineBuildResult> {
55-
let cluster = self.ctx.get_cluster();
54+
async fn set_warehouse_priority(&self) -> Result<PipelineBuildResult> {
55+
let warehouse = self.ctx.get_warehouse_cluster().await?;
5656

57-
let mut message = HashMap::with_capacity(cluster.nodes.len());
58-
for node_info in &cluster.nodes {
59-
if node_info.id != cluster.local_id {
57+
let mut message = HashMap::with_capacity(warehouse.nodes.len());
58+
for node_info in &warehouse.nodes {
59+
if node_info.id != warehouse.local_id {
6060
message.insert(node_info.id.clone(), self.plan.clone());
6161
}
6262
}
@@ -67,7 +67,7 @@ impl SetPriorityInterpreter {
6767
retry_times: settings.get_flight_max_retry_times()?,
6868
retry_interval: settings.get_flight_retry_interval()?,
6969
};
70-
let res = cluster
70+
let res = warehouse
7171
.do_action::<_, bool>(SET_PRIORITY, message, flight_params)
7272
.await?;
7373

@@ -96,8 +96,8 @@ impl Interpreter for SetPriorityInterpreter {
9696
async fn execute2(&self) -> Result<PipelineBuildResult> {
9797
let id = &self.plan.id;
9898
match self.ctx.get_session_by_id(id) {
99-
None => match self.proxy_to_cluster {
100-
true => self.set_cluster_priority().await,
99+
None => match self.proxy_to_warehouse {
100+
true => self.set_warehouse_priority().await,
101101
false => Err(ErrorCode::UnknownSession(format!(
102102
"Not found session id {}",
103103
id

src/query/service/src/interpreters/interpreter_system_action.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,23 @@ use crate::sessions::QueryContext;
3131
pub struct SystemActionInterpreter {
3232
ctx: Arc<QueryContext>,
3333
plan: SystemPlan,
34-
proxy_to_cluster: bool,
34+
proxy_to_warehouse: bool,
3535
}
3636

3737
impl SystemActionInterpreter {
3838
pub fn try_create(ctx: Arc<QueryContext>, plan: SystemPlan) -> Result<Self> {
3939
Ok(SystemActionInterpreter {
4040
ctx,
4141
plan,
42-
proxy_to_cluster: true,
42+
proxy_to_warehouse: true,
4343
})
4444
}
4545

4646
pub fn from_flight(ctx: Arc<QueryContext>, plan: SystemPlan) -> Result<Self> {
4747
Ok(SystemActionInterpreter {
4848
ctx,
4949
plan,
50-
proxy_to_cluster: false,
50+
proxy_to_warehouse: false,
5151
})
5252
}
5353
}
@@ -65,11 +65,12 @@ impl Interpreter for SystemActionInterpreter {
6565
#[async_backtrace::framed]
6666
#[fastrace::trace]
6767
async fn execute2(&self) -> Result<PipelineBuildResult> {
68-
if self.proxy_to_cluster {
69-
let cluster = self.ctx.get_cluster();
70-
let mut message = HashMap::with_capacity(cluster.nodes.len());
71-
for node_info in &cluster.nodes {
72-
if node_info.id != cluster.local_id {
68+
if self.proxy_to_warehouse {
69+
let warehouse = self.ctx.get_warehouse_cluster().await?;
70+
71+
let mut message = HashMap::with_capacity(warehouse.nodes.len());
72+
for node_info in &warehouse.nodes {
73+
if node_info.id != warehouse.local_id {
7374
message.insert(node_info.id.clone(), self.plan.clone());
7475
}
7576
}
@@ -80,7 +81,7 @@ impl Interpreter for SystemActionInterpreter {
8081
retry_times: settings.get_flight_max_retry_times()?,
8182
retry_interval: settings.get_flight_retry_interval()?,
8283
};
83-
cluster
84+
warehouse
8485
.do_action::<_, ()>(SYSTEM_ACTION, message, flight_params)
8586
.await?;
8687
}

0 commit comments

Comments
 (0)