-
Notifications
You must be signed in to change notification settings - Fork 32
Description
Hi
We are using the workflows lib, and we start to experience a lot of stuck workflows, meaning they don't proceed, just hangs in the same state. Often the initial state.
Quick disclaimer, it is very likely that we do something stupid, so not sure this is a bug in this lib.
Our Problem (we think)
Our current suspicion is: "What if the storing of the state is 'too slow', can the 'response event' from a handler race the storing of state".
Minimal Example
I have made the following TestWorkflow:
export class TestWorkflow extends Workflow<TestWorkflowState> {
workflowName = 'AwsAccountUpdateWorkflow'
constructor(private readonly bus: BusInstance) {
super()
}
configureWorkflow(mapper: WorkflowMapper<TestWorkflowState, TestWorkflow>): void {
mapper.withState(TestWorkflowState).startedBy(TestStartCommand, 'start').when(TestCommandHandled, 'handlesHandled')
}
async start(command: TestStartCommand, wfState: TestWorkflowState): Promise<Partial<TestWorkflowState>> {
console.log('Ok ok. We have started. I will now publish a command')
await this.bus.send(new TestCommand())
//Emulate slow workflow handling
// console.log("Should be on the queue.... I'll go to sleep")
// await sleep(1000)
console.log("Let's save that state")
return { ...wfState, isStateUpdated: true }
}
async handlesHandled(event: TestCommandHandled, wfState: TestWorkflowState): Promise<Partial<TestWorkflowState>> {
console.log('Now handling the handled command because I got an handleHandled event', {
stateUpdated: wfState.isStateUpdated,
})
const newState: TestWorkflowState = {
...wfState,
}
await this.bus.publish(new TestEnded())
return this.completeWorkflow(newState)
}
}
export const sleep = (ms) =>
new Promise((resolve) => {
setTimeout(resolve, ms)
})
We have also wrapped the postgresPersistence to fake a slow connection.
class DelayedPersistence implements Persistence {
private dbConfig: PostgresPersistence
constructor() {
this.dbConfig = getDbConfig()
}
dispose(): Promise<void> {
return this.dbConfig.dispose()
}
getWorkflowState<WorkflowStateType extends WorkflowState, MessageType extends Message>(
workflowStateConstructor: ClassConstructor<WorkflowStateType>,
messageMap: MessageWorkflowMapping<MessageType, WorkflowStateType>,
message: MessageType,
messageOptions: MessageAttributes,
includeCompleted?: boolean,
): Promise<WorkflowStateType[]> {
return this.dbConfig.getWorkflowState(
workflowStateConstructor,
messageMap,
message,
messageOptions,
includeCompleted,
)
}
initialize(): Promise<void> {
return this.dbConfig.initialize()
}
initializeWorkflow<TWorkflowState extends WorkflowState>(
workflowStateConstructor: ClassConstructor<TWorkflowState>,
messageWorkflowMappings: MessageWorkflowMapping<Message, WorkflowState>[],
): Promise<void> {
return this.dbConfig.initializeWorkflow(workflowStateConstructor, messageWorkflowMappings)
}
prepare(coreDependencies: CoreDependencies): void {
this.dbConfig.prepare(coreDependencies)
}
async saveWorkflowState<WorkflowStateType extends WorkflowState>(workflowState: WorkflowStateType): Promise<void> {
await sleep(600)
return this.dbConfig.saveWorkflowState(workflowState)
}
}
It is using the in mem queue, maybe that behaved different from rabbitmq, in this regard.
The Issue
When I run this workflow, I can see the following message in the logs:
No existing workflow state found for message. Ignoring.
The Question
Are we doing something completely wrong?
We are using: "@node-ts/bus-core": "^1.0.0"