Skip to content

Commit 634b0d8

Browse files
committed
fixed pool behaviour on error when item must be deleted
1 parent d38e680 commit 634b0d8

File tree

4 files changed

+205
-129
lines changed

4 files changed

+205
-129
lines changed

internal/pool/pool.go

Lines changed: 143 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,16 @@ type (
3232
Item
3333
}
3434
Config[PT ItemConstraint[T], T any] struct {
35-
trace *Trace
36-
clock clockwork.Clock
37-
limit int
38-
createTimeout time.Duration
39-
createItem func(ctx context.Context) (PT, error)
40-
closeTimeout time.Duration
41-
closeItem func(ctx context.Context, item PT)
42-
idleTimeToLive time.Duration
43-
itemUsageLimit uint64
35+
trace *Trace
36+
clock clockwork.Clock
37+
limit int
38+
createTimeout time.Duration
39+
createItemFunc func(ctx context.Context) (PT, error)
40+
mustDeleteItemFunc func(item PT, err error) bool
41+
closeTimeout time.Duration
42+
closeItemFunc func(ctx context.Context, item PT)
43+
idleTimeToLive time.Duration
44+
itemUsageLimit uint64
4445
}
4546
itemInfo[PT ItemConstraint[T], T any] struct {
4647
idle *xlist.Element[PT]
@@ -54,8 +55,7 @@ type (
5455
Pool[PT ItemConstraint[T], T any] struct {
5556
config Config[PT, T]
5657

57-
createItem func(ctx context.Context) (PT, error)
58-
closeItem func(ctx context.Context, item PT)
58+
createItemFunc func(ctx context.Context) (PT, error)
5959

6060
mu xsync.RWMutex
6161
createInProgress int // KIKIMR-9163: in-create-process counter
@@ -71,15 +71,21 @@ type (
7171

7272
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
7373
return func(c *Config[PT, T]) {
74-
c.createItem = f
74+
c.createItemFunc = f
75+
}
76+
}
77+
78+
func WithMustDeleteItemFunc[PT ItemConstraint[T], T any](f func(item PT, err error) bool) Option[PT, T] {
79+
return func(c *Config[PT, T]) {
80+
c.mustDeleteItemFunc = f
7581
}
7682
}
7783

7884
func WithSyncCloseItem[PT ItemConstraint[T], T any]() Option[PT, T] {
7985
return func(c *Config[PT, T]) {
80-
c.closeItem = func(ctx context.Context, item PT) {
81-
_ = item.Close(ctx)
82-
}
86+
//c.closeItemFunc = func(ctx context.Context, item PT) {
87+
// _ = item.Close(ctx)
88+
//}
8389
}
8490
}
8591

@@ -131,12 +137,22 @@ func New[PT ItemConstraint[T], T any](
131137
) *Pool[PT, T] {
132138
p := &Pool[PT, T]{
133139
config: Config[PT, T]{
134-
trace: &Trace{},
135-
clock: clockwork.NewRealClock(),
136-
limit: DefaultLimit,
137-
createItem: defaultCreateItem[T, PT],
140+
trace: &Trace{},
141+
clock: clockwork.NewRealClock(),
142+
limit: DefaultLimit,
143+
createItemFunc: func(ctx context.Context) (PT, error) {
144+
var item T
145+
146+
return &item, nil
147+
},
148+
closeItemFunc: func(ctx context.Context, item PT) {
149+
_ = item.Close(ctx)
150+
},
138151
createTimeout: defaultCreateTimeout,
139152
closeTimeout: defaultCloseTimeout,
153+
mustDeleteItemFunc: func(item PT, err error) bool {
154+
return !xerrors.IsRetryObjectValid(err)
155+
},
140156
},
141157
index: make(map[PT]itemInfo[PT, T]),
142158
idle: xlist.New[PT](),
@@ -168,23 +184,11 @@ func New[PT ItemConstraint[T], T any](
168184
}
169185
}
170186

171-
p.createItem = makeAsyncCreateItemFunc(p)
172-
if p.config.closeItem != nil {
173-
p.closeItem = p.config.closeItem
174-
} else {
175-
p.closeItem = makeAsyncCloseItemFunc[PT, T](p)
176-
}
187+
p.createItemFunc = makeAsyncCreateItemFunc(p)
177188

178189
return p
179190
}
180191

181-
// defaultCreateItem returns a new item
182-
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
183-
var item T
184-
185-
return &item, nil
186-
}
187-
188192
// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
189193
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
190194
p *Pool[PT, T],
@@ -228,7 +232,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
228232
defer cancelCreate()
229233
}
230234

231-
newItem, err := p.config.createItem(createCtx)
235+
newItem, err := p.config.createItemFunc(createCtx)
232236
if newItem != nil {
233237
p.mu.WithLock(func() {
234238
var useCounter uint64
@@ -279,6 +283,80 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
279283
}
280284
}
281285

286+
type (
287+
closeItemOptions struct {
288+
withLock bool
289+
withDeleteFromPool bool
290+
withNotifyStats bool
291+
wg *sync.WaitGroup
292+
}
293+
closeItemOption func(*closeItemOptions)
294+
)
295+
296+
func closeItemWithLock() closeItemOption {
297+
return func(o *closeItemOptions) {
298+
o.withLock = true
299+
}
300+
}
301+
302+
func closeItemWithDeleteFromPool() closeItemOption {
303+
return func(o *closeItemOptions) {
304+
o.withDeleteFromPool = true
305+
}
306+
}
307+
308+
func closeItemNotifyStats() closeItemOption {
309+
return func(o *closeItemOptions) {
310+
o.withNotifyStats = true
311+
}
312+
}
313+
314+
func closeItemWithWaitGroup(wg *sync.WaitGroup) closeItemOption {
315+
return func(o *closeItemOptions) {
316+
o.wg = wg
317+
}
318+
}
319+
320+
func (p *Pool[PT, T]) closeItem(ctx context.Context, item PT, opts ...closeItemOption) {
321+
options := closeItemOptions{}
322+
for _, opt := range opts {
323+
opt(&options)
324+
}
325+
if options.withLock {
326+
p.mu.Lock()
327+
defer p.mu.Unlock()
328+
}
329+
330+
if options.withDeleteFromPool {
331+
if options.withNotifyStats {
332+
p.changeState(func() Stats {
333+
delete(p.index, item)
334+
335+
return p.stats()
336+
})
337+
} else {
338+
delete(p.index, item)
339+
}
340+
}
341+
342+
if t := p.config.closeTimeout; t > 0 {
343+
var cancel context.CancelFunc
344+
ctx, cancel = context.WithTimeout(ctx, t)
345+
defer cancel()
346+
}
347+
348+
if options.wg != nil {
349+
options.wg.Add(1)
350+
go func() {
351+
defer options.wg.Done()
352+
353+
p.config.closeItemFunc(ctx, item)
354+
}()
355+
} else {
356+
p.config.closeItemFunc(ctx, item)
357+
}
358+
}
359+
282360
func (p *Pool[PT, T]) stats() Stats {
283361
return Stats{
284362
Limit: p.config.limit,
@@ -296,24 +374,6 @@ func (p *Pool[PT, T]) Stats() Stats {
296374
return p.stats()
297375
}
298376

299-
func makeAsyncCloseItemFunc[PT ItemConstraint[T], T any](
300-
p *Pool[PT, T],
301-
) func(ctx context.Context, item PT) {
302-
return func(ctx context.Context, item PT) {
303-
go func() {
304-
closeItemCtx, closeItemCancel := xcontext.WithDone(xcontext.ValueOnly(ctx), p.done)
305-
defer closeItemCancel()
306-
307-
if d := p.config.closeTimeout; d > 0 {
308-
closeItemCtx, closeItemCancel = xcontext.WithTimeout(closeItemCtx, d)
309-
defer closeItemCancel()
310-
}
311-
312-
_ = item.Close(closeItemCtx)
313-
}()
314-
}
315-
}
316-
317377
func (p *Pool[PT, T]) changeState(changeState func() Stats) {
318378
if stats, onChange := changeState(), p.config.trace.OnChange; onChange != nil {
319379
onChange(stats)
@@ -357,7 +417,15 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
357417
}
358418

359419
defer func() {
360-
_ = p.putItem(ctx, item)
420+
if finalErr == nil || !p.config.mustDeleteItemFunc(item, finalErr) {
421+
_ = p.putItem(ctx, item)
422+
} else {
423+
p.closeItem(ctx, item,
424+
closeItemWithLock(),
425+
closeItemNotifyStats(),
426+
closeItemWithDeleteFromPool(),
427+
)
428+
}
361429
}()
362430

363431
err = f(ctx, item)
@@ -434,14 +502,13 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
434502
p.waitQ.Clear()
435503

436504
var wg sync.WaitGroup
437-
wg.Add(p.idle.Len())
438505

439506
for el := p.idle.Front(); el != nil; el = el.Next() {
440-
go func(item PT) {
441-
defer wg.Done()
442-
p.closeItem(ctx, item)
443-
}(el.Value)
444507
delete(p.index, el.Value)
508+
509+
p.closeItem(ctx, el.Value,
510+
closeItemWithWaitGroup(&wg),
511+
)
445512
}
446513

447514
wg.Wait()
@@ -664,14 +731,11 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
664731

665732
if (p.config.itemUsageLimit > 0 && *info.useCounter > p.config.itemUsageLimit) ||
666733
(p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive) {
667-
p.closeItem(ctx, item)
668-
p.mu.WithLock(func() {
669-
p.changeState(func() Stats {
670-
delete(p.index, item)
671-
672-
return p.stats()
673-
})
674-
})
734+
p.closeItem(ctx, item,
735+
closeItemWithLock(),
736+
closeItemNotifyStats(),
737+
closeItemWithDeleteFromPool(),
738+
)
675739

676740
continue
677741
}
@@ -680,7 +744,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
680744
}
681745
}
682746

683-
item, err := p.createItem(ctx)
747+
item, err := p.createItemFunc(ctx)
684748
if item != nil {
685749
return item, nil
686750
}
@@ -814,38 +878,31 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
814878
}
815879
select {
816880
case <-p.done:
817-
p.closeItem(ctx, item)
818-
p.mu.WithLock(func() {
819-
p.changeState(func() Stats {
820-
delete(p.index, item)
821-
822-
return p.stats()
823-
})
824-
})
881+
p.closeItem(ctx, item,
882+
closeItemWithLock(),
883+
closeItemNotifyStats(),
884+
closeItemWithDeleteFromPool(),
885+
)
825886

826887
return xerrors.WithStackTrace(errClosedPool)
827888
default:
828889
p.mu.Lock()
829890
defer p.mu.Unlock()
830891

831892
if !item.IsAlive() {
832-
p.closeItem(ctx, item)
833-
p.changeState(func() Stats {
834-
delete(p.index, item)
835-
836-
return p.stats()
837-
})
893+
p.closeItem(ctx, item,
894+
closeItemNotifyStats(),
895+
closeItemWithDeleteFromPool(),
896+
)
838897

839898
return xerrors.WithStackTrace(errItemIsNotAlive)
840899
}
841900

842901
if p.idle.Len() >= p.config.limit {
843-
p.closeItem(ctx, item)
844-
p.changeState(func() Stats {
845-
delete(p.index, item)
846-
847-
return p.stats()
848-
})
902+
p.closeItem(ctx, item,
903+
closeItemNotifyStats(),
904+
closeItemWithDeleteFromPool(),
905+
)
849906

850907
return xerrors.WithStackTrace(errPoolIsOverflow)
851908
}

0 commit comments

Comments
 (0)