Skip to content

Commit 06e8903

Browse files
authored
✨ Add the possibility to set a timeout for waiting job completion (#76)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Proprietary --> ### Description - add a timeout entry and determine retries based on this value ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 4cadac7 commit 06e8903

File tree

5 files changed

+97
-21
lines changed

5 files changed

+97
-21
lines changed

changes/20240619155022.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: Add the possibility to set a timeout for waiting job completion

utils/job/interfaces.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package job
88

99
import (
1010
"context"
11+
"time"
1112

1213
"github.com/ARM-software/embedded-development-services-client-utils/utils/resource"
1314
)
@@ -43,6 +44,8 @@ type IJobManager interface {
4344
HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
4445
// HasJobStarted calls the services to determine whether the job has started.
4546
HasJobStarted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
46-
// WaitForJobCompletion waits for a job to complete.
47+
// WaitForJobCompletion waits for a job to complete. Similar to WaitForJobCompletionWithTimeout but with a timeout set to 5 minutes.
4748
WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error)
49+
// WaitForJobCompletionWithTimeout waits for a job to complete but with timeout protection.
50+
WaitForJobCompletionWithTimeout(ctx context.Context, job IAsynchronousJob, timeout time.Duration) (err error)
4851
}

utils/job/manager.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package job
88
import (
99
"context"
1010
"fmt"
11+
"math"
1112
"net/http"
1213
"time"
1314

@@ -55,19 +56,24 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono
5556
return
5657
}
5758

58-
func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, jobState string, checkStateFunc func(context.Context, IAsynchronousJob) (bool, error)) (err error) {
59+
func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, jobState string, checkStateFunc func(context.Context, IAsynchronousJob) (bool, error), timeout time.Duration) (err error) {
5960
err = parallelisation.DetermineContextError(ctx)
6061
if err != nil {
6162
return
6263
}
6364

65+
subCtx, cancel := context.WithTimeout(ctx, timeout)
66+
defer cancel()
67+
retryCfg := retry.DefaultExponentialBackoffRetryPolicyConfiguration()
68+
retryCfg.RetryMax = int(float64(timeout.Milliseconds())/math.Max(float64(retryCfg.RetryWaitMin.Milliseconds()), 1)) + 1
69+
6470
jobName, err := job.FetchName()
6571
if err != nil {
6672
return
6773
}
6874
notStartedError := fmt.Errorf("%w: job [%v] has not reached the expected state [%v]", commonerrors.ErrCondition, jobName, jobState)
69-
err = retry.RetryOnError(ctx, logs.NewPlainLogrLoggerFromLoggers(logger), retry.DefaultExponentialBackoffRetryPolicyConfiguration(), func() error {
70-
inState, subErr := checkStateFunc(ctx, job)
75+
err = retry.RetryOnError(subCtx, logs.NewPlainLogrLoggerFromLoggers(logger), retryCfg, func() error {
76+
inState, subErr := checkStateFunc(subCtx, job)
7177
if subErr != nil {
7278
return subErr
7379
}
@@ -79,12 +85,12 @@ func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IA
7985
return
8086
}
8187

82-
func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) error {
83-
return waitForJobState(ctx, logger, job, "start", m.HasJobStarted)
88+
func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, timeout time.Duration) error {
89+
return waitForJobState(ctx, logger, job, "start", m.HasJobStarted, timeout)
8490
}
8591

86-
func (m *Manager) waitForJobToHaveMessagesAvailable(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) error {
87-
return waitForJobState(ctx, logger, job, "have messages", m.areThereMessages)
92+
func (m *Manager) waitForJobToHaveMessagesAvailable(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, timeout time.Duration) error {
93+
return waitForJobState(ctx, logger, job, "have messages", m.areThereMessages, timeout)
8894
}
8995

9096
func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJob) (paginator pagination.IStreamPaginatorAndPageFetcher, err error) {
@@ -94,7 +100,11 @@ func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJ
94100
return
95101
}
96102

97-
func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error) {
103+
func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) error {
104+
return m.WaitForJobCompletionWithTimeout(ctx, job, 5*time.Minute)
105+
}
106+
107+
func (m *Manager) WaitForJobCompletionWithTimeout(ctx context.Context, job IAsynchronousJob, timeout time.Duration) (err error) {
98108
err = parallelisation.DetermineContextError(ctx)
99109
if err != nil {
100110
return
@@ -108,15 +118,17 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
108118
_ = messageLogger.Close()
109119
}
110120
}()
111-
err = m.waitForJobToStart(ctx, messageLogger, job)
121+
subCtx, cancel := context.WithTimeout(ctx, timeout)
122+
defer cancel()
123+
err = m.waitForJobToStart(subCtx, messageLogger, job, timeout)
112124
if err != nil {
113125
return
114126
}
115-
err = m.waitForJobToHaveMessagesAvailable(ctx, messageLogger, job)
127+
err = m.waitForJobToHaveMessagesAvailable(subCtx, messageLogger, job, timeout)
116128
if err != nil {
117129
return
118130
}
119-
messagePaginator, err := m.createMessagePaginator(ctx, job)
131+
messagePaginator, err := m.createMessagePaginator(subCtx, job)
120132
if err != nil {
121133
return
122134
}
@@ -126,7 +138,7 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
126138
}
127139
}()
128140

