14
14
//! Context traits are sealed, and thus can only be implemented by data types in the
15
15
//! core hyperactor crate.
16
16
17
+ use std:: mem:: take;
18
+ use std:: sync:: Arc ;
17
19
use std:: sync:: Mutex ;
18
20
use std:: sync:: OnceLock ;
19
21
@@ -24,12 +26,16 @@ use crate::ActorId;
24
26
use crate :: Instance ;
25
27
use crate :: PortId ;
26
28
use crate :: accum;
29
+ use crate :: accum:: ErasedCommReducer ;
30
+ use crate :: accum:: ReducerOpts ;
27
31
use crate :: accum:: ReducerSpec ;
28
32
use crate :: attrs:: Attrs ;
33
+ use crate :: config;
29
34
use crate :: data:: Serialized ;
30
35
use crate :: mailbox;
31
36
use crate :: mailbox:: MailboxSender ;
32
37
use crate :: mailbox:: MessageEnvelope ;
38
+ use crate :: time:: Alarm ;
33
39
34
40
/// A mailbox context provides a mailbox.
35
41
pub trait Mailbox : crate :: private:: Sealed + Send + Sync {
@@ -58,7 +64,12 @@ pub(crate) trait MailboxExt: Mailbox {
58
64
fn post ( & self , dest : PortId , headers : Attrs , data : Serialized ) ;
59
65
60
66
/// Split a port, using a provided reducer spec, if provided.
61
- fn split ( & self , port_id : PortId , reducer_spec : Option < ReducerSpec > ) -> anyhow:: Result < PortId > ;
67
+ fn split (
68
+ & self ,
69
+ port_id : PortId ,
70
+ reducer_spec : Option < ReducerSpec > ,
71
+ reducer_opts : Option < ReducerOpts > ,
72
+ ) -> anyhow:: Result < PortId > ;
62
73
}
63
74
64
75
// Tracks mailboxes that have emitted a `CanSend::post` warning due to
@@ -90,7 +101,12 @@ impl<T: Mailbox + Send + Sync> MailboxExt for T {
90
101
MailboxSender :: post ( self . mailbox ( ) , envelope, return_handle) ;
91
102
}
92
103
93
- fn split ( & self , port_id : PortId , reducer_spec : Option < ReducerSpec > ) -> anyhow:: Result < PortId > {
104
+ fn split (
105
+ & self ,
106
+ port_id : PortId ,
107
+ reducer_spec : Option < ReducerSpec > ,
108
+ reducer_opts : Option < ReducerOpts > ,
109
+ ) -> anyhow:: Result < PortId > {
94
110
fn post ( mailbox : & mailbox:: Mailbox , port_id : PortId , msg : Serialized ) {
95
111
mailbox:: MailboxSender :: post (
96
112
mailbox,
@@ -122,24 +138,70 @@ impl<T: Mailbox + Send + Sync> MailboxExt for T {
122
138
post ( & mailbox, port_id. clone ( ) , serialized) ;
123
139
Ok ( ( ) )
124
140
} ) ,
125
- Some ( r) => {
126
- let buffer = Mutex :: new ( mailbox:: SplitPortBuffer :: default ( ) ) ;
127
- Box :: new ( move |serialized : Serialized | {
141
+ Some ( reducer) => {
142
+ let buffer: Arc < Mutex < UpdateBuffer > > =
143
+ Arc :: new ( Mutex :: new ( UpdateBuffer :: new ( reducer) ) ) ;
144
+
145
+ let alarm = Alarm :: new ( ) ;
146
+
147
+ {
148
+ let mut sleeper = alarm. sleeper ( ) ;
149
+ let buffer = Arc :: clone ( & buffer) ;
150
+ let port_id = port_id. clone ( ) ;
151
+ let mailbox = mailbox. clone ( ) ;
152
+ tokio:: spawn ( async move {
153
+ while sleeper. sleep ( ) . await {
154
+ let mut buf = buffer. lock ( ) . unwrap ( ) ;
155
+ match buf. reduce ( ) {
156
+ None => ( ) ,
157
+ Some ( Ok ( reduced) ) => post ( & mailbox, port_id. clone ( ) , reduced) ,
158
+ // We simply ignore errors here, and let them be propagated
159
+ // later in the enqueueing function.
160
+ //
161
+ // If this is the last update, then this strategy will cause a hang.
162
+ // We should obtain a supervisor here from our send context and notify
163
+ // it.
164
+ Some ( Err ( e) ) => tracing:: error!(
165
+ "error while reducing update: {}; waiting until the next send to propagate" ,
166
+ e
167
+ ) ,
168
+ }
169
+ }
170
+ } ) ;
171
+ }
172
+
173
+ // Note: alarm is held in the closure while the port is active;
174
+ // when it is dropped, the alarm terminates, and so does the sleeper
175
+ // task.
176
+ let alarm = Mutex :: new ( alarm) ;
177
+
178
+ // Default to global configuration if not specified.
179
+ let reducer_opts = reducer_opts. unwrap_or_default ( ) ;
180
+
181
+ Box :: new ( move |update : Serialized | {
128
182
// Hold the lock until messages are sent. This is to avoid another
129
183
// invocation of this method trying to send message concurrently and
130
184
// cause messages delivered out of order.
185
+ //
186
+ // We also always acquire alarm *after* the buffer, to avoid deadlocks.
131
187
let mut buf = buffer. lock ( ) . unwrap ( ) ;
132
- if let Some ( buffered) = buf. push ( serialized) {
133
- let reduced = r. reduce_updates ( buffered) . map_err ( |( e, mut b) | {
134
- (
135
- b. pop ( )
136
- . expect ( "there should be at least one update from buffer" ) ,
137
- e,
138
- )
139
- } ) ?;
140
- post ( & mailbox, port_id. clone ( ) , reduced) ;
188
+ let was_empty = buf. is_empty ( ) ;
189
+ match buf. push ( update) {
190
+ None if was_empty => {
191
+ alarm
192
+ . lock ( )
193
+ . unwrap ( )
194
+ . arm ( reducer_opts. max_update_interval ( ) ) ;
195
+ Ok ( ( ) )
196
+ }
197
+ None => Ok ( ( ) ) ,
198
+ Some ( Ok ( reduced) ) => {
199
+ alarm. lock ( ) . unwrap ( ) . disarm ( ) ;
200
+ post ( & mailbox, port_id. clone ( ) , reduced) ;
201
+ Ok ( ( ) )
202
+ }
203
+ Some ( Err ( e) ) => Err ( ( buf. pop ( ) . unwrap ( ) , e) ) ,
141
204
}
142
- Ok ( ( ) )
143
205
} )
144
206
}
145
207
} ;
@@ -153,3 +215,52 @@ impl<T: Mailbox + Send + Sync> MailboxExt for T {
153
215
Ok ( split_port)
154
216
}
155
217
}
218
+
219
+ struct UpdateBuffer {
220
+ buffered : Vec < Serialized > ,
221
+ reducer : Box < dyn ErasedCommReducer + Send + Sync + ' static > ,
222
+ }
223
+
224
+ impl UpdateBuffer {
225
+ fn new ( reducer : Box < dyn ErasedCommReducer + Send + Sync + ' static > ) -> Self {
226
+ Self {
227
+ buffered : Vec :: new ( ) ,
228
+ reducer,
229
+ }
230
+ }
231
+
232
+ fn is_empty ( & self ) -> bool {
233
+ self . buffered . is_empty ( )
234
+ }
235
+
236
+ fn pop ( & mut self ) -> Option < Serialized > {
237
+ self . buffered . pop ( )
238
+ }
239
+
240
+ /// Push a new item to the buffer, and optionally return any items that should
241
+ /// be flushed.
242
+ fn push ( & mut self , serialized : Serialized ) -> Option < anyhow:: Result < Serialized > > {
243
+ let limit = config:: global:: get ( config:: SPLIT_MAX_BUFFER_SIZE ) ;
244
+
245
+ self . buffered . push ( serialized) ;
246
+ if self . buffered . len ( ) >= limit {
247
+ self . reduce ( )
248
+ } else {
249
+ None
250
+ }
251
+ }
252
+
253
+ fn reduce ( & mut self ) -> Option < anyhow:: Result < Serialized > > {
254
+ if self . buffered . is_empty ( ) {
255
+ None
256
+ } else {
257
+ match self . reducer . reduce_updates ( take ( & mut self . buffered ) ) {
258
+ Ok ( reduced) => Some ( Ok ( reduced) ) ,
259
+ Err ( ( e, b) ) => {
260
+ self . buffered = b;
261
+ Some ( Err ( e) )
262
+ }
263
+ }
264
+ }
265
+ }
266
+ }
0 commit comments