Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/replaytests/branch_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func sampleBranchWorkflow2(ctx workflow.Context) error {
}
ctx = workflow.WithActivityOptions(ctx, ao)

for i := 1; i <= 4; i++ {
for i := 1; i <= 2; i++ {
activityInput := fmt.Sprintf("branch %d of 4", i)
future := workflow.ExecuteActivity(ctx, sampleActivity, activityInput)
futures = append(futures, future)
Expand Down
90 changes: 90 additions & 0 deletions test/replaytests/continue_as_new.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
[
{
"eventId": 1,
"timestamp": 1699856700704442400,
"eventType": "WorkflowExecutionStarted",
"version": 4,
"taskId": 882931375,
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1",
"initiator": "CronSchedule",
"continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE",
"originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1",
"firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5",
"firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00",
"cronSchedule": "* * * * *",
"firstDecisionTaskBackoffSeconds": 60,
"PartitionConfig": {
"isolation-group": "dca11"
}
}
},
{
"eventId": 2,
"timestamp": 1699856760713586608,
"eventType": "DecisionTaskScheduled",
"version": 4,
"taskId": 882931383,
"decisionTaskScheduledEventAttributes": {
"taskList": {
"name": "fx-worker"
},
"startToCloseTimeoutSeconds": 10
}
},
{
"eventId": 3,
"timestamp": 1699856760741837021,
"eventType": "DecisionTaskStarted",
"version": 4,
"taskId": 882931387,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04"
}
},
{
"eventId": 4,
"timestamp": 1699856760773459755,
"eventType": "DecisionTaskCompleted",
"version": 4,
"taskId": 882931391,
"decisionTaskCompletedEventAttributes": {
"scheduledEventId": 2,
"startedEventId": 3,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a"
}
},
{
"eventId": 5,
"timestamp": 1699857360713649962,
"eventType": "WorkflowExecutionContinuedAsNew",
"version": 4,
"taskId": 882931394,
"workflowExecutionContinuedAsNewEventAttributes": {
"newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90",
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"decisionTaskCompletedEventId": -23,
"backoffStartIntervalInSeconds": 60,
"initiator": "CronSchedule",
"failureReason": "cadenceInternal:Timeout START_TO_CLOSE"
}
}
]
26 changes: 26 additions & 0 deletions test/replaytests/continue_as_new_wf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package replaytests

import (
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)

// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal
func ContinueAsNewWorkflow(ctx workflow.Context) error {
selector := workflow.NewSelector(ctx)
var signalResult string
signalName := "helloWorldSignal"
for {
signalChan := workflow.GetSignalChannel(ctx, signalName)
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
c.Receive(ctx, &signalResult)
workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult))
})
workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName)
// Wait for signal
selector.Select(ctx)
if signalResult == "kill" {
return nil
}
}
}
21 changes: 21 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ func TestBranchWorkflowWithExtraBranch(t *testing.T) {
assert.ErrorContains(t, err, "nondeterministic workflow")
}

// ####### my test
func TestSequentialStepsWorkflow(t *testing.T) {
replayer := worker.NewWorkflowReplayer()

replayer.RegisterWorkflowWithOptions(sequantialStepsWorkflow, workflow.RegisterOptions{Name: "sequentialStepsWorkflow"})

// branch.json file contains history of a run with 3 activity calls
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json")
assert.ErrorContains(t, err, "nondeterministic workflow")
}

func TestParallel(t *testing.T) {
replayer := worker.NewWorkflowReplayer()

Expand All @@ -170,3 +181,13 @@ func TestParallel2(t *testing.T) {
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
require.NoError(t, err)
}

// Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done
// for continue as new case by replayWorkflowHistory().
// This should not have any error because it's a valid continue as new case.
func TestContinueAsNew(t *testing.T) {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflowWithOptions(ContinueAsNewWorkflow, workflow.RegisterOptions{Name: "fx.SimpleSignalWorkflow"})
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json")
assert.ErrorContains(t, err, "replay workflow doesn't return the same result as the last event")
}
54 changes: 54 additions & 0 deletions test/replaytests/sequantial_steps_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replaytests

import (
"fmt"
"time"

"go.uber.org/cadence/workflow"
)

/**
* This sample workflow executes sample activity 3 times sequentially.
*/

// sampleBranchWorkflow workflow decider
func sequantialStepsWorkflow(ctx workflow.Context) error {
// starts activities in parallel
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: time.Second * 20,
}
ctx = workflow.WithActivityOptions(ctx, ao)

for i := 1; i <= 3; i++ {
activityInput := fmt.Sprintf("step %d", i)
err := workflow.ExecuteActivity(ctx, sampleActivity, activityInput).Get(ctx, nil)
if err != nil {
return fmt.Errorf("Failed to execute sampleActivity %dth time, err: $v", err)
}
}

workflow.GetLogger(ctx).Info("Workflow completed.")
return nil
}