Skip to content

Commit 49a5612

Browse files
committed
feat: add ResettableTimer for simplified timer management
1 parent 7ce86d7 commit 49a5612

File tree

4 files changed

+265
-0
lines changed

4 files changed

+265
-0
lines changed

internal/internal_time.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,102 @@ type (
4747
// If the timer is not started then it is a no-operation.
4848
RequestCancelTimer(timerID string)
4949
}
50+
51+
// ResettableTimer represents a timer that can be reset to restart its countdown.
52+
ResettableTimer interface {
53+
Future
54+
55+
// Reset - Cancels the current timer and starts a new one with the given duration.
56+
// If duration is not provided, uses the previous duration.
57+
// If the timer has already fired, Reset has no effect.
58+
Reset(d ...time.Duration)
59+
}
60+
61+
resettableTimerImpl struct {
62+
ctx Context
63+
timerCtx Context
64+
cancelTimer CancelFunc
65+
future Future
66+
settable Settable
67+
duration time.Duration
68+
isReady bool
69+
}
5070
)
71+
72+
// NewResettableTimer creates a new resettable timer that fires after duration d.
73+
// The timer can be reset using Reset() to restart the countdown.
74+
func NewResettableTimer(ctx Context, d time.Duration) ResettableTimer {
75+
rt := &resettableTimerImpl{
76+
ctx: ctx,
77+
duration: d,
78+
}
79+
rt.future, rt.settable = NewFuture(ctx)
80+
rt.startTimer(d)
81+
return rt
82+
}
83+
84+
func (rt *resettableTimerImpl) startTimer(d time.Duration) {
85+
rt.duration = d
86+
87+
if rt.cancelTimer != nil {
88+
rt.cancelTimer()
89+
}
90+
91+
rt.timerCtx, rt.cancelTimer = WithCancel(rt.ctx)
92+
93+
timer := NewTimer(rt.timerCtx, d)
94+
95+
Go(rt.ctx, func(ctx Context) {
96+
err := timer.Get(ctx, nil)
97+
98+
if !IsCanceledError(err) && !rt.isReady {
99+
rt.isReady = true
100+
rt.settable.Set(nil, err)
101+
}
102+
})
103+
}
104+
105+
func (rt *resettableTimerImpl) Reset(d ...time.Duration) {
106+
if rt.isReady {
107+
return
108+
}
109+
110+
duration := rt.duration
111+
if len(d) > 0 {
112+
duration = d[0]
113+
}
114+
115+
rt.startTimer(duration)
116+
}
117+
118+
// Future interface delegation methods
119+
120+
func (rt *resettableTimerImpl) Get(ctx Context, valuePtr interface{}) error {
121+
return rt.future.Get(ctx, valuePtr)
122+
}
123+
124+
func (rt *resettableTimerImpl) IsReady() bool {
125+
return rt.future.IsReady()
126+
}
127+
128+
// asyncFuture interface delegation methods (needed for Selector.AddFuture)
129+
130+
func (rt *resettableTimerImpl) GetAsync(callback *receiveCallback) (v interface{}, ok bool, err error) {
131+
return rt.future.(asyncFuture).GetAsync(callback)
132+
}
133+
134+
func (rt *resettableTimerImpl) RemoveReceiveCallback(callback *receiveCallback) {
135+
rt.future.(asyncFuture).RemoveReceiveCallback(callback)
136+
}
137+
138+
func (rt *resettableTimerImpl) ChainFuture(f Future) {
139+
rt.future.(asyncFuture).ChainFuture(f)
140+
}
141+
142+
func (rt *resettableTimerImpl) GetValueAndError() (v interface{}, err error) {
143+
return rt.future.(asyncFuture).GetValueAndError()
144+
}
145+
146+
func (rt *resettableTimerImpl) Set(value interface{}, err error) {
147+
rt.future.(asyncFuture).Set(value, err)
148+
}