129-
wait, gCtx := errgroup.WithContext(ctx)
141+
wait, gCtx := errgroup.WithContext(subCtx)
130142
wait.Go(func() error {
131143
return messageLogger.LogMessagesCollection(gCtx, messagePaginator)
132144

@@ -138,7 +150,7 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
138150
if err != nil {
139151
messageLogger.LogError(err)
140152
}
141-
_, err = m.HasJobCompleted(ctx, job)
153+
_, err = m.HasJobCompleted(subCtx, job)
142154
return
143155
}
144156

utils/job/manager_test.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
pagination2 "github.com/ARM-software/embedded-development-services-client-utils/utils/pagination"
2525
"github.com/ARM-software/golang-utils/utils/collection/pagination"
2626
"github.com/ARM-software/golang-utils/utils/commonerrors"
27+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
28+
"github.com/ARM-software/golang-utils/utils/field"
2729
)
2830

2931
func TestManager_HasJobCompleted(t *testing.T) {
@@ -77,7 +79,7 @@ func TestManager_HasJobCompleted(t *testing.T) {
7779
assert.Equal(t, test.expectCompleted, completed)
7880
} else {
7981
assert.Error(t, err)
80-
assert.True(t, commonerrors.Any(err, test.expectedError))
82+
errortest.AssertError(t, err, test.expectedError)
8183
assert.Equal(t, test.expectCompleted, completed)
8284
}
8385
})
@@ -132,7 +134,7 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
132134
assert.True(t, messagePaginator.IsRunningDry())
133135
} else {
134136
assert.Error(t, err)
135-
assert.True(t, commonerrors.Any(err, test.expectedError))
137+
errortest.AssertError(t, err, test.expectedError)
136138
}
137139
})
138140
}
@@ -149,15 +151,17 @@ func TestManager_WaitForJobCompletion(t *testing.T) {
149151
defer goleak.VerifyNone(t)
150152
tests := []struct {
151153
jobFunc func() (IAsynchronousJob, error)
152-
expectedError error
154+
expectedError []error
155+
timeout *time.Duration
153156
}{
154157
{
155158
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
156-
expectedError: commonerrors.ErrInvalid,
159+
expectedError: []error{commonerrors.ErrInvalid},
157160
},
158161
{
159162
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
160-
expectedError: commonerrors.ErrCondition,
163+
expectedError: []error{commonerrors.ErrCondition, commonerrors.ErrTimeout, commonerrors.ErrCancelled},
164+
timeout: field.ToOptionalDuration(500 * time.Millisecond),
161165
},
162166
{
163167
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
@@ -178,17 +182,58 @@ func TestManager_WaitForJobCompletion(t *testing.T) {
178182

179183
require.NoError(t, err)
180184
require.NotNil(t, factory)
181-
err = factory.WaitForJobCompletion(context.TODO(), job)
185+
if test.timeout == nil {
186+
err = factory.WaitForJobCompletion(context.TODO(), job)
187+
} else {
188+
err = factory.WaitForJobCompletionWithTimeout(context.TODO(), job, *test.timeout)
189+
}
182190
if test.expectedError == nil {
183191
assert.NoError(t, err)
184192
} else {
185193
assert.Error(t, err)
186-
assert.True(t, commonerrors.Any(err, test.expectedError))
194+
errortest.AssertError(t, err, test.expectedError...)
187195
}
188196
})
189197
}
190198
}
191199

200+
func TestManager_WaitForJobCompletionTimeout(t *testing.T) {
201+
defer goleak.VerifyNone(t)
202+
tests := []struct {
203+
jobFunc func() (IAsynchronousJob, error)
204+
}{
205+
{
206+
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
207+
},
208+
{
209+
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
210+
},
211+
{
212+
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
213+
},
214+
}
215+
for i := range tests {
216+
test := tests[i]
217+
218+
t.Run(fmt.Sprintf("#%v", i), func(t *testing.T) {
219+
// t.Parallel()
220+
logger, err := logging.NewStandardClientLogger(fmt.Sprintf("test #%v", i), nil)
221+
require.NoError(t, err)
222+
loggerF := messages.NewMessageLoggerFactory(logger, false, time.Nanosecond)
223+
job, err := test.jobFunc()
224+
runOut := time.Nanosecond
225+
factory, err := newMockJobManager(loggerF, time.Nanosecond, &runOut, job, err)
226+
227+
require.NoError(t, err)
228+
require.NotNil(t, factory)
229+
230+
err = factory.WaitForJobCompletionWithTimeout(context.TODO(), job, time.Nanosecond)
231+
assert.Error(t, err)
232+
errortest.AssertError(t, err, commonerrors.ErrTimeout, commonerrors.ErrCancelled, commonerrors.ErrCondition)
233+
})
234+
}
235+
}
236+
192237
func newMockJobManager(logger *messages.MessageLoggerFactory, backOffPeriod time.Duration, messagePaginatorRunOutTimeout *time.Duration, job IAsynchronousJob, errToReturn error) (*Manager, error) {
193238
pageNumber := rand.Intn(50) //nolint:gosec //causes G404: Use of weak random number generator
194239
messageStream := messages.NewMockMessagePaginatorFactory(pageNumber)

utils/mocks/mock_job.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)