10
10
//! in hyperactor meshes.
11
11
12
12
use core:: slice:: GetDisjointMutIndex as _;
13
+ use std:: collections:: HashMap ;
14
+ use std:: fmt;
13
15
use std:: fmt:: Debug ;
14
16
use std:: hash:: Hash ;
15
17
use std:: marker:: PhantomData ;
16
18
use std:: mem:: replace;
17
19
use std:: mem:: take;
20
+ use std:: ops:: Deref ;
21
+ use std:: ops:: DerefMut ;
18
22
use std:: ops:: Range ;
23
+ use std:: time:: Duration ;
19
24
20
25
use enum_as_inner:: EnumAsInner ;
21
26
use hyperactor:: Bind ;
@@ -30,6 +35,8 @@ use hyperactor::accum::Accumulator;
30
35
use hyperactor:: accum:: CommReducer ;
31
36
use hyperactor:: accum:: ReducerFactory ;
32
37
use hyperactor:: accum:: ReducerSpec ;
38
+ use hyperactor:: mailbox:: MailboxError ;
39
+ use hyperactor:: mailbox:: PortReceiver ;
33
40
use hyperactor:: message:: Bind ;
34
41
use hyperactor:: message:: Bindings ;
35
42
use hyperactor:: message:: Unbind ;
@@ -50,7 +57,8 @@ use crate::v1::Name;
50
57
PartialEq ,
51
58
Eq ,
52
59
Hash ,
53
- EnumAsInner
60
+ EnumAsInner ,
61
+ strum:: Display
54
62
) ]
55
63
pub enum Status {
56
64
/// The resource does not exist.
@@ -65,12 +73,17 @@ pub enum Status {
65
73
Stopped ,
66
74
/// The resource has failed, with an error message.
67
75
Failed ( String ) ,
76
+ /// The resource has been declared failed after a timeout.
77
+ Timeout ( Duration ) ,
68
78
}
69
79
70
80
impl Status {
71
81
/// Returns whether the status is a terminating status.
72
82
pub fn is_terminating ( & self ) -> bool {
73
- matches ! ( self , Status :: Stopping | Status :: Stopped | Status :: Failed ( _) )
83
+ matches ! (
84
+ self ,
85
+ Status :: Stopping | Status :: Stopped | Status :: Failed ( _) | Status :: Timeout ( _)
86
+ )
74
87
}
75
88
}
76
89
@@ -128,6 +141,34 @@ pub struct GetRankStatus {
128
141
pub reply : PortRef < RankedValues < Status > > ,
129
142
}
130
143
144
+ impl GetRankStatus {
145
+ pub async fn wait (
146
+ mut rx : PortReceiver < RankedValues < Status > > ,
147
+ num_ranks : usize ,
148
+ max_idle_time : Duration ,
149
+ ) -> Result < RankedValues < Status > , RankedValues < Status > > {
150
+ let mut alarm = hyperactor:: time:: Alarm :: new ( ) ;
151
+ alarm. arm ( max_idle_time) ;
152
+ let mut statuses = RankedValues :: default ( ) ;
153
+ loop {
154
+ let mut sleeper = alarm. sleeper ( ) ;
155
+ tokio:: select! {
156
+ _ = sleeper. sleep( ) => return Err ( statuses) ,
157
+ new_statuses = rx. recv( ) => {
158
+ match new_statuses {
159
+ Ok ( new_statuses) => statuses = new_statuses,
160
+ Err ( _) => return Err ( statuses) ,
161
+ }
162
+ }
163
+ }
164
+ alarm. arm ( max_idle_time) ;
165
+ if statuses. rank ( num_ranks) == num_ranks {
166
+ break Ok ( statuses) ;
167
+ }
168
+ }
169
+ }
170
+ }
171
+
131
172
/// The state of a resource.
132
173
#[ derive( Clone , Debug , Serialize , Deserialize , Named , PartialEq , Eq ) ]
133
174
pub struct State < S > {
@@ -214,6 +255,14 @@ pub struct RankedValues<T> {
214
255
intervals : Vec < ( Range < usize > , T ) > ,
215
256
}
216
257
258
+ impl < T : PartialEq > PartialEq for RankedValues < T > {
259
+ fn eq ( & self , other : & Self ) -> bool {
260
+ self . intervals == other. intervals
261
+ }
262
+ }
263
+
264
+ impl < T : Eq > Eq for RankedValues < T > { }
265
+
217
266
impl < T > Default for RankedValues < T > {
218
267
fn default ( ) -> Self {
219
268
Self {
@@ -238,6 +287,28 @@ impl<T> RankedValues<T> {
238
287
}
239
288
}
240
289
290
+ impl < T : Clone > RankedValues < T > {
291
+ pub fn materialized_iter ( & self , until : usize ) -> impl Iterator < Item = & T > + ' _ {
292
+ assert_eq ! ( self . rank( until) , until, "insufficient rank" ) ;
293
+ self . iter ( )
294
+ . flat_map ( |( range, value) | std:: iter:: repeat ( value) . take ( range. end - range. start ) )
295
+ }
296
+ }
297
+
298
+ impl < T : Hash + Eq + Clone > RankedValues < T > {
299
+ /// Invert this ranked values into a [`ValuesByRank<T>`].
300
+ pub fn invert ( & self ) -> ValuesByRank < T > {
301
+ let mut inverted: HashMap < T , Vec < Range < usize > > > = HashMap :: new ( ) ;
302
+ for ( range, value) in self . iter ( ) {
303
+ inverted
304
+ . entry ( value. clone ( ) )
305
+ . or_default ( )
306
+ . push ( range. clone ( ) ) ;
307
+ }
308
+ ValuesByRank { values : inverted }
309
+ }
310
+ }
311
+
241
312
impl < T : Eq + Clone > RankedValues < T > {
242
313
/// Merge `other` into this set of ranked values. Values in `other` that overlap
243
314
/// with `self` take prededence.
@@ -298,6 +369,11 @@ impl<T: Eq + Clone> RankedValues<T> {
298
369
}
299
370
}
300
371
372
+ /// Merge the contents of this RankedValues into another RankedValues.
373
+ pub fn merge_into ( self , other : & mut Self ) {
374
+ other. merge_from ( self ) ;
375
+ }
376
+
301
377
fn append ( & mut self , range : Range < usize > , value : T ) {
302
378
if let Some ( last) = self . intervals . last_mut ( )
303
379
&& last. 0 . end == range. start
@@ -310,6 +386,15 @@ impl<T: Eq + Clone> RankedValues<T> {
310
386
}
311
387
}
312
388
389
+ impl RankedValues < Status > {
390
+ pub fn first_terminating ( & self ) -> Option < ( usize , Status ) > {
391
+ self . intervals
392
+ . iter ( )
393
+ . find ( |( _, status) | status. is_terminating ( ) )
394
+ . map ( |( range, status) | ( range. start , status. clone ( ) ) )
395
+ }
396
+ }
397
+
313
398
impl < T > From < ( usize , T ) > for RankedValues < T > {
314
399
fn from ( ( rank, value) : ( usize , T ) ) -> Self {
315
400
Self {
@@ -318,6 +403,67 @@ impl<T> From<(usize, T)> for RankedValues<T> {
318
403
}
319
404
}
320
405
406
+ impl < T > From < ( Range < usize > , T ) > for RankedValues < T > {
407
+ fn from ( ( range, value) : ( Range < usize > , T ) ) -> Self {
408
+ Self {
409
+ intervals : vec ! [ ( range, value) ] ,
410
+ }
411
+ }
412
+ }
413
+
414
+ /// An inverted index of RankedValues, providing all ranks for
415
+ /// which each unique T-typed value appears.
416
+ #[ derive( Clone , Debug ) ]
417
+ pub struct ValuesByRank < T > {
418
+ values : HashMap < T , Vec < Range < usize > > > ,
419
+ }
420
+
421
+ impl < T : Eq + Hash > PartialEq for ValuesByRank < T > {
422
+ fn eq ( & self , other : & Self ) -> bool {
423
+ self . values == other. values
424
+ }
425
+ }
426
+
427
+ impl < T : Eq + Hash > Eq for ValuesByRank < T > { }
428
+
429
+ impl < T : fmt:: Display > fmt:: Display for ValuesByRank < T > {
430
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
431
+ let mut first_value = true ;
432
+ for ( value, ranges) in self . iter ( ) {
433
+ if first_value {
434
+ first_value = false ;
435
+ } else {
436
+ write ! ( f, ";" ) ?;
437
+ }
438
+ write ! ( f, "{}=" , value) ?;
439
+ let mut first_range = true ;
440
+ for range in ranges. iter ( ) {
441
+ if first_range {
442
+ first_range = false ;
443
+ } else {
444
+ write ! ( f, "," ) ?;
445
+ }
446
+ write ! ( f, "{}..{}" , range. start, range. end) ?;
447
+ }
448
+ }
449
+ Ok ( ( ) )
450
+ }
451
+ }
452
+
453
+ impl < T > Deref for ValuesByRank < T > {
454
+ type Target = HashMap < T , Vec < Range < usize > > > ;
455
+
456
+ fn deref ( & self ) -> & Self :: Target {
457
+ & self . values
458
+ }
459
+ }
460
+
461
+ impl < T > DerefMut for ValuesByRank < T > {
462
+ fn deref_mut ( & mut self ) -> & mut Self :: Target {
463
+ & mut self . values
464
+ }
465
+ }
466
+
321
467
/// Enabled for test only because we have to guarantee that the input
322
468
/// iterator is well-formed.
323
469
#[ cfg( test) ]
@@ -425,4 +571,36 @@ mod tests {
425
571
assert_eq ! ( left. rank( 70 ) , 62 ) ;
426
572
assert_eq ! ( left. rank( 100 ) , 62 ) ;
427
573
}
574
+
575
+ #[ test]
576
+ fn test_equality ( ) {
577
+ assert_eq ! (
578
+ RankedValues :: from( ( 0 ..10 , 123 ) ) ,
579
+ RankedValues :: from( ( 0 ..10 , 123 ) )
580
+ ) ;
581
+ assert_eq ! (
582
+ RankedValues :: from( ( 0 ..10 , Status :: Failed ( "foo" . to_string( ) ) ) ) ,
583
+ RankedValues :: from( ( 0 ..10 , Status :: Failed ( "foo" . to_string( ) ) ) ) ,
584
+ ) ;
585
+ }
586
+
587
+ #[ test]
588
+ fn test_default_through_merging ( ) {
589
+ let values: RankedValues < usize > =
590
+ [ ( 0 ..10 , 1 ) , ( 15 ..20 , 1 ) , ( 30 ..50 , 1 ) ] . into_iter ( ) . collect ( ) ;
591
+
592
+ let mut default = RankedValues :: from ( ( 0 ..50 , 0 ) ) ;
593
+ default. merge_from ( values) ;
594
+
595
+ assert_eq ! (
596
+ default . iter( ) . cloned( ) . collect:: <Vec <_>>( ) ,
597
+ vec![
598
+ ( 0 ..10 , 1 ) ,
599
+ ( 10 ..15 , 0 ) ,
600
+ ( 15 ..20 , 1 ) ,
601
+ ( 20 ..30 , 0 ) ,
602
+ ( 30 ..50 , 1 )
603
+ ]
604
+ ) ;
605
+ }
428
606
}
0 commit comments