Skip to content

Commit 3b0f083

Browse files
committed
Merge branch 'master' into wip_defrag
2 parents f624216 + 7e1e21e commit 3b0f083

File tree

21 files changed

+1621
-438
lines changed

21 files changed

+1621
-438
lines changed

.mergify.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ pull_request_rules:
99
- "#approved-reviews-by>=2"
1010
- and:
1111
- "#approved-reviews-by>=1"
12-
- "author~=^(d80tb7|dave[-]gantenbein|dejanzele|eleanorpratt|geaere|JamesMurkin|mauriceyap|masipauskas|MustafaI|zuqq|richscott|robertdavidsmith|samclark|suprjinx)"
12+
- "author~=^(d80tb7|dave[-]gantenbein|dejanzele|eleanorpratt|geaere|JamesMurkin|mauriceyap|masipauskas|MustafaI|zuqq|richscott|robertdavidsmith|samclark|suprjinx|EnricoMi)$"
1313
title:
14-
Two are checks required.
14+
Two approvals required, or one if author is a maintainer.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#sample config
2+
---
3+
queues:
4+
- "queue1"
5+
ids:
6+
- "01JD2FF8TTJAXCNFB0F91053ZD"
7+
jobsets:
8+
- "jobset1"
9+
queueCounts:
10+
queue1: 1000000

cmd/lookout/dbloadtester/main.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"os/signal"
6+
"syscall"
7+
8+
"github.com/spf13/pflag"
9+
"github.com/spf13/viper"
10+
11+
"github.com/armadaproject/armada/internal/common"
12+
"github.com/armadaproject/armada/internal/common/armadacontext"
13+
log "github.com/armadaproject/armada/internal/common/logging"
14+
"github.com/armadaproject/armada/internal/common/profiling"
15+
"github.com/armadaproject/armada/internal/lookout/configuration"
16+
"github.com/armadaproject/armada/internal/lookout/dbloadtester"
17+
)
18+
19+
const (
20+
customConfigLocation string = "config"
21+
customParamsLocation string = "params"
22+
)
23+
24+
func init() {
25+
pflag.StringSlice(
26+
customConfigLocation,
27+
[]string{},
28+
"path to the configuration for the Lookout under test",
29+
)
30+
pflag.StringSlice(
31+
customParamsLocation,
32+
[]string{},
33+
"parameters to the load test, defining the shape of the load",
34+
)
35+
pflag.Parse()
36+
}
37+
38+
func makeContext() (*armadacontext.Context, func()) {
39+
ctx := armadacontext.Background()
40+
ctx, cancel := armadacontext.WithCancel(ctx)
41+
42+
c := make(chan os.Signal, 1)
43+
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
44+
45+
go func() {
46+
select {
47+
case <-c:
48+
cancel()
49+
case <-ctx.Done():
50+
}
51+
}()
52+
53+
return ctx, func() {
54+
signal.Stop(c)
55+
cancel()
56+
}
57+
}
58+
59+
func main() {
60+
common.BindCommandlineArguments()
61+
62+
var config configuration.LookoutConfig
63+
userSpecifiedConfigs := viper.GetStringSlice(customConfigLocation)
64+
common.LoadConfig(&config, "./config/lookout", userSpecifiedConfigs)
65+
66+
var args dbloadtester.ReadTestConfig
67+
argsInput := viper.GetStringSlice(customParamsLocation)
68+
common.LoadConfig(&args, "./cmd/lookout/dbloadtester", argsInput)
69+
70+
// Expose profiling endpoints if enabled.
71+
err := profiling.SetupPprof(config.Profiling, armadacontext.Background(), nil)
72+
if err != nil {
73+
log.Fatalf("Pprof setup failed, exiting, %v", err)
74+
}
75+
76+
ctx, cleanup := makeContext()
77+
defer cleanup()
78+
79+
results, err := dbloadtester.DoQueries(config, args, ctx)
80+
if err != nil {
81+
log.Fatalf("error running queries, exiting, %v", err)
82+
}
83+
84+
log.Infof("results: \n%v", results)
85+
}

