Skip to content

Commit b34d32b

Browse files
authored
Allow duplicate weighted backend keys (#3319)
Allow duplicate weighted backend keys Currently, if the proxy receives two backends with the same metadata, one of the backends will get dropped because the backend metadata is used as the key in a hash map. Attempting to then randomly distribute requests to the backends can panic when selecting the now non-existent backend. This is fixed by no longer using backend metadata as a hash map key, instead generating separate IDs that are stored in a vec to retain the declared order of backends while also being used to look up the backend and associated weight independently. Validated with new unit tests exercising duplicate backend keys, as well as a few around the invariants use to store the backends. Signed-off-by: Scott Fleener <[email protected]>
1 parent db64c7a commit b34d32b

File tree

9 files changed

+570
-227
lines changed

9 files changed

+570
-227
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1564,7 +1564,6 @@ name = "linkerd-distribute"
15641564
version = "0.1.0"
15651565
dependencies = [
15661566
"ahash",
1567-
"indexmap 2.6.0",
15681567
"linkerd-stack",
15691568
"parking_lot",
15701569
"rand",

linkerd/distribute/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ publish = false
77

88
[dependencies]
99
ahash = "0.8"
10-
indexmap = "2"
1110
linkerd-stack = { path = "../stack" }
1211
parking_lot = "0.12"
1312
rand = { version = "0.8", features = ["small_rng"] }

linkerd/distribute/src/keys.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use ahash::{HashMap, HashMapExt};
2+
use rand::{
3+
distributions::{WeightedError, WeightedIndex},
4+
prelude::Distribution as _,
5+
Rng,
6+
};
7+
use std::hash::Hash;
8+
9+
/// Uniquely identifies a key/backend pair for a distribution. This allows
10+
/// backends to have the same key and still participate in request distribution.
11+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
12+
pub(crate) struct KeyId {
13+
idx: usize,
14+
}
15+
16+
#[derive(Debug)]
17+
pub struct ServiceKeys<K> {
18+
ids: Vec<KeyId>,
19+
keys: HashMap<KeyId, K>,
20+
}
21+
22+
pub type WeightedServiceKeys<K> = ServiceKeys<WeightedKey<K>>;
23+
24+
#[derive(Debug, PartialEq, Eq, Hash)]
25+
pub struct WeightedKey<K> {
26+
pub key: K,
27+
pub weight: u32,
28+
}
29+
30+
pub(crate) struct WeightedKeySelector<'a, K> {
31+
keys: &'a WeightedServiceKeys<K>,
32+
index: WeightedIndex<u32>,
33+
}
34+
35+
// === impl KeyId ===
36+
37+
impl KeyId {
38+
pub(crate) fn new(idx: usize) -> Self {
39+
Self { idx }
40+
}
41+
}
42+
43+
// === impl UnweightedKeys ===
44+
45+
// PartialEq, Eq, and Hash are all valid to implement for UnweightedKeys since
46+
// there is a defined iteration order for the keys, but it cannot be automatically
47+
// derived for HashMap fields.
48+
impl<K: PartialEq> PartialEq for ServiceKeys<K> {
49+
fn eq(&self, other: &Self) -> bool {
50+
if self.ids != other.ids {
51+
return false;
52+
}
53+
54+
for id in &self.ids {
55+
if self.keys.get(id) != other.keys.get(id) {
56+
return false;
57+
}
58+
}
59+
60+
true
61+
}
62+
}
63+
64+
impl<K: Eq> Eq for ServiceKeys<K> {}
65+
66+
impl<K: Hash> Hash for ServiceKeys<K> {
67+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
68+
self.ids.hash(state);
69+
// Normally we would also hash the length, but self.ids and
70+
// self.keys have the same length
71+
for id in &self.ids {
72+
self.keys.get(id).hash(state);
73+
}
74+
}
75+
}
76+
77+
impl<K> ServiceKeys<K> {
78+
pub(crate) fn new(iter: impl Iterator<Item = K>) -> Self {
79+
let mut ids = Vec::new();
80+
let mut keys = HashMap::new();
81+
for (idx, key) in iter.enumerate() {
82+
let id = KeyId::new(idx);
83+
ids.push(id);
84+
keys.insert(id, key);
85+
}
86+
87+
Self { ids, keys }
88+
}
89+
90+
pub(crate) fn is_empty(&self) -> bool {
91+
self.ids.is_empty()
92+
}
93+
94+
pub(crate) fn len(&self) -> usize {
95+
self.ids.len()
96+
}
97+
98+
/// Returns the key `K` associated with the given [`KeyId`].
99+
///
100+
/// The output of using a [`KeyId`] not produced by the same instance of
101+
/// [`ServiceKeys`] is unspecified, and it is likely to panic.
102+
///
103+
/// # Panics
104+
///
105+
/// This will panic if no entry is associated with the given lookup key.
106+
pub(crate) fn get(&self, id: KeyId) -> &K {
107+
self.keys
108+
.get(&id)
109+
.expect("distribution lookup keys must be valid")
110+
}
111+
112+
fn try_get_id(&self, idx: usize) -> Option<KeyId> {
113+
self.ids.get(idx).copied()
114+
}
115+
116+
pub(crate) fn iter(&self) -> impl Iterator<Item = &KeyId> {
117+
self.ids.iter()
118+
}
119+
}
120+
121+
// === impl WeightedKeys ===
122+
123+
impl<K> WeightedServiceKeys<K> {
124+
pub(crate) fn into_unweighted(self) -> ServiceKeys<K> {
125+
ServiceKeys {
126+
ids: self.ids,
127+
keys: self
128+
.keys
129+
.into_iter()
130+
.map(|(id, key)| (id, key.key))
131+
.collect(),
132+
}
133+
}
134+
135+
pub(crate) fn weighted_index(&self) -> Result<WeightedIndex<u32>, WeightedError> {
136+
WeightedIndex::new(self.ids.iter().map(|&id| self.get(id).weight))
137+
}
138+
139+
pub(crate) fn validate_weights(&self) -> Result<(), WeightedError> {
140+
self.weighted_index()?;
141+
Ok(())
142+
}
143+
144+
pub(crate) fn selector(&self) -> WeightedKeySelector<'_, K> {
145+
let index = self.weighted_index().expect("distribution must be valid");
146+
WeightedKeySelector { keys: self, index }
147+
}
148+
}
149+
150+
// === impl WeightedKeySelector ===
151+
152+
impl<K> WeightedKeySelector<'_, K> {
153+
pub(crate) fn select_weighted<R: Rng + ?Sized>(&self, rng: &mut R) -> KeyId {
154+
let idx = self.index.sample(rng);
155+
self.keys
156+
.try_get_id(idx)
157+
.expect("distrubtion must select a valid backend")
158+
}
159+
160+
pub(crate) fn disable_backend(&mut self, id: KeyId) -> Result<(), WeightedError> {
161+
self.index.update_weights(&[(id.idx, &0)])
162+
}
163+
}

