@@ -159,15 +159,15 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
159
159
}
160
160
}
161
161
protocol:: ToServer :: ToServerEvents ( events) => {
162
- let last_event_idx = events. last ( ) . map ( |event| event. index ) ;
163
-
164
162
// NOTE: This should not be parallelized because signals should be sent in order
165
163
// Forward to actor workflows
166
- for event in events {
164
+ for event in & events {
167
165
let actor_id =
168
166
crate :: utils:: event_actor_id ( & event. inner ) . to_string ( ) ;
169
167
let res = ctx
170
- . signal ( crate :: workflows:: actor:: Event { inner : event. inner } )
168
+ . signal ( crate :: workflows:: actor:: Event {
169
+ inner : event. inner . clone ( ) ,
170
+ } )
171
171
. to_workflow :: < crate :: workflows:: actor:: Workflow > ( )
172
172
. tag ( "actor_id" , & actor_id)
173
173
. send ( )
@@ -186,20 +186,29 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
186
186
}
187
187
}
188
188
189
- // Ack every 500 events
190
- if let Some ( last_event_idx) = last_event_idx {
191
- if last_event_idx > state. last_event_ack_idx . saturating_add ( 500 ) {
192
- state. last_event_ack_idx = last_event_idx;
189
+ if !events. is_empty ( ) {
190
+ ctx. activity ( InsertEventsInput {
191
+ events : events. clone ( ) ,
192
+ } )
193
+ . await ?;
193
194
194
- ctx. activity ( SendMessageToRunnerInput {
195
- runner_id : input. runner_id ,
196
- message : protocol:: ToClient :: ToClientAckEvents (
197
- protocol:: ToClientAckEvents {
198
- last_event_idx : state. last_event_ack_idx ,
199
- } ,
200
- ) ,
201
- } )
202
- . await ?;
195
+ // Ack every 500 events
196
+ let last_event_idx = events. last ( ) . map ( |event| event. index ) ;
197
+ if let Some ( last_event_idx) = last_event_idx {
198
+ if last_event_idx > state. last_event_ack_idx . saturating_add ( 500 )
199
+ {
200
+ state. last_event_ack_idx = last_event_idx;
201
+
202
+ ctx. activity ( SendMessageToRunnerInput {
203
+ runner_id : input. runner_id ,
204
+ message : protocol:: ToClient :: ToClientAckEvents (
205
+ protocol:: ToClientAckEvents {
206
+ last_event_idx : state. last_event_ack_idx ,
207
+ } ,
208
+ ) ,
209
+ } )
210
+ . await ?;
211
+ }
203
212
}
204
213
}
205
214
}
@@ -822,7 +831,6 @@ async fn ack_commands(ctx: &ActivityCtx, input: &AckCommandsInput) -> Result<()>
822
831
823
832
#[ derive( Debug , Serialize , Deserialize , Hash ) ]
824
833
struct InsertEventsInput {
825
- runner_id : Id ,
826
834
events : Vec < protocol:: EventWrapper > ,
827
835
}
828
836
0 commit comments