Skip to content

Commit bf8a956

Browse files
committed
use separate context for spawner
1 parent 93a62f8 commit bf8a956

File tree

2 files changed

+34
-29
lines changed

2 files changed

+34
-29
lines changed

internal/pool/pool.go

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ func New[PT Item[T], T any](
202202
onChange: p.trace.OnChange,
203203
}
204204

205+
createCtx := xcontext.ValueOnly(ctx)
205206
for i := 0; i < defaultSpawnGoroutinesNumber; i++ {
206-
go p.spawnItems(ctx)
207+
go p.spawnItems(createCtx)
207208
}
208209

209210
return p
@@ -213,45 +214,47 @@ func New[PT Item[T], T any](
213214
// It ensures that pool would always have amount of connections equal to configured limit.
214215
// If item creation ended with error it will be retried infinity with configured interval until success.
215216
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
216-
spawnLoop:
217217
for {
218218
select {
219-
case <-ctx.Done():
220-
break spawnLoop
221219
case <-p.done:
222-
break spawnLoop
220+
return
223221
case <-p.itemTokens:
224222
// got token, must create item
225223
for {
226-
item, err := p.createItem(ctx)
227-
if err != nil {
228-
select {
229-
case <-ctx.Done():
230-
break spawnLoop
231-
case <-p.done:
232-
break spawnLoop
233-
case <-time.After(defaultCreateRetryDelay):
234-
// try again.
235-
// token must always result in new item and not be lost.
236-
}
237-
} else {
238-
// item is created successfully, put it in queue
239-
select {
240-
case <-ctx.Done():
241-
break spawnLoop
242-
case <-p.done:
243-
break spawnLoop
244-
case p.queue <- item:
245-
p.stats.Idle().Inc()
246-
}
247-
248-
continue spawnLoop
224+
err := p.trySpawn(ctx)
225+
if err == nil {
226+
break
227+
}
228+
// spawn was unsuccessful, need to try again.
229+
// token must always result in new item and not be lost.
230+
timer := time.NewTimer(defaultCreateRetryDelay)
231+
select {
232+
case <-p.done:
233+
timer.Stop()
234+
return
235+
case <-timer.C:
249236
}
237+
timer.Stop()
250238
}
251239
}
252240
}
253241
}
254242

243+
func (p *Pool[PT, T]) trySpawn(ctx context.Context) error {
244+
item, err := p.createItem(ctx)
245+
if err != nil {
246+
return err
247+
}
248+
// item was created successfully, put it in queue
249+
select {
250+
case <-p.done:
251+
return nil
252+
case p.queue <- item:
253+
p.stats.Idle().Inc()
254+
}
255+
return nil
256+
}
257+
255258
// defaultCreateItem returns a new item
256259
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
257260
var item T

internal/pool/pool_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ func (t *testItemV2) Close(context.Context) error {
6767
}
6868

6969
func (t *testItemV2) failAfter(d time.Duration) {
70-
<-time.After(d)
70+
timer := time.NewTimer(d)
71+
<-timer.C
7172
atomic.CompareAndSwapInt32(&t.dead, 0, 1)
73+
timer.Stop()
7274
}
7375

7476
func TestPool(t *testing.T) {

0 commit comments

Comments
 (0)