Skip to content

Commit a88d4fa

Browse files
committed
adding tests
Signed-off-by: Nikola Jokic <jokicnikola07@gmail.com>
1 parent 5025f51 commit a88d4fa

File tree

6 files changed

+219
-83
lines changed

6 files changed

+219
-83
lines changed

internal/lookout/conversions/convert_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ var (
4848
Pending: &baseTimeSwagger,
4949
RunID: "run-id",
5050
Started: &baseTimeSwagger,
51+
IngressAddresses: map[string]string{
52+
"80": "address1.svc.cluster.local",
53+
},
5154
},
5255
},
5356
State: string(lookout.JobFailed),
@@ -84,6 +87,9 @@ var (
8487
Pending: model.NewPostgreSQLTime(&baseTime),
8588
RunId: "run-id",
8689
Started: model.NewPostgreSQLTime(&baseTime),
90+
IngressAddresses: map[int32]string{
91+
80: "address1.svc.cluster.local",
92+
},
8793
},
8894
},
8995
State: string(lookout.JobFailed),

internal/lookout/repository/getjobs_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2529,3 +2529,32 @@ func TestGetJobsByClusterOfLatestRun(t *testing.T) {
25292529
})
25302530
require.NoError(t, err)
25312531
}
2532+
2533+
func TestGetJobsIncludesIngressAddresses(t *testing.T) {
2534+
err := withGetJobsSetup(func(converter *instructions.InstructionConverter, store *lookoutdb.LookoutDb, repo *SqlGetJobsRepository, testClock *clock.FakeClock) error {
2535+
runId := uuid.NewString()
2536+
ingressAddresses := map[int32]string{
2537+
80: "ingress.example.com",
2538+
}
2539+
2540+
job := NewJobSimulatorWithClock(converter, store, testClock).
2541+
Submit(queue, jobSet, owner, namespace, baseTime, basicJobOpts).
2542+
Lease(runId, cluster, node, baseTime).
2543+
Pending(runId, cluster, baseTime).
2544+
Running(runId, node, baseTime.Add(time.Minute)).
2545+
IngressInfo(runId, ingressAddresses, baseTime.Add(2*time.Minute)).
2546+
RunSucceeded(runId, baseTime.Add(3*time.Minute)).
2547+
Succeeded(baseTime.Add(3 * time.Minute)).
2548+
Build().
2549+
Job()
2550+
2551+
result, err := repo.GetJobs(armadacontext.TODO(), []*model.Filter{}, false, &model.Order{}, 0, 10)
2552+
require.NoError(t, err)
2553+
require.Len(t, result.Jobs, 1)
2554+
assert.Equal(t, job, result.Jobs[0])
2555+
require.Len(t, result.Jobs[0].Runs, 1)
2556+
assert.Equal(t, ingressAddresses, result.Jobs[0].Runs[0].IngressAddresses)
2557+
return nil
2558+
})
2559+
require.NoError(t, err)
2560+
}

internal/lookout/repository/util.go

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,16 @@ type JobOptions struct {
5555
}
5656

5757
type runPatch struct {
58-
runId string
59-
cluster *string
60-
exitCode *int32
61-
finished *time.Time
62-
jobRunState lookout.JobRunState
63-
node *string
64-
leased *time.Time
65-
pending *time.Time
66-
started *time.Time
58+
runId string
59+
cluster *string
60+
exitCode *int32
61+
finished *time.Time
62+
jobRunState lookout.JobRunState
63+
node *string
64+
leased *time.Time
65+
pending *time.Time
66+
started *time.Time
67+
ingressAddresses map[int32]string
6768
}
6869

6970
func NewJobSimulator(converter *instructions.InstructionConverter, store *lookoutdb.LookoutDb) *JobSimulator {
@@ -298,6 +299,27 @@ func (js *JobSimulator) Running(runId string, node string, timestamp time.Time)
298299
return js
299300
}
300301