internal/internal_workflow_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,3 +1345,156 @@ func (t *WorkflowOptionTest) TestKnowQueryType_WithHandlers() {
13451345
},
13461346
wo.KnownQueryTypes())
13471347
}
1348+
1349+
func (s *WorkflowUnitTest) Test_ResettableTimerFiresAfterExpiration() {
1350+
wf := func(ctx Context) error {
1351+
timer := NewResettableTimer(ctx, 5*time.Second)
1352+
1353+
timerFired := false
1354+
NewSelector(ctx).AddFuture(timer, func(f Future) {
1355+
err := f.Get(ctx, nil)
1356+
s.NoError(err)
1357+
timerFired = true
1358+
}).Select(ctx)
1359+
1360+
s.True(timerFired, "Timer should fire after 5 seconds with no resets")
1361+
return nil
1362+
}
1363+
1364+
env := s.NewTestWorkflowEnvironment()
1365+
env.ExecuteWorkflow(wf)
1366+
s.True(env.IsWorkflowCompleted())
1367+
s.NoError(env.GetWorkflowError())
1368+
}
1369+
1370+
func (s *WorkflowUnitTest) Test_ResettableTimerDoesNotFireWhenKeptReset() {
1371+
wf := func(ctx Context) error {
1372+
timer := NewResettableTimer(ctx, 2*time.Second)
1373+
activityCh := GetSignalChannel(ctx, "activity")
1374+
stopCh := GetSignalChannel(ctx, "stop")
1375+
1376+
timerFired := false
1377+
activityCount := 0
1378+
stopped := false
1379+
1380+
for !stopped {
1381+
selector := NewSelector(ctx)
1382+
1383+
selector.AddFuture(timer, func(f Future) {
1384+
timerFired = true
1385+
})
1386+
1387+
selector.AddReceive(activityCh, func(c Channel, more bool) {
1388+
var signal string
1389+
c.Receive(ctx, &signal)
1390+
activityCount++
1391+
timer.Reset()
1392+
})
1393+
1394+
selector.AddReceive(stopCh, func(c Channel, more bool) {
1395+
var stop bool
1396+
c.Receive(ctx, &stop)
1397+
stopped = true
1398+
})
1399+
1400+
selector.Select(ctx)
1401+
}
1402+
1403+
s.Equal(3, activityCount, "Should have received 3 activity signals")
1404+
s.False(timerFired, "Timer should NOT fire when continuously reset")
1405+
return nil
1406+
}
1407+
1408+
env := s.NewTestWorkflowEnvironment()
1409+
1410+
env.RegisterDelayedCallback(func() {
1411+
env.SignalWorkflow("activity", "scan1")
1412+
}, 500*time.Millisecond)
1413+
env.RegisterDelayedCallback(func() {
1414+
env.SignalWorkflow("activity", "scan2")
1415+
}, 1500*time.Millisecond)
1416+
env.RegisterDelayedCallback(func() {
1417+
env.SignalWorkflow("activity", "scan3")
1418+
}, 2500*time.Millisecond)
1419+
env.RegisterDelayedCallback(func() {
1420+
env.SignalWorkflow("stop", true)
1421+
}, 4*time.Second)
1422+
1423+
env.ExecuteWorkflow(wf)
1424+
s.True(env.IsWorkflowCompleted())
1425+
s.NoError(env.GetWorkflowError())
1426+
}
1427+
1428+
func (s *WorkflowUnitTest) Test_ResettableTimerFiresAfterResetsStop() {
1429+
wf := func(ctx Context) error {
1430+
timer := NewResettableTimer(ctx, 5*time.Second)
1431+
1432+
timer.Reset() // Still 5s
1433+
timer.Reset(2 * time.Second) // Now 2s
1434+
1435+
timerFired := false
1436+
NewSelector(ctx).AddFuture(timer, func(f Future) {
1437+
err := f.Get(ctx, nil)
1438+
s.NoError(err)
1439+
timerFired = true
1440+
}).Select(ctx)
1441+
1442+
s.True(timerFired, "Timer should eventually fire after resets stop")
1443+
return nil
1444+
}
1445+
1446+
env := s.NewTestWorkflowEnvironment()
1447+
env.ExecuteWorkflow(wf)
1448+
s.True(env.IsWorkflowCompleted())
1449+
s.NoError(env.GetWorkflowError())
1450+
}
1451+
1452+
func (s *WorkflowUnitTest) Test_ResettableTimerResetAfterFireIsNoop() {
1453+
wf := func(ctx Context) error {
1454+
timer := NewResettableTimer(ctx, 1*time.Second)
1455+
1456+
err := timer.Get(ctx, nil)
1457+
s.NoError(err)
1458+
s.True(timer.IsReady(), "Timer should be ready after firing")
1459+
1460+
timer.Reset(5 * time.Second)
1461+
s.True(timer.IsReady(), "Timer should still be ready (reset was no-op)")
1462+
1463+
timer.Reset()
1464+
s.True(timer.IsReady(), "Multiple resets after fire are all no-ops")
1465+
return nil
1466+
}
1467+
1468+
env := s.NewTestWorkflowEnvironment()
1469+
env.ExecuteWorkflow(wf)
1470+
s.True(env.IsWorkflowCompleted())
1471+
s.NoError(env.GetWorkflowError())
1472+
}
1473+
1474+
func (s *WorkflowUnitTest) Test_ResettableTimerResetWithDifferentDurations() {
1475+
wf := func(ctx Context) error {
1476+
timer := NewResettableTimer(ctx, 10*time.Second)
1477+
1478+
timer.Reset(1 * time.Second)
1479+
1480+
timerFired := false
1481+
startTime := Now(ctx)
1482+
1483+
NewSelector(ctx).AddFuture(timer, func(f Future) {
1484+
err := f.Get(ctx, nil)
1485+
s.NoError(err)
1486+
timerFired = true
1487+
}).Select(ctx)
1488+
1489+
elapsed := Now(ctx).Sub(startTime)
1490+
1491+
s.True(timerFired, "Timer should fire")
1492+
s.Less(elapsed, 5*time.Second, "Should fire with reset duration, not original")
1493+
return nil
1494+
}
1495+
1496+
env := s.NewTestWorkflowEnvironment()
1497+
env.ExecuteWorkflow(wf)
1498+
s.True(env.IsWorkflowCompleted())
1499+
s.NoError(env.GetWorkflowError())
1500+
}

