Skip to content

Commit 22f5b6b

Browse files
authored
Merge pull request #460 from mediar-ai/pr/event-flush-20260119-163051
fix: Add event flush for reliable delivery before runner returns
2 parents 1db9fe6 + a2e9f38 commit 22f5b6b

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

packages/workflow/src/events.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,43 @@ class EventTransport {
191191
}
192192
}
193193

194+
/**
195+
* Flush pending writes to ensure all events are delivered
196+
* Returns a promise that resolves when the stream is drained
197+
*/
198+
async flush(): Promise<void> {
199+
if (this.useStderr || !this.pipeStream) {
200+
return;
201+
}
202+
203+
return new Promise<void>((resolve) => {
204+
if (!this.pipeStream) {
205+
resolve();
206+
return;
207+
}
208+
209+
// If stream is already drained, resolve immediately
210+
if (this.pipeStream.writableNeedDrain === false) {
211+
resolve();
212+
return;
213+
}
214+
215+
// Wait for drain event
216+
const onDrain = () => {
217+
resolve();
218+
};
219+
220+
this.pipeStream.once('drain', onDrain);
221+
222+
// Also resolve on error or close to avoid hanging
223+
this.pipeStream.once('error', () => resolve());
224+
this.pipeStream.once('close', () => resolve());
225+
226+
// Timeout fallback - don't wait forever
227+
setTimeout(() => resolve(), 100);
228+
});
229+
}
230+
194231
/**
195232
* Close the transport
196233
*/
@@ -334,6 +371,14 @@ export const emit = {
334371
raw(event: Omit<WorkflowEvent, '__mcp_event__' | 'timestamp'>): void {
335372
transport.send(event);
336373
},
374+
375+
/**
376+
* Flush all pending events to ensure they are delivered
377+
* Call this before workflow exits to ensure all events are sent
378+
*/
379+
async flush(): Promise<void> {
380+
await transport.flush();
381+
},
337382
};
338383

339384
/**

packages/workflow/src/runner.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ export class WorkflowRunner {
140140
this.state.context.data = result.result;
141141
this.state.lastStepId = step.config.id;
142142
this.state.lastStepIndex = i;
143+
await emit.flush(); // Ensure events are delivered before returning
143144
return {
144145
status: 'executed_without_error',
145146
lastStepId: this.state.lastStepId,
@@ -210,6 +211,7 @@ export class WorkflowRunner {
210211
this.state.lastStepIndex = i;
211212

212213
// Return error result
214+
await emit.flush(); // Ensure events are delivered before returning
213215
return {
214216
status: 'executed_with_error',
215217
lastStepId: this.state.lastStepId,
@@ -219,6 +221,7 @@ export class WorkflowRunner {
219221
}
220222
}
221223

224+
await emit.flush(); // Ensure events are delivered before returning
222225
return {
223226
status: 'executed_without_error',
224227
lastStepId: this.state.lastStepId,

0 commit comments

Comments
 (0)