Skip to content

Commit 461c69f

Browse files
authored
Devloadbalance (#4)
* 添加方案 * Update security.yaml * add p2c,WRR etc &&fmt * 优化负载的策略 * fix conflict
1 parent d5cb308 commit 461c69f

File tree

2 files changed

+87
-23
lines changed

2 files changed

+87
-23
lines changed

src/adapter/volo_adapter.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type DiscoverKey = <volo::discovery::StaticDiscover as Discover>::Key;
1616

1717
struct PickerCacheEntry {
1818
picker: Arc<dyn crate::strategy::Picker>,
19+
signature: u64,
1920
}
2021

2122
/// Volo LoadBalancer Adapter
@@ -209,24 +210,28 @@ impl<S: BalanceStrategy + 'static> LoadBalance<volo::discovery::StaticDiscover>
209210
discover: &volo::discovery::StaticDiscover,
210211
) -> Result<Self::InstanceIter, LoadBalanceError> {
211212
let discover_key = discover.key(endpoint);
213+
214+
// Get instances from service discovery first to avoid stale cache
215+
let instances = discover
216+
.discover(endpoint)
217+
.await
218+
.map_err(|e| LoadBalanceError::Discover(Box::new(e)))?;
219+
220+
let signature = instances_signature(&instances);
212221
let cache_key = self.get_cache_key(endpoint, &discover_key);
213222

214-
// Check cache
223+
// Check cache with signature guard
215224
{
216225
let cache = self.picker_cache.read();
217226
if let Some(entry) = cache.get(&cache_key) {
218-
return Ok(VoloInstanceIter {
219-
picker: entry.picker.clone(),
220-
});
227+
if entry.signature == signature {
228+
return Ok(VoloInstanceIter {
229+
picker: entry.picker.clone(),
230+
});
231+
}
221232
}
222233
}
223234

224-
// Get instances from service discovery
225-
let instances = discover
226-
.discover(endpoint)
227-
.await
228-
.map_err(|e| LoadBalanceError::Discover(Box::new(e)))?;
229-
230235
if instances.is_empty() {
231236
// When no available instances are found, return a custom error
232237
return Err(LoadBalanceError::from(Box::<
@@ -250,6 +255,7 @@ impl<S: BalanceStrategy + 'static> LoadBalance<volo::discovery::StaticDiscover>
250255
cache_key.clone(),
251256
PickerCacheEntry {
252257
picker: picker.clone(),
258+
signature,
253259
},
254260
);
255261
}
@@ -309,3 +315,20 @@ pub fn response_time_weighted() -> VoloLoadBalancer<crate::strategy::ResponseTim
309315
pub fn consistent_hash() -> VoloLoadBalancer<crate::strategy::ConsistentHash> {
310316
VoloLoadBalancer::new(crate::strategy::ConsistentHash::default())
311317
}
318+
319+
fn instances_signature(instances: &[Arc<Instance>]) -> u64 {
320+
let mut h = AHasher::default();
321+
for inst in instances {
322+
format!("{:?}", inst.address).hash(&mut h);
323+
inst.weight.hash(&mut h);
324+
if !inst.tags.is_empty() {
325+
let mut tags: Vec<_> = inst.tags.iter().collect();
326+
tags.sort_by(|a, b| a.0.cmp(b.0).then_with(|| a.1.cmp(b.1)));
327+
for (k, v) in tags {
328+
k.hash(&mut h);
329+
v.hash(&mut h);
330+
}
331+
}
332+
}
333+
h.finish()
334+
}

src/strategy.rs

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ impl Picker for LeastConnPicker {
317317
/// - Weighted selection based on node's recent response time (RTT)
318318
/// - Smaller RTT means higher weight
319319
/// - Also considers current load (in_flight)
320-
/// - Performance optimization: scans nodes once to find the highest score
320+
/// - Performance optimization: single-pass scan to find the highest score (O(n))
321321
#[derive(Clone, Debug)]
322322
pub struct ResponseTimeWeighted;
323323

@@ -337,17 +337,17 @@ impl Picker for RTWeightedPicker {
337337
if len == 0 {
338338
return Err(LoadBalanceError::NoAvailableNodes);
339339
}
340-
if len == 1 {
341-
return Ok(self.nodes[0].clone());
342-
}
343340

344-
let mut best_node = self.nodes[0].clone();
345-
let mut best_score = score(&self.nodes[0]);
341+
// Single pass O(n) selection; avoids allocation + sort on every pick
342+
let mut iter = self.nodes.iter();
343+
let first = iter.next().unwrap();
344+
let mut best_node = first.clone();
345+
let mut best_score = score(first);
346346

347-
for node in self.nodes.iter().skip(1) {
348-
let current_score = score(node);
349-
if current_score > best_score {
350-
best_score = current_score;
347+
for node in iter {
348+
let s = score(node);
349+
if s > best_score {
350+
best_score = s;
351351
best_node = node.clone();
352352
}
353353
}
@@ -402,14 +402,33 @@ impl ConsistentHashPicker {
402402
fn new(nodes: Arc<Vec<Arc<Node>>>, virtual_factor: usize) -> Self {
403403
let mut ring = Vec::new();
404404

405+
// Normalize weights to avoid exploding virtual nodes when weights are large.
406+
let weights: Vec<usize> = nodes.iter().map(|n| n.weight.max(1) as usize).collect();
407+
let gcd_w = weights
408+
.iter()
409+
.copied()
410+
.fold(
411+
0usize,
412+
|acc, w| if acc == 0 { w } else { gcd_usize(acc, w) },
413+
)
414+
.max(1);
415+
416+
// Hard cap to keep ring size reasonable while preserving relative weights.
417+
const MAX_VNODE_PER_NODE: usize = 1024;
418+
405419
// Create virtual nodes for each node
406420
for (i, node) in nodes.iter().enumerate() {
407-
let weight = node.weight.max(1) as usize; // Ensure weight is at least 1
408-
let vnode_count = weight * virtual_factor;
421+
let normalized = (weights[i] / gcd_w).max(1);
422+
let vnode_count = normalized
423+
.saturating_mul(virtual_factor)
424+
.min(MAX_VNODE_PER_NODE)
425+
.max(1);
426+
427+
let base_key = stable_node_key(node, i);
409428

410429
for j in 0..vnode_count {
411430
// Generate hash value using node address and virtual node index
412-
let key = format!("{}:{j}", node.endpoint.id);
431+
let key = format!("{base_key}#{j}");
413432
let hash = hash_str(&key);
414433
ring.push((hash, i));
415434
}
@@ -463,6 +482,28 @@ fn hash_str(s: &str) -> u64 {
463482
h.finish()
464483
}
465484

485+
fn gcd_usize(a: usize, b: usize) -> usize {
486+
if b == 0 {
487+
a
488+
} else {
489+
gcd_usize(b, a % b)
490+
}
491+
}
492+
493+
fn stable_node_key(node: &Arc<Node>, idx: usize) -> String {
494+
let addr = format_address(&node.endpoint.address);
495+
format!("id:{}|addr:{}|idx:{idx}", node.endpoint.id, addr)
496+
}
497+
498+
#[cfg(feature = "volo-adapter")]
499+
fn format_address(addr: &volo::net::Address) -> String {
500+
format!("{addr:?}")
501+
}
502+
503+
#[cfg(not(feature = "volo-adapter"))]
504+
fn format_address(addr: &String) -> String {
505+
addr.clone()
506+
}
466507
#[cfg(test)]
467508
mod tests {
468509
use super::*;

0 commit comments

Comments
 (0)