@@ -62,11 +62,17 @@ use std::io::Write;
6262use std:: net:: SocketAddr ;
6363
6464use crc32fast:: Hasher ;
65+ #[ cfg( feature = "v2" ) ]
66+ use i_key_sort:: sort:: one_key_cmp:: OneKeyAndCmpSort ;
67+
68+ /// This constant is copied from nginx. It will create 160 points per weight
69+ /// unit. For example, a weight of 2 will create 320 points on the ring.
70+ pub const DEFAULT_POINT_MULTIPLE : u32 = 160 ;
6571
6672/// A [Bucket] represents a server for consistent hashing
6773///
6874/// A [Bucket] contains a [SocketAddr] to the server and a weight associated with it.
69- #[ derive( Clone , Debug , Eq , PartialEq , PartialOrd ) ]
75+ #[ derive( Clone , Debug , Eq , PartialEq , PartialOrd , Ord ) ]
7076pub struct Bucket {
7177 // The node name.
7278 // TODO: UDS
@@ -94,28 +100,197 @@ impl Bucket {
94100
95101// A point on the continuum.
96102#[ derive( Clone , Debug , Eq , PartialEq ) ]
97- struct Point {
103+ struct PointV1 {
98104 // the index to the actual address
99105 node : u32 ,
100106 hash : u32 ,
101107}
102108
103109// We only want to compare the hash when sorting, so we implement these traits by hand.
104- impl Ord for Point {
110+ impl Ord for PointV1 {
105111 fn cmp ( & self , other : & Self ) -> Ordering {
106112 self . hash . cmp ( & other. hash )
107113 }
108114}
109115
110- impl PartialOrd for Point {
116+ impl PartialOrd for PointV1 {
111117 fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
112118 Some ( self . cmp ( other) )
113119 }
114120}
115121
116- impl Point {
122+ impl PointV1 {
117123 fn new ( node : u32 , hash : u32 ) -> Self {
118- Point { node, hash }
124+ PointV1 { node, hash }
125+ }
126+ }
127+
128+ /// A point on the continuum.
129+ ///
130+ /// We are trying to save memory here, so this struct is equivalent to a struct
131+ /// this this definition, but doesn't require using the "untrustworthy" compact
132+ /// repr. This does mean we have to do the memory layout manually though, but
133+ /// the benchmarks show there is no performance hit for it.
134+ ///
135+ /// #[repr(Rust, packed)]
136+ /// struct Point {
137+ /// node: u16,
138+ /// hash: u32,
139+ /// }
140+ #[ cfg( feature = "v2" ) ]
141+ #[ derive( Copy , Clone , Eq , PartialEq ) ]
142+ #[ repr( transparent) ]
143+ struct PointV2 ( [ u8 ; 6 ] ) ;
144+
145+ #[ cfg( feature = "v2" ) ]
146+ impl PointV2 {
147+ fn new ( node : u16 , hash : u32 ) -> Self {
148+ let mut this = [ 0 ; 6 ] ;
149+
150+ this[ 0 ..4 ] . copy_from_slice ( & hash. to_ne_bytes ( ) ) ;
151+ this[ 4 ..6 ] . copy_from_slice ( & node. to_ne_bytes ( ) ) ;
152+
153+ Self ( this)
154+ }
155+
156+ /// Return the hash of the point which is stored in the first 4 bytes (big endian).
157+ fn hash ( & self ) -> u32 {
158+ u32:: from_ne_bytes ( self . 0 [ 0 ..4 ] . try_into ( ) . expect ( "There are exactly 4 bytes" ) )
159+ }
160+
161+ /// Return the node of the point which is stored in the last 2 bytes (big endian).
162+ fn node ( & self ) -> u16 {
163+ u16:: from_ne_bytes ( self . 0 [ 4 ..6 ] . try_into ( ) . expect ( "There are exactly 2 bytes" ) )
164+ }
165+ }
166+
167+ #[ derive( Copy , Clone , Debug , Eq , PartialEq , Default ) ]
168+ pub enum Version {
169+ #[ default]
170+ V1 ,
171+ #[ cfg( feature = "v2" ) ]
172+ V2 { point_multiple : u32 } ,
173+ }
174+
175+ impl Version {
176+ fn point_multiple ( & self ) -> u32 {
177+ match self {
178+ Version :: V1 => DEFAULT_POINT_MULTIPLE ,
179+ #[ cfg( feature = "v2" ) ]
180+ Version :: V2 { point_multiple } => * point_multiple,
181+ }
182+ }
183+ }
184+
185+ enum RingBuilder {
186+ V1 ( Vec < PointV1 > ) ,
187+ #[ cfg( feature = "v2" ) ]
188+ V2 ( Vec < PointV2 > ) ,
189+ }
190+
191+ impl RingBuilder {
192+ fn new ( version : Version , total_weight : u32 ) -> Self {
193+ match version {
194+ Version :: V1 => RingBuilder :: V1 ( Vec :: with_capacity (
195+ ( total_weight * DEFAULT_POINT_MULTIPLE ) as usize ,
196+ ) ) ,
197+ #[ cfg( feature = "v2" ) ]
198+ Version :: V2 { point_multiple } => {
199+ RingBuilder :: V2 ( Vec :: with_capacity ( ( total_weight * point_multiple) as usize ) )
200+ }
201+ }
202+ }
203+
204+ fn push ( & mut self , node : u16 , hash : u32 ) {
205+ match self {
206+ RingBuilder :: V1 ( ring) => {
207+ ring. push ( PointV1 :: new ( node as u32 , hash) ) ;
208+ }
209+ #[ cfg( feature = "v2" ) ]
210+ RingBuilder :: V2 ( ring) => {
211+ ring. push ( PointV2 :: new ( node, hash) ) ;
212+ }
213+ }
214+ }
215+
216+ #[ allow( unused) ]
217+ fn sort ( & mut self , addresses : & [ SocketAddr ] ) {
218+ match self {
219+ RingBuilder :: V1 ( ring) => {
220+ // Sort and remove any duplicates.
221+ ring. sort_unstable ( ) ;
222+ ring. dedup_by ( |a, b| a. hash == b. hash ) ;
223+ }
224+ #[ cfg( feature = "v2" ) ]
225+ RingBuilder :: V2 ( ring) => {
226+ ring. sort_by_one_key_then_by (
227+ true ,
228+ |p| p. hash ( ) ,
229+ |p1, p2| addresses[ p1. node ( ) as usize ] . cmp ( & addresses[ p2. node ( ) as usize ] ) ,
230+ ) ;
231+
232+ //secondary_radix_sort(ring, |p| p.hash(), |p| addresses[p.node() as usize]);
233+ ring. dedup_by ( |a, b| a. 0 [ 0 ..4 ] == b. 0 [ 0 ..4 ] ) ;
234+ }
235+ }
236+ }
237+ }
238+
239+ impl From < RingBuilder > for VersionedRing {
240+ fn from ( ring : RingBuilder ) -> Self {
241+ match ring {
242+ RingBuilder :: V1 ( ring) => VersionedRing :: V1 ( ring. into_boxed_slice ( ) ) ,
243+ #[ cfg( feature = "v2" ) ]
244+ RingBuilder :: V2 ( ring) => VersionedRing :: V2 ( ring. into_boxed_slice ( ) ) ,
245+ }
246+ }
247+ }
248+
249+ enum VersionedRing {
250+ V1 ( Box < [ PointV1 ] > ) ,
251+ #[ cfg( feature = "v2" ) ]
252+ V2 ( Box < [ PointV2 ] > ) ,
253+ }
254+
255+ impl VersionedRing {
256+ /// Find the associated index for the given input.
257+ pub fn node_idx ( & self , hash : u32 ) -> usize {
258+ // The `Result` returned here is either a match or the error variant
259+ // returns where the value would be inserted.
260+ let search_result = match self {
261+ VersionedRing :: V1 ( ring) => ring. binary_search_by ( |p| p. hash . cmp ( & hash) ) ,
262+ #[ cfg( feature = "v2" ) ]
263+ VersionedRing :: V2 ( ring) => ring. binary_search_by ( |p| p. hash ( ) . cmp ( & hash) ) ,
264+ } ;
265+
266+ match search_result {
267+ Ok ( i) => i,
268+ Err ( i) => {
269+ // We wrap around to the front if this value would be
270+ // inserted at the end.
271+ if i == self . len ( ) {
272+ 0
273+ } else {
274+ i
275+ }
276+ }
277+ }
278+ }
279+
280+ pub fn get ( & self , index : usize ) -> Option < usize > {
281+ match self {
282+ VersionedRing :: V1 ( ring) => ring. get ( index) . map ( |p| p. node as usize ) ,
283+ #[ cfg( feature = "v2" ) ]
284+ VersionedRing :: V2 ( ring) => ring. get ( index) . map ( |p| p. node ( ) as usize ) ,
285+ }
286+ }
287+
288+ pub fn len ( & self ) -> usize {
289+ match self {
290+ VersionedRing :: V1 ( ring) => ring. len ( ) ,
291+ #[ cfg( feature = "v2" ) ]
292+ VersionedRing :: V2 ( ring) => ring. len ( ) ,
293+ }
119294 }
120295}
121296
@@ -124,27 +299,27 @@ impl Point {
124299/// A [Continuum] represents a ring of buckets where a node is associated with various points on
125300/// the ring.
126301pub struct Continuum {
127- ring : Box < [ Point ] > ,
302+ ring : VersionedRing ,
128303 addrs : Box < [ SocketAddr ] > ,
129304}
130305
131306impl Continuum {
132- /// Create a new [Continuum] with the given list of buckets.
133307 pub fn new ( buckets : & [ Bucket ] ) -> Self {
134- // This constant is copied from nginx. It will create 160 points per weight unit. For
135- // example, a weight of 2 will create 320 points on the ring.
136- const POINT_MULTIPLE : u32 = 160 ;
308+ Self :: new_with_version ( buckets, Version :: default ( ) )
309+ }
137310
311+ /// Create a new [Continuum] with the given list of buckets.
312+ pub fn new_with_version ( buckets : & [ Bucket ] , version : Version ) -> Self {
138313 if buckets. is_empty ( ) {
139314 return Continuum {
140- ring : Box :: new ( [ ] ) ,
315+ ring : VersionedRing :: V1 ( Box :: new ( [ ] ) ) ,
141316 addrs : Box :: new ( [ ] ) ,
142317 } ;
143318 }
144319
145320 // The total weight is multiplied by the factor of points to create many points per node.
146321 let total_weight: u32 = buckets. iter ( ) . fold ( 0 , |sum, b| sum + b. weight ) ;
147- let mut ring = Vec :: with_capacity ( ( total_weight * POINT_MULTIPLE ) as usize ) ;
322+ let mut ring = RingBuilder :: new ( version , total_weight ) ;
148323 let mut addrs = Vec :: with_capacity ( buckets. len ( ) ) ;
149324
150325 for bucket in buckets {
@@ -165,7 +340,7 @@ impl Continuum {
165340 hasher. update ( hash_bytes. as_ref ( ) ) ;
166341
167342 // A higher weight will add more points for this node.
168- let num_points = bucket. weight * POINT_MULTIPLE ;
343+ let num_points = bucket. weight * version . point_multiple ( ) ;
169344
170345 // This is appended to the crc32 hash for each point.
171346 let mut prev_hash: u32 = 0 ;
@@ -176,45 +351,33 @@ impl Continuum {
176351 hasher. update ( & prev_hash. to_le_bytes ( ) ) ;
177352
178353 let hash = hasher. finalize ( ) ;
179- ring. push ( Point :: new ( node as u32 , hash) ) ;
354+ ring. push ( node as u16 , hash) ;
180355 prev_hash = hash;
181356 }
182357 }
183358
359+ let addrs = addrs. into_boxed_slice ( ) ;
360+
184361 // Sort and remove any duplicates.
185- ring. sort_unstable ( ) ;
186- ring. dedup_by ( |a, b| a. hash == b. hash ) ;
362+ ring. sort ( & addrs) ;
187363
188364 Continuum {
189- ring : ring. into_boxed_slice ( ) ,
190- addrs : addrs . into_boxed_slice ( ) ,
365+ ring : ring. into ( ) ,
366+ addrs,
191367 }
192368 }
193369
194370 /// Find the associated index for the given input.
195371 pub fn node_idx ( & self , input : & [ u8 ] ) -> usize {
196372 let hash = crc32fast:: hash ( input) ;
197-
198- // The `Result` returned here is either a match or the error variant returns where the
199- // value would be inserted.
200- match self . ring . binary_search_by ( |p| p. hash . cmp ( & hash) ) {
201- Ok ( i) => i,
202- Err ( i) => {
203- // We wrap around to the front if this value would be inserted at the end.
204- if i == self . ring . len ( ) {
205- 0
206- } else {
207- i
208- }
209- }
210- }
373+ self . ring . node_idx ( hash)
211374 }
212375
213376 /// Hash the given `hash_key` to the server address.
214377 pub fn node ( & self , hash_key : & [ u8 ] ) -> Option < SocketAddr > {
215378 self . ring
216379 . get ( self . node_idx ( hash_key) ) // should we unwrap here?
217- . map ( |p | self . addrs [ p . node as usize ] )
380+ . map ( |n | self . addrs [ n ] )
218381 }
219382
220383 /// Get an iterator of nodes starting at the original hashed node of the `hash_key`.
@@ -234,7 +397,7 @@ impl Continuum {
234397 // only update idx for non-empty ring otherwise we will panic on modulo 0
235398 * idx = ( * idx + 1 ) % self . ring . len ( ) ;
236399 }
237- point. map ( |p | & self . addrs [ p . node as usize ] )
400+ point. map ( |n | & self . addrs [ n ] )
238401 }
239402}
240403
0 commit comments