linkerd/distribute/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
#![forbid(unsafe_code)]
55

66
mod cache;
7+
mod keys;
78
mod params;
89
mod service;
910
mod stack;
1011

1112
pub use self::{
1213
cache::{BackendCache, NewBackendCache},
13-
params::{Backends, Distribution, WeightedKeys},
14+
keys::WeightedServiceKeys,
15+
params::{Backends, Distribution},
1416
service::Distribute,
1517
stack::NewDistribute,
1618
};

linkerd/distribute/src/params.rs

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use crate::{
2+
keys::{ServiceKeys, WeightedKey},
3+
WeightedServiceKeys,
4+
};
15
use ahash::AHashSet;
2-
use rand::distributions::{WeightedError, WeightedIndex};
6+
use rand::distributions::WeightedError;
37
use std::{fmt::Debug, hash::Hash, sync::Arc};
48

59
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -16,17 +20,11 @@ pub enum Distribution<K> {
1620
Empty,
1721

1822
/// A distribution that uses the first available backend in an ordered list.
19-
FirstAvailable(Arc<[K]>),
23+
FirstAvailable(Arc<ServiceKeys<K>>),
2024

2125
/// A distribution that uses the first available backend when randomly
2226
/// selecting over a weighted distribution of backends.
23-
RandomAvailable(Arc<WeightedKeys<K>>),
24-
}
25-
26-
#[derive(Debug, PartialEq, Eq, Hash)]
27-
pub struct WeightedKeys<K> {
28-
keys: Vec<K>,
29-
weights: Vec<u32>,
27+
RandomAvailable(Arc<WeightedServiceKeys<K>>),
3028
}
3129

3230
// === impl Backends ===
@@ -64,46 +62,29 @@ impl<K> Default for Distribution<K> {
6462
}
6563

6664
impl<K> Distribution<K> {
67-
pub fn first_available(keys: impl IntoIterator<Item = K>) -> Self {
68-
let keys: Arc<[K]> = keys.into_iter().collect();
65+
pub fn first_available(iter: impl IntoIterator<Item = K>) -> Self {
66+
let keys = ServiceKeys::new(iter.into_iter());
6967
if keys.is_empty() {
7068
return Self::Empty;
7169
}
72-
Self::FirstAvailable(keys)
70+
71+
Self::FirstAvailable(Arc::new(keys))
7372
}
7473

7574
pub fn random_available<T: IntoIterator<Item = (K, u32)>>(
7675
iter: T,
7776
) -> Result<Self, WeightedError> {
78-
let (keys, weights): (Vec<_>, Vec<_>) = iter.into_iter().filter(|(_, w)| *w > 0).unzip();
79-
if keys.len() < 2 {
80-
return Ok(Self::first_available(keys));
81-
}
82-
// Error if the distribution is invalid.
83-
let _index = WeightedIndex::new(weights.iter().copied())?;
84-
Ok(Self::RandomAvailable(Arc::new(WeightedKeys {
85-
keys,
86-
weights,
87-
})))
88-
}
89-
90-
pub(crate) fn keys(&self) -> &[K] {
91-
match self {
92-
Self::Empty => &[],
93-
Self::FirstAvailable(keys) => keys,
94-
Self::RandomAvailable(keys) => keys.keys(),
77+
let weighted_keys = WeightedServiceKeys::new(
78+
iter.into_iter()
79+
.map(|(key, weight)| WeightedKey { key, weight }),
80+
);
81+
if weighted_keys.len() < 2 {
82+
return Ok(Self::FirstAvailable(Arc::new(
83+
weighted_keys.into_unweighted(),
84+
)));
9585
}
96-
}
97-
}
98-
99-
// === impl WeightedKeys ===
100-
101-
impl<K> WeightedKeys<K> {
102-
pub(crate) fn keys(&self) -> &[K] {
103-
&self.keys
104-
}
10586

106-
pub(crate) fn index(&self) -> WeightedIndex<u32> {
107-
WeightedIndex::new(self.weights.iter().copied()).expect("distribution must be valid")
87+
weighted_keys.validate_weights()?;
88+
Ok(Self::RandomAvailable(Arc::new(weighted_keys)))
10889
}
10990
}

0 commit comments

Comments
 (0)