1919//! help with allocation accounting.
2020
2121use datafusion_common:: { internal_err, Result } ;
22- use std:: { cmp:: Ordering , sync:: Arc } ;
22+ use std:: hash:: { Hash , Hasher } ;
23+ use std:: { cmp:: Ordering , sync:: atomic, sync:: Arc } ;
2324
2425mod pool;
2526pub mod proxy {
@@ -146,24 +147,76 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
146147/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
147148/// a particular `MemoryConsumer`;
148149///
150+ /// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
151+ /// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
152+ /// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
153+ /// and is only guaranteed to share the name and inner properties.
154+ ///
149155/// For help with allocation accounting, see the [`proxy`] module.
150156///
151157/// [proxy]: datafusion_common::utils::proxy
152- #[ derive( Debug , PartialEq , Eq , Hash , Clone ) ]
158+ #[ derive( Debug ) ]
153159pub struct MemoryConsumer {
154160 name : String ,
155161 can_spill : bool ,
162+ id : usize ,
163+ }
164+
165+ impl PartialEq for MemoryConsumer {
166+ fn eq ( & self , other : & Self ) -> bool {
167+ let is_same_id = self . id == other. id ;
168+
169+ #[ cfg( debug_assertions) ]
170+ if is_same_id {
171+ assert_eq ! ( self . name, other. name) ;
172+ assert_eq ! ( self . can_spill, other. can_spill) ;
173+ }
174+
175+ is_same_id
176+ }
177+ }
178+
179+ impl Eq for MemoryConsumer { }
180+
181+ impl Hash for MemoryConsumer {
182+ fn hash < H : Hasher > ( & self , state : & mut H ) {
183+ self . id . hash ( state) ;
184+ self . name . hash ( state) ;
185+ self . can_spill . hash ( state) ;
186+ }
156187}
157188
158189impl MemoryConsumer {
190+ fn new_unique_id ( ) -> usize {
191+ static ID : atomic:: AtomicUsize = atomic:: AtomicUsize :: new ( 0 ) ;
192+ ID . fetch_add ( 1 , atomic:: Ordering :: Relaxed )
193+ }
194+
159195 /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
160196 pub fn new ( name : impl Into < String > ) -> Self {
161197 Self {
162198 name : name. into ( ) ,
163199 can_spill : false ,
200+ id : Self :: new_unique_id ( ) ,
201+ }
202+ }
203+
204+ /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
205+ /// which can be registered with a [`MemoryPool`],
206+ /// This new consumer is separate from the original.
207+ pub fn clone_with_new_id ( & self ) -> Self {
208+ Self {
209+ name : self . name . clone ( ) ,
210+ can_spill : self . can_spill ,
211+ id : Self :: new_unique_id ( ) ,
164212 }
165213 }
166214
215+ /// Return the unique id of this [`MemoryConsumer`]
216+ pub fn id ( & self ) -> usize {
217+ self . id
218+ }
219+
167220 /// Set whether this allocation can be spilled to disk
168221 pub fn with_can_spill ( self , can_spill : bool ) -> Self {
169222 Self { can_spill, ..self }
@@ -349,7 +402,7 @@ pub mod units {
349402 pub const KB : u64 = 1 << 10 ;
350403}
351404
352- /// Present size in human readable form
405+ /// Present size in human- readable form
353406pub fn human_readable_size ( size : usize ) -> String {
354407 use units:: * ;
355408
@@ -374,6 +427,15 @@ pub fn human_readable_size(size: usize) -> String {
374427mod tests {
375428 use super :: * ;
376429
430+ #[ test]
431+ fn test_id_uniqueness ( ) {
432+ let mut ids = std:: collections:: HashSet :: new ( ) ;
433+ for _ in 0 ..100 {
434+ let consumer = MemoryConsumer :: new ( "test" ) ;
435+ assert ! ( ids. insert( consumer. id( ) ) ) ; // Ensures unique insertion
436+ }
437+ }
438+
377439 #[ test]
378440 fn test_memory_pool_underflow ( ) {
379441 let pool = Arc :: new ( GreedyMemoryPool :: new ( 50 ) ) as _ ;
0 commit comments