Skip to content

Commit 7f0980a

Browse files
authored
Merge pull request #1049 from ydb-platform/internal-pool-table-refactoring
add pool implementation
2 parents f3a504c + f118371 commit 7f0980a

File tree

2 files changed

+364
-0
lines changed

2 files changed

+364
-0
lines changed

internal/pool/pool.go

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
package pool
2+
3+
import (
4+
"container/list"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
"time"
10+
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
14+
)
15+
16+
var (
17+
errClosedPool = xerrors.Wrap(errors.New("pool client closed early"))
18+
errPoolOverflow = xerrors.Wrap(errors.New("pool overflow"))
19+
errNoProgress = xerrors.Wrap(errors.New("no progress"))
20+
errNilPool = xerrors.Wrap(errors.New("pool is not initialized"))
21+
)
22+
23+
type Pool[T any] interface {
24+
Close(ctx context.Context) error
25+
With(ctx context.Context, f func(ctx context.Context, s *T) error) (err error)
26+
27+
// private API
28+
get(ctx context.Context) (s *T, err error)
29+
put(el *T) error
30+
}
31+
32+
type pool[T any] struct {
33+
mu xsync.Mutex
34+
idle *list.List // list<*session>
35+
waitQ *list.List // list<*chan *session>
36+
limit int
37+
waitChPool sync.Pool
38+
done chan struct{}
39+
wg sync.WaitGroup
40+
create func(ctx context.Context) (*T, error)
41+
close func(ctx context.Context, s *T) error
42+
deleteTimout time.Duration
43+
}
44+
45+
func NewPool[T any](
46+
limit int,
47+
createEntity func(ctx context.Context) (*T, error),
48+
closeEntity func(ctx context.Context, s *T) error,
49+
deleteTimout time.Duration,
50+
) Pool[T] {
51+
return &pool[T]{
52+
limit: limit,
53+
waitChPool: sync.Pool{
54+
New: func() interface{} {
55+
ch := make(chan *T)
56+
57+
return &ch
58+
},
59+
},
60+
create: createEntity,
61+
close: closeEntity,
62+
idle: list.New(),
63+
waitQ: list.New(),
64+
deleteTimout: deleteTimout,
65+
}
66+
}
67+
68+
func (p *pool[T]) With(ctx context.Context, f func(ctx context.Context, s *T) error) (err error) {
69+
select {
70+
case <-p.done:
71+
return xerrors.WithStackTrace(errClosedPool)
72+
default:
73+
74+
s, err := p.get(ctx)
75+
if err != nil {
76+
return err
77+
}
78+
79+
defer func() {
80+
if err != nil {
81+
p.internalPoolSyncCloseSession(ctx, s)
82+
}
83+
}()
84+
85+
err = f(ctx, s)
86+
if err != nil {
87+
return err
88+
}
89+
90+
err = p.put(s)
91+
if err != nil {
92+
return err
93+
}
94+
95+
return nil
96+
}
97+
}
98+
99+
// checked
100+
func (p *pool[T]) Close(ctx context.Context) error {
101+
if p == nil {
102+
return xerrors.WithStackTrace(errNilPool)
103+
}
104+
105+
p.mu.WithLock(func() {
106+
select {
107+
case <-p.done:
108+
return
109+
default:
110+
close(p.done)
111+
112+
p.limit = 0
113+
114+
for el := p.waitQ.Front(); el != nil; el = el.Next() {
115+
ch := el.Value.(*chan *T)
116+
close(*ch)
117+
}
118+
119+
for e := p.idle.Front(); e != nil; e = e.Next() {
120+
s := e.Value.(*T)
121+
// TODO don't forget do it in session
122+
// s.SetStatus(table.SessionClosing)
123+
p.wg.Add(1)
124+
go func() {
125+
defer p.wg.Done()
126+
p.internalPoolSyncCloseSession(ctx, s)
127+
}()
128+
}
129+
}
130+
})
131+
132+
p.wg.Wait()
133+
134+
return nil
135+
}
136+
137+
// checked
138+
func (p *pool[T]) internalPoolSyncCloseSession(ctx context.Context, s *T) {
139+
var cancel context.CancelFunc
140+
ctx, cancel = xcontext.WithTimeout(ctx, p.deleteTimout)
141+
defer cancel()
142+
143+
_ = p.close(ctx, s)
144+
}
145+
146+
// checked
147+
// private API
148+
func (p *pool[T]) get(ctx context.Context) (s *T, err error) {
149+
if p.isClosed() {
150+
return nil, xerrors.WithStackTrace(errClosedPool)
151+
}
152+
153+
var (
154+
start = time.Now()
155+
i = 0
156+
)
157+
158+
const maxAttempts = 100
159+
for s == nil && err == nil && i < maxAttempts && !p.isClosed() {
160+
i++
161+
// First, we try to internalPoolGet session from idle
162+
p.mu.WithLock(func() {
163+
s = p.internalPoolRemoveFirstIdle()
164+
})
165+
166+
if s != nil {
167+
return s, nil
168+
}
169+
170+
// Second, we try to create new session
171+
s, err = p.create(ctx)
172+
if s == nil && err == nil {
173+
if err = ctx.Err(); err != nil {
174+
return nil, xerrors.WithStackTrace(err)
175+
}
176+
panic("both of session and err are nil")
177+
}
178+
179+
if s != nil {
180+
return s, err
181+
}
182+
183+
// Third, we try to wait for a touched session - Client is full.
184+
//
185+
// This should be done only if number of currently waiting goroutines
186+
// are less than maximum amount of touched session. That is, we want to
187+
// be fair here and not to lock more goroutines than we could ship
188+
// session to.
189+
s, err = p.internalPoolWaitFromCh(ctx)
190+
if err != nil {
191+
err = xerrors.WithStackTrace(err)
192+
}
193+
}
194+
195+
if s == nil && err == nil {
196+
if p.isClosed() {
197+
err = xerrors.WithStackTrace(errClosedPool)
198+
} else {
199+
err = xerrors.WithStackTrace(errNoProgress)
200+
}
201+
}
202+
if err != nil {
203+
var idle int
204+
205+
p.mu.WithLock(func() {
206+
idle = p.idle.Len()
207+
})
208+
209+
return s, xerrors.WithStackTrace(
210+
fmt.Errorf("failed to get session from pool ("+
211+
"attempts: %d, latency: %v, pool have ( %d idle): %w",
212+
i, time.Since(start), idle, err,
213+
),
214+
)
215+
}
216+
217+
return s, nil
218+
}
219+
220+
// checked
221+
// internalPoolGetWaitCh returns pointer to a channel of sessions.
222+
//
223+
// Note that returning a pointer reduces allocations on sync.Pool usage –
224+
// sync.Client.Get() returns empty interface, which leads to allocation for
225+
// non-pointer values.
226+
func (p *pool[T]) internalPoolGetWaitCh() *chan *T { //nolint:gocritic
227+
ch := p.waitChPool.Get()
228+
s, ok := ch.(*chan *T)
229+
if !ok {
230+
panic(fmt.Sprintf("%T is not a chan of sessions", ch))
231+
}
232+
233+
return s
234+
}
235+
236+
// checked
237+
func (p *pool[T]) internalPoolWaitFromCh(ctx context.Context) (s *T, err error) {
238+
var (
239+
ch *chan *T
240+
el *list.Element // Element in the wait queue.
241+
ok bool
242+
)
243+
244+
p.mu.WithLock(func() {
245+
ch = p.internalPoolGetWaitCh()
246+
el = p.waitQ.PushBack(ch)
247+
})
248+
249+
select {
250+
case <-p.done:
251+
p.mu.WithLock(func() {
252+
p.waitQ.Remove(el)
253+
})
254+
return nil, xerrors.WithStackTrace(errClosedPool)
255+
256+
case s, ok = <-*ch:
257+
// Note that race may occur and some goroutine may try to write
258+
// session into channel after it was enqueued but before it being
259+
// read here. In that case we will receive nil here and will retry.
260+
//
261+
// The same way will work when some session become deleted - the
262+
// nil value will be sent into the channel.
263+
if ok {
264+
// Put only filled and not closed channel back to the Client.
265+
// That is, we need to avoid races on filling reused channel
266+
// for the next waiter – session could be lost for a long time.
267+
p.waitChPool.Put(ch)
268+
}
269+
return s, nil
270+
271+
case <-ctx.Done():
272+
p.mu.WithLock(func() {
273+
p.waitQ.Remove(el)
274+
})
275+
return nil, xerrors.WithStackTrace(ctx.Err())
276+
}
277+
}
278+
279+
// checked
280+
// removes first session from idle and resets the keepAliveCount
281+
// to prevent session from dying in the internalPoolGC after it was returned
282+
// to be used only in outgoing functions that make session busy.
283+
// c.mu must be held.
284+
func (p *pool[T]) internalPoolRemoveFirstIdle() *T {
285+
el := p.idle.Front()
286+
if el == nil {
287+
return nil
288+
}
289+
s := el.Value.(*T)
290+
if s != nil {
291+
p.idle.Remove(el)
292+
}
293+
return s
294+
}
295+
296+
// checked
297+
func (p *pool[T]) put(el *T) error {
298+
switch {
299+
case p.isClosed():
300+
return xerrors.WithStackTrace(errClosedPool)
301+
302+
default:
303+
304+
p.mu.Lock()
305+
defer p.mu.Unlock()
306+
307+
if p.idle.Len() >= p.limit {
308+
return xerrors.WithStackTrace(errPoolOverflow)
309+
}
310+
311+
if !p.internalPoolNotify(el) {
312+
p.idle.PushBack(el)
313+
// c.internalPoolPushIdle(s, c.clock.Now())
314+
}
315+
316+
return nil
317+
}
318+
}
319+
320+
// checked
321+
// c.mu must be held.
322+
func (p *pool[T]) internalPoolNotify(s *T) (notified bool) {
323+
for el := p.waitQ.Front(); el != nil; el = p.waitQ.Front() {
324+
// Some goroutine is waiting for a session.
325+
//
326+
// It could be in this states:
327+
// 1) Reached the select code and awaiting for a value in channel.
328+
// 2) Reached the select code but already in branch of deadline
329+
// cancellation. In this case it is locked on c.mu.Lock().
330+
// 3) Not reached the select code and thus not reading yet from the
331+
// channel.
332+
//
333+
// For cases (2) and (3) we close the channel to signal that goroutine
334+
// missed something and may want to retry (especially for case (3)).
335+
//
336+
// After that we taking a next waiter and repeat the same.
337+
ch := p.waitQ.Remove(el).(*chan *T)
338+
select {
339+
case *ch <- s:
340+
// Case (1).
341+
return true
342+
343+
case <-p.done:
344+
// Case (2) or (3).
345+
close(*ch)
346+
347+
default:
348+
// Case (2) or (3).
349+
close(*ch)
350+
}
351+
}
352+
return false
353+
}
354+
355+
// checked
356+
func (p *pool[T]) isClosed() bool {
357+
select {
358+
case <-p.done:
359+
return true
360+
default:
361+
return false
362+
}
363+
}

internal/pool/pool_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package pool

0 commit comments

Comments
 (0)