Skip to content

Commit 83dce81

Browse files
authored
chore(query): add cache id for distribute scheduler (#17708)
* chore(query): add cache id for distribute scheduler * chore(query): add cache id for distribute scheduler * chore(query): add cache id for distribute scheduler
1 parent e40b13c commit 83dce81

File tree

9 files changed

+112
-50
lines changed

9 files changed

+112
-50
lines changed

src/meta/types/src/cluster.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub struct NodeInfo {
4848

4949
#[serde(skip_serializing_if = "Option::is_none")]
5050
pub runtime_node_group: Option<String>,
51+
52+
pub cache_id: String,
5153
}
5254

5355
impl NodeInfo {
@@ -59,6 +61,7 @@ impl NodeInfo {
5961
flight_address: String,
6062
discovery_address: String,
6163
binary_version: String,
64+
cache_id: String,
6265
) -> NodeInfo {
6366
NodeInfo {
6467
id,
@@ -69,6 +72,7 @@ impl NodeInfo {
6972
flight_address,
7073
discovery_address,
7174
binary_version,
75+
cache_id,
7276
cluster_id: "".to_string(),
7377
warehouse_id: "".to_string(),
7478
node_type: NodeType::SystemManaged,
@@ -102,6 +106,7 @@ impl NodeInfo {
102106
binary_version: self.binary_version.clone(),
103107
node_type: self.node_type.clone(),
104108
node_group: self.node_group.clone(),
109+
cache_id: self.cache_id.clone(),
105110
cluster_id: String::new(),
106111
warehouse_id: String::new(),
107112
runtime_node_group: self.runtime_node_group.clone(),
@@ -120,6 +125,7 @@ impl NodeInfo {
120125
binary_version: self.binary_version.clone(),
121126
node_type: self.node_type.clone(),
122127
node_group: self.node_group.clone(),
128+
cache_id: self.cache_id.clone(),
123129
cluster_id: String::new(),
124130
warehouse_id: String::new(),
125131
runtime_node_group: None,

src/meta/types/tests/it/cluster.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {
3030
cluster_id: "".to_string(),
3131
warehouse_id: "".to_string(),
3232
runtime_node_group: None,
33+
cache_id: "".to_string(),
3334
};
3435

3536
let (ip, port) = n.ip_port()?;
@@ -56,6 +57,7 @@ fn test_serde_node_info() {
5657
cluster_id: String::new(),
5758
warehouse_id: String::new(),
5859
runtime_node_group: None,
60+
cache_id: "test_id".to_string(),
5961
};
6062

6163
let json_str = serde_json::to_string(&info).unwrap();

src/query/catalog/src/plan/partition.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::sync::Arc;
2424

2525
use databend_common_config::GlobalConfig;
2626
use databend_common_exception::Result;
27+
use databend_common_meta_types::NodeInfo;
2728
use databend_storages_common_table_meta::meta::BlockMeta;
2829
use databend_storages_common_table_meta::meta::Statistics;
2930
use parking_lot::RwLock;
@@ -126,9 +127,9 @@ impl Partitions {
126127
self.partitions.is_empty()
127128
}
128129

129-
pub fn reshuffle(&self, executors: Vec<String>) -> Result<HashMap<String, Partitions>> {
130+
pub fn reshuffle(&self, executors: Vec<Arc<NodeInfo>>) -> Result<HashMap<String, Partitions>> {
130131
let mut executors_sorted = executors;
131-
executors_sorted.sort();
132+
executors_sorted.sort_by(|left, right| left.cache_id.cmp(&right.cache_id));
132133

133134
let num_executors = executors_sorted.len();
134135
let partitions = match self.kind {
@@ -153,17 +154,17 @@ impl Partitions {
153154

154155
let mut executor_part = executors_sorted
155156
.iter()
156-
.map(|e| (e.clone(), Partitions::default()))
157+
.map(|e| (e.id.clone(), Partitions::default()))
157158
.collect::<HashMap<_, _>>();
158159

159160
let mut ring = executors_sorted
160161
.iter()
161162
.flat_map(|e| {
162163
let mut s = DefaultHasher::new();
163-
e.hash(&mut s);
164+
e.cache_id.hash(&mut s);
164165
(0..1 << scale).map(move |i| {
165166
i.hash(&mut s);
166-
(e, s.finish())
167+
(e.id.clone(), s.finish())
167168
})
168169
})
169170
.collect::<Vec<_>>();
@@ -172,16 +173,15 @@ impl Partitions {
172173

173174
for p in self.partitions.iter() {
174175
let k = p.hash();
175-
let idx = match ring.binary_search_by(|&(_, h)| h.cmp(&k)) {
176-
Err(i) => i,
177-
Ok(i) => i,
178-
};
176+
let idx = ring
177+
.binary_search_by(|&(_, h)| h.cmp(&k))
178+
.unwrap_or_else(|i| i);
179179
let executor = if idx == ring.len() {
180-
ring[0].0
180+
ring[0].0.clone()
181181
} else {
182-
ring[idx].0
182+
ring[idx].0.clone()
183183
};
184-
let part = executor_part.get_mut(executor).unwrap();
184+
let part = executor_part.get_mut(&executor).unwrap();
185185
part.partitions.push(p.clone());
186186
}
187187
return Ok(executor_part);
@@ -198,7 +198,7 @@ impl Partitions {
198198
.into_iter()
199199
.map(|executor| {
200200
(
201-
executor,
201+
executor.id.clone(),
202202
Partitions::create(PartitionsShuffleKind::Seq, self.partitions.clone()),
203203
)
204204
})
@@ -212,13 +212,13 @@ impl Partitions {
212212

213213
let local_id = &GlobalConfig::instance().query.node_id;
214214
for executor in executors_sorted.into_iter() {
215-
let parts = match &executor == local_id {
215+
let parts = match &executor.id == local_id {
216216
true => partitions.clone(),
217217
false => vec![],
218218
};
219219

220220
executor_part.insert(
221-
executor,
221+
executor.id.clone(),
222222
Partitions::create(PartitionsShuffleKind::Seq, parts),
223223
);
224224
}
@@ -246,7 +246,7 @@ impl Partitions {
246246
};
247247

248248
executor_part.insert(
249-
executor,
249+
executor.id.clone(),
250250
Partitions::create(PartitionsShuffleKind::Seq, parts),
251251
);
252252
}

src/query/catalog/tests/it/partitions.rs

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ use std::hash::Hasher;
2020
use std::io::Write;
2121
use std::sync::Arc;
2222

23+
use databend_common_base::base::GlobalUniqName;
2324
use databend_common_catalog::plan::compute_row_id_prefix;
2425
use databend_common_catalog::plan::split_prefix;
2526
use databend_common_catalog::plan::PartInfo;
2627
use databend_common_catalog::plan::PartInfoPtr;
2728
use databend_common_catalog::plan::PartInfoType;
2829
use databend_common_catalog::plan::Partitions;
2930
use databend_common_catalog::plan::PartitionsShuffleKind;
31+
use databend_common_meta_types::NodeInfo;
3032
use databend_storages_common_table_meta::meta::NUM_BLOCK_ID_BITS;
3133
use goldenfile::Mint;
3234

@@ -73,18 +75,34 @@ fn gen_parts(kind: PartitionsShuffleKind, size: usize) -> Partitions {
7375
Partitions::create(kind, parts)
7476
}
7577

78+
fn create_node(cache_id: String) -> Arc<NodeInfo> {
79+
Arc::new(NodeInfo::create(
80+
GlobalUniqName::unique(),
81+
String::new(),
82+
0,
83+
String::new(),
84+
String::new(),
85+
String::new(),
86+
String::new(),
87+
cache_id,
88+
))
89+
}
90+
7691
#[test]
7792
fn test_partition_reshuffle() {
7893
let mut mint = Mint::new("tests/it/testdata");
7994
let file = &mut mint.new_goldenfile("partition-reshuffle.txt").unwrap();
8095

8196
let executors_3 = vec![
82-
"node-1".to_string(),
83-
"node-2".to_string(),
84-
"node-3".to_string(),
97+
create_node("node-1".to_string()),
98+
create_node("node-2".to_string()),
99+
create_node("node-3".to_string()),
85100
];
86101

87-
let executors_2 = vec!["node-1".to_string(), "node-2".to_string()];
102+
let executors_2 = vec![
103+
create_node("node-1".to_string()),
104+
create_node("node-2".to_string()),
105+
];
88106

89107
// None.
90108
{
@@ -96,13 +114,13 @@ fn test_partition_reshuffle() {
96114
"PartitionsShuffleKind::Seq : 11 partitions of 3 executors"
97115
)
98116
.unwrap();
99-
let e1_parts = shuffle.get(&executors_3[0]).unwrap();
117+
let e1_parts = shuffle.get(&executors_3[0].id).unwrap();
100118
writeln!(file, "{:?}", e1_parts).unwrap();
101119

102-
let e2_parts = shuffle.get(&executors_3[1]).unwrap();
120+
let e2_parts = shuffle.get(&executors_3[1].id).unwrap();
103121
writeln!(file, "{:?}", e2_parts).unwrap();
104122

105-
let e3_parts = shuffle.get(&executors_3[2]).unwrap();
123+
let e3_parts = shuffle.get(&executors_3[2].id).unwrap();
106124
writeln!(file, "{:?}", e3_parts).unwrap();
107125
}
108126

@@ -116,13 +134,13 @@ fn test_partition_reshuffle() {
116134
"PartitionsShuffleKind::Seq : 2 partitions of 3 executors"
117135
)
118136
.unwrap();
119-
let e1_parts = shuffle.get(&executors_3[0]).unwrap();
137+
let e1_parts = shuffle.get(&executors_3[0].id).unwrap();
120138
writeln!(file, "{:?}", e1_parts).unwrap();
121139

122-
let e2_parts = shuffle.get(&executors_3[1]).unwrap();
140+
let e2_parts = shuffle.get(&executors_3[1].id).unwrap();
123141
writeln!(file, "{:?}", e2_parts).unwrap();
124142

125-
let e3_parts = shuffle.get(&executors_3[2]).unwrap();
143+
let e3_parts = shuffle.get(&executors_3[2].id).unwrap();
126144
writeln!(file, "{:?}", e3_parts).unwrap();
127145
}
128146

@@ -136,13 +154,13 @@ fn test_partition_reshuffle() {
136154
"PartitionsShuffleKind::Mod : 10 partitions of 3 executors"
137155
)
138156
.unwrap();
139-
let e1_parts = shuffle.get(&executors_3[0]).unwrap();
157+
let e1_parts = shuffle.get(&executors_3[0].id).unwrap();
140158
writeln!(file, "{:?}", e1_parts).unwrap();
141159

142-
let e2_parts = shuffle.get(&executors_3[1]).unwrap();
160+
let e2_parts = shuffle.get(&executors_3[1].id).unwrap();
143161
writeln!(file, "{:?}", e2_parts).unwrap();
144162

145-
let e3_parts = shuffle.get(&executors_3[2]).unwrap();
163+
let e3_parts = shuffle.get(&executors_3[2].id).unwrap();
146164
writeln!(file, "{:?}", e3_parts).unwrap();
147165
}
148166

@@ -156,13 +174,13 @@ fn test_partition_reshuffle() {
156174
"PartitionsShuffleKind::Mod : 11 partitions of 3 executors"
157175
)
158176
.unwrap();
159-
let e1_parts = shuffle.get(&executors_3[0]).unwrap();
177+
let e1_parts = shuffle.get(&executors_3[0].id).unwrap();
160178
writeln!(file, "{:?}", e1_parts).unwrap();
161179

162-
let e2_parts = shuffle.get(&executors_3[1]).unwrap();
180+
let e2_parts = shuffle.get(&executors_3[1].id).unwrap();
163181
writeln!(file, "{:?}", e2_parts).unwrap();
164182

165-
let e3_parts = shuffle.get(&executors_3[2]).unwrap();
183+
let e3_parts = shuffle.get(&executors_3[2].id).unwrap();
166184
writeln!(file, "{:?}", e3_parts).unwrap();
167185
}
168186

@@ -176,10 +194,10 @@ fn test_partition_reshuffle() {
176194
"PartitionsShuffleKind::Mod : 11 partitions of 2 executors"
177195
)
178196
.unwrap();
179-
let e1_parts = shuffle.get(&executors_2[0]).unwrap();
197+
let e1_parts = shuffle.get(&executors_2[0].id).unwrap();
180198
writeln!(file, "{:?}", e1_parts).unwrap();
181199

182-
let e2_parts = shuffle.get(&executors_2[1]).unwrap();
200+
let e2_parts = shuffle.get(&executors_2[1].id).unwrap();
183201
writeln!(file, "{:?}", e2_parts).unwrap();
184202
}
185203

@@ -193,10 +211,10 @@ fn test_partition_reshuffle() {
193211
"PartitionsShuffleKind::ConsistentHash : 11 partitions of 2 executors"
194212
)
195213
.unwrap();
196-
let e1_parts = shuffle.get(&executors_2[0]).unwrap();
214+
let e1_parts = shuffle.get(&executors_2[0].id).unwrap();
197215
writeln!(file, "{:?}", e1_parts).unwrap();
198216

199-
let e2_parts = shuffle.get(&executors_2[1]).unwrap();
217+
let e2_parts = shuffle.get(&executors_2[1].id).unwrap();
200218
writeln!(file, "{:?}", e2_parts).unwrap();
201219
}
202220

@@ -210,13 +228,13 @@ fn test_partition_reshuffle() {
210228
"PartitionsShuffleKind::ConsistentHash : 11 partitions of 3 executors"
211229
)
212230
.unwrap();
213-
let e1_parts = shuffle.get(&executors_3[0]).unwrap();
231+
let e1_parts = shuffle.get(&executors_3[0].id).unwrap();
214232
writeln!(file, "{:?}", e1_parts).unwrap();
215233

216-
let e2_parts = shuffle.get(&executors_3[1]).unwrap();
234+
let e2_parts = shuffle.get(&executors_3[1].id).unwrap();
217235
writeln!(file, "{:?}", e2_parts).unwrap();
218236

219-
let e3_parts = shuffle.get(&executors_3[2]).unwrap();
237+
let e3_parts = shuffle.get(&executors_3[2].id).unwrap();
220238
writeln!(file, "{:?}", e3_parts).unwrap();
221239
}
222240

@@ -230,10 +248,10 @@ fn test_partition_reshuffle() {
230248
"PartitionsShuffleKind::Rand: 11 partitions of 2 executors"
231249
)
232250
.unwrap();
233-
let e1_parts = shuffle.get(&executors_2[0]).unwrap();
251+
let e1_parts = shuffle.get(&executors_2[0].id).unwrap();
234252
writeln!(file, "{:?}", e1_parts.len()).unwrap();
235253

236-
let e2_parts = shuffle.get(&executors_2[1]).unwrap();
254+
let e2_parts = shuffle.get(&executors_2[1].id).unwrap();
237255
writeln!(file, "{:?}", e2_parts.len()).unwrap();
238256
}
239257

@@ -247,10 +265,10 @@ fn test_partition_reshuffle() {
247265
"PartitionsShuffleKind::Broadcast: 3 partitions of 2 executors"
248266
)
249267
.unwrap();
250-
let e1_parts = shuffle.get(&executors_2[0]).unwrap();
268+
let e1_parts = shuffle.get(&executors_2[0].id).unwrap();
251269
writeln!(file, "{:?}", e1_parts).unwrap();
252270

253-
let e2_parts = shuffle.get(&executors_2[1]).unwrap();
271+
let e2_parts = shuffle.get(&executors_2[1].id).unwrap();
254272
writeln!(file, "{:?}", e2_parts).unwrap();
255273
}
256274
}

src/query/management/tests/it/warehouse.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,7 @@ fn system_managed_node(id: &str) -> NodeInfo {
14641464
cluster_id: "".to_string(),
14651465
warehouse_id: "".to_string(),
14661466
runtime_node_group: None,
1467+
cache_id: id.to_string(),
14671468
}
14681469
}
14691470

@@ -1482,6 +1483,7 @@ fn self_managed_node(node_id: &str) -> NodeInfo {
14821483
cluster_id: "test-cluster-id".to_string(),
14831484
warehouse_id: "test-cluster-id".to_string(),
14841485
runtime_node_group: None,
1486+
cache_id: node_id.to_string(),
14851487
}
14861488
}
14871489

0 commit comments

Comments
 (0)