workflow/deterministic_wrappers.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,17 @@ func NewTimer(ctx Context, d time.Duration) Future {
154154
return internal.NewTimer(ctx, d)
155155
}
156156

157+
// NewResettableTimer returns a timer that can be reset to restart its countdown. The timer becomes ready after the
158+
// specified duration d. You can reset the timer by calling timer.Reset() with no arguments to restart with the original
159+
// duration, or pass a new duration to change it. The timer's status can be checked using timer.Get(ctx, nil)
160+
// or timer.IsReady(). This is useful for implementing timeout patterns that should restart based on external
161+
// events. The workflow needs to use this NewResettableTimer() instead of creating new timers repeatedly.
162+
// The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
163+
// subjected to change in the future.
164+
func NewResettableTimer(ctx Context, d time.Duration) ResettableTimer {
165+
return internal.NewResettableTimer(ctx, d)
166+
}
167+
157168
// Sleep pauses the current workflow for at least the duration d. A negative or zero duration causes Sleep to return
158169
// immediately. Workflow code needs to use this Sleep() to sleep instead of the Go lang library one(timer.Sleep()).
159170
// You can cancel the pending sleep by cancel the Context (using context from workflow.WithCancel(ctx)).

workflow/workflow.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type (
6262

6363
// GetVersionOption is used to specify options for GetVersion
6464
GetVersionOption = internal.GetVersionOption
65+
66+
// ResettableTimer represents a timer that can be reset to a new duration
67+
ResettableTimer = internal.ResettableTimer
6568
)
6669

6770
// Register - registers a workflow function with the framework.

0 commit comments

Comments
 (0)