Skip to content

Commit 0dc7d6d

Browse files
committed
Add a configurable upgrade for pingora-ketama that reduces runtime cpu and memory
1 parent 26dd823 commit 0dc7d6d

File tree

11 files changed

+547
-49
lines changed

11 files changed

+547
-49
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
c21b8668856d3114acb49e2d054357369a1e6663
1+
581a224bab00d4d8a554bf59136e2b62ad54a64b

pingora-ketama/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ keywords = ["hash", "hashing", "consistent", "pingora"]
1111

1212
[dependencies]
1313
crc32fast = "1.3"
14+
i_key_sort = { version = "0.10.1", optional = true, features = ["allow_multithreading"] }
1415

1516
[dev-dependencies]
1617
criterion = "0.7"
1718
csv = "1.2"
1819
dhat = "0.3"
1920
env_logger = "0.11"
2021
log = { workspace = true }
21-
rand = "0.8"
22+
rand = "0.9.2"
2223

2324
[[bench]]
2425
name = "simple"
@@ -30,3 +31,4 @@ harness = false
3031

3132
[features]
3233
heap-prof = []
34+
v2 = ["i_key_sort"]

pingora-ketama/benches/simple.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use pingora_ketama::{Bucket, Continuum};
22

33
use criterion::{criterion_group, criterion_main, Criterion};
4-
use rand::distributions::Alphanumeric;
5-
use rand::{thread_rng, Rng};
4+
use rand::{
5+
distr::{Alphanumeric, SampleString},
6+
rng,
7+
};
68

79
#[cfg(feature = "heap-prof")]
810
#[global_allocator]
@@ -19,11 +21,8 @@ fn buckets() -> Vec<Bucket> {
1921
}
2022

2123
fn random_string() -> String {
22-
thread_rng()
23-
.sample_iter(&Alphanumeric)
24-
.take(30)
25-
.map(char::from)
26-
.collect()
24+
let mut rand = rng();
25+
Alphanumeric.sample_string(&mut rand, 30)
2726
}
2827