302+
func (js *JobSimulator) IngressInfo(runId string, ingressAddresses map[int32]string, timestamp time.Time) *JobSimulator {
303+
ts := timestampOrNow(timestamp)
304+
ingressEvent := &armadaevents.EventSequence_Event{
305+
Created: ts,
306+
Event: &armadaevents.EventSequence_Event_StandaloneIngressInfo{
307+
StandaloneIngressInfo: &armadaevents.StandaloneIngressInfo{
308+
IngressAddresses: ingressAddresses,
309+
JobId: js.jobId,
310+
RunId: runId,
311+
},
312+
},
313+
}
314+
js.events = append(js.events, ingressEvent)
315+
316+
js.updateRun(js.job, &runPatch{
317+
runId: runId,
318+
ingressAddresses: ingressAddresses,
319+
})
320+
return js
321+
}
322+
301323
func (js *JobSimulator) RunSucceeded(runId string, timestamp time.Time) *JobSimulator {
302324
ts := timestampOrNow(timestamp)
303325
succeeded := protoutil.ToStdTime(ts)
@@ -667,15 +689,16 @@ func (js *JobSimulator) updateRun(job *model.Job, patch *runPatch) {
667689
cluster = *patch.cluster
668690
}
669691
job.Runs = append(job.Runs, &model.Run{
670-
Cluster: cluster,
671-
ExitCode: patch.exitCode,
672-
Finished: model.NewPostgreSQLTime(patch.finished),
673-
JobRunState: lookout.JobRunStateOrdinalMap[patch.jobRunState],
674-
Node: patch.node,
675-
Leased: model.NewPostgreSQLTime(patch.leased),
676-
Pending: model.NewPostgreSQLTime(patch.pending),
677-
RunId: patch.runId,
678-
Started: model.NewPostgreSQLTime(patch.started),
692+
Cluster: cluster,
693+
ExitCode: patch.exitCode,
694+
Finished: model.NewPostgreSQLTime(patch.finished),
695+
JobRunState: lookout.JobRunStateOrdinalMap[patch.jobRunState],
696+
Node: patch.node,
697+
Leased: model.NewPostgreSQLTime(patch.leased),
698+
Pending: model.NewPostgreSQLTime(patch.pending),
699+
RunId: patch.runId,
700+
Started: model.NewPostgreSQLTime(patch.started),
701+
IngressAddresses: patch.ingressAddresses,
679702
})
680703
job.RuntimeSeconds = calculateJobRuntime(model.NewPostgreSQLTime(patch.started), model.NewPostgreSQLTime(patch.finished), js.clock)
681704
}
@@ -703,6 +726,9 @@ func patchRun(run *model.Run, patch *runPatch) {
703726
if patch.started != nil {
704727
run.Started = model.NewPostgreSQLTime(patch.started)
705728
}
729+
if patch.ingressAddresses != nil {
730+
run.IngressAddresses = patch.ingressAddresses
731+
}
706732
}
707733

