Skip to content

Commit 4067609

Browse files
committed
Improve channel handling
1 parent 23f2eaf commit 4067609

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

claimutils/claim/claimer.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"sync"
10+
"sync/atomic"
1111

1212
"github.com/go-logr/logr"
1313
"github.com/ironcore-dev/ironcore/api/core/v1alpha1"
@@ -16,6 +16,8 @@ import (
1616
var (
1717
ErrMissingPlugins = errors.New("no plugin for resource")
1818
ErrReleaseClaim = errors.New("failed to release claim")
19+
ErrAlreadyStarted = errors.New("claimer already started")
20+
ErrNotStarted = errors.New("claimer not running")
1921
)
2022

2123
type Claims map[v1alpha1.ResourceName]ResourceClaim
@@ -69,6 +71,8 @@ type claimer struct {
6971

7072
toClaim chan claimReq
7173
toRelease chan releaseReq
74+
75+
running atomic.Bool
7276
}
7377

7478
func (c *claimer) checkPluginsForResources(resources v1alpha1.ResourceList) error {
@@ -100,16 +104,14 @@ func (c *claimer) checkPluginsForClaims(claims Claims) error {
100104
}
101105

102106
func (c *claimer) Start(ctx context.Context) error {
103-
var wg sync.WaitGroup
104-
105-
wg.Add(1)
106-
go func() {
107-
defer wg.Done()
108-
c.run(ctx)
109-
}()
107+
if !c.running.CompareAndSwap(false, true) {
108+
return ErrAlreadyStarted
109+
}
110110

111-
wg.Wait()
111+
go c.run(ctx)
112112

113+
<-ctx.Done()
114+
c.running.Store(false)
113115
return nil
114116
}
115117

@@ -171,11 +173,19 @@ func (c *claimer) Claim(ctx context.Context, resources v1alpha1.ResourceList) (C
171173
return nil, errors.Join(ErrMissingPlugins, err)
172174
}
173175

176+
if !c.running.Load() {
177+
return nil, ErrNotStarted
178+
}
179+
174180
req := claimReq{
175181
resources: resources,
176182
resultChan: make(chan claimRes, 1),
177183
}
178-
c.toClaim <- req
184+
select {
185+
case c.toClaim <- req:
186+
case <-ctx.Done():
187+
return nil, ctx.Err()
188+
}
179189

180190
select {
181191
case <-ctx.Done():
@@ -206,11 +216,19 @@ func (c *claimer) Release(ctx context.Context, claims Claims) error {
206216
return errors.Join(ErrMissingPlugins, err)
207217
}
208218

219+
if !c.running.Load() {
220+
return ErrNotStarted
221+
}
222+
209223
req := releaseReq{
210224
claims: claims,
211225
resultChan: make(chan error, 1),
212226
}
213-
c.toRelease <- req
227+
select {
228+
case c.toRelease <- req:
229+
case <-ctx.Done():
230+
return ctx.Err()
231+
}
214232

215233
select {
216234
case <-ctx.Done():

claimutils/claim/claimer_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@ var _ = Describe("Resource Claimer", func() {
4949
}()
5050

5151
By("failing if not not existing resource is claimed")
52-
resourceClaim, err := resourceClaimer.Claim(ctx, v1alpha1.ResourceList{
53-
"not_existing_plugin": resource.MustParse("1"),
54-
})
55-
Expect(err).To(MatchError(claim.ErrMissingPlugins))
52+
var resourceClaim claim.Claims
53+
Eventually(func() error {
54+
var err error
55+
resourceClaim, err = resourceClaimer.Claim(ctx, v1alpha1.ResourceList{
56+
"not_existing_plugin": resource.MustParse("1"),
57+
})
58+
return err
59+
}).Should(MatchError(claim.ErrMissingPlugins))
5660
Expect(resourceClaim).To(BeNil())
5761

5862
By("claiming correct resource")

0 commit comments

Comments
 (0)