cmd/lookoutingester/dbloadtester/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"fmt"
54
"time"
65

76
"github.com/spf13/pflag"
@@ -19,7 +18,7 @@ func init() {
1918
pflag.StringSlice(
2019
"lookoutIngesterConfig",
2120
[]string{},
22-
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)",
21+
"path to the configuration for the lookout ingester under test",
2322
)
2423
pflag.Parse()
2524
}
@@ -74,7 +73,7 @@ func main() {
7473
if err != nil {
7574
log.Warn("Failed to marshal lookout ingester config for report output")
7675
}
77-
fmt.Printf(
76+
log.Infof(
7877
ReportTemplate,
7978
time.Now().Format("2006-01-02"),
8079
loadtesterConfig.TotalJobs,

internal/common/resource/resource.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,19 @@ func (a ComputeResources) Sub(b ComputeResources) {
204204
}
205205
}
206206

207+
func (a ComputeResources) Div(b map[string]int64) {
208+
for k, v := range b {
209+
existing, ok := a[k]
210+
if ok {
211+
if existing.Format == resource.DecimalSI {
212+
a[k] = resource.NewMilliQuantity(existing.MilliValue()/v, existing.Format).DeepCopy()
213+
} else {
214+
a[k] = resource.NewQuantity(existing.Value()/v, existing.Format).DeepCopy()
215+
}
216+
}
217+
}
218+
}
219+
207220
func (a ComputeResources) DeepCopy() ComputeResources {
208221
targetComputeResource := make(ComputeResources)
209222

internal/executor/domain/resources.go

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@ func EmptyUtilisationData() *UtilisationData {
1414
}
1515
}
1616

17-
func (a *UtilisationData) Max(b *UtilisationData) {
18-
a.CurrentUsage.Max(b.CurrentUsage)
19-
a.CumulativeUsage.Max(b.CumulativeUsage)
20-
}
21-
2217
func (u *UtilisationData) DeepCopy() *UtilisationData {
2318
return &UtilisationData{
2419
CurrentUsage: u.CurrentUsage.DeepCopy(),
@@ -29,3 +24,82 @@ func (u *UtilisationData) DeepCopy() *UtilisationData {
2924
func (u *UtilisationData) IsEmpty() bool {
3025
return len(u.CumulativeUsage) == 0 && len(u.CurrentUsage) == 0
3126
}
27+
28+
type UtilisationDataAggregation struct {
29+
maxUsage armadaresource.ComputeResources
30+
sumOfUsage armadaresource.ComputeResources
31+
numDataPoints map[string]int64
32+
cumulativeUsage armadaresource.ComputeResources
33+
}
34+
35+
func EmptyUtilisationDataAggregation() *UtilisationDataAggregation {
36+
return &UtilisationDataAggregation{
37+
maxUsage: armadaresource.ComputeResources{},
38+
sumOfUsage: armadaresource.ComputeResources{},
39+
numDataPoints: map[string]int64{},
40+
cumulativeUsage: armadaresource.ComputeResources{},
41+
}
42+
}
43+
44+
func NewUtilisationDataAggregation(u *UtilisationData) *UtilisationDataAggregation {
45+
numDataPoints := make(map[string]int64)
46+
47+
for k := range u.CurrentUsage {
48+
numDataPoints[k] = 1
49+
}
50+
51+
return &UtilisationDataAggregation{
52+
maxUsage: u.CurrentUsage.DeepCopy(),
53+
sumOfUsage: u.CurrentUsage.DeepCopy(),
54+
numDataPoints: numDataPoints,
55+
cumulativeUsage: u.CumulativeUsage.DeepCopy(),
56+
}
57+
}
58+
59+
func (u *UtilisationDataAggregation) GetMaxUsage() armadaresource.ComputeResources {
60+
return u.maxUsage
61+
}
62+
63+
func (u *UtilisationDataAggregation) GetAvgUsage() armadaresource.ComputeResources {
64+
avg := u.sumOfUsage.DeepCopy()
65+
avg.Div(u.numDataPoints)
66+
return avg
67+
}
68+
69+
func (u *UtilisationDataAggregation) GetCumulativeUsage() armadaresource.ComputeResources {
70+
return u.cumulativeUsage
71+
}
72+
73+
func (a *UtilisationDataAggregation) Add(b *UtilisationData) {
74+
a.maxUsage.Max(b.CurrentUsage)
75+
a.sumOfUsage.Add(b.CurrentUsage)
76+
a.cumulativeUsage.Max(b.CumulativeUsage)
77+
78+
for k := range b.CurrentUsage {
79+
existing, ok := a.numDataPoints[k]
80+
if ok {
81+
a.numDataPoints[k] = existing + 1
82+
} else {
83+
a.numDataPoints[k] = 1
84+
}
85+
}
86+
}
87+
88+
func (u *UtilisationDataAggregation) DeepCopy() *UtilisationDataAggregation {
89+
numDataPoints := map[string]int64{}
90+
91+
for k, v := range u.numDataPoints {
92+
numDataPoints[k] = v
93+
}
94+
95+
return &UtilisationDataAggregation{
96+
maxUsage: u.maxUsage.DeepCopy(),
97+
sumOfUsage: u.sumOfUsage.DeepCopy(),
98+
numDataPoints: numDataPoints,
99+
cumulativeUsage: u.cumulativeUsage.DeepCopy(),
100+
}
101+
}
102+
103+
func (u *UtilisationDataAggregation) IsEmpty() bool {
104+
return len(u.maxUsage) == 0
105+
}

internal/executor/domain/resources_test.go

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,76 @@ import (
99
armadaresource "github.com/armadaproject/armada/internal/common/resource"
1010
)
1111

12-
func TestUtilisationData_Max(t *testing.T) {
13-
data := &UtilisationData{
14-
CurrentUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("1"), "memory": resource.MustParse("10")},
15-
CumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("5")},
12+
func compareQuantityMaps(a, b map[string]resource.Quantity) bool {
13+
if len(a) != len(b) {
14+
return false
15+
}
16+
for key, aVal := range a {
17+
bVal, exists := b[key]
18+
if !exists {
19+
return false
20+
}
21+
if aVal.String() != bVal.String() {
22+
return false
23+
}
24+
}
25+
return true
26+
}
27+
28+
func TestUtilisationDataAggregation(t *testing.T) {
29+
data := &UtilisationDataAggregation{
30+
maxUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("1"), "memory": resource.MustParse("10")},
31+
sumOfUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("1"), "memory": resource.MustParse("10")},
32+
numDataPoints: map[string]int64{"cpu": 1, "memory": 1},
33+
cumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("5")},
1634
}
1735
data2 := &UtilisationData{
1836
CurrentUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("2"), "memory": resource.MustParse("1")},
1937
CumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("10")},
2038
}
21-
expected := &UtilisationData{
22-
CurrentUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("2"), "memory": resource.MustParse("10")},
23-
CumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("10")},
39+
expected := &UtilisationDataAggregation{
40+
maxUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("2"), "memory": resource.MustParse("10")},
41+
sumOfUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("3"), "memory": resource.MustParse("11")},
42+
numDataPoints: map[string]int64{"cpu": 2, "memory": 2},
43+
cumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("10")},
2444
}
2545