708734
func prefixAnnotations(prefix string, annotations map[string]string) map[string]string {

internal/lookoutingester/instructions/instructions_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,26 @@ var expectedCancelledRun = model.UpdateJobRunInstruction{
173173
JobRunState: pointer.Int32(lookout.JobRunCancelledOrdinal),
174174
}
175175

176+
var standaloneIngressAddresses = map[int32]string{
177+
80: "ingress.example.com",
178+
}
179+
180+
var standaloneIngressInfoEvent = &armadaevents.EventSequence_Event{
181+
Created: testfixtures.BaseTimeProto,
182+
Event: &armadaevents.EventSequence_Event_StandaloneIngressInfo{
183+
StandaloneIngressInfo: &armadaevents.StandaloneIngressInfo{
184+
JobId: testfixtures.JobId,
185+
RunId: testfixtures.RunId,
186+
IngressAddresses: standaloneIngressAddresses,
187+
},
188+
},
189+
}
190+
191+
var expectedStandaloneIngressRun = model.UpdateJobRunInstruction{
192+
RunId: testfixtures.RunId,
193+
IngressAddresses: standaloneIngressAddresses,
194+
}
195+
176196
func TestConvert(t *testing.T) {
177197
submit, err := testfixtures.DeepCopy(testfixtures.Submit)
178198
assert.NoError(t, err)
@@ -402,6 +422,16 @@ func TestConvert(t *testing.T) {
402422
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
403423
},
404424
},
425+
"job run ingress info": {
426+
events: &utils.EventsWithIds[*armadaevents.EventSequence]{
427+
Events: []*armadaevents.EventSequence{testfixtures.NewEventSequence(standaloneIngressInfoEvent)},
428+
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
429+
},
430+
expected: &model.InstructionSet{
431+
JobRunsToUpdate: []*model.UpdateJobRunInstruction{&expectedStandaloneIngressRun},
432+
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
433+
},
434+
},
405435
"job run preempted": {
406436
events: &utils.EventsWithIds[*armadaevents.EventSequence]{
407437
Events: []*armadaevents.EventSequence{testfixtures.NewEventSequence(testfixtures.JobRunPreempted)},

internal/lookoutingester/lookoutdb/insertion_test.go

Lines changed: 74 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package lookoutdb
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"regexp"
67
"sort"
@@ -45,6 +46,16 @@ var annotations = map[string]string{
4546
"b": "1",
4647
}
4748

49+
var (
50+
initialIngressAddresses = map[int32]string{
51+
80: "ingress.example.com",
52+
}
53+
updatedIngressAddresses = map[int32]string{
54+
80: "ingress.example.com",
55+
443: "ingress-backup.example.com",
56+
}
57+
)
58+
4859
var (
4960
baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z")
5061
updateTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:06.000Z")
@@ -87,17 +98,18 @@ type JobSpecRow struct {
8798
}
8899

89100
type JobRunRow struct {
90-
RunId string
91-
JobId string
92-
Cluster string
93-
Node *string
94-
Pending time.Time
95-
Started *time.Time
96-
Finished *time.Time
97-
JobRunState int32
98-
Error []byte
99-
Debug []byte
100-
ExitCode *int32
101+
RunId string
102+
JobId string
103+
Cluster string
104+
Node *string
105+
Pending time.Time
106+
Started *time.Time
107+
Finished *time.Time
108+
JobRunState int32
109+
Error []byte
110+
Debug []byte
111+
ExitCode *int32
112+
IngressAddresses map[int32]string
101113
}
102114

103115
type JobErrorRow struct {
@@ -116,21 +128,23 @@ func defaultInstructionSet() *model.InstructionSet {
116128
LastTransitionTimeSeconds: pointer.Int64(updateTime.Unix()),
117129
}},
118130
JobRunsToCreate: []*model.CreateJobRunInstruction{{
119-
RunId: RunId,
120-
JobId: JobId,
121-
Cluster: executorId,
122-
Leased: &updateTime,
123-
Pending: &updateTime,
124-
JobRunState: lookout.JobRunPendingOrdinal,
131+
RunId: RunId,
132+
JobId: JobId,
133+
Cluster: executorId,
134+
Leased: &updateTime,
135+
Pending: &updateTime,
136+
JobRunState: lookout.JobRunPendingOrdinal,
137+
IngressAddresses: cloneIngressAddresses(initialIngressAddresses),
125138
}},
126139
JobRunsToUpdate: []*model.UpdateJobRunInstruction{{
127-
RunId: RunId,
128-
Node: pointer.String(nodeName),
129-
Started: &startTime,
130-
Finished: &finishedTime,
131-
Debug: []byte(testfixtures.DebugMsg),
132-
JobRunState: pointer.Int32(lookout.JobRunSucceededOrdinal),
133-
ExitCode: pointer.Int32(0),
140+
RunId: RunId,
141+
Node: pointer.String(nodeName),
142+
Started: &startTime,
143+
Finished: &finishedTime,
144+
Debug: []byte(testfixtures.DebugMsg),
145+
JobRunState: pointer.Int32(lookout.JobRunSucceededOrdinal),
146+
ExitCode: pointer.Int32(0),
147+
IngressAddresses: cloneIngressAddresses(updatedIngressAddresses),
134148
}},
135149
JobErrorsToCreate: []*model.CreateJobErrorInstruction{{
136150
JobId: JobId,
@@ -183,11 +197,12 @@ var expectedJobAfterUpdate = JobRow{
183197
}
184198

185199
var expectedJobRun = JobRunRow{
186-
RunId: RunId,
187-
JobId: JobId,
188-
Cluster: executorId,
189-
Pending: updateTime,
190-
JobRunState: lookout.JobRunPendingOrdinal,
200+
RunId: RunId,
201+
JobId: JobId,
202+
Cluster: executorId,
203+
Pending: updateTime,
204+
JobRunState: lookout.JobRunPendingOrdinal,
205+
IngressAddresses: cloneIngressAddresses(initialIngressAddresses),
191206
}
192207

193208
var expectedJobError = JobErrorRow{
@@ -196,16 +211,17 @@ var expectedJobError = JobErrorRow{
196211
}
197212

198213
var expectedJobRunAfterUpdate = JobRunRow{
199-
RunId: RunId,
200-
JobId: JobId,
201-
Cluster: executorId,
202-
Node: pointer.String(nodeName),
203-
Pending: updateTime,
204-
Started: &startTime,
205-
Finished: &finishedTime,
206-
JobRunState: lookout.JobRunSucceededOrdinal,
207-
ExitCode: pointer.Int32(0),
208-
Debug: []byte(testfixtures.DebugMsg),
214+
RunId: RunId,
215+
JobId: JobId,
216+
Cluster: executorId,
217+
Node: pointer.String(nodeName),
218+
Pending: updateTime,
219+
Started: &startTime,
220+
Finished: &finishedTime,
221+
JobRunState: lookout.JobRunSucceededOrdinal,
222+
ExitCode: pointer.Int32(0),
223+
Debug: []byte(testfixtures.DebugMsg),
224+
IngressAddresses: cloneIngressAddresses(updatedIngressAddresses),
209225
}
210226

211227
func TestCreateJobsBatch(t *testing.T) {
@@ -1004,6 +1020,17 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow {
10041020
return job
10051021
}
10061022

1023+
func cloneIngressAddresses(source map[int32]string) map[int32]string {
1024+
if source == nil {
1025+
return nil
1026+
}
1027+
copy := make(map[int32]string, len(source))
1028+
for k, v := range source {
1029+
copy[k] = v
1030+
}
1031+
return copy
1032+
}
1033+
10071034
func getJobSpec(t *testing.T, db *pgxpool.Pool, jobId string) JobSpecRow {
10081035
jobSpec := JobSpecRow{}
10091036
r := db.QueryRow(
@@ -1036,9 +1063,11 @@ func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow {
10361063
job_run_state,
10371064
error,
10381065
exit_code,
1039-
debug
1066+
debug,
1067+
ingress_addresses
10401068
FROM job_run WHERE run_id = $1`,
10411069
runId)
1070+
var ingressJSON []byte
10421071
err := r.Scan(
10431072
&run.RunId,
10441073
&run.JobId,
@@ -1051,8 +1080,13 @@ func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow {
10511080
&run.Error,
10521081
&run.ExitCode,
10531082
&run.Debug,
1083+
&ingressJSON,
10541084
)
10551085
assert.NoError(t, err)
1086+
if len(ingressJSON) > 0 {
1087+
err = json.Unmarshal(ingressJSON, &run.IngressAddresses)
1088+
assert.NoError(t, err)
1089+
}
10561090
return run
10571091
}
10581092

0 commit comments

Comments
 (0)