2222 Pool [T any ] struct {
2323 clock clockwork.Clock
2424
25- create func (ctx context.Context , onClose func (item * T )) (* T , error )
26- close func (ctx context.Context , item * T ) error
25+ createItem func (ctx context.Context , onClose func (item * T )) (* T , error )
26+ deleteItem func (ctx context.Context , item * T ) error
27+ checkErr func (err error ) bool
2728
2829 mu xsync.Mutex
2930 index map [* T ]itemInfo
@@ -42,17 +43,19 @@ type (
4243func New [T any ](
4344 limit int ,
4445 createItem func (ctx context.Context , onClose func (item * T )) (* T , error ),
45- closeItem func (ctx context.Context , item * T ) error ,
46+ deleteItem func (ctx context.Context , item * T ) error ,
47+ checkErr func (err error ) bool ,
4648 opts ... option [T ],
4749) * Pool [T ] {
4850 p := & Pool [T ]{
49- clock : clockwork .NewRealClock (),
50- create : createItem ,
51- close : closeItem ,
52- index : make (map [* T ]itemInfo ),
53- idle : list .New (),
54- waitQ : list .New (),
55- limit : limit ,
51+ clock : clockwork .NewRealClock (),
52+ createItem : createItem ,
53+ deleteItem : deleteItem ,
54+ checkErr : checkErr ,
55+ index : make (map [* T ]itemInfo ),
56+ idle : list .New (),
57+ waitQ : list .New (),
58+ limit : limit ,
5659 waitChPool : sync.Pool {
5760 New : func () interface {} {
5861 ch := make (chan * T )
@@ -70,30 +73,34 @@ func New[T any](
7073}
7174
7275func (p * Pool [T ]) try (ctx context.Context , f func (ctx context.Context , item * T ) error ) error {
73- t , err := p .get (ctx )
76+ item , err := p .get (ctx )
7477 if err != nil {
7578 return xerrors .WithStackTrace (err )
7679 }
7780
7881 defer func () {
7982 select {
8083 case <- p .done :
81- _ = p .close (ctx , t )
84+ _ = p .deleteItem (ctx , item )
8285 default :
8386 p .mu .Lock ()
8487 defer p .mu .Unlock ()
8588
8689 if p .idle .Len () >= p .limit {
87- _ = p .close (ctx , t )
90+ _ = p .deleteItem (ctx , item )
8891 }
8992
90- if ! p .notify (t ) {
91- p .pushIdle (t , p .clock .Now ())
93+ if ! p .notify (item ) {
94+ p .pushIdle (item , p .clock .Now ())
9295 }
9396 }
9497 }()
9598
96- if err = f (ctx , t ); err != nil {
99+ if err = f (ctx , item ); err != nil {
100+ if p .checkErr (err ) {
101+ _ = p .deleteItem (ctx , item )
102+ }
103+
97104 return xerrors .WithStackTrace (err )
98105 }
99106
@@ -116,7 +123,7 @@ func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T)
116123 return nil
117124}
118125
119- func (p * Pool [T ]) createItem (ctx context.Context ) (item * T , err error ) {
126+ func (p * Pool [T ]) newItem (ctx context.Context ) (item * T , err error ) {
120127 select {
121128 case <- p .done :
122129 return nil , xerrors .WithStackTrace (errClosedPool )
@@ -140,7 +147,7 @@ func (p *Pool[T]) createItem(ctx context.Context) (item *T, err error) {
140147 })
141148 }()
142149
143- item , err = p .create (ctx , p .removeItem )
150+ item , err = p .createItem (ctx , p .removeItem )
144151 if err != nil {
145152 return nil , xerrors .WithStackTrace (err )
146153 }
@@ -153,7 +160,7 @@ func (p *Pool[T]) removeItem(item *T) {
153160 p .mu .WithLock (func () {
154161 info , has := p .index [item ]
155162 if ! has {
156- panic ( "item not found in pool" )
163+ return
157164 }
158165
159166 delete (p .index , item )
@@ -187,7 +194,7 @@ func (p *Pool[T]) get(ctx context.Context) (item *T, err error) {
187194 }
188195
189196 // Second, we try to create item.
190- item , _ = p .createItem (ctx )
197+ item , _ = p .newItem (ctx )
191198 if item != nil {
192199 return item , nil
193200 }
@@ -276,7 +283,7 @@ func (p *Pool[T]) Close(ctx context.Context) (err error) {
276283 p .wg .Add (1 )
277284 go func () {
278285 defer p .wg .Done ()
279- _ = p .close (ctx , item )
286+ _ = p .deleteItem (ctx , item )
280287 }()
281288 }
282289 }
0 commit comments