Skip to content

Commit 10c1fc1

Browse files
authored
Prevent parallel allocation to ensure spreading and reservations. (#490)
1 parent d295a05 commit 10c1fc1

File tree

10 files changed

+523
-226
lines changed

10 files changed

+523
-226
lines changed

cmd/metal-api/internal/datastore/machine.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datastore
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"math"
78
"math/rand/v2"
9+
"time"
810

911
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
1012
"golang.org/x/exp/slices"
@@ -427,7 +429,7 @@ func (rs *RethinkStore) UpdateMachine(oldMachine *metal.Machine, newMachine *met
427429
// FindWaitingMachine returns an available, not allocated, waiting and alive machine of given size within the given partition.
428430
// TODO: the algorithm can be optimized / shortened by using a rethinkdb join command and then using .Sample(1)
429431
// but current implementation should have a slightly better readability.
430-
func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) {
432+
func (rs *RethinkStore) FindWaitingMachine(ctx context.Context, projectid, partitionid string, size metal.Size, placementTags []string) (*metal.Machine, error) {
431433
q := *rs.machineTable()
432434
q = q.Filter(map[string]interface{}{
433435
"allocation": nil,
@@ -440,6 +442,11 @@ func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid string, size m
440442
"preallocated": false,
441443
})
442444

445+
if err := rs.sharedMutex.lock(ctx, partitionid, 10*time.Second); err != nil {
446+
return nil, fmt.Errorf("too many parallel machine allocations taking place, try again later")
447+
}
448+
defer rs.sharedMutex.unlock(ctx, partitionid)
449+
443450
var candidates metal.Machines
444451
err := rs.searchEntities(&q, &candidates)
445452
if err != nil {
@@ -635,7 +642,9 @@ func randomIndex(max int) int {
635642
if max <= 0 {
636643
return 0
637644
}
638-
return rand.N(max)
645+
// golangci-lint has an issue with math/rand/v2
646+
// here it provides sufficient randomness though because it's not used for cryptographic purposes
647+
return rand.N(max) //nolint:gosec
639648
}
640649

641650
func intersect[T comparable](a, b []T) []T {

cmd/metal-api/internal/datastore/machine_integration_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44
package datastore
55

66
import (
7+
"context"
8+
"log/slog"
9+
"strconv"
10+
"strings"
11+
"sync"
712
"testing"
13+
"time"
814

915
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
1016
"github.com/metal-stack/metal-lib/pkg/pointer"
17+
"github.com/stretchr/testify/assert"
1118
"github.com/stretchr/testify/require"
19+
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
1220
)
1321

1422
type machineTestable struct{}
@@ -950,3 +958,110 @@ func TestRethinkStore_UpdateMachine(t *testing.T) {
950958
tests[i].run(t, tt)
951959
}
952960
}
961+
962+
func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
963+
964+
var (
965+
root = slog.Default()
966+
wg sync.WaitGroup
967+
size = metal.Size{Base: metal.Base{ID: "1"}}
968+
count int
969+
)
970+
971+
for _, initEntity := range []struct {
972+
entity metal.Entity
973+
table *r.Term
974+
}{
975+
{
976+
table: sharedDS.machineTable(),
977+
entity: &metal.Machine{
978+
Base: metal.Base{
979+
ID: "1",
980+
},
981+
PartitionID: "partition",
982+
SizeID: size.ID,
983+
State: metal.MachineState{
984+
Value: metal.AvailableState,
985+
},
986+
Waiting: true,
987+
PreAllocated: false,
988+
},
989+
},
990+
{
991+
table: sharedDS.eventTable(),
992+
entity: &metal.ProvisioningEventContainer{
993+
Base: metal.Base{
994+
ID: "1",
995+
},
996+
Liveliness: metal.MachineLivelinessAlive,
997+
},
998+
},
999+
} {
1000+
initEntity := initEntity
1001+
1002+
err := sharedDS.createEntity(initEntity.table, initEntity.entity)
1003+
require.NoError(t, err)
1004+
1005+
defer func() {
1006+
_, err := initEntity.table.Delete().RunWrite(sharedDS.session)
1007+
require.NoError(t, err)
1008+
}()
1009+
}
1010+
1011+
for i := 0; i < 100; i++ {
1012+
i := i
1013+
wg.Add(1)
1014+
1015+
log := root.With("worker", i)
1016+
1017+
go func() {
1018+
defer wg.Done()
1019+
1020+
for {
1021+
machine, err := sharedDS.FindWaitingMachine(context.Background(), "project", "partition", size, nil)
1022+
if err != nil {
1023+
if metal.IsConflict(err) {
1024+
t.Errorf("concurrent modification occurred, shared mutex is not working")
1025+
break
1026+
}
1027+
1028+
if strings.Contains(err.Error(), "no machine available") {
1029+
continue
1030+
}
1031+
1032+
if strings.Contains(err.Error(), "too many parallel") {
1033+
time.Sleep(10 * time.Millisecond)
1034+
continue
1035+
}
1036+
1037+
t.Errorf("unexpected error occurred: %s", err)
1038+
continue
1039+
}
1040+
1041+
log.Info("waiting machine found")
1042+
1043+
newMachine := *machine
1044+
newMachine.PreAllocated = false
1045+
if newMachine.Name == "" {
1046+
newMachine.Name = strconv.Itoa(0)
1047+
}
1048+
1049+
assert.Equal(t, strconv.Itoa(count), newMachine.Name, "concurrency occurred")
1050+
count++
1051+
newMachine.Name = strconv.Itoa(count)
1052+
1053+
err = sharedDS.updateEntity(sharedDS.machineTable(), &newMachine, machine)
1054+
if err != nil {
1055+
log.Error("unable to toggle back pre-allocation flag", "error", err)
1056+
t.Fail()
1057+
}
1058+
1059+
return
1060+
}
1061+
}()
1062+
}
1063+
1064+
wg.Wait()
1065+
1066+
assert.Equal(t, 100, count)
1067+
}

cmd/metal-api/internal/datastore/rethinkdb.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datastore
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
67
"reflect"
@@ -18,9 +19,21 @@ const (
1819
)
1920

2021
var tables = []string{
21-
"image", "size", "partition", "machine", "switch", "switchstatus", "event", "network", "ip", "migration", "filesystemlayout", "sizeimageconstraint",
22-
VRFIntegerPool.String(), VRFIntegerPool.String() + "info",
2322
ASNIntegerPool.String(), ASNIntegerPool.String() + "info",
23+
"event",
24+
"filesystemlayout",
25+
"image",
26+
"ip",
27+
"machine",
28+
"migration",
29+
"network",
30+
"partition",
31+
"sharedmutex",
32+
"size",
33+
"sizeimageconstraint",
34+
"switch",
35+
"switchstatus",
36+
VRFIntegerPool.String(), VRFIntegerPool.String() + "info",
2437
}
2538

2639
// A RethinkStore is the database access layer for rethinkdb.
@@ -40,6 +53,11 @@ type RethinkStore struct {
4053
VRFPoolRangeMax uint
4154
ASNPoolRangeMin uint
4255
ASNPoolRangeMax uint
56+
57+
sharedMutexCtx context.Context
58+
sharedMutexCancel context.CancelFunc
59+
sharedMutex *sharedMutex
60+
sharedMutexCheckInterval time.Duration
4361
}
4462

4563
// New creates a new rethink store.
@@ -55,6 +73,8 @@ func New(log *slog.Logger, dbhost string, dbname string, dbuser string, dbpass s
5573
VRFPoolRangeMax: DefaultVRFPoolRangeMax,
5674
ASNPoolRangeMin: DefaultASNPoolRangeMin,
5775
ASNPoolRangeMax: DefaultASNPoolRangeMax,
76+
77+
sharedMutexCheckInterval: defaultSharedMutexCheckInterval,
5878
}
5979
}
6080

@@ -241,7 +261,13 @@ func (rs *RethinkStore) Close() error {
241261
return err
242262
}
243263
}
264+
265+
if rs.sharedMutexCancel != nil {
266+
rs.sharedMutexCancel()
267+
}
268+
244269
rs.log.Info("Rethinkstore disconnected")
270+
245271
return nil
246272
}
247273

@@ -251,6 +277,13 @@ func (rs *RethinkStore) Connect() error {
251277
rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, rs.dbuser, rs.dbpass)
252278
rs.log.Info("Rethinkstore connected")
253279
rs.session = rs.dbsession
280+
rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background())
281+
var err error
282+
rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval))
283+
if err != nil {
284+
return err
285+
}
286+
254287
return nil
255288
}
256289

@@ -262,8 +295,14 @@ func (rs *RethinkStore) Demote() error {
262295
if err != nil {
263296
return err
264297
}
298+
265299
rs.dbsession = retryConnect(rs.log, []string{rs.dbhost}, rs.dbname, DemotedUser, rs.dbpass)
266300
rs.session = rs.dbsession
301+
rs.sharedMutexCtx, rs.sharedMutexCancel = context.WithCancel(context.Background())
302+
rs.sharedMutex, err = newSharedMutex(rs.sharedMutexCtx, rs.log, rs.dbsession, newMutexOptCheckInterval(rs.sharedMutexCheckInterval))
303+
if err != nil {
304+
return err
305+
}
267306

268307
rs.log.Info("rethinkstore connected with demoted user")
269308
return nil

cmd/metal-api/internal/datastore/rethinkdb_integration_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log/slog"
99
"os"
1010
"sort"
11+
"time"
1112

1213
"github.com/google/go-cmp/cmp"
1314
"github.com/google/go-cmp/cmp/cmpopts"
@@ -45,12 +46,15 @@ func startRethinkInitialized() (container testcontainers.Container, ds *RethinkS
4546
panic(err)
4647
}
4748

48-
rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
49+
rs := New(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})), c.IP+":"+c.Port, c.DB, c.User, c.Password)
50+
4951
rs.VRFPoolRangeMin = 10000
5052
rs.VRFPoolRangeMax = 10010
5153
rs.ASNPoolRangeMin = 10000
5254
rs.ASNPoolRangeMax = 10010
5355

56+
rs.sharedMutexCheckInterval = 3 * time.Second
57+
5458
err = rs.Connect()
5559
if err != nil {
5660
panic(err)

0 commit comments

Comments
 (0)