2928
pub fn criterion_benchmark(c: &mut Criterion) {

pingora-ketama/src/lib.rs

Lines changed: 198 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,17 @@ use std::io::Write;
6262
use std::net::SocketAddr;
6363

6464
use crc32fast::Hasher;
65+
#[cfg(feature = "v2")]
66+
use i_key_sort::sort::one_key_cmp::OneKeyAndCmpSort;
67+
68+
/// This constant is copied from nginx. It will create 160 points per weight
69+
/// unit. For example, a weight of 2 will create 320 points on the ring.
70+
pub const DEFAULT_POINT_MULTIPLE: u32 = 160;
6571

6672
/// A [Bucket] represents a server for consistent hashing
6773
///
6874
/// A [Bucket] contains a [SocketAddr] to the server and a weight associated with it.
69-
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd)]
75+
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
7076
pub struct Bucket {
7177
// The node name.
7278
// TODO: UDS
@@ -94,28 +100,197 @@ impl Bucket {
94100

95101
// A point on the continuum.
96102
#[derive(Clone, Debug, Eq, PartialEq)]
97-
struct Point {
103+
struct PointV1 {
98104
// the index to the actual address
99105
node: u32,
100106
hash: u32,
101107
}
102108

103109
// We only want to compare the hash when sorting, so we implement these traits by hand.
104-
impl Ord for Point {
110+
impl Ord for PointV1 {
105111
fn cmp(&self, other: &Self) -> Ordering {
106112
self.hash.cmp(&other.hash)
107113
}
108114
}
109115

110-
impl PartialOrd for Point {
116+
impl PartialOrd for PointV1 {
111117
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
112118
Some(self.cmp(other))
113119
}
114120
}
115121

116-
impl Point {
122+
impl PointV1 {
117123
fn new(node: u32, hash: u32) -> Self {
118-
Point { node, hash }
124+
PointV1 { node, hash }
125+
}
126+
}
127+
128+
/// A point on the continuum.
129+
///
130+
/// We are trying to save memory here, so this struct is equivalent to a struct
131+
/// this this definition, but doesn't require using the "untrustworthy" compact
132+
/// repr. This does mean we have to do the memory layout manually though, but
133+
/// the benchmarks show there is no performance hit for it.
134+
///
135+
/// #[repr(Rust, packed)]
136+
/// struct Point {
137+
/// node: u16,
138+
/// hash: u32,
139+
/// }
140+
#[cfg(feature = "v2")]
141+
#[derive(Copy, Clone, Eq, PartialEq)]
142+
#[repr(transparent)]
143+
struct PointV2([u8; 6]);
144+
145+
#[cfg(feature = "v2")]
146+
impl PointV2 {
147+
fn new(node: u16, hash: u32) -> Self {
148+
let mut this = [0; 6];
149+
150+
this[0..4].copy_from_slice(&hash.to_ne_bytes());
151+
this[4..6].copy_from_slice(&node.to_ne_bytes());
152+
153+
Self(this)
154+
}
155+
156+
/// Return the hash of the point which is stored in the first 4 bytes (big endian).
157+
fn hash(&self) -> u32 {
158+
u32::from_ne_bytes(self.0[0..4].try_into().expect("There are exactly 4 bytes"))
159+
}
160+
161+
/// Return the node of the point which is stored in the last 2 bytes (big endian).
162+
fn node(&self) -> u16 {
163+
u16::from_ne_bytes(self.0[4..6].try_into().expect("There are exactly 2 bytes"))
164+
}
165+
}
166+
167+
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
168+
pub enum Version {
169+
#[default]
170+
V1,
171+
#[cfg(feature = "v2")]
172+
V2 { point_multiple: u32 },
173+
}
174+
175+
impl Version {
176+
fn point_multiple(&self) -> u32 {
177+
match self {
178+
Version::V1 => DEFAULT_POINT_MULTIPLE,
179+
#[cfg(feature = "v2")]
180+
Version::V2 { point_multiple } => *point_multiple,
181+
}
182+
}
183+
}
184+
185+
enum RingBuilder {
186+
V1(Vec<PointV1>),
187+
#[cfg(feature = "v2")]
188+
V2(Vec<PointV2>),
189+
}
190+
191+
impl RingBuilder {
192+
fn new(version: Version, total_weight: u32) -> Self {
193+
match version {
194+
Version::V1 => RingBuilder::V1(Vec::with_capacity(
195+
(total_weight * DEFAULT_POINT_MULTIPLE) as usize,
196+
)),
197+
#[cfg(feature = "v2")]
198+
Version::V2 { point_multiple } => {
199+
RingBuilder::V2(Vec::with_capacity((total_weight * point_multiple) as usize))
200+
}
201+
}
202+
}
203+
204+
fn push(&mut self, node: u16, hash: u32) {
205+
match self {
206+
RingBuilder::V1(ring) => {
207+
ring.push(PointV1::new(node as u32, hash));
208+
}
209+
#[cfg(feature = "v2")]
210+
RingBuilder::V2(ring) => {
211+
ring.push(PointV2::new(node, hash));
212+
}
213+
}
214+
}
215+
216+
#[allow(unused)]
217+
fn sort(&mut self, addresses: &[SocketAddr]) {
218+
match self {
219+
RingBuilder::V1(ring) => {
220+
// Sort and remove any duplicates.
221+
ring.sort_unstable();
222+
ring.dedup_by(|a, b| a.hash == b.hash);
223+
}
224+
#[cfg(feature = "v2")]
225+
RingBuilder::V2(ring) => {
226+
ring.sort_by_one_key_then_by(
227+
true,
228+
|p| p.hash(),
229+
|p1, p2| addresses[p1.node() as usize].cmp(&addresses[p2.node() as usize]),
230+
);
231+
232+
//secondary_radix_sort(ring, |p| p.hash(), |p| addresses[p.node() as usize]);
233+
ring.dedup_by(|a, b| a.0[0..4] == b.0[0..4]);
234+
}
235+
}
236+
}
237+
}
238+
239+
impl From<RingBuilder> for VersionedRing {
240+
fn from(ring: RingBuilder) -> Self {
241+
match ring {
242+
RingBuilder::V1(ring) => VersionedRing::V1(ring.into_boxed_slice()),
243+
#[cfg(feature = "v2")]
244+
RingBuilder::V2(ring) => VersionedRing::V2(ring.into_boxed_slice()),
245+
}
246+
}
247+
}
248+
249+
enum VersionedRing {
250+
V1(Box<[PointV1]>),
251+
#[cfg(feature = "v2")]
252+
V2(Box<[PointV2]>),
253+
}
254+
255+
impl VersionedRing {
256+
/// Find the associated index for the given input.
257+
pub fn node_idx(&self, hash: u32) -> usize {
258+
// The `Result` returned here is either a match or the error variant
259+
// returns where the value would be inserted.
260+
let search_result = match self {
261+
VersionedRing::V1(ring) => ring.binary_search_by(|p| p.hash.cmp(&hash)),
262+
#[cfg(feature = "v2")]
263+
VersionedRing::V2(ring) => ring.binary_search_by(|p| p.hash().cmp(&hash)),
264+
};
265+
266+
match search_result {
267+
Ok(i) => i,
268+
Err(i) => {
269+
// We wrap around to the front if this value would be
270+
// inserted at the end.
271+
if i == self.len() {
272+
0
273+
} else {
274+
i
275+
}
276+
}
277+
}
278+
}
279+
280+
pub fn get(&self, index: usize) -> Option<usize> {
281+
match self {
282+
VersionedRing::V1(ring) => ring.get(index).map(|p| p.node as usize),
283+
#[cfg(feature = "v2")]
284+
VersionedRing::V2(ring) => ring.get(index).map(|p| p.node() as usize),
285+
}
286+
}
287+
288+
pub fn len(&self) -> usize {
289+
match self {
290+
VersionedRing::V1(ring) => ring.len(),
291+
#[cfg(feature = "v2")]
292+
VersionedRing::V2(ring) => ring.len(),
293+
}
119294
}
120295
}
121296

@@ -124,27 +299,27 @@ impl Point {
124299
/// A [Continuum] represents a ring of buckets where a node is associated with various points on
125300
/// the ring.
126301
pub struct Continuum {
127-
ring: Box<[Point]>,
302+
ring: VersionedRing,
128303
addrs: Box<[SocketAddr]>,
129304
}
130305

131306
impl Continuum {
132-
/// Create a new [Continuum] with the given list of buckets.
133307
pub fn new(buckets: &[Bucket]) -> Self {
134-
// This constant is copied from nginx. It will create 160 points per weight unit. For
135-
// example, a weight of 2 will create 320 points on the ring.
136-
const POINT_MULTIPLE: u32 = 160;
308+
Self::new_with_version(buckets, Version::default())
309+
}
137310

311+
/// Create a new [Continuum] with the given list of buckets.
312+
pub fn new_with_version(buckets: &[Bucket], version: Version) -> Self {
138313
if buckets.is_empty() {
139314
return Continuum {
140-
ring: Box::new([]),
315+
ring: VersionedRing::V1(Box::new([])),
141316
addrs: Box::new([]),
142317
};
143318
}
144319

145320
// The total weight is multiplied by the factor of points to create many points per node.
146321
let total_weight: u32 = buckets.iter().fold(0, |sum, b| sum + b.weight);
147-
let mut ring = Vec::with_capacity((total_weight * POINT_MULTIPLE) as usize);
322+
let mut ring = RingBuilder::new(version, total_weight);
148323
let mut addrs = Vec::with_capacity(buckets.len());
149324

150325
for bucket in buckets {
@@ -165,7 +340,7 @@ impl Continuum {
165340
hasher.update(hash_bytes.as_ref());
166341

167342
// A higher weight will add more points for this node.
168-
let num_points = bucket.weight * POINT_MULTIPLE;
343+
let num_points = bucket.weight * version.point_multiple();
169344

170345
// This is appended to the crc32 hash for each point.
171346
let mut prev_hash: u32 = 0;
@@ -176,45 +351,33 @@ impl Continuum {
176351
hasher.update(&prev_hash.to_le_bytes());
177352

178353
let hash = hasher.finalize();
179-
ring.push(Point::new(node as u32, hash));
354+
ring.push(node as u16, hash);
180355
prev_hash = hash;
181356
}
182357
}
183358

359+
let addrs = addrs.into_boxed_slice();
360+
184361
// Sort and remove any duplicates.
185-
ring.sort_unstable();
186-
ring.dedup_by(|a, b| a.hash == b.hash);
362+
ring.sort(&addrs);
187363

188364
Continuum {
189-
ring: ring.into_boxed_slice(),
190-
addrs: addrs.into_boxed_slice(),
365+
ring: ring.into(),
366+
addrs,
191367
}
192368
}
193369

194370
/// Find the associated index for the given input.
195371
pub fn node_idx(&self, input: &[u8]) -> usize {
196372
let hash = crc32fast::hash(input);
197-
198-
// The `Result` returned here is either a match or the error variant returns where the
199-
// value would be inserted.
200-
match self.ring.binary_search_by(|p| p.hash.cmp(&hash)) {
201-
Ok(i) => i,
202-
Err(i) => {
203-
// We wrap around to the front if this value would be inserted at the end.
204-
if i == self.ring.len() {
205-
0
206-
} else {
207-
i
208-
}
209-
}
210-
}
373+
self.ring.node_idx(hash)
211374
}
212375

213376
/// Hash the given `hash_key` to the server address.
214377
pub fn node(&self, hash_key: &[u8]) -> Option<SocketAddr> {
215378
self.ring
216379
.get(self.node_idx(hash_key)) // should we unwrap here?
217-
.map(|p| self.addrs[p.node as usize])
380+
.map(|n| self.addrs[n])
218381
}
219382

220383
/// Get an iterator of nodes starting at the original hashed node of the `hash_key`.
@@ -234,7 +397,7 @@ impl Continuum {
234397
// only update idx for non-empty ring otherwise we will panic on modulo 0
235398
*idx = (*idx + 1) % self.ring.len();
236399
}
237-
point.map(|p| &self.addrs[p.node as usize])
400+
point.map(|n| &self.addrs[n])
238401
}
239402
}
240403

0 commit comments

Comments
 (0)