Skip to content

Commit 869e69a

Browse files
committed
fallback if not found items by node id
1 parent cb4ea78 commit 869e69a

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

internal/pool/pool.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
444444
}
445445
}
446446

447-
// getWaitCh returns pointer to a channel of sessions.
447+
// getWaitCh returns pointer to a channel of items.
448448
//
449449
// Note that returning a pointer reduces allocations on sync.Pool usage –
450450
// sync.Client.Get() returns empty interface, which leads to allocation for
@@ -494,9 +494,7 @@ func (p *Pool[PT, T]) peekFirstIdleByNodeID(nodeID uint32) (item PT, touched tim
494494
return item, info.lastUsage
495495
}
496496

497-
// removes first session from idle and resets the keepAliveCount
498-
// to prevent session from dying in the internalPoolGC after it was returned
499-
// to be used only in outgoing functions that make session busy.
497+
// removes first item from idle to use only in outgoing functions that make item busy.
500498
// p.mu must be held.
501499
func (p *Pool[PT, T]) removeFirstIdle() PT {
502500
idle, _ := p.peekFirstIdle()
@@ -508,9 +506,7 @@ func (p *Pool[PT, T]) removeFirstIdle() PT {
508506
return idle
509507
}
510508

511-
// removes first session from idle and resets the keepAliveCount
512-
// to prevent session from dying in the internalPoolGC after it was returned
513-
// to be used only in outgoing functions that make session busy.
509+
// removes first item with preferred nodeID from idle to use only in outgoing functions that make item busy.
514510
// p.mu must be held.
515511
func (p *Pool[PT, T]) removeIdleByNodeID(nodeID uint32) PT {
516512
idle, _ := p.peekFirstIdleByNodeID(nodeID)
@@ -525,7 +521,7 @@ func (p *Pool[PT, T]) removeIdleByNodeID(nodeID uint32) PT {
525521
// p.mu must be held.
526522
func (p *Pool[PT, T]) notifyAboutIdle(idle PT) (notified bool) {
527523
for el := p.waitQ.Front(); el != nil; el = p.waitQ.Front() {
528-
// Some goroutine is waiting for a session.
524+
// Some goroutine is waiting for a item.
529525
//
530526
// It could be in this states:
531527
// 1) Reached the select code and awaiting for a value in channel.
@@ -566,7 +562,7 @@ func (p *Pool[PT, T]) notifyAboutIdle(idle PT) (notified bool) {
566562
func (p *Pool[PT, T]) removeIdle(item PT) itemInfo[PT, T] {
567563
info, has := p.index[item]
568564
if !has || info.idle == nil {
569-
panic("inconsistent session client index")
565+
panic("inconsistent item client index")
570566
}
571567

572568
p.changeState(func() Stats {
@@ -630,7 +626,15 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
630626

631627
if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
632628
if hasPreferredNodeID {
633-
return p.removeIdleByNodeID(preferredNodeID)
629+
item := p.removeIdleByNodeID(preferredNodeID)
630+
if item != nil {
631+
return item
632+
}
633+
634+
if len(p.index)+p.createInProgress < p.config.limit {
635+
// for create item with preferred nodeID
636+
return nil
637+
}
634638
}
635639

636640
return p.removeFirstIdle()
@@ -746,15 +750,15 @@ func (p *Pool[PT, T]) waitFromCh(ctx context.Context) (item PT, finalErr error)
746750

747751
case item, ok := <-*ch:
748752
// Note that race may occur and some goroutine may try to write
749-
// session into channel after it was enqueued but before it being
753+
// item into channel after it was enqueued but before it being
750754
// read here. In that case we will receive nil here and will retry.
751755
//
752-
// The same way will work when some session become deleted - the
756+
// The same way will work when some item become deleted - the
753757
// nil value will be sent into the channel.
754758
if ok {
755759
// Put only filled and not closed channel back to the Client.
756760
// That is, we need to avoid races on filling reused channel
757-
// for the next waiter – session could be lost for a long time.
761+
// for the next waiter – item could be lost for a long time.
758762
p.putWaitCh(ch)
759763
}
760764

0 commit comments

Comments
 (0)