-
Notifications
You must be signed in to change notification settings - Fork 41
Send get events #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send get events #38
Conversation
…work fine work 1 consumer <-> 1 producer, but set/get event accepts multiple consumers
| tx, err := s.pool.Begin(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to begin transaction: %w", err) | ||
| } | ||
| defer tx.Rollback(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transaction should start for the sequence:
- consume message
- record Recv() output
But not before
|
|
||
| return value, nil | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I trust recording the step outcome + reading the event do not need being done transactionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they don't, as it doesn't consume the event
|
|
||
| // Send and receive again the same workflows to verify idempotency | ||
| _, err = sendWf(context.Background(), sendWorkflowInput{ | ||
| DestinationID: receiveHandle.GetWorkflowID(), | ||
| Topic: "test-topic", | ||
| }, WithWorkflowID(handle.GetWorkflowID())) | ||
| if err != nil { | ||
| t.Fatalf("failed to send message with same workflow ID: %v", err) | ||
| } | ||
| receiveHandle2, err := receiveWf(context.Background(), "test-topic", WithWorkflowID(receiveHandle.GetWorkflowID())) | ||
| if err != nil { | ||
| t.Fatalf("failed to start receive workflow with same ID: %v", err) | ||
| } | ||
| result, err = receiveHandle2.GetResult(context.Background()) | ||
| if err != nil { | ||
| t.Fatalf("failed to get result from receive workflow with same ID: %v", err) | ||
| } | ||
| if result != "message1-message2-message3" { | ||
| t.Fatalf("expected received message to be 'message1-message2-message3', got '%s'", result) | ||
| } | ||
|
|
||
| // Get steps for both workflows and verify we have the expected number | ||
| sendSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) | ||
| if err != nil { | ||
| t.Fatalf("failed to get steps for send workflow: %v", err) | ||
| } | ||
| receiveSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) | ||
| if err != nil { | ||
| t.Fatalf("failed to get steps for receive workflow: %v", err) | ||
| } | ||
|
|
||
| // Verify the number of steps matches the number of send() and recv() calls | ||
| // sendWorkflow has 3 Send() calls, receiveWorkflow has 3 Recv() calls | ||
| expectedSendSteps := 3 | ||
| expectedReceiveSteps := 3 | ||
|
|
||
| if len(sendSteps) != expectedSendSteps { | ||
| t.Fatalf("expected %d steps in send workflow, got %d", expectedSendSteps, len(sendSteps)) | ||
| } | ||
| if len(receiveSteps) != expectedReceiveSteps { | ||
| t.Fatalf("expected %d steps in receive workflow, got %d", expectedReceiveSteps, len(receiveSteps)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was testing the workflow idempotency, not the steps -- already tested elsewhere.
dbos.Launch()recover workflows.Recv()