Skip to content

Commit cc6aa72

Browse files
authored
Populate currentAttemptScheduledTime on PollActivityTaskQueueResponse for standalone activities (#9333)
## What changed? Populate currentAttemptScheduledTime on PollActivityTaskQueueResponse for standalone activities ## Why? This field was being left empty, and it was breaking the .NET SDK which has strict nullness check ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s)
1 parent 8d659fd commit cc6aa72

File tree

3 files changed

+175
-65
lines changed

3 files changed

+175
-65
lines changed

chasm/lib/activity/activity.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,16 @@ func (a *Activity) GenerateRecordActivityTaskStartedResponse(
220220
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
221221
requestData := a.RequestData.Get(ctx)
222222
attempt := a.LastAttempt.Get(ctx)
223+
223224
return &historyservice.RecordActivityTaskStartedResponse{
224-
StartedTime: attempt.GetStartedTime(),
225-
Attempt: attempt.GetCount(),
226-
Priority: a.GetPriority(),
227-
RetryPolicy: a.GetRetryPolicy(),
228-
ActivityRunId: key.RunID,
229-
WorkflowNamespace: namespace,
230-
HeartbeatDetails: lastHeartbeat.GetDetails(),
225+
StartedTime: attempt.GetStartedTime(),
226+
Attempt: attempt.GetCount(),
227+
Priority: a.GetPriority(),
228+
RetryPolicy: a.GetRetryPolicy(),
229+
ActivityRunId: key.RunID,
230+
WorkflowNamespace: namespace,
231+
HeartbeatDetails: lastHeartbeat.GetDetails(),
232+
CurrentAttemptScheduledTime: a.attemptScheduleTime(attempt),
231233
ScheduledEvent: &historypb.HistoryEvent{
232234
EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED,
233235
EventTime: a.GetScheduleTime(),
@@ -248,6 +250,27 @@ func (a *Activity) GenerateRecordActivityTaskStartedResponse(
248250
}, nil
249251
}
250252

253+
// attemptScheduleTime returns when the given attempt was scheduled to run:
254+
// the activity's original schedule time for the first attempt, or
255+
// calculated from attemptScheduleTimeForRetry on retries.
256+
func (a *Activity) attemptScheduleTime(attempt *activitypb.ActivityAttemptState) *timestamppb.Timestamp {
257+
if attempt.GetCount() == 1 {
258+
return a.GetScheduleTime()
259+
}
260+
return attemptScheduleTimeForRetry(attempt)
261+
}
262+
263+
// attemptScheduleTimeForRetry computes the time a retried attempt is scheduled to start,
264+
// as complete_time + retry_interval. Returns nil if either field is missing or zero.
265+
func attemptScheduleTimeForRetry(attempt *activitypb.ActivityAttemptState) *timestamppb.Timestamp {
266+
retryInterval := attempt.GetCurrentRetryInterval()
267+
completeTime := attempt.GetCompleteTime()
268+
if retryInterval != nil && retryInterval.AsDuration() > 0 && completeTime != nil {
269+
return timestamppb.New(completeTime.AsTime().Add(retryInterval.AsDuration()))
270+
}
271+
return nil
272+
}
273+
251274
// RecordCompleted applies the provided function to record activity completion.
252275
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
253276
return applyFn(ctx)
@@ -643,14 +666,6 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.
643666
heartbeat, _ := a.LastHeartbeat.TryGet(ctx)
644667
key := ctx.ExecutionKey()
645668

646-
// TODO(saa-preview): debating if we should persist next attempt schedule time for stronger consistency
647-
var nextAttemptScheduleTime *timestamppb.Timestamp
648-
interval := attempt.GetCurrentRetryInterval()
649-
completeTime := attempt.GetCompleteTime()
650-
if interval != nil && interval.AsDuration() > 0 && completeTime != nil {
651-
nextAttemptScheduleTime = timestamppb.New(completeTime.AsTime().Add(interval.AsDuration()))
652-
}
653-
654669
var closeTime *timestamppb.Timestamp
655670
var executionDuration *durationpb.Duration
656671
if a.LifecycleState(ctx) != chasm.LifecycleStateRunning {
@@ -684,7 +699,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.
684699
LastHeartbeatTime: heartbeat.GetRecordedTime(),
685700
LastStartedTime: attempt.GetStartedTime(),
686701
LastWorkerIdentity: attempt.GetLastWorkerIdentity(),
687-
NextAttemptScheduleTime: nextAttemptScheduleTime,
702+
NextAttemptScheduleTime: attemptScheduleTimeForRetry(attempt),
688703
Priority: a.GetPriority(),
689704
RetryPolicy: a.GetRetryPolicy(),
690705
RunId: key.RunID,

chasm/lib/activity/statemachine.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,13 @@ var TransitionRescheduled = chasm.NewTransition(
100100
return err
101101
}
102102

103+
retryScheduledTime := attemptScheduleTimeForRetry(attempt).AsTime()
104+
103105
if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 {
104106
ctx.AddTask(
105107
a,
106108
chasm.TaskAttributes{
107-
ScheduledTime: currentTime.Add(timeout).Add(event.retryInterval),
109+
ScheduledTime: retryScheduledTime.Add(timeout),
108110
},
109111
&activitypb.ScheduleToStartTimeoutTask{
110112
Stamp: attempt.GetStamp(),
@@ -114,7 +116,7 @@ var TransitionRescheduled = chasm.NewTransition(
114116
ctx.AddTask(
115117
a,
116118
chasm.TaskAttributes{
117-
ScheduledTime: currentTime.Add(event.retryInterval),
119+
ScheduledTime: retryScheduledTime,
118120
},
119121
&activitypb.ActivityDispatchTask{
120122
Stamp: attempt.GetStamp(),

tests/standalone_activity_test.go

Lines changed: 140 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -274,58 +274,151 @@ func (s *standaloneActivityTestSuite) TestPollActivityTaskQueue() {
274274
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
275275
defer cancel()
276276

277-
activityID := testcore.RandomizeStr(t.Name())
278-
taskQueue := testcore.RandomizeStr(t.Name())
279-
namespace := s.Namespace().String()
277+
t.Run("FirstAttempt", func(t *testing.T) {
278+
activityID := testcore.RandomizeStr(t.Name())
279+
taskQueue := testcore.RandomizeStr(t.Name())
280+
namespace := s.Namespace().String()
280281

281-
startToCloseTimeout := durationpb.New(1 * time.Minute)
282-
scheduleToCloseTimeout := durationpb.New(2 * time.Minute)
283-
heartbeatTimeout := durationpb.New(20 * time.Second)
284-
priority := &commonpb.Priority{
285-
FairnessKey: "test-key",
286-
}
282+
startToCloseTimeout := durationpb.New(1 * time.Minute)
283+
scheduleToCloseTimeout := durationpb.New(2 * time.Minute)
284+
heartbeatTimeout := durationpb.New(20 * time.Second)
285+
priority := &commonpb.Priority{
286+
FairnessKey: "test-key",
287+
}
287288

288-
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
289-
Namespace: namespace,
290-
ActivityId: activityID,
291-
ActivityType: s.tv.ActivityType(),
292-
Identity: s.tv.WorkerIdentity(),
293-
Input: defaultInput,
294-
TaskQueue: &taskqueuepb.TaskQueue{
295-
Name: taskQueue,
296-
},
297-
StartToCloseTimeout: startToCloseTimeout,
298-
ScheduleToCloseTimeout: scheduleToCloseTimeout,
299-
HeartbeatTimeout: heartbeatTimeout,
300-
RequestId: s.tv.RequestID(),
301-
Priority: priority,
302-
Header: defaultHeader,
289+
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
290+
Namespace: namespace,
291+
ActivityId: activityID,
292+
ActivityType: s.tv.ActivityType(),
293+
Identity: s.tv.WorkerIdentity(),
294+
Input: defaultInput,
295+
TaskQueue: &taskqueuepb.TaskQueue{
296+
Name: taskQueue,
297+
},
298+
StartToCloseTimeout: startToCloseTimeout,
299+
ScheduleToCloseTimeout: scheduleToCloseTimeout,
300+
HeartbeatTimeout: heartbeatTimeout,
301+
RequestId: s.tv.RequestID(),
302+
Priority: priority,
303+
Header: defaultHeader,
304+
})
305+
require.NoError(t, err)
306+
307+
pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
308+
Namespace: namespace,
309+
TaskQueue: &taskqueuepb.TaskQueue{
310+
Name: taskQueue,
311+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
312+
},
313+
Identity: s.tv.WorkerIdentity(),
314+
})
315+
require.NoError(t, err)
316+
require.Equal(t, activityID, pollTaskResp.GetActivityId())
317+
require.Equal(t, namespace, pollTaskResp.GetWorkflowNamespace())
318+
protorequire.ProtoEqual(t, s.tv.ActivityType(), pollTaskResp.GetActivityType())
319+
require.Equal(t, startResp.GetRunId(), pollTaskResp.GetActivityRunId())
320+
protorequire.ProtoEqual(t, defaultInput, pollTaskResp.GetInput())
321+
require.False(t, pollTaskResp.GetStartedTime().AsTime().IsZero())
322+
require.False(t, pollTaskResp.GetScheduledTime().AsTime().IsZero())
323+
require.EqualValues(t, 1, pollTaskResp.Attempt)
324+
protorequire.ProtoEqual(t, startToCloseTimeout, pollTaskResp.GetStartToCloseTimeout())
325+
protorequire.ProtoEqual(t, scheduleToCloseTimeout, pollTaskResp.GetScheduleToCloseTimeout())
326+
protorequire.ProtoEqual(t, heartbeatTimeout, pollTaskResp.GetHeartbeatTimeout())
327+
protorequire.ProtoEqual(t, priority, pollTaskResp.GetPriority())
328+
protorequire.ProtoEqual(t, defaultHeader, pollTaskResp.GetHeader())
329+
require.NotNil(t, pollTaskResp.TaskToken)
330+
protorequire.ProtoEqual(t, pollTaskResp.GetScheduledTime(), pollTaskResp.GetCurrentAttemptScheduledTime()) // Equal on first attempt
303331
})
304-
require.NoError(t, err)
305332

306-
pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
307-
Namespace: namespace,
308-
TaskQueue: &taskqueuepb.TaskQueue{
309-
Name: taskQueue,
310-
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
311-
},
312-
Identity: s.tv.WorkerIdentity(),
333+
t.Run("RetriedAttempt", func(t *testing.T) {
334+
activityID := testcore.RandomizeStr(t.Name())
335+
taskQueue := testcore.RandomizeStr(t.Name())
336+
namespace := s.Namespace().String()
337+
338+
startToCloseTimeout := durationpb.New(1 * time.Minute)
339+
scheduleToCloseTimeout := durationpb.New(2 * time.Minute)
340+
heartbeatTimeout := durationpb.New(20 * time.Second)
341+
priority := &commonpb.Priority{
342+
FairnessKey: "test-key",
343+
}
344+
345+
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
346+
Namespace: namespace,
347+
ActivityId: activityID,
348+
ActivityType: s.tv.ActivityType(),
349+
Identity: s.tv.WorkerIdentity(),
350+
Input: defaultInput,
351+
TaskQueue: &taskqueuepb.TaskQueue{
352+
Name: taskQueue,
353+
},
354+
StartToCloseTimeout: startToCloseTimeout,
355+
ScheduleToCloseTimeout: scheduleToCloseTimeout,
356+
HeartbeatTimeout: heartbeatTimeout,
357+
RequestId: s.tv.RequestID(),
358+
Priority: priority,
359+
Header: defaultHeader,
360+
})
361+
require.NoError(t, err)
362+
363+
pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
364+
Namespace: namespace,
365+
TaskQueue: &taskqueuepb.TaskQueue{
366+
Name: taskQueue,
367+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
368+
},
369+
Identity: s.tv.WorkerIdentity(),
370+
})
371+
require.NoError(t, err)
372+
373+
nextRetryDelay := durationpb.New(1 * time.Second)
374+
_, err = s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{
375+
Namespace: s.Namespace().String(),
376+
TaskToken: pollTaskResp.TaskToken,
377+
Failure: &failurepb.Failure{
378+
Message: "retryable failure",
379+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
380+
NonRetryable: false,
381+
NextRetryDelay: nextRetryDelay,
382+
}},
383+
},
384+
})
385+
require.NoError(t, err)
386+
387+
describeResp, err := s.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{
388+
Namespace: s.Namespace().String(),
389+
ActivityId: activityID,
390+
RunId: startResp.GetRunId(),
391+
})
392+
require.NoError(t, err)
393+
394+
pollTaskResp, err = s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
395+
Namespace: namespace,
396+
TaskQueue: &taskqueuepb.TaskQueue{
397+
Name: taskQueue,
398+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
399+
},
400+
Identity: s.tv.WorkerIdentity(),
401+
})
402+
require.NoError(t, err)
403+
require.Equal(t, activityID, pollTaskResp.GetActivityId())
404+
require.Equal(t, namespace, pollTaskResp.GetWorkflowNamespace())
405+
protorequire.ProtoEqual(t, s.tv.ActivityType(), pollTaskResp.GetActivityType())
406+
require.Equal(t, startResp.GetRunId(), pollTaskResp.GetActivityRunId())
407+
protorequire.ProtoEqual(t, defaultInput, pollTaskResp.GetInput())
408+
require.False(t, pollTaskResp.GetStartedTime().AsTime().IsZero())
409+
require.False(t, pollTaskResp.GetScheduledTime().AsTime().IsZero())
410+
require.EqualValues(t, 2, pollTaskResp.Attempt)
411+
protorequire.ProtoEqual(t, startToCloseTimeout, pollTaskResp.GetStartToCloseTimeout())
412+
protorequire.ProtoEqual(t, scheduleToCloseTimeout, pollTaskResp.GetScheduleToCloseTimeout())
413+
protorequire.ProtoEqual(t, heartbeatTimeout, pollTaskResp.GetHeartbeatTimeout())
414+
protorequire.ProtoEqual(t, priority, pollTaskResp.GetPriority())
415+
protorequire.ProtoEqual(t, defaultHeader, pollTaskResp.GetHeader())
416+
require.NotNil(t, pollTaskResp.TaskToken)
417+
418+
expectedAttemptScheduledTime := timestamppb.New(
419+
describeResp.GetInfo().GetLastAttemptCompleteTime().AsTime().Add(nextRetryDelay.AsDuration()))
420+
protorequire.ProtoEqual(t, expectedAttemptScheduledTime, pollTaskResp.GetCurrentAttemptScheduledTime())
313421
})
314-
require.NoError(t, err)
315-
require.Equal(t, activityID, pollTaskResp.GetActivityId())
316-
require.Equal(t, namespace, pollTaskResp.GetWorkflowNamespace())
317-
protorequire.ProtoEqual(t, s.tv.ActivityType(), pollTaskResp.GetActivityType())
318-
require.Equal(t, startResp.GetRunId(), pollTaskResp.GetActivityRunId())
319-
protorequire.ProtoEqual(t, defaultInput, pollTaskResp.GetInput())
320-
require.False(t, pollTaskResp.GetStartedTime().AsTime().IsZero())
321-
require.False(t, pollTaskResp.GetScheduledTime().AsTime().IsZero())
322-
require.EqualValues(t, 1, pollTaskResp.Attempt)
323-
protorequire.ProtoEqual(t, startToCloseTimeout, pollTaskResp.GetStartToCloseTimeout())
324-
protorequire.ProtoEqual(t, scheduleToCloseTimeout, pollTaskResp.GetScheduleToCloseTimeout())
325-
protorequire.ProtoEqual(t, heartbeatTimeout, pollTaskResp.GetHeartbeatTimeout())
326-
protorequire.ProtoEqual(t, priority, pollTaskResp.GetPriority())
327-
protorequire.ProtoEqual(t, defaultHeader, pollTaskResp.GetHeader())
328-
require.NotNil(t, pollTaskResp.TaskToken)
329422
}
330423

331424
func (s *standaloneActivityTestSuite) TestStart() {

0 commit comments

Comments
 (0)