26-
max := data
27-
max.Max(data2)
28-
assert.Equal(t, expected.CurrentUsage, max.CurrentUsage)
29-
assert.Equal(t, expected.CumulativeUsage, max.CumulativeUsage)
46+
aggregated := data
47+
aggregated.Add(data2)
48+
assert.True(t, compareQuantityMaps(expected.maxUsage, aggregated.maxUsage))
49+
assert.True(t, compareQuantityMaps(expected.sumOfUsage, aggregated.sumOfUsage))
50+
assert.Equal(t, expected.numDataPoints, aggregated.numDataPoints)
51+
assert.True(t, compareQuantityMaps(expected.cumulativeUsage, aggregated.cumulativeUsage))
3052
}
3153

32-
func TestUtilisationData_Max_WithEmpty(t *testing.T) {
54+
func TestUtilisationDataAggregation_WithEmpty(t *testing.T) {
3355
currentUsage := armadaresource.ComputeResources{"cpu": resource.MustParse("1"), "memory": resource.MustParse("1")}
3456
cumulativeUsage := armadaresource.ComputeResources{"cpu": resource.MustParse("10")}
3557
data := &UtilisationData{
3658
CurrentUsage: currentUsage.DeepCopy(),
3759
CumulativeUsage: cumulativeUsage.DeepCopy(),
3860
}
39-
max := EmptyUtilisationData()
40-
max.Max(data)
41-
assert.Equal(t, data.CurrentUsage, max.CurrentUsage)
42-
assert.Equal(t, data.CumulativeUsage, max.CumulativeUsage)
61+
aggregated := EmptyUtilisationDataAggregation()
62+
aggregated.Add(data)
63+
assert.Equal(t, data.CurrentUsage, aggregated.maxUsage)
64+
assert.Equal(t, data.CumulativeUsage, aggregated.cumulativeUsage)
65+
66+
assert.True(t, compareQuantityMaps(data.CurrentUsage, aggregated.maxUsage))
67+
assert.True(t, compareQuantityMaps(data.CurrentUsage, aggregated.sumOfUsage))
68+
assert.True(t, compareQuantityMaps(data.CumulativeUsage, aggregated.cumulativeUsage))
69+
}
70+
71+
func TestUtilisationData_GetAvg(t *testing.T) {
72+
data := &UtilisationDataAggregation{
73+
maxUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("100m"), "memory": resource.MustParse("100Mi")},
74+
sumOfUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("160m"), "memory": resource.MustParse("110Mi")},
75+
numDataPoints: map[string]int64{"cpu": 2, "memory": 2},
76+
cumulativeUsage: armadaresource.ComputeResources{"cpu": resource.MustParse("160m")},
77+
}
78+
79+
expected := armadaresource.ComputeResources{"cpu": resource.MustParse("80m"), "memory": resource.MustParse("55Mi")}
80+
81+
assert.True(t, compareQuantityMaps(expected, data.GetAvgUsage()))
4382
}
4483

4584
func TestUtilisationData_IsEmpty(t *testing.T) {

internal/executor/reporter/event.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func CreateReturnLeaseEvent(pod *v1.Pod, reason string, debugMessage string, clu
318318
return sequence, nil
319319
}
320320

321-
func CreateJobUtilisationEvent(pod *v1.Pod, utilisationData *domain.UtilisationData, clusterId string) (*armadaevents.EventSequence, error) {
321+
func CreateJobUtilisationEvent(pod *v1.Pod, utilisationData *domain.UtilisationDataAggregation, clusterId string) (*armadaevents.EventSequence, error) {
322322
sequence := createEmptySequence(pod)
323323
jobId, runId, err := extractIds(pod)
324324
if err != nil {
@@ -345,8 +345,9 @@ func CreateJobUtilisationEvent(pod *v1.Pod, utilisationData *domain.UtilisationD
345345
},
346346
},
347347
},
348-
MaxResourcesForPeriod: utilisationData.CurrentUsage.ToProtoMap(),
349-
TotalCumulativeUsage: utilisationData.CumulativeUsage.ToProtoMap(),
348+
MaxResourcesForPeriod: utilisationData.GetMaxUsage().ToProtoMap(),
349+
AvgResourcesForPeriod: utilisationData.GetAvgUsage().ToProtoMap(),
350+
TotalCumulativeUsage: utilisationData.GetCumulativeUsage().ToProtoMap(),
350351
},
351352
},
352353
})

0 commit comments

Comments
 (0)