@@ -2,6 +2,7 @@ package pool
22
33import (
44 "context"
5+ "sync"
56 "time"
67
78 "golang.org/x/sync/errgroup"
6263 stats * safeStats
6364
6465 spawnCancel context.CancelFunc
66+
67+ wg * sync.WaitGroup
6568 }
6669 option [PT Item [T ], T any ] func (p * Pool [PT , T ])
6770)
@@ -205,7 +208,9 @@ func New[PT Item[T], T any](
205208 }
206209
207210 var spawnCtx context.Context
211+ p .wg = & sync.WaitGroup {}
208212 spawnCtx , p .spawnCancel = xcontext .WithCancel (xcontext .ValueOnly (ctx ))
213+ p .wg .Add (1 )
209214 go p .spawnItems (spawnCtx )
210215
211216 return p
@@ -215,13 +220,17 @@ func New[PT Item[T], T any](
215220// It ensures that pool would always have amount of connections equal to configured limit.
216221// If item creation ended with error it will be retried infinity with configured interval until success.
217222func (p * Pool [PT , T ]) spawnItems (ctx context.Context ) {
223+ defer p .wg .Done ()
218224 for {
219225 select {
226+ case <- ctx .Done ():
227+ return
220228 case <- p .done :
221229 return
222230 case <- p .itemTokens :
223231 // got token, must create item
224232 for {
233+ p .wg .Add (1 )
225234 err := p .trySpawn (ctx )
226235 if err == nil {
227236 break
@@ -234,12 +243,15 @@ func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
234243}
235244
236245func (p * Pool [PT , T ]) trySpawn (ctx context.Context ) error {
246+ defer p .wg .Done ()
237247 item , err := p .createItem (ctx )
238248 if err != nil {
239249 return err
240250 }
241251 // item was created successfully, put it in queue
242252 select {
253+ case <- ctx .Done ():
254+ return ctx .Err ()
243255 case <- p .done :
244256 return nil
245257 case p .queue <- item :
@@ -523,6 +535,9 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
523535 // Due to multiple senders queue is not closed here,
524536 // we're just making sure to drain it fully to close any existing item.
525537 close (p .done )
538+
539+ p .wg .Wait ()
540+
526541 var g errgroup.Group
527542shutdownLoop:
528543 for {
0 commit comments