Skip to content

Commit 692c27a

Browse files
committed
add create session goroutine to table client spawned goroutines background worker
1 parent f6e35cf commit 692c27a

File tree

4 files changed

+139
-136
lines changed

4 files changed

+139
-136
lines changed

internal/table/client.go

Lines changed: 70 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
"errors"
77
"fmt"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1213
"google.golang.org/grpc"
1314
grpcCodes "google.golang.org/grpc/codes"
1415

16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
1618
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
1719
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
@@ -107,9 +109,9 @@ type Client struct {
107109
mu xsync.Mutex
108110
waitChPool sync.Pool
109111
testHookGetWaitCh func() // nil except some tests.
110-
wgClosed sync.WaitGroup
112+
spawnedGoroutines background.Worker
111113
touching bool
112-
closed bool
114+
closed uint32
113115
}
114116

115117
func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) {
@@ -124,9 +126,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
124126

125127
ch := make(chan result)
126128

127-
go func() {
128-
defer close(ch)
129-
129+
c.spawnedGoroutines.Start("CreateSession", func(ctx context.Context) {
130130
var (
131131
s Session
132132
err error
@@ -149,10 +149,10 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
149149
}: // nop
150150
case <-ctx.Done():
151151
if s != nil {
152-
_ = s.Close(ctx)
152+
_ = s.Close(xcontext.WithoutDeadline(ctx))
153153
}
154154
}
155-
}()
155+
})
156156

157157
select {
158158
case r := <-ch:
@@ -205,9 +205,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
205205
}
206206

207207
func (c *Client) isClosed() bool {
208-
c.mu.Lock()
209-
defer c.mu.Unlock()
210-
return c.closed
208+
return atomic.LoadUint32(&c.closed) != 0
211209
}
212210

213211
func isCreateSessionErrorRetriable(err error) bool {
@@ -259,9 +257,7 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
259257

260258
ch := make(chan result)
261259

262-
go func() {
263-
defer close(ch)
264-
260+
c.spawnedGoroutines.Start("createSession", func(ctx context.Context) {
265261
var (
266262
s Session
267263
err error
@@ -292,35 +288,36 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
292288

293289
if s != nil {
294290
s.OnClose(func() {
295-
c.mu.Lock()
296-
defer c.mu.Unlock()
297-
298-
info, has := c.index[s]
299-
if !has {
300-
return
301-
}
291+
c.mu.WithLock(func() {
292+
if c.isClosed() {
293+
return
294+
}
302295

303-
delete(c.index, s)
296+
info, has := c.index[s]
297+
if !has {
298+
return
299+
}
304300

305-
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
301+
delete(c.index, s)
306302

307-
if c.closed {
308-
return
309-
}
303+
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
310304

311-
c.notify(nil)
305+
c.notify(nil)
312306

313-
if info.idle != nil {
314-
panic("session closed while still in idle client")
315-
}
307+
if info.idle != nil {
308+
panic("session closed while still in idle client")
309+
}
310+
})
316311
})
317312
}
318313

319314
c.mu.WithLock(func() {
320315
c.createInProgress--
321-
if s != nil && !c.closed {
322-
c.index[s] = sessionInfo{}
323-
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
316+
if s != nil {
317+
if !c.isClosed() {
318+
c.index[s] = sessionInfo{}
319+
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
320+
}
324321
}
325322
})
326323

@@ -330,9 +327,11 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
330327
err: err,
331328
}: // nop
332329
case <-ctx.Done():
333-
// nop
330+
if s != nil {
331+
_ = s.Close(xcontext.WithoutDeadline(ctx))
332+
}
334333
}
335-
}()
334+
})
336335

337336
select {
338337
case r := <-ch:
@@ -389,6 +388,9 @@ func (c *Client) get(ctx context.Context, opts ...getOption) (s Session, err err
389388
// Second, we try to create new session
390389
s, err = c.createSession(ctx)
391390
if s == nil && err == nil {
391+
if err = ctx.Err(); err != nil {
392+
return nil, xerrors.WithStackTrace(err)
393+
}
392394
panic("both of session and err are nil")
393395
}
394396
// got session or err is not recoverable
@@ -489,7 +491,7 @@ func (c *Client) Put(ctx context.Context, s Session) (err error) {
489491

490492
c.mu.WithLock(func() {
491493
switch {
492-
case c.closed:
494+
case c.isClosed():
493495
err = xerrors.WithStackTrace(errClosedClient)
494496

495497
case c.idle.Len() >= c.limit:
@@ -538,41 +540,41 @@ func (c *Client) Close(ctx context.Context) (err error) {
538540
}
539541

540542
var issues []error
541-
c.mu.WithLock(func() {
542-
keeperDone := c.keeperDone
543-
if ch := c.keeperStop; ch != nil {
544-
close(ch)
545-
}
546-
547-
if keeperDone != nil {
548-
<-keeperDone
549-
}
543+
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
544+
c.mu.WithLock(func() {
545+
keeperDone := c.keeperDone
546+
if ch := c.keeperStop; ch != nil {
547+
close(ch)
548+
}
550549

551-
for el := c.waitq.Front(); el != nil; el = el.Next() {
552-
ch := el.Value.(*chan Session)
553-
close(*ch)
554-
}
550+
if keeperDone != nil {
551+
<-keeperDone
552+
}
555553

556-
issues = make([]error, 0, len(c.index))
554+
for el := c.waitq.Front(); el != nil; el = el.Next() {
555+
ch := el.Value.(*chan Session)
556+
close(*ch)
557+
}
557558

558-
for e := c.idle.Front(); e != nil; e = e.Next() {
559-
if err = c.closeSession(
560-
ctx,
561-
e.Value.(Session),
562-
withCloseSessionAsync(),
563-
); err != nil {
564-
issues = append(issues, err)
559+
issues = make([]error, 0, len(c.index))
560+
for e := c.idle.Front(); e != nil; e = e.Next() {
561+
if err = c.closeSession(
562+
ctx,
563+
e.Value.(Session),
564+
withCloseSessionAsync(),
565+
); err != nil {
566+
issues = append(issues, err)
567+
}
565568
}
566-
}
567569

568-
c.limit = 0
569-
c.idle = list.New()
570-
c.waitq = list.New()
571-
c.index = make(map[Session]sessionInfo)
572-
c.closed = true
573-
})
570+
c.limit = 0
571+
c.idle = list.New()
572+
c.waitq = list.New()
573+
c.index = make(map[Session]sessionInfo)
574+
})
575+
}
574576

575-
c.wgClosed.Wait()
577+
_ = c.spawnedGoroutines.Close(ctx, errClosedClient)
576578

577579
if len(issues) > 0 {
578580
return xerrors.WithStackTrace(xerrors.NewWithIssues("table client closed with issues", issues...))
@@ -775,11 +777,7 @@ func (c *Client) keeper(ctx context.Context) {
775777
timer.Reset(delay)
776778
}
777779
for _, s := range toDelete {
778-
_ = c.closeSession(
779-
ctx,
780-
s,
781-
withCloseSessionLock(),
782-
)
780+
_ = c.closeSession(ctx, s)
783781
}
784782
if touchingDone != nil {
785783
close(touchingDone)
@@ -900,7 +898,6 @@ func (c *Client) CloseSession(ctx context.Context, s Session) error {
900898
return c.closeSession(
901899
ctx,
902900
s,
903-
withCloseSessionLock(),
904901
withCloseSessionAsync(),
905902
withCloseSessionTrace(),
906903
)
@@ -909,17 +906,10 @@ func (c *Client) CloseSession(ctx context.Context, s Session) error {
909906
type closeSessionOptionsHolder struct {
910907
withTrace bool
911908
withAsync bool
912-
withLock bool
913909
}
914910

915911
type closeSessionOption func(h *closeSessionOptionsHolder)
916912

917-
func withCloseSessionLock() closeSessionOption {
918-
return func(h *closeSessionOptionsHolder) {
919-
h.withLock = true
920-
}
921-
}
922-
923913
func withCloseSessionAsync() closeSessionOption {
924914
return func(h *closeSessionOptionsHolder) {
925915
h.withAsync = true
@@ -945,17 +935,7 @@ func (c *Client) closeSession(ctx context.Context, s Session, opts ...closeSessi
945935
defer onDone()
946936
}
947937

948-
if h.withLock {
949-
c.mu.WithLock(func() {
950-
c.wgClosed.Add(1)
951-
})
952-
} else {
953-
c.wgClosed.Add(1)
954-
}
955-
956938
f := func(s Session) {
957-
defer c.wgClosed.Done()
958-
959939
closeCtx, cancel := context.WithTimeout(
960940
xcontext.WithoutDeadline(ctx),
961941
c.config.DeleteTimeout(),
@@ -966,7 +946,9 @@ func (c *Client) closeSession(ctx context.Context, s Session, opts ...closeSessi
966946
}
967947

968948
if h.withAsync {
969-
go f(s)
949+
c.spawnedGoroutines.Start("closeSession", func(ctx context.Context) {
950+
f(s)
951+
})
970952
} else {
971953
f(s)
972954
}

0 commit comments

Comments
 (0)