Skip to content

Commit 083e254

Browse files
authored
timer duration change and loose deterministic check (#342)
* timer duration change and loose deterministic check * update doc for Sleep() and NewTimer()
1 parent 11ed3e5 commit 083e254

File tree

5 files changed

+30
-9
lines changed

5 files changed

+30
-9
lines changed

internal/internal_event_handlers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package internal
2525
import (
2626
"errors"
2727
"fmt"
28+
"math"
2829
"time"
2930

3031
"github.com/uber-go/tally"
@@ -327,15 +328,15 @@ func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback resultHand
327328
callback(nil, fmt.Errorf("negative duration provided %v", d))
328329
return nil
329330
}
330-
if d.Seconds() == 0 {
331+
if d == 0 {
331332
callback(nil, nil)
332333
return nil
333334
}
334335

335336
timerID := wc.GenerateSequenceID()
336337
startTimerAttr := &m.StartTimerDecisionAttributes{}
337338
startTimerAttr.TimerId = common.StringPtr(timerID)
338-
startTimerAttr.StartToFireTimeoutSeconds = common.Int64Ptr(int64(d.Seconds()))
339+
startTimerAttr.StartToFireTimeoutSeconds = common.Int64Ptr(int64(math.Ceil(d.Seconds())))
339340

340341
decision := wc.decisionsHelper.startTimer(startTimerAttr)
341342
decision.setData(&scheduledTimer{callback: callback})

internal/internal_task_handlers.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"errors"
2929
"fmt"
3030
"reflect"
31+
"strings"
3132
"sync"
3233
"time"
3334

@@ -719,6 +720,14 @@ matchLoop:
719720
return nil
720721
}
721722

723+
func lastPartOfActivityName(name string) string {
724+
lastDotIdx := strings.LastIndex(name, ".")
725+
if lastDotIdx < 0 || lastDotIdx == len(name)-1 {
726+
return name
727+
}
728+
return name[lastDotIdx+1:]
729+
}
730+
722731
func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) bool {
723732
switch d.GetDecisionType() {
724733
case s.DecisionTypeScheduleActivityTask:
@@ -729,7 +738,7 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
729738
decisionAttributes := d.ScheduleActivityTaskDecisionAttributes
730739

731740
if eventAttributes.GetActivityId() != decisionAttributes.GetActivityId() ||
732-
eventAttributes.ActivityType.GetName() != decisionAttributes.ActivityType.GetName() ||
741+
lastPartOfActivityName(eventAttributes.ActivityType.GetName()) != lastPartOfActivityName(decisionAttributes.ActivityType.GetName()) ||
733742
(strictMode && eventAttributes.TaskList.GetName() != decisionAttributes.TaskList.GetName()) ||
734743
(strictMode && bytes.Compare(eventAttributes.Input, decisionAttributes.Input) != 0) {
735744
return false
@@ -758,7 +767,7 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
758767
decisionAttributes := d.StartTimerDecisionAttributes
759768

760769
if eventAttributes.GetTimerId() != decisionAttributes.GetTimerId() ||
761-
eventAttributes.GetStartToFireTimeoutSeconds() != decisionAttributes.GetStartToFireTimeoutSeconds() {
770+
(strictMode && eventAttributes.GetStartToFireTimeoutSeconds() != decisionAttributes.GetStartToFireTimeoutSeconds()) {
762771
return false
763772
}
764773

internal/internal_task_handlers_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
346346
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
347347
createTestEventActivityTaskScheduled(2, &s.ActivityTaskScheduledEventAttributes{
348348
ActivityId: common.StringPtr("0"),
349-
ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")},
349+
ActivityType: &s.ActivityType{Name: common.StringPtr("pkg.Greeter_Activity")},
350350
TaskList: &s.TaskList{Name: &taskList},
351351
}),
352352
}
@@ -370,6 +370,13 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
370370
t.Error(err)
371371
t.Nil(request)
372372
t.Contains(err.Error(), "nondeterministic")
373+
374+
// now with different package name to activity type
375+
testEvents[1].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")
376+
task = createWorkflowTask(testEvents, 2, "HelloWorld_Workflow")
377+
request, _, err = taskHandler.ProcessWorkflowTask(task, nil, false)
378+
t.NoError(err)
379+
t.NotNil(request)
373380
}
374381

375382
func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() {

internal/workflow.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,8 @@ func Now(ctx Context) time.Time {
433433
// this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer()). You can cancel the pending
434434
// timer by cancel the Context (using context from workflow.WithCancel(ctx)) and that will cancel the timer. After timer
435435
// is canceled, the returned Future become ready, and Future.Get() will return *CanceledError.
436-
// The current timer resolution implementation is in seconds but is subjected to change.
436+
// The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
437+
// subjected to change in the future.
437438
func NewTimer(ctx Context, d time.Duration) Future {
438439
future, settable := NewFuture(ctx)
439440
if d <= 0 {
@@ -465,7 +466,8 @@ func NewTimer(ctx Context, d time.Duration) Future {
465466
// Sleep() returns nil if the duration d is passed, or it returns *CanceledError if the ctx is canceled. There are 2
466467
// reasons the ctx could be canceled: 1) your workflow code cancel the ctx (with workflow.WithCancel(ctx));
467468
// 2) your workflow itself is canceled by external request.
468-
// The current timer resolution implementation is in seconds but is subjected to change.
469+
// The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
470+
// subjected to change in the future.
469471
func Sleep(ctx Context, d time.Duration) (err error) {
470472
t := NewTimer(ctx, d)
471473
err = t.Get(ctx, nil)

workflow/deterministic_wrappers.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func Now(ctx Context) time.Time {
104104
// this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer()). You can cancel the pending
105105
// timer by cancel the Context (using context from workflow.WithCancel(ctx)) and that will cancel the timer. After timer
106106
// is canceled, the returned Future become ready, and Future.Get() will return *CanceledError.
107-
// The current timer resolution implementation is in seconds but is subjected to change.
107+
// The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
108+
// subjected to change in the future.
108109
func NewTimer(ctx Context, d time.Duration) Future {
109110
return internal.NewTimer(ctx, d)
110111
}
@@ -115,7 +116,8 @@ func NewTimer(ctx Context, d time.Duration) Future {
115116
// Sleep() returns nil if the duration d is passed, or it returns *CanceledError if the ctx is canceled. There are 2
116117
// reasons the ctx could be canceled: 1) your workflow code cancel the ctx (with workflow.WithCancel(ctx));
117118
// 2) your workflow itself is canceled by external request.
118-
// The current timer resolution implementation is in seconds but is subjected to change.
119+
// The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
120+
// subjected to change in the future.
119121
func Sleep(ctx Context, d time.Duration) (err error) {
120122
return internal.Sleep(ctx, d)
121123
}

0 commit comments

Comments
 (0)