@@ -154,28 +154,20 @@ async fn find_actor(
154
154
}
155
155
156
156
// Check if actor is connectable and get runner_id
157
- let runner_info = {
158
- let get_runner_fut = ctx. op ( pegboard:: ops:: actor:: get_runner:: Input {
159
- actor_ids : vec ! [ actor_id] ,
160
- } ) ;
161
- let output = tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , get_runner_fut) . await ??;
162
- output. actors . into_iter ( ) . find ( |a| a. actor_id == actor_id)
163
- } ;
164
-
165
- let Some ( runner_info) = runner_info else {
166
- return Err ( errors:: ActorNotFound {
167
- actor_id,
168
- port_name : port_name. to_string ( ) ,
169
- }
170
- . build ( ) ) ;
171
- } ;
172
-
173
- if !runner_info. is_connectable {
157
+ let get_runner_fut = ctx. op ( pegboard:: ops:: actor:: get_runner:: Input {
158
+ actor_ids : vec ! [ actor_id] ,
159
+ } ) ;
160
+ let res = tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , get_runner_fut) . await ??;
161
+ let runner_info = res. actors . into_iter ( ) . next ( ) . filter ( |x| x. is_connectable ) ;
162
+
163
+ let runner_id = if let Some ( runner_info) = runner_info {
164
+ runner_info. runner_id
165
+ } else {
174
166
tracing:: info!( ?actor_id, "waiting for actor to become ready" ) ;
175
167
176
168
// Wait for ready, fail, or destroy
177
169
tokio:: select! {
178
- res = ready_sub. next( ) => { res?; } ,
170
+ res = ready_sub. next( ) => { res?. runner_id } ,
179
171
res = fail_sub. next( ) => {
180
172
let msg = res?;
181
173
return Err ( msg. error. clone( ) . build( ) ) ;
@@ -185,45 +177,19 @@ async fn find_actor(
185
177
return Err ( pegboard:: errors:: Actor :: DestroyedWhileWaitingForReady . build( ) ) ;
186
178
}
187
179
// Ready timeout
188
- _ = tokio:: time:: sleep( ACTOR_READY_TIMEOUT ) => {
180
+ _ = tokio:: time:: sleep( ACTOR_READY_TIMEOUT ) => {
189
181
return Err ( errors:: ActorReadyTimeout { actor_id } . build( ) ) ;
190
182
}
191
183
}
184
+ } ;
192
185
193
- // TODO: Is this needed? Can't we just re-check the actor exists if it fails to connect?
194
- // Verify actor is connectable again
195
- let runner_info = {
196
- let get_runner_fut = ctx. op ( pegboard:: ops:: actor:: get_runner:: Input {
197
- actor_ids : vec ! [ actor_id] ,
198
- } ) ;
199
- let output = tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , get_runner_fut) . await ??;
200
- output. actors . into_iter ( ) . find ( |a| a. actor_id == actor_id)
201
- } ;
202
-
203
- let Some ( runner_info) = runner_info else {
204
- return Err ( errors:: ActorNotFound {
205
- actor_id,
206
- port_name : port_name. to_string ( ) ,
207
- }
208
- . build ( ) ) ;
209
- } ;
210
-
211
- if !runner_info. is_connectable {
212
- return Err ( errors:: ActorNotFound {
213
- actor_id,
214
- port_name : port_name. to_string ( ) ,
215
- }
216
- . build ( ) ) ;
217
- } ;
218
- }
219
-
220
- tracing:: debug!( ?actor_id, runner_id = ?runner_info. runner_id, "actor ready" ) ;
186
+ tracing:: debug!( ?actor_id, ?runner_id, "actor ready" ) ;
221
187
222
188
// Return pegboard-gateway instance
223
189
let gateway = pegboard_gateway:: PegboardGateway :: new (
224
190
ctx. clone ( ) ,
225
191
actor_id,
226
- runner_info . runner_id ,
192
+ runner_id,
227
193
port_name. to_string ( ) ,
228
194
) ;
229
195
Ok ( Some ( RoutingOutput :: CustomServe ( std:: sync:: Arc :: new (
0 commit comments