@@ -1085,7 +1085,7 @@ mod tests {
1085
1085
1086
1086
// Returning the sequence number assigned to the message.
1087
1087
#[ derive( Debug ) ]
1088
- #[ hyperactor:: export( handlers = [ String ] ) ]
1088
+ #[ hyperactor:: export( handlers = [ String , Callback ] ) ]
1089
1089
struct GetSeqActor ( PortRef < ( String , SeqInfo ) > ) ;
1090
1090
1091
1091
#[ async_trait]
@@ -1111,6 +1111,28 @@ mod tests {
1111
1111
}
1112
1112
}
1113
1113
1114
+ // Unlike Handler<String>, where the sender provides the string message
1115
+ // directly, in Hanlder<Callback>, sender needs to provide a port, and
1116
+ // handler will reply that port with its own callback port. Then sender can
1117
+ // send the string message through thsi callback port.
1118
+ #[ derive( Clone , Debug , Serialize , Deserialize , Named ) ]
1119
+ struct Callback ( PortRef < PortRef < String > > ) ;
1120
+
1121
+ #[ async_trait]
1122
+ impl Handler < Callback > for GetSeqActor {
1123
+ async fn handle (
1124
+ & mut self ,
1125
+ cx : & Context < Self > ,
1126
+ message : Callback ,
1127
+ ) -> Result < ( ) , anyhow:: Error > {
1128
+ let ( handle, mut receiver) = cx. open_port :: < String > ( ) ;
1129
+ let callback_ref = handle. bind ( ) ;
1130
+ message. 0 . send ( cx, callback_ref) . unwrap ( ) ;
1131
+ let msg = receiver. recv ( ) . await . unwrap ( ) ;
1132
+ self . handle ( cx, msg) . await
1133
+ }
1134
+ }
1135
+
1114
1136
#[ async_timed_test( timeout_secs = 30 ) ]
1115
1137
async fn test_sequencing_actor_handle_basic ( ) {
1116
1138
let proc = Proc :: local ( ) ;
@@ -1151,6 +1173,37 @@ mod tests {
1151
1173
}
1152
1174
}
1153
1175
1176
+ // Verify that we can pass port refs between sender and destination actors
1177
+ // back and forward, and send messages through them without being deadlocked.
1178
+ #[ async_timed_test( timeout_secs = 30 ) ]
1179
+ async fn test_sequencing_actor_handle_callback ( ) {
1180
+ let config = config:: global:: lock ( ) ;
1181
+ let _guard = config. override_key ( config:: ENABLE_DEST_ACTOR_REORDERING_BUFFER , true ) ;
1182
+
1183
+ let proc = Proc :: local ( ) ;
1184
+ let ( client, _) = proc. instance ( "client" ) . unwrap ( ) ;
1185
+ let ( tx, mut rx) = client. open_port ( ) ;
1186
+
1187
+ let actor_handle = proc
1188
+ . spawn :: < GetSeqActor > ( "get_seq" , tx. bind ( ) )
1189
+ . await
1190
+ . unwrap ( ) ;
1191
+ let actor_ref: ActorRef < GetSeqActor > = actor_handle. bind ( ) ;
1192
+
1193
+ let ( callback_tx, mut callback_rx) = client. open_port ( ) ;
1194
+ actor_ref
1195
+ . send ( & client, Callback ( callback_tx. bind ( ) ) )
1196
+ . unwrap ( ) ;
1197
+ let msg_port_ref = callback_rx. recv ( ) . await . unwrap ( ) ;
1198
+ msg_port_ref. send ( & client, "finally" . to_string ( ) ) . unwrap ( ) ;
1199
+
1200
+ let session_id = client. sequencer ( ) . session_id ( ) ;
1201
+ assert_eq ! (
1202
+ rx. recv( ) . await . unwrap( ) ,
1203
+ ( "finally" . to_string( ) , SeqInfo { session_id, seq: 1 } )
1204
+ ) ;
1205
+ }
1206
+
1154
1207
// Adding a delay before sending the destination proc. Useful for tests
1155
1208
// requiring latency injection.
1156
1209
#[ derive( Debug ) ]
@@ -1254,7 +1307,7 @@ mod tests {
1254
1307
1255
1308
// By disabling the actor side re-ordering buffer, the mssages will
1256
1309
// be processed in the same order as they sent out.
1257
- let _guard = config. override_key ( config:: ENABLE_CLIENT_SEQ_ASSIGNMENT , false ) ;
1310
+ let _guard = config. override_key ( config:: ENABLE_DEST_ACTOR_REORDERING_BUFFER , false ) ;
1258
1311
assert_out_of_order_delivery (
1259
1312
vec ! [ ( "second" . to_string( ) , 2 ) , ( "first" . to_string( ) , 1 ) ] ,
1260
1313
latency_plan. clone ( ) ,
@@ -1263,7 +1316,7 @@ mod tests {
1263
1316
1264
1317
// By enabling the actor side re-ordering buffer, the mssages will
1265
1318
// be re-ordered before being processed.
1266
- let _guard = config. override_key ( config:: ENABLE_CLIENT_SEQ_ASSIGNMENT , true ) ;
1319
+ let _guard = config. override_key ( config:: ENABLE_DEST_ACTOR_REORDERING_BUFFER , true ) ;
1267
1320
assert_out_of_order_delivery (
1268
1321
vec ! [ ( "first" . to_string( ) , 1 ) , ( "second" . to_string( ) , 2 ) ] ,
1269
1322
latency_plan. clone ( ) ,
@@ -1281,7 +1334,7 @@ mod tests {
1281
1334
1282
1335
// By enabling the actor side re-ordering buffer, the mssages will
1283
1336
// be re-ordered before being processed.
1284
- let _guard = config. override_key ( config:: ENABLE_CLIENT_SEQ_ASSIGNMENT , true ) ;
1337
+ let _guard = config. override_key ( config:: ENABLE_DEST_ACTOR_REORDERING_BUFFER , true ) ;
1285
1338
let expected = ( 1 ..10000 )
1286
1339
. map ( |i| ( format ! ( "msg{i}" ) , i) )
1287
1340
. collect :: < Vec < _ > > ( ) ;
0 commit comments