Skip to content

Commit 8e877d3

Browse files
committed
PR Review
1 parent e10d4f1 commit 8e877d3

File tree

3 files changed

+66
-34
lines changed

3 files changed

+66
-34
lines changed

claimutils/claim/claimer.go

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"sync/atomic"
10+
"sync"
1111

1212
"github.com/go-logr/logr"
1313
"github.com/ironcore-dev/ironcore/api/core/v1alpha1"
@@ -26,15 +26,19 @@ type Claimer interface {
2626
Claim(ctx context.Context, resources v1alpha1.ResourceList) (Claims, error)
2727
Release(ctx context.Context, claims Claims) error
2828
Start(ctx context.Context) error
29+
WaitUntilStarted(ctx context.Context) error
2930
}
3031

31-
func NewResourceClaimer(log logr.Logger, started chan struct{}, plugins ...Plugin) (*claimer, error) {
32+
func NewResourceClaimer(log logr.Logger, plugins ...Plugin) (*claimer, error) {
3233
c := claimer{
33-
log: log,
34-
started: started,
35-
plugins: map[string]Plugin{},
34+
log: log,
35+
plugins: map[string]Plugin{},
36+
3637
toClaim: make(chan claimReq, 1),
3738
toRelease: make(chan releaseReq, 1),
39+
40+
started: make(chan struct{}),
41+
shutdown: make(chan struct{}),
3842
}
3943

4044
for _, plugin := range plugins {
@@ -74,8 +78,9 @@ type claimer struct {
7478
toClaim chan claimReq
7579
toRelease chan releaseReq
7680

77-
running atomic.Bool
78-
started chan struct{}
81+
startOnce sync.Once
82+
started chan struct{}
83+
shutdown chan struct{}
7984
}
8085

8186
func (c *claimer) checkPluginsForResources(resources v1alpha1.ResourceList) error {
@@ -107,13 +112,14 @@ func (c *claimer) checkPluginsForClaims(claims Claims) error {
107112
}
108113

109114
func (c *claimer) Start(ctx context.Context) error {
110-
if !c.running.CompareAndSwap(false, true) {
111-
return ErrAlreadyStarted
112-
}
115+
var called bool
116+
c.startOnce.Do(func() {
117+
called = true
118+
go c.run(ctx)
119+
})
113120

114-
go c.run(ctx)
115-
if c.started != nil {
116-
close(c.started)
121+
if !called {
122+
return ErrAlreadyStarted
117123
}
118124

119125
<-ctx.Done()
@@ -123,21 +129,23 @@ func (c *claimer) Start(ctx context.Context) error {
123129

124130
func (c *claimer) run(ctx context.Context) {
125131
defer func() {
126-
c.running.Store(false)
127-
close(c.toClaim)
128-
close(c.toRelease)
129-
130132
for req := range c.toClaim {
131133
req.resultChan <- claimRes{err: ctx.Err()}
132134
}
133135
for req := range c.toRelease {
134136
req.resultChan <- ctx.Err()
135137
}
138+
139+
close(c.toClaim)
140+
close(c.toRelease)
136141
}()
137142

143+
close(c.started)
144+
138145
for {
139146
select {
140147
case <-ctx.Done():
148+
close(c.shutdown)
141149
return
142150
case req := <-c.toClaim:
143151
res := claimRes{}
@@ -154,6 +162,22 @@ func (c *claimer) run(ctx context.Context) {
154162
}
155163
}
156164

165+
func (c *claimer) ensureRunning() error {
166+
select {
167+
case <-c.started:
168+
default:
169+
return ErrNotStarted
170+
}
171+
172+
select {
173+
case <-c.shutdown:
174+
return ErrNotStarted
175+
default:
176+
}
177+
178+
return nil
179+
}
180+
157181
func (c *claimer) claim(resources v1alpha1.ResourceList) (Claims, error) {
158182
var insufficientResourceErrors []error
159183
for resourceName := range resources {
@@ -192,8 +216,8 @@ func (c *claimer) Claim(ctx context.Context, resources v1alpha1.ResourceList) (C
192216
return nil, errors.Join(ErrMissingPlugins, err)
193217
}
194218

195-
if !c.running.Load() {
196-
return nil, ErrNotStarted
219+
if err := c.ensureRunning(); err != nil {
220+
return nil, err
197221
}
198222

199223
req := claimReq{
@@ -202,6 +226,8 @@ func (c *claimer) Claim(ctx context.Context, resources v1alpha1.ResourceList) (C
202226
}
203227
select {
204228
case c.toClaim <- req:
229+
case <-c.shutdown:
230+
return nil, ErrNotStarted
205231
case <-ctx.Done():
206232
return nil, ctx.Err()
207233
}
@@ -235,16 +261,17 @@ func (c *claimer) Release(ctx context.Context, claims Claims) error {
235261
return errors.Join(ErrMissingPlugins, err)
236262
}
237263

238-
if !c.running.Load() {
239-
return ErrNotStarted
264+
if err := c.ensureRunning(); err != nil {
265+
return err
240266
}
241-
242267
req := releaseReq{
243268
claims: claims,
244269
resultChan: make(chan error, 1),
245270
}
246271
select {
247272
case c.toRelease <- req:
273+
case <-c.shutdown:
274+
return ErrNotStarted
248275
case <-ctx.Done():
249276
return ctx.Err()
250277
}
@@ -256,3 +283,12 @@ func (c *claimer) Release(ctx context.Context, claims Claims) error {
256283
return res
257284
}
258285
}
286+
287+
func (c *claimer) WaitUntilStarted(ctx context.Context) error {
288+
select {
289+
case <-c.started:
290+
return nil
291+
case <-ctx.Done():
292+
return ctx.Err()
293+
}
294+
}

claimutils/claim/claimer_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,9 @@ func (m *mockReader) Read() ([]pci.Address, error) {
2727

2828
var _ = Describe("Resource Claimer", func() {
2929
It("should claim composite resources", func(ctx SpecContext) {
30-
31-
claimerStarted := make(chan struct{})
32-
3330
By("init plugin")
3431
resourceClaimer, err := claim.NewResourceClaimer(
3532
log.FromContext(ctx),
36-
claimerStarted,
3733
gpu.NewGPUClaimPlugin(log.FromContext(ctx), "nvidia.com/gpu", &mockReader{
3834
devices: []pci.Address{
3935
{},
@@ -59,9 +55,9 @@ var _ = Describe("Resource Claimer", func() {
5955
})
6056

6157
By("waiting until claimer is started")
62-
Eventually(claimerStarted).To(BeClosed())
58+
Expect(resourceClaimer.WaitUntilStarted(ctx)).To(Succeed())
6359

64-
By("failing if not not existing resource is claimed")
60+
By("failing if nonexistent resource is claimed")
6561
resourceClaim, err := resourceClaimer.Claim(ctx, v1alpha1.ResourceList{
6662
"not_existing_plugin": resource.MustParse("1"),
6763
})

claimutils/gpu/gpu_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var _ = Describe("GPU Claimer", func() {
4747
Expect(plugin.Init()).Should(MatchError(testErr))
4848
})
4949

50-
It("should error if no resource left (not init)", func(ctx SpecContext) {
50+
It("should error if no resource left after init", func(ctx SpecContext) {
5151
By("init plugin")
5252
plugin := gpu.NewGPUClaimPlugin(log.FromContext(ctx), "test-plugin", &MockReader{}, nil)
5353
Expect(plugin.Init()).ShouldNot(HaveOccurred())
@@ -130,20 +130,20 @@ var _ = Describe("GPU Claimer", func() {
130130
gpuClaim1, err := plugin.Claim(resource.MustParse("1"))
131131
Expect(err).ToNot(HaveOccurred())
132132

133-
ociAddress1, ok := gpuClaim1.(gpu.Claim)
133+
pciAddress1, ok := gpuClaim1.(gpu.Claim)
134134
Expect(ok).To(BeTrue())
135-
Expect(ociAddress1.PCIAddresses()).To(HaveLen(1))
135+
Expect(pciAddress1.PCIAddresses()).To(HaveLen(1))
136136

137137
By("claim resources again")
138138
gpuClaim2, err := plugin.Claim(resource.MustParse("1"))
139139
Expect(err).ToNot(HaveOccurred())
140140

141-
ociAddress2, ok := gpuClaim2.(gpu.Claim)
141+
pciAddress2, ok := gpuClaim2.(gpu.Claim)
142142
Expect(ok).To(BeTrue())
143-
Expect(ociAddress2.PCIAddresses()).To(HaveLen(1))
143+
Expect(pciAddress2.PCIAddresses()).To(HaveLen(1))
144144

145145
By("ensure claims are not equal")
146-
Expect(ociAddress1.PCIAddresses()[0]).NotTo(Equal(ociAddress2.PCIAddresses()[0]))
146+
Expect(pciAddress1.PCIAddresses()[0]).NotTo(Equal(pciAddress2.PCIAddresses()[0]))
147147
})
148148

149149
It("should handle zero-quantity claims", func(ctx SpecContext) {

0 commit comments

Comments
 (0)