Skip to content

Commit f042f1e

Browse files
authored
✨ Job Management (#12)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Proprietary --> ### Description Added utilities for managing jobs ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update). Co-authored-by: acabarbaye <[email protected]>
1 parent 3a2c014 commit f042f1e

File tree

15 files changed

+787
-24
lines changed

15 files changed

+787
-24
lines changed

changes/20230103145501.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: Added utilities for dealing with jobs

utils/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/stretchr/testify v1.8.1
1212
go.uber.org/atomic v1.10.0
1313
go.uber.org/goleak v1.2.0
14+
golang.org/x/sync v0.1.0
1415
)
1516

1617
require (
@@ -55,7 +56,6 @@ require (
5556
github.com/yusufpapurcu/wmi v1.2.2 // indirect
5657
golang.org/x/net v0.4.0 // indirect
5758
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
58-
golang.org/x/sync v0.1.0 // indirect
5959
golang.org/x/sys v0.3.0 // indirect
6060
golang.org/x/text v0.5.0 // indirect
6161
google.golang.org/appengine v1.6.7 // indirect

utils/job/interfaces.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (C) 2020-2023 Arm Limited or its affiliates and Contributors. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
// Package job defines utilities for managing jobs.
7+
package job
8+
9+
import (
10+
"context"
11+
12+
"github.com/ARM-software/embedded-development-services-client-utils/utils/resource"
13+
)
14+
15+
// Mocks are generated using `go generate ./...`
16+
// Add interfaces to the following command for a mock to be generated
17+
//go:generate mockgen -destination=../mocks/mock_$GOPACKAGE.go -package=mocks github.com/ARM-software/embedded-development-services-client-utils/utils/$GOPACKAGE IAsynchronousJob,IJobManager
18+
19+
// IAsynchronousJob defines a typical asynchronous job.
20+
type IAsynchronousJob interface {
21+
resource.IResource
22+
// GetDone returns whether a job has terminated.
23+
GetDone() bool
24+
// GetError returns whether a system error occurred.
25+
GetError() bool
26+
// GetFailure returns whether the job has failed.
27+
GetFailure() bool
28+
// GetSuccess returns whether the job has been successful.
29+
GetSuccess() bool
30+
// GetStatus returns the state the job is in. This is for information only and should not be relied upon as likely to change. Use flags for implementing a state machine.
31+
GetStatus() string
32+
}
33+
34+
// IJobManager defines a manager of asynchronous jobs
35+
type IJobManager interface {
36+
// HasJobCompleted calls the services to determine whether the job has completed.
37+
HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
38+
39+
// WaitForJobCompletion waits for a job to complete.
40+
WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error)
41+
}

utils/job/manager.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright (C) 2020-2023 Arm Limited or its affiliates and Contributors. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package job
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"net/http"
12+
"time"
13+
14+
"golang.org/x/sync/errgroup"
15+
16+
"github.com/ARM-software/embedded-development-services-client-utils/utils/api"
17+
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
18+
"github.com/ARM-software/golang-utils/utils/collection/pagination"
19+
"github.com/ARM-software/golang-utils/utils/commonerrors"
20+
"github.com/ARM-software/golang-utils/utils/parallelisation"
21+
)
22+
23+
type Manager struct {
24+
messageLoggerFactory messages.MessageLoggerFactory
25+
messagesPaginatorFactory messages.PaginatorFactory
26+
backOffPeriod time.Duration
27+
fetchJobStatusFunc func(ctx context.Context, jobName string) (IAsynchronousJob, *http.Response, error)
28+
}
29+
30+
func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error) {
31+
err = parallelisation.DetermineContextError(ctx)
32+
if err != nil {
33+
return
34+
}
35+
messageLogger, err := m.messageLoggerFactory.Create(ctx)
36+
if err != nil {
37+
return
38+
}
39+
defer func() {
40+
if messageLogger != nil {
41+
_ = messageLogger.Close()
42+
}
43+
}()
44+
messagePaginator, err := m.messagesPaginatorFactory.Create(ctx)
45+
if err != nil {
46+
return
47+
}
48+
defer func() {
49+
if messagePaginator != nil {
50+
_ = messagePaginator.Close()
51+
}
52+
}()
53+
54+
wait, gCtx := errgroup.WithContext(ctx)
55+
wait.Go(func() error {
56+
return messageLogger.LogMessagesCollection(gCtx, messagePaginator)
57+
58+
})
59+
wait.Go(func() error {
60+
return m.checkForMessageStreamExhaustion(gCtx, messagePaginator, job)
61+
})
62+
err = wait.Wait()
63+
if err != nil {
64+
messageLogger.LogError(err)
65+
}
66+
_, err = m.HasJobCompleted(ctx, job)
67+
return
68+
}
69+
70+
func (m *Manager) checkForMessageStreamExhaustion(ctx context.Context, paginator pagination.IGenericStreamPaginator, job IAsynchronousJob) error {
71+
72+
for {
73+
err := parallelisation.DetermineContextError(ctx)
74+
if err != nil {
75+
return err
76+
}
77+
completed, err := m.HasJobCompleted(ctx, job)
78+
if commonerrors.Any(err, commonerrors.ErrUndefined) {
79+
return err
80+
}
81+
if completed {
82+
err = paginator.DryUp()
83+
return err
84+
}
85+
parallelisation.SleepWithContext(ctx, m.backOffPeriod)
86+
}
87+
}
88+
89+
func (m *Manager) HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error) {
90+
err = parallelisation.DetermineContextError(ctx)
91+
if err != nil {
92+
return
93+
}
94+
if job == nil {
95+
err = fmt.Errorf("%w: missing job", commonerrors.ErrUndefined)
96+
return
97+
}
98+
jobName, err := job.FetchName()
99+
if err != nil {
100+
return
101+
}
102+
jobType := job.FetchType()
103+
jobStatus, resp, apierr := m.fetchJobStatusFunc(ctx, jobName)
104+
if resp != nil {
105+
_ = resp.Body.Close()
106+
}
107+
err = api.CheckAPICallSuccess(ctx, fmt.Sprintf("could not fetch %v [%v]'s status", jobType, jobName), resp, apierr)
108+
if err != nil {
109+
return
110+
}
111+
if jobStatus.GetDone() {
112+
completed = true
113+
}
114+
if jobStatus.GetError() {
115+
err = fmt.Errorf("%w: %v [%v] errored: %v", commonerrors.ErrUnexpected, jobType, jobName, jobStatus.GetStatus())
116+
return
117+
}
118+
if jobStatus.GetFailure() {
119+
err = fmt.Errorf("%w: %v [%v] failed: %v", commonerrors.ErrInvalid, jobType, jobName, jobStatus.GetStatus())
120+
return
121+
}
122+
if jobStatus.GetSuccess() {
123+
return
124+
}
125+
if completed {
126+
err = fmt.Errorf("%w: %v [%v] completed but without success: %v", commonerrors.ErrUnexpected, jobType, jobName, jobStatus.GetStatus())
127+
return
128+
}
129+
return
130+
}
131+
132+
// NewJobManager creates a new job manager.
133+
func NewJobManager(logger *messages.MessageLoggerFactory, backOffPeriod time.Duration,
134+
fetchJobStatusFunc func(ctx context.Context, jobName string) (IAsynchronousJob, *http.Response, error),
135+
fetchFirstJobMessagesPageFunc func(context.Context) (pagination.IStaticPageStream, error),
136+
fetchNextJobMessagesPageFunc func(context.Context, pagination.IStaticPage) (pagination.IStaticPage, error),
137+
fetchFutureJobMessagesPageFunc func(context.Context, pagination.IStaticPageStream) (pagination.IStaticPageStream, error)) (IJobManager, error) {
138+
return newJobManagerFromMessageFactory(logger, backOffPeriod, fetchJobStatusFunc, messages.NewPaginatorFactory(messages.DefaultStreamExhaustionGracePeriod, backOffPeriod, fetchFirstJobMessagesPageFunc, fetchNextJobMessagesPageFunc, fetchFutureJobMessagesPageFunc))
139+
}
140+
141+
func newJobManagerFromMessageFactory(logger *messages.MessageLoggerFactory, backOffPeriod time.Duration,
142+
fetchJobStatusFunc func(ctx context.Context, jobName string) (IAsynchronousJob, *http.Response, error),
143+
messagePaginator *messages.PaginatorFactory) (*Manager, error) {
144+
if logger == nil {
145+
return nil, commonerrors.ErrNoLogger
146+
}
147+
if messagePaginator == nil {
148+
return nil, fmt.Errorf("%w: missing paginator factory", commonerrors.ErrUndefined)
149+
}
150+
return &Manager{
151+
messageLoggerFactory: *logger,
152+
messagesPaginatorFactory: *messagePaginator,
153+
backOffPeriod: backOffPeriod,
154+
fetchJobStatusFunc: fetchJobStatusFunc,
155+
}, nil
156+
}

utils/job/manager_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (C) 2020-2023 Arm Limited or its affiliates and Contributors. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package job
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"net/http"
12+
"net/http/httptest"
13+
"testing"
14+
"time"
15+
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
18+
"go.uber.org/goleak"
19+
20+
"github.com/ARM-software/embedded-development-services-client-utils/utils/logging"
21+
"github.com/ARM-software/embedded-development-services-client-utils/utils/messages"
22+
"github.com/ARM-software/golang-utils/utils/commonerrors"
23+
)
24+
25+
func TestManager_HasJobCompleted(t *testing.T) {
26+
logger, err := logging.NewStandardClientLogger("test", nil)
27+
require.NoError(t, err)
28+
loggerF := messages.NewMessageLoggerFactory(logger, false, time.Nanosecond)
29+
30+
tests := []struct {
31+
jobFunc func() (IAsynchronousJob, error)
32+
expectCompleted bool
33+
expectedError error
34+
}{
35+
{
36+
jobFunc: func() (IAsynchronousJob, error) { return nil, nil },
37+
expectCompleted: false,
38+
expectedError: commonerrors.ErrUndefined,
39+
},
40+
{
41+
jobFunc: NewMockFailedAsynchronousJob,
42+
expectCompleted: true,
43+
expectedError: commonerrors.ErrInvalid,
44+
},
45+
{
46+
jobFunc: NewMockUndoneAsynchronousJob,
47+
expectCompleted: false,
48+
expectedError: nil,
49+
},
50+
{
51+
jobFunc: NewMockSuccessfulAsynchronousJob,
52+
expectCompleted: true,
53+
expectedError: nil,
54+
},
55+
}
56+
for i := range tests {
57+
test := tests[i]
58+
59+
t.Run(fmt.Sprintf("#%v", i), func(t *testing.T) {
60+
defer goleak.VerifyNone(t)
61+
job, err := test.jobFunc()
62+
factory, err := newJobManagerFromMessageFactory(loggerF, 100*time.Millisecond, func(context.Context, string) (IAsynchronousJob, *http.Response, error) {
63+
return job, httptest.NewRecorder().Result(), err
64+
}, messages.NewMockMessagePaginatorFactory())
65+
require.NoError(t, err)
66+
require.NotNil(t, factory)
67+
completed, err := factory.HasJobCompleted(context.TODO(), job)
68+
if test.expectedError == nil {
69+
assert.NoError(t, err)
70+
assert.Equal(t, test.expectCompleted, completed)
71+
} else {
72+
assert.Error(t, err)
73+
assert.True(t, commonerrors.Any(err, test.expectedError))
74+
assert.Equal(t, test.expectCompleted, completed)
75+
}
76+
})
77+
}
78+
}
79+
80+
func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
81+
logger, err := logging.NewStandardClientLogger("test", nil)
82+
require.NoError(t, err)
83+
loggerF := messages.NewMessageLoggerFactory(logger, false, time.Nanosecond)
84+
85+
tests := []struct {
86+
jobFunc func() (IAsynchronousJob, error)
87+
expectedError error
88+
}{
89+
{
90+
jobFunc: func() (IAsynchronousJob, error) { return nil, nil },
91+
expectedError: commonerrors.ErrUndefined,
92+
},
93+
{
94+
jobFunc: NewMockFailedAsynchronousJob,
95+
expectedError: nil,
96+
},
97+
{
98+
jobFunc: NewMockSuccessfulAsynchronousJob,
99+
expectedError: nil,
100+
},
101+
}
102+
for i := range tests {
103+
test := tests[i]
104+
105+
t.Run(fmt.Sprintf("#%v", i), func(t *testing.T) {
106+
defer goleak.VerifyNone(t)
107+
job, err := test.jobFunc()
108+
factory, err := newJobManagerFromMessageFactory(loggerF, 100*time.Millisecond, func(context.Context, string) (IAsynchronousJob, *http.Response, error) {
109+
return job, httptest.NewRecorder().Result(), err
110+
}, messages.NewMockMessagePaginatorFactory())
111+
require.NoError(t, err)
112+
require.NotNil(t, factory)
113+
messagePaginator, err := factory.messagesPaginatorFactory.Create(context.TODO())
114+
assert.False(t, messagePaginator.IsRunningDry())
115+
116+
err = factory.checkForMessageStreamExhaustion(context.TODO(), messagePaginator, job)
117+
if test.expectedError == nil {
118+
assert.NoError(t, err)
119+
assert.True(t, messagePaginator.IsRunningDry())
120+
} else {
121+
assert.Error(t, err)
122+
assert.True(t, commonerrors.Any(err, test.expectedError))
123+
assert.False(t, messagePaginator.IsRunningDry())
124+
}
125+
})
126+
}
127+
}
128+
129+
func TestManager_WaitForJobCompletion(t *testing.T) {
130+
defer goleak.VerifyNone(t)
131+
tests := []struct {
132+
jobFunc func() (IAsynchronousJob, error)
133+
expectedError error
134+
}{
135+
{
136+
jobFunc: NewMockFailedAsynchronousJob,
137+
expectedError: commonerrors.ErrInvalid,
138+
},
139+
{
140+
jobFunc: NewMockSuccessfulAsynchronousJob,
141+
expectedError: nil,
142+
},
143+
}
144+
for i := range tests {
145+
test := tests[i]
146+
147+
t.Run(fmt.Sprintf("#%v", i), func(t *testing.T) {
148+
// t.Parallel()
149+
logger, err := logging.NewStandardClientLogger(fmt.Sprintf("test #%v", i), nil)
150+
require.NoError(t, err)
151+
loggerF := messages.NewMessageLoggerFactory(logger, false, time.Nanosecond)
152+
job, err := test.jobFunc()
153+
factory, err := newJobManagerFromMessageFactory(loggerF, time.Nanosecond, func(context.Context, string) (IAsynchronousJob, *http.Response, error) {
154+
return job, httptest.NewRecorder().Result(), err
155+
}, messages.NewMockMessagePaginatorFactory().UpdateRunOutTimeout(time.Nanosecond))
156+
require.NoError(t, err)
157+
require.NotNil(t, factory)
158+
err = factory.WaitForJobCompletion(context.TODO(), job)
159+
if test.expectedError == nil {
160+
assert.NoError(t, err)
161+
} else {
162+
assert.Error(t, err)
163+
assert.True(t, commonerrors.Any(err, test.expectedError))
164+
}
165+
})
166+
}
167+
}

0 commit comments

Comments
 (0)