Skip to content

Commit 739260a

Browse files
authored
🐛 [job] Wait for job to start before progressing with messages (#74)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Proprietary --> ### Description - the job manager was not handling the fact a job could be queued for some time and so, no messages would be accessible. ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 1105dc9 commit 739260a

File tree

8 files changed

+137
-19
lines changed

8 files changed

+137
-19
lines changed

changes/20240617160218.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:bug: [`job`] Wait for job to start before progressing with messages

utils/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ module github.com/ARM-software/embedded-development-services-client-utils/utils
33
go 1.22
44

55
require (
6-
github.com/ARM-software/embedded-development-services-client/client v1.31.3
7-
github.com/ARM-software/golang-utils/utils v1.66.1
6+
github.com/ARM-software/embedded-development-services-client/client v1.31.4
7+
github.com/ARM-software/golang-utils/utils v1.68.0
88
github.com/go-faker/faker/v4 v4.4.2
99
github.com/go-logr/logr v1.4.2
1010
github.com/golang/mock v1.6.0

utils/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
bitbucket.org/creachadair/stringset v0.0.9/go.mod h1:t+4WcQ4+PXTa8aQdNKe40ZP6iwesoMFWAxPGd3UGjyY=
2-
github.com/ARM-software/embedded-development-services-client/client v1.31.3 h1:OJTaRUnmfdVBZWaY6ALg3FQ0SRlbASM4bHmL97yxvsU=
3-
github.com/ARM-software/embedded-development-services-client/client v1.31.3/go.mod h1:DfoiGBGolbmt6mud6JG0fCEd2UpMyFaeYsB9d7cVSNo=
4-
github.com/ARM-software/golang-utils/utils v1.66.1 h1:fuaJ6uB5hbKXHVeaitZK5KeDCIryN0WAgkwnbbxUX70=
5-
github.com/ARM-software/golang-utils/utils v1.66.1/go.mod h1:PFxtFy3iRiGvmH6X9EUflQet+VcuUS3WZ0qD/rtTu2A=
2+
github.com/ARM-software/embedded-development-services-client/client v1.31.4 h1:sUtf53bEPaME7GBqPAFCBoVRDdAn6brZAuVnlYfsKns=
3+
github.com/ARM-software/embedded-development-services-client/client v1.31.4/go.mod h1:ZNNMwjBLtXKIUHed1p3EYy71CzFWeUv8C/HLgqlNvY0=
4+
github.com/ARM-software/golang-utils/utils v1.68.0 h1:Sp92FvRhGhzfnvIKHDdLb8slrhcidBNFzB9GkpcsN3E=
5+
github.com/ARM-software/golang-utils/utils v1.68.0/go.mod h1:pjIYW6jPUXH7/kxkZEPyzRwiux+NUG/BFJFn8zaJ/0A=
66
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
77
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
88
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=

utils/job/interfaces.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@ type IAsynchronousJob interface {
2929
GetSuccess() bool
3030
// GetStatus returns the state the job is in. This is for information only and should not be relied upon as likely to change. Use flags for implementing a state machine.
3131
GetStatus() string
32+
// GetQueued returns whether the job is being queued and has not started just yet
33+
GetQueued() bool
3234
}
3335

3436
// IJobManager defines a manager of asynchronous jobs
3537
type IJobManager interface {
3638
// HasJobCompleted calls the services to determine whether the job has completed.
3739
HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
38-
40+
// HasJobStarted calls the services to determine whether the job has started.
41+
HasJobStarted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
3942
// WaitForJobCompletion waits for a job to complete.
4043
WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error)
4144
}
Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package job
6+
package jobtest
77

88
import (
99
"github.com/go-faker/faker/v4"
@@ -18,6 +18,10 @@ type MockAsynchronousJob struct {
1818
failure bool
1919
}
2020

21+
func (m *MockAsynchronousJob) GetQueued() bool {
22+
return !m.done
23+
}
24+
2125
func (m *MockAsynchronousJob) FetchType() string {
2226
return "Mock Asynchronous Job"
2327
}
@@ -42,7 +46,7 @@ func (m *MockAsynchronousJob) GetStatus() string {
4246
return faker.Word()
4347
}
4448

45-
func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) {
49+
func newMockAsynchronousJob(done bool, failure bool) (*MockAsynchronousJob, error) {
4650
r, err := resourcetests.NewMockResource()
4751
if err != nil {
4852
return nil, err
@@ -54,14 +58,18 @@ func newMockAsynchronousJob(done bool, failure bool) (IAsynchronousJob, error) {
5458
}, nil
5559
}
5660

57-
func NewMockUndoneAsynchronousJob() (IAsynchronousJob, error) {
61+
func NewMockUndoneAsynchronousJob() (*MockAsynchronousJob, error) {
62+
return newMockAsynchronousJob(false, false)
63+
}
64+
65+
func NewMockQueuedAsynchronousJob() (*MockAsynchronousJob, error) {
5866
return newMockAsynchronousJob(false, false)
5967
}
6068

61-
func NewMockSuccessfulAsynchronousJob() (IAsynchronousJob, error) {
69+
func NewMockSuccessfulAsynchronousJob() (*MockAsynchronousJob, error) {
6270
return newMockAsynchronousJob(true, false)
6371
}
6472

65-
func NewMockFailedAsynchronousJob() (IAsynchronousJob, error) {
73+
func NewMockFailedAsynchronousJob() (*MockAsynchronousJob, error) {
6674
return newMockAsynchronousJob(true, true)
6775
}

utils/job/manager.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ import (
1717
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
1818
"github.com/ARM-software/golang-utils/utils/collection/pagination"
1919
"github.com/ARM-software/golang-utils/utils/commonerrors"
20+
"github.com/ARM-software/golang-utils/utils/logs"
2021
"github.com/ARM-software/golang-utils/utils/parallelisation"
22+
"github.com/ARM-software/golang-utils/utils/retry"
2123
)
2224

2325
type Manager struct {
@@ -53,6 +55,30 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono
5355
return
5456
}
5557

58+
func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) (err error) {
59+
err = parallelisation.DetermineContextError(ctx)
60+
if err != nil {
61+
return
62+
}
63+
64+
jobName, err := job.FetchName()
65+
if err != nil {
66+
return
67+
}
68+
notStartedError := fmt.Errorf("%w: job [%v] has not started", commonerrors.ErrCondition, jobName)
69+
err = retry.RetryOnError(ctx, logs.NewPlainLogrLoggerFromLoggers(logger), retry.DefaultExponentialBackoffRetryPolicyConfiguration(), func() error {
70+
started, subErr := m.HasJobStarted(ctx, job)
71+
if subErr != nil {
72+
return subErr
73+
}
74+
if started {
75+
return nil
76+
}
77+
return notStartedError
78+
}, fmt.Sprintf("Waiting for job [%v] to start...", jobName), notStartedError)
79+
return
80+
}
81+
5682
func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJob) (paginator pagination.IStreamPaginatorAndPageFetcher, err error) {
5783
paginator, err = m.messagesPaginatorFactory.Create(ctx, func(subCtx context.Context) (pagination.IStaticPageStream, error) {
5884
return m.FetchJobMessagesFirstPage(subCtx, job)
@@ -74,6 +100,10 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
74100
_ = messageLogger.Close()
75101
}
76102
}()
103+
err = m.waitForJobToStart(ctx, messageLogger, job)
104+
if err != nil {
105+
return
106+
}
77107
messagePaginator, err := m.createMessagePaginator(ctx, job)
78108
if err != nil {
79109
return
@@ -119,6 +149,36 @@ func (m *Manager) checkForMessageStreamExhaustion(ctx context.Context, paginator
119149
}
120150
}
121151

152+
func (m *Manager) HasJobStarted(ctx context.Context, job IAsynchronousJob) (started bool, err error) {
153+
err = parallelisation.DetermineContextError(ctx)
154+
if err != nil {
155+
return
156+
}
157+
if job == nil {
158+
err = fmt.Errorf("%w: missing job", commonerrors.ErrUndefined)
159+
return
160+
}
161+
jobName, err := job.FetchName()
162+
if err != nil {
163+
return
164+
}
165+
jobType := job.FetchType()
166+
jobStatus, resp, apierr := m.fetchJobStatusFunc(ctx, jobName)
167+
if resp != nil {
168+
_ = resp.Body.Close()
169+
}
170+
err = api.CheckAPICallSuccess(ctx, fmt.Sprintf("could not fetch %v [%v]'s status", jobType, jobName), resp, apierr)
171+
if err != nil {
172+
return
173+
}
174+
if jobStatus.GetDone() {
175+
started = true
176+
} else {
177+
started = !jobStatus.GetQueued()
178+
}
179+
return
180+
}
181+
122182
func (m *Manager) HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) {
123183
err = parallelisation.DetermineContextError(ctx)
124184
if err != nil {

utils/job/manager_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/stretchr/testify/require"
1919
"go.uber.org/goleak"
2020

21+
"github.com/ARM-software/embedded-development-services-client-utils/utils/job/jobtest"
2122
"github.com/ARM-software/embedded-development-services-client-utils/utils/logging"
2223
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
2324
pagination2 "github.com/ARM-software/embedded-development-services-client-utils/utils/pagination"
@@ -41,17 +42,22 @@ func TestManager_HasJobCompleted(t *testing.T) {
4142
expectedError: commonerrors.ErrUndefined,
4243
},
4344
{
44-
jobFunc: NewMockFailedAsynchronousJob,
45+
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
4546
expectCompleted: true,
4647
expectedError: commonerrors.ErrInvalid,
4748
},
4849
{
49-
jobFunc: NewMockUndoneAsynchronousJob,
50+
jobFunc: mapFunc(jobtest.NewMockUndoneAsynchronousJob),
5051
expectCompleted: false,
5152
expectedError: nil,
5253
},
5354
{
54-
jobFunc: NewMockSuccessfulAsynchronousJob,
55+
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
56+
expectCompleted: false,
57+
expectedError: nil,
58+
},
59+
{
60+
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
5561
expectCompleted: true,
5662
expectedError: nil,
5763
},
@@ -92,11 +98,11 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
9298
expectedError: commonerrors.ErrUndefined,
9399
},
94100
{
95-
jobFunc: NewMockFailedAsynchronousJob,
101+
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
96102
expectedError: nil,
97103
},
98104
{
99-
jobFunc: NewMockSuccessfulAsynchronousJob,
105+
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
100106
expectedError: nil,
101107
},
102108
}
@@ -132,18 +138,29 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
132138
}
133139
}
134140

141+
func mapFunc(f func() (*jobtest.MockAsynchronousJob, error)) func() (IAsynchronousJob, error) {
142+
return func() (IAsynchronousJob, error) {
143+
job, err := f()
144+
return job, err
145+
}
146+
}
147+
135148
func TestManager_WaitForJobCompletion(t *testing.T) {
136149
defer goleak.VerifyNone(t)
137150
tests := []struct {
138151
jobFunc func() (IAsynchronousJob, error)
139152
expectedError error
140153
}{
141154
{
142-
jobFunc: NewMockFailedAsynchronousJob,
155+
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
143156
expectedError: commonerrors.ErrInvalid,
144157
},
145158
{
146-
jobFunc: NewMockSuccessfulAsynchronousJob,
159+
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
160+
expectedError: commonerrors.ErrCondition,
161+
},
162+
{
163+
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
147164
expectedError: nil,
148165
},
149166
}

utils/mocks/mock_job.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)