Skip to content

Commit 11698cd

Browse files
arinda-arifirainia
andauthored
fix: bug breaking deploy on resource manager failure (#500) (#508)
* refactor: change external optimus sensor task id similar with internal sensor task id (#491) * refactor: change external optimus sensor task name similar with internal sensor name * fix: cross optimus task dependencies in airflow is not properly * refactor: revert cross optimus sensor variable name Co-authored-by: Anwar Hidayat <anwarhidayat14@gmail.com> * fix: bug caused by error when getting dependencies from external resource manager * fix: bug on unknown dependencies being updated Co-authored-by: Arinda Arif <79823430+arinda-arif@users.noreply.github.com> Co-authored-by: Anwar Hidayat <15167551+irainia@users.noreply.github.com>
1 parent 5b3896d commit 11698cd

File tree

3 files changed

+33
-87
lines changed

3 files changed

+33
-87
lines changed

job/dependency_resolver.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,12 @@ func (d *dependencyResolver) getExternalDependenciesByJobName(ctx context.Contex
314314
return input1
315315
}
316316

317-
unresolvedStaticDependenciesPerJobName, unknownDependenciesFromInternal := d.getUnresolvedStaticDependencies(jobSpecs, internalJobDependencies)
318-
staticExternalDependencyPerJobName, unknownDependenciesFromExternal, err := d.externalDependencyResolver.FetchStaticExternalDependenciesPerJobName(ctx, unresolvedStaticDependenciesPerJobName)
317+
unresolvedStaticDependenciesPerJobName, unknownStaticDependenciesFromInternal := d.getUnresolvedStaticDependencies(jobSpecs, internalJobDependencies)
318+
staticExternalDependencyPerJobName, unknownStaticDependenciesFromExternal, err := d.externalDependencyResolver.FetchStaticExternalDependenciesPerJobName(ctx, unresolvedStaticDependenciesPerJobName)
319319
if err != nil {
320320
return nil, nil, err
321321
}
322-
unknownDependencies := mergeUnknownDependencies(unknownDependenciesFromInternal, unknownDependenciesFromExternal)
322+
unknownStaticDependencies := mergeUnknownDependencies(unknownStaticDependenciesFromInternal, unknownStaticDependenciesFromExternal)
323323

324324
unresolvedInferredDependenciesPerJobName, err := d.getUnresolvedInferredDependencies(ctx, projectID, jobSpecs, internalJobDependencies)
325325
if err != nil {
@@ -331,29 +331,17 @@ func (d *dependencyResolver) getExternalDependenciesByJobName(ctx context.Contex
331331
}
332332

333333
externalDependenciesByJobName := mergeExternalDependencies(staticExternalDependencyPerJobName, inferredExternalDependencyPerJobName)
334-
return externalDependenciesByJobName, unknownDependencies, nil
334+
return externalDependenciesByJobName, unknownStaticDependencies, nil
335335
}
336336

337337
func (d *dependencyResolver) getUnresolvedStaticDependencies(jobSpecs []models.JobSpec, internalJobDependencies map[uuid.UUID][]models.JobSpec) (map[string][]models.UnresolvedJobDependency, []models.UnknownDependency) {
338-
convertToUnknownDependencies := func(unknownStaticDependencyNames []string, jobName, projectName string) []models.UnknownDependency {
339-
unknownDependencies := make([]models.UnknownDependency, len(unknownStaticDependencyNames))
340-
for i := 0; i < len(unknownStaticDependencyNames); i++ {
341-
unknownDependencies[i] = models.UnknownDependency{
342-
JobName: jobName,
343-
DependencyProjectName: projectName,
344-
DependencyJobName: unknownStaticDependencyNames[i],
345-
}
346-
}
347-
return unknownDependencies
348-
}
349-
350338
unresolvedStaticDependenciesPerJobName := make(map[string][]models.UnresolvedJobDependency)
351339
var unknownDependencies []models.UnknownDependency
352340
for _, jobSpec := range jobSpecs {
353341
resolvedDependencies := internalJobDependencies[jobSpec.ID]
354342
unresolvedStaticDependencies, unknownStaticDependencyNames := d.identifyUnresolvedStaticDependencies(jobSpec.Dependencies, resolvedDependencies)
355343

356-
unknownDependenciesPerJob := convertToUnknownDependencies(unknownStaticDependencyNames, jobSpec.Name, jobSpec.GetProjectSpec().Name)
344+
unknownDependenciesPerJob := d.convertToUnknownDependencies(unknownStaticDependencyNames, jobSpec.Name, jobSpec.GetProjectSpec().Name)
357345
unknownDependencies = append(unknownDependencies, unknownDependenciesPerJob...)
358346

359347
if len(unresolvedStaticDependencies) > 0 {
@@ -363,6 +351,18 @@ func (d *dependencyResolver) getUnresolvedStaticDependencies(jobSpecs []models.J
363351
return unresolvedStaticDependenciesPerJobName, unknownDependencies
364352
}
365353

354+
func (*dependencyResolver) convertToUnknownDependencies(unknownStaticDependencyNames []string, jobName, projectName string) []models.UnknownDependency {
355+
unknownDependencies := make([]models.UnknownDependency, len(unknownStaticDependencyNames))
356+
for i := 0; i < len(unknownStaticDependencyNames); i++ {
357+
unknownDependencies[i] = models.UnknownDependency{
358+
JobName: jobName,
359+
DependencyProjectName: projectName,
360+
DependencyJobName: unknownStaticDependencyNames[i],
361+
}
362+
}
363+
return unknownDependencies
364+
}
365+
366366
func (*dependencyResolver) identifyUnresolvedStaticDependencies(jobDependencies map[string]models.JobSpecDependency, resolvedStaticDependencies []models.JobSpec) ([]models.UnresolvedJobDependency, []string) {
367367
var unresolvedStaticDependencies []models.UnresolvedJobDependency
368368
var unknownStaticDependencyNames []string

job/external_dependency_resolver.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ func (e *externalDependencyResolver) FetchInferredExternalDependenciesPerJobName
4545
}
4646
externalDependencyPerJobName := make(map[string]models.ExternalDependency)
4747
for jobName, filters := range unresolvedDependenciesPerJobName {
48-
optimusDependencies, err := e.fetchInferredOptimusDependencies(ctx, filters)
49-
if err != nil {
50-
return nil, err
51-
}
48+
optimusDependencies := e.fetchInferredOptimusDependencies(ctx, filters)
5249
// external dependency types other than optimus will be called in this line, and used in the line below
5350
externalDependencyPerJobName[jobName] = models.ExternalDependency{
5451
OptimusDependencies: optimusDependencies,
@@ -65,11 +62,9 @@ func (e *externalDependencyResolver) FetchStaticExternalDependenciesPerJobName(c
6562
var unknownDependencies []models.UnknownDependency
6663
externalDependencyPerJobName := make(map[string]models.ExternalDependency)
6764
for jobName, toBeResolvedDependency := range unresolvedDependenciesPerJobName {
68-
optimusDependencies, unresolvedFromExternal, err := e.fetchStaticOptimusDependencies(ctx, toBeResolvedDependency)
69-
if err != nil {
70-
return nil, nil, err
71-
}
72-
unknownDependencies = convertUnresolvedToUnknownDependencies(jobName, unresolvedFromExternal)
65+
optimusDependencies, unresolvedFromExternal := e.fetchStaticOptimusDependencies(ctx, toBeResolvedDependency)
66+
unknownDependenciesFromUnresolved := e.convertUnresolvedToUnknownDependencies(jobName, unresolvedFromExternal)
67+
unknownDependencies = append(unknownDependencies, unknownDependenciesFromUnresolved...)
7368
// external dependency types other than optimus will be called in this line, and used in the line below
7469
externalDependencyPerJobName[jobName] = models.ExternalDependency{
7570
OptimusDependencies: optimusDependencies,
@@ -79,7 +74,7 @@ func (e *externalDependencyResolver) FetchStaticExternalDependenciesPerJobName(c
7974
return externalDependencyPerJobName, unknownDependencies, nil
8075
}
8176

82-
func convertUnresolvedToUnknownDependencies(jobName string, unresolvedDependencies []models.UnresolvedJobDependency) []models.UnknownDependency {
77+
func (*externalDependencyResolver) convertUnresolvedToUnknownDependencies(jobName string, unresolvedDependencies []models.UnresolvedJobDependency) []models.UnknownDependency {
8378
unknownDependencies := make([]models.UnknownDependency, len(unresolvedDependencies))
8479
for i := 0; i < len(unresolvedDependencies); i++ {
8580
unknownDependencies[i] = models.UnknownDependency{
@@ -91,43 +86,37 @@ func convertUnresolvedToUnknownDependencies(jobName string, unresolvedDependenci
9186
return unknownDependencies
9287
}
9388

94-
func (e *externalDependencyResolver) fetchInferredOptimusDependencies(ctx context.Context, unresolvedDependencies []models.UnresolvedJobDependency) ([]models.OptimusDependency, error) {
89+
func (e *externalDependencyResolver) fetchInferredOptimusDependencies(ctx context.Context, unresolvedDependencies []models.UnresolvedJobDependency) []models.OptimusDependency {
9590
var optimusDependencies []models.OptimusDependency
9691
for _, unresolvedDependency := range unresolvedDependencies {
97-
dependencies, err := e.fetchOptimusDependencies(ctx, unresolvedDependency)
98-
if err != nil {
99-
return nil, err
100-
}
92+
dependencies := e.fetchOptimusDependencies(ctx, unresolvedDependency)
10193
optimusDependencies = append(optimusDependencies, dependencies...)
10294
}
103-
return optimusDependencies, nil
95+
return optimusDependencies
10496
}
10597

106-
func (e *externalDependencyResolver) fetchStaticOptimusDependencies(ctx context.Context, unresolvedDependencies []models.UnresolvedJobDependency) ([]models.OptimusDependency, []models.UnresolvedJobDependency, error) {
98+
func (e *externalDependencyResolver) fetchStaticOptimusDependencies(ctx context.Context, unresolvedDependencies []models.UnresolvedJobDependency) ([]models.OptimusDependency, []models.UnresolvedJobDependency) {
10799
var optimusDependencies []models.OptimusDependency
108100
var unresolvedFromExternal []models.UnresolvedJobDependency
109101
for _, toBeResolvedDependency := range unresolvedDependencies {
110-
dependencies, err := e.fetchOptimusDependencies(ctx, toBeResolvedDependency)
111-
if err != nil {
112-
return nil, nil, err
113-
}
102+
dependencies := e.fetchOptimusDependencies(ctx, toBeResolvedDependency)
114103
if len(dependencies) == 0 {
115104
unresolvedFromExternal = append(unresolvedFromExternal, toBeResolvedDependency)
116105
continue
117106
}
118107
optimusDependencies = append(optimusDependencies, dependencies...)
119108
}
120-
return optimusDependencies, unresolvedFromExternal, nil
109+
return optimusDependencies, unresolvedFromExternal
121110
}
122111

123-
func (e *externalDependencyResolver) fetchOptimusDependencies(ctx context.Context, unresolvedDependency models.UnresolvedJobDependency) ([]models.OptimusDependency, error) {
112+
func (e *externalDependencyResolver) fetchOptimusDependencies(ctx context.Context, unresolvedDependency models.UnresolvedJobDependency) []models.OptimusDependency {
124113
var dependencies []models.OptimusDependency
125-
for _, getter := range e.optimusResourceManagers {
126-
deps, err := getter.GetOptimusDependencies(ctx, unresolvedDependency)
114+
for _, manager := range e.optimusResourceManagers {
115+
deps, err := manager.GetOptimusDependencies(ctx, unresolvedDependency)
127116
if err != nil {
128-
return nil, err
117+
continue
129118
}
130119
dependencies = append(dependencies, deps...)
131120
}
132-
return dependencies, nil
121+
return dependencies
133122
}

job/external_dependency_resolver_test.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package job_test
22

33
import (
44
"context"
5-
"errors"
65
"testing"
76

87
"github.com/stretchr/testify/suite"
@@ -35,26 +34,6 @@ func (e *ExternalDependencyResolverTestSuite) TestFetchInferredExternalDependenc
3534
e.Error(actualError)
3635
})
3736

38-
// TODO: this unit test need to be changed if the dependency is not from Optimus only, but currently it is
39-
e.Run("should return nil and error if error when fetching optimus dependencies from optimus dependency getter", func() {
40-
optimusResourceManager := mock.NewResourceManager(e.T())
41-
optimusResourceManagers := []resourcemgr.ResourceManager{optimusResourceManager}
42-
externalDependencyResolver := job.NewTestExternalDependencyResolver(optimusResourceManagers)
43-
44-
ctx := context.Background()
45-
unresolvedDependencies := []models.UnresolvedJobDependency{
46-
{ResourceDestination: "urn1"},
47-
}
48-
unresolvedDependenciesPerJobName := map[string][]models.UnresolvedJobDependency{"job1": {unresolvedDependencies[0]}}
49-
50-
optimusResourceManager.On("GetOptimusDependencies", ctx, unresolvedDependencies[0]).Return(nil, errors.New("random error"))
51-
52-
actualDependencies, actualError := externalDependencyResolver.FetchInferredExternalDependenciesPerJobName(ctx, unresolvedDependenciesPerJobName)
53-
54-
e.Nil(actualDependencies)
55-
e.Error(actualError)
56-
})
57-
5837
e.Run("should return external dependency and nil if no error is encountered", func() {
5938
optimusResourceManager := mock.NewResourceManager(e.T())
6039
optimusResourceManagers := []resourcemgr.ResourceManager{optimusResourceManager}
@@ -110,28 +89,6 @@ func (e *ExternalDependencyResolverTestSuite) TestFetchStaticExternalDependencie
11089
e.Error(actualError)
11190
})
11291

113-
// TODO: this unit test need to be changed if the dependency is not from Optimus only, but currently it is
114-
e.Run("should return nil, nil and error if error when fetching optimus dependencies from optimus dependency getter", func() {
115-
optimusResourceManager := mock.NewResourceManager(e.T())
116-
optimusResourceManagers := []resourcemgr.ResourceManager{optimusResourceManager}
117-
externalDependencyResolver := job.NewTestExternalDependencyResolver(optimusResourceManagers)
118-
119-
ctx := context.Background()
120-
121-
unresolvedDependencies := []models.UnresolvedJobDependency{
122-
{ProjectName: "project2", JobName: "job2"},
123-
}
124-
unresolvedDependenciesPerJobName := map[string][]models.UnresolvedJobDependency{"job1": {unresolvedDependencies[0]}}
125-
126-
optimusResourceManager.On("GetOptimusDependencies", ctx, unresolvedDependencies[0]).Return(nil, errors.New("random error"))
127-
128-
actualExternalDependencies, actualUnknownDependencies, actualError := externalDependencyResolver.FetchStaticExternalDependenciesPerJobName(ctx, unresolvedDependenciesPerJobName)
129-
130-
e.Nil(actualExternalDependencies)
131-
e.Nil(actualUnknownDependencies)
132-
e.Error(actualError)
133-
})
134-
13592
e.Run("should return external dependency and nil if no error is encountered", func() {
13693
optimusResourceManager := mock.NewResourceManager(e.T())
13794
optimusResourceManagers := []resourcemgr.ResourceManager{optimusResourceManager}

0 commit comments

Comments
 (0)