Skip to content

Commit 3566ba5

Browse files
authored
Spread only on same machine size. (#625)
1 parent d9f67a3 commit 3566ba5

File tree

4 files changed

+144
-15
lines changed

4 files changed

+144
-15
lines changed

cmd/metal-api/internal/datastore/machine.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func (rs *RethinkStore) UpdateMachine(oldMachine *metal.Machine, newMachine *met
422422
// FindWaitingMachine returns an available, not allocated, waiting and alive machine of given size within the given partition.
423423
// TODO: the algorithm can be optimized / shortened by using a rethinkdb join command and then using .Sample(1)
424424
// but current implementation should have a slightly better readability.
425-
func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) {
425+
func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, partitionid string, size metal.Size, placementTags []string, role metal.Role) (*metal.Machine, error) {
426426
q := *rs.machineTable()
427427
q = q.Filter(map[string]interface{}{
428428
"allocation": nil,
@@ -473,6 +473,7 @@ func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, parti
473473
var partitionMachines metal.Machines
474474
err = rs.SearchMachines(&MachineSearchQuery{
475475
PartitionID: &partitionid,
476+
SizeID: &size.ID,
476477
}, &partitionMachines)
477478
if err != nil {
478479
return nil, err
@@ -487,12 +488,12 @@ func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, parti
487488
return nil, err
488489
}
489490

490-
ok := checkSizeReservations(available, projectid, partitionMachines.WithSize(size.ID).ByProjectID(), reservations)
491+
ok := checkSizeReservations(available, projectid, partitionMachines.ByProjectID(), reservations)
491492
if !ok {
492493
return nil, errors.New("no machine available")
493494
}
494495

495-
projectMachines := partitionMachines.ByProjectID()[projectid]
496+
projectMachines := partitionMachines.WithRole(role).ByProjectID()[projectid]
496497

497498
spreadCandidates := spreadAcrossRacks(available, projectMachines, placementTags)
498499
if len(spreadCandidates) == 0 {
@@ -523,8 +524,6 @@ func checkSizeReservations(available metal.Machines, projectid string, machinesB
523524
)
524525

525526
for _, r := range reservations {
526-
r := r
527-
528527
// sum up the amount of reservations
529528
amount += r.Amount
530529

cmd/metal-api/internal/datastore/machine_integration_test.go

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package datastore
55

66
import (
77
"context"
8+
"fmt"
89
"log/slog"
910
"os"
1011
"strconv"
@@ -993,8 +994,6 @@ func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
993994
},
994995
},
995996
} {
996-
initEntity := initEntity
997-
998997
err := sharedDS.createEntity(initEntity.table, initEntity.entity)
999998
require.NoError(t, err)
1000999

@@ -1004,8 +1003,7 @@ func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
10041003
}()
10051004
}
10061005

1007-
for i := 0; i < 100; i++ {
1008-
i := i
1006+
for i := range 100 {
10091007
wg.Add(1)
10101008

10111009
log := root.With("worker", i)
@@ -1014,7 +1012,7 @@ func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
10141012
defer wg.Done()
10151013

10161014
for {
1017-
machine, err := sharedDS.FindWaitingMachine(context.Background(), "project", "partition", size, nil)
1015+
machine, err := sharedDS.FindWaitingMachine(context.Background(), "project", "partition", size, nil, metal.RoleMachine)
10181016
if err != nil {
10191017
if metal.IsConflict(err) {
10201018
t.Errorf("concurrent modification occurred, shared mutex is not working")
@@ -1061,3 +1059,121 @@ func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
10611059

10621060
assert.Equal(t, 100, count)
10631061
}
1062+
1063+
func Test_FindWaitingMachine_RackSpreadingDistribution(t *testing.T) {
1064+
var (
1065+
partitionID = "partition"
1066+
projectID = "project"
1067+
size1 = metal.Size{Base: metal.Base{ID: "1"}}
1068+
fiveRacks = func(i int) string {
1069+
return "rack-" + strconv.FormatInt(int64((i%5)+1), 10)
1070+
}
1071+
)
1072+
1073+
defer func() {
1074+
_, err := sharedDS.machineTable().Delete().RunWrite(sharedDS.session)
1075+
require.NoError(t, err)
1076+
_, err = sharedDS.eventTable().Delete().RunWrite(sharedDS.session)
1077+
require.NoError(t, err)
1078+
}()
1079+
1080+
for i := range 200 {
1081+
err := sharedDS.createEntity(sharedDS.machineTable(), &metal.Machine{
1082+
Base: metal.Base{
1083+
ID: strconv.Itoa(i),
1084+
},
1085+
PartitionID: partitionID,
1086+
SizeID: size1.ID,
1087+
State: metal.MachineState{
1088+
Value: metal.AvailableState,
1089+
},
1090+
Waiting: true,
1091+
PreAllocated: false,
1092+
RackID: fiveRacks(i),
1093+
})
1094+
require.NoError(t, err)
1095+
1096+
err = sharedDS.createEntity(sharedDS.eventTable(), &metal.ProvisioningEventContainer{
1097+
Base: metal.Base{
1098+
ID: strconv.Itoa(i),
1099+
},
1100+
Liveliness: metal.MachineLivelinessAlive,
1101+
})
1102+
require.NoError(t, err)
1103+
}
1104+
1105+
// just allocate some machines with different specs that should not influence later allocs
1106+
for i, spec := range []struct {
1107+
role metal.Role
1108+
size string
1109+
}{
1110+
{role: metal.RoleFirewall, size: "firewall"},
1111+
{role: metal.RoleFirewall, size: "firewall"},
1112+
{role: metal.RoleMachine, size: "machine"},
1113+
{role: metal.RoleMachine, size: "machine"},
1114+
// just to prove that it affects the algorithm:
1115+
// {role: metal.RoleMachine, size: size1.ID},
1116+
} {
1117+
err := sharedDS.createEntity(sharedDS.machineTable(), &metal.Machine{
1118+
Base: metal.Base{
1119+
ID: "allocated-" + strconv.Itoa(i),
1120+
},
1121+
PartitionID: partitionID,
1122+
SizeID: spec.size,
1123+
State: metal.MachineState{
1124+
Value: metal.AvailableState,
1125+
},
1126+
Allocation: &metal.MachineAllocation{
1127+
Project: projectID,
1128+
Role: spec.role,
1129+
},
1130+
RackID: fiveRacks(i),
1131+
})
1132+
require.NoError(t, err)
1133+
1134+
err = sharedDS.createEntity(sharedDS.eventTable(), &metal.ProvisioningEventContainer{
1135+
Base: metal.Base{
1136+
ID: "allocated-" + strconv.Itoa(i),
1137+
},
1138+
Liveliness: metal.MachineLivelinessAlive,
1139+
})
1140+
require.NoError(t, err)
1141+
}
1142+
1143+
for range 100 {
1144+
machine, err := sharedDS.FindWaitingMachine(context.Background(), projectID, partitionID, size1, nil, metal.RoleMachine)
1145+
require.NoError(t, err)
1146+
1147+
newMachine := *machine
1148+
newMachine.PreAllocated = false
1149+
newMachine.Allocation = &metal.MachineAllocation{
1150+
Project: projectID,
1151+
}
1152+
newMachine.Allocation.Role = metal.RoleMachine
1153+
newMachine.SizeID = size1.ID
1154+
1155+
err = sharedDS.updateEntity(sharedDS.machineTable(), &newMachine, machine)
1156+
if err != nil {
1157+
t.Errorf("unable to update machine: %s", err)
1158+
}
1159+
1160+
t.Logf("machine %s allocated in %s", newMachine.ID, newMachine.RackID)
1161+
}
1162+
1163+
var ms metal.Machines
1164+
err := sharedDS.SearchMachines(&MachineSearchQuery{AllocationProject: &projectID, SizeID: &size1.ID, PartitionID: &partitionID}, &ms)
1165+
require.NoError(t, err)
1166+
1167+
require.Len(t, ms, 100)
1168+
1169+
machinesByRack := map[string]int{}
1170+
for _, m := range ms {
1171+
machinesByRack[m.RackID]++
1172+
}
1173+
1174+
for id, count := range machinesByRack {
1175+
assert.Equal(t, 100/5, count, "uneven machine distribution in %s", id)
1176+
}
1177+
1178+
fmt.Println(machinesByRack)
1179+
}

cmd/metal-api/internal/metal/machine.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,6 @@ func (ms Machines) WithSize(id string) Machines {
334334
var res Machines
335335

336336
for _, m := range ms {
337-
m := m
338-
339337
if m.SizeID != id {
340338
continue
341339
}
@@ -350,8 +348,6 @@ func (ms Machines) WithPartition(id string) Machines {
350348
var res Machines
351349

352350
for _, m := range ms {
353-
m := m
354-
355351
if m.PartitionID != id {
356352
continue
357353
}
@@ -362,6 +358,24 @@ func (ms Machines) WithPartition(id string) Machines {
362358
return res
363359
}
364360

361+
func (ms Machines) WithRole(role Role) Machines {
362+
var res Machines
363+
364+
for _, m := range ms {
365+
if m.Allocation == nil {
366+
continue
367+
}
368+
369+
if m.Allocation.Role != role {
370+
continue
371+
}
372+
373+
res = append(res, m)
374+
}
375+
376+
return res
377+
}
378+
365379
// MachineNetwork stores the Network details of the machine
366380
type MachineNetwork struct {
367381
NetworkID string `rethinkdb:"networkid" json:"networkid"`

cmd/metal-api/internal/service/machine-service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1434,7 +1434,7 @@ func findWaitingMachine(ctx context.Context, ds *datastore.RethinkStore, allocat
14341434
return nil, fmt.Errorf("partition cannot be found: %w", err)
14351435
}
14361436

1437-
machine, err := ds.FindWaitingMachine(ctx, allocationSpec.ProjectID, partition.ID, *size, allocationSpec.PlacementTags)
1437+
machine, err := ds.FindWaitingMachine(ctx, allocationSpec.ProjectID, partition.ID, *size, allocationSpec.PlacementTags, allocationSpec.Role)
14381438
if err != nil {
14391439
return nil, err
14401440
}

0 commit comments

Comments
 (0)