Skip to content

Commit 96ee84e

Browse files
committed
2 parents 9820077 + dbc9de2 commit 96ee84e

File tree

6 files changed

+114
-49
lines changed

6 files changed

+114
-49
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
* Moved `internal/deadline.ContextWithoutDeadline` to `internal/xcontext.WithoutDeadline`
1+
* Refactored `db.Table().CreateSession(ctx)` (maked retryable with internal create session timeout)
2+
* Refactored `internal/table/client.createSession(ctx)` (got rid of unnecessary goroutine)
23
* Supported many user-agent records
34

45
## v3.30.0

internal/table/client.go

Lines changed: 93 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,71 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
117117
if c == nil {
118118
return nil, xerrors.WithStackTrace(errNilClient)
119119
}
120+
createSession := func(ctx context.Context) (s Session, err error) {
121+
type result struct {
122+
s Session
123+
err error
124+
}
125+
126+
ch := make(chan result)
127+
128+
go func() {
129+
defer close(ch)
130+
131+
var (
132+
s Session
133+
err error
134+
)
135+
136+
createSessionCtx := xcontext.WithoutDeadline(ctx)
137+
138+
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
139+
var cancel context.CancelFunc
140+
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
141+
defer cancel()
142+
}
143+
144+
s, err = c.build(createSessionCtx)
145+
146+
select {
147+
case ch <- result{
148+
s: s,
149+
err: err,
150+
}: // nop
151+
case <-ctx.Done():
152+
if s != nil {
153+
_ = s.Close(ctx)
154+
}
155+
}
156+
}()
157+
158+
select {
159+
case r := <-ch:
160+
if r.err != nil {
161+
return nil, xerrors.WithStackTrace(r.err)
162+
}
163+
return r.s, nil
164+
case <-ctx.Done():
165+
return nil, ctx.Err()
166+
}
167+
}
120168
var s Session
121169
if !c.config.AutoRetry() {
122-
s, err = c.build(ctx)
123-
return s, xerrors.WithStackTrace(err)
170+
s, err = createSession(ctx)
171+
if err != nil {
172+
return nil, xerrors.WithStackTrace(err)
173+
}
174+
return s, nil
124175
}
125176
options := retryOptions(c.config.Trace(), opts...)
126177
err = retry.Retry(
127178
ctx,
128179
func(ctx context.Context) (err error) {
129180
s, err = c.build(ctx)
130-
return xerrors.WithStackTrace(err)
181+
if err != nil {
182+
return xerrors.WithStackTrace(err)
183+
}
184+
return nil
131185
},
132186
retry.WithIdempotent(true),
133187
retry.WithID("CreateSession"),
@@ -145,7 +199,10 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
145199
},
146200
}),
147201
)
148-
return s, xerrors.WithStackTrace(err)
202+
if err != nil {
203+
return nil, xerrors.WithStackTrace(err)
204+
}
205+
return s, nil
149206
}
150207

151208
func (c *Client) isClosed() bool {
@@ -175,17 +232,12 @@ type Session interface {
175232
table.ClosableSession
176233

177234
Status() string
178-
OnClose(f func(ctx context.Context))
235+
OnClose(f func())
179236

180237
isClosed() bool
181238
isClosing() bool
182239
}
183240

184-
type createSessionResult struct {
185-
s Session
186-
err error
187-
}
188-
189241
// p.mu must NOT be held.
190242
func (c *Client) createSession(ctx context.Context) (s Session, err error) {
191243
// pre-check the Client size
@@ -201,28 +253,37 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
201253
return nil, xerrors.WithStackTrace(errSessionPoolOverflow)
202254
}
203255

204-
resCh := make(chan createSessionResult, 1) // for non-block write
256+
type result struct {
257+
s Session
258+
err error
259+
}
260+
261+
ch := make(chan result)
205262

206263
go func() {
264+
defer close(ch)
265+
207266
var (
208267
s Session
209268
err error
210269
)
211270

212-
createSessionCtx, cancel := context.WithTimeout(
213-
meta.WithAllowFeatures(
214-
xcontext.WithoutDeadline(ctx),
215-
meta.HintSessionBalancer,
216-
),
217-
c.config.CreateSessionTimeout(),
271+
createSessionCtx := xcontext.WithoutDeadline(ctx)
272+
273+
createSessionCtx = meta.WithAllowFeatures(createSessionCtx,
274+
meta.HintSessionBalancer,
218275
)
219276

220-
onDone := trace.TableOnPoolSessionNew(c.config.Trace(), &ctx)
277+
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
278+
var cancel context.CancelFunc
279+
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
280+
defer cancel()
281+
}
282+
283+
onDone := trace.TableOnPoolSessionNew(c.config.Trace(), &createSessionCtx)
221284

222285
defer func() {
223286
onDone(s, err)
224-
cancel()
225-
close(resCh)
226287
}()
227288

228289
s, err = c.build(createSessionCtx)
@@ -231,7 +292,7 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
231292
}
232293

233294
if s != nil {
234-
s.OnClose(func(ctx context.Context) {
295+
s.OnClose(func() {
235296
c.mu.Lock()
236297
defer c.mu.Unlock()
237298

@@ -258,32 +319,30 @@ func (c *Client) createSession(ctx context.Context) (s Session, err error) {
258319

259320
c.mu.WithLock(func() {
260321
c.createInProgress--
261-
if s != nil {
322+
if s != nil && !c.closed {
262323
c.index[s] = sessionInfo{}
263324
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
264325
}
265326
})
266327

267-
resCh <- createSessionResult{
328+
select {
329+
case ch <- result{
268330
s: s,
269331
err: err,
332+
}: // nop
333+
case <-ctx.Done():
334+
// nop
270335
}
271336
}()
272337

273338
select {
274-
case r := <-resCh:
275-
if r.s == nil && r.err == nil {
276-
panic("ydb: abnormal result of createSession()")
339+
case r := <-ch:
340+
if r.err != nil {
341+
return nil, xerrors.WithStackTrace(r.err)
277342
}
278-
return r.s, xerrors.WithStackTrace(r.err)
343+
return r.s, nil
279344
case <-ctx.Done():
280-
// read result from resCh for prevention of forgetting session
281-
go func() {
282-
if r, ok := <-resCh; ok && r.s != nil {
283-
_ = r.s.Close(ctx)
284-
}
285-
}()
286-
return nil, ctx.Err()
345+
return nil, xerrors.WithStackTrace(ctx.Err())
287346
}
288347
}
289348

internal/table/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,9 @@ func TestSessionPoolClose(t *testing.T) {
289289
closed3 = false
290290
)
291291

292-
s1.OnClose(func(context.Context) { closed1 = true })
293-
s2.OnClose(func(context.Context) { closed2 = true })
294-
s3.OnClose(func(context.Context) { closed3 = true })
292+
s1.OnClose(func() { closed1 = true })
293+
s2.OnClose(func() { closed2 = true })
294+
s3.OnClose(func() { closed3 = true })
295295

296296
mustPutSession(t, p, s1)
297297
mustPutSession(t, p, s2)
@@ -524,7 +524,7 @@ func TestSessionPoolRacyGet(t *testing.T) {
524524
// Release the first create session request.
525525
// Created session must be stored in the Client.
526526
expSession = r1.session
527-
expSession.OnClose(func(context.Context) {
527+
expSession.OnClose(func() {
528528
t.Fatalf("unexpected first session close")
529529
})
530530
close(r1.release)
@@ -1057,7 +1057,7 @@ func TestSessionPoolKeepAliveMinSize(t *testing.T) {
10571057
sessionBuilder := func(t *testing.T, pool *Client) (Session, chan bool) {
10581058
s := mustCreateSession(t, pool)
10591059
closed := make(chan bool)
1060-
s.OnClose(func(context.Context) {
1060+
s.OnClose(func() {
10611061
close(closed)
10621062
})
10631063
return s, closed
@@ -1124,7 +1124,7 @@ func TestSessionPoolKeepAliveWithBadSession(t *testing.T) {
11241124
}()
11251125
s := mustCreateSession(t, p)
11261126
closed := make(chan bool)
1127-
s.OnClose(func(context.Context) {
1127+
s.OnClose(func() {
11281128
close(closed)
11291129
})
11301130
mustPutSession(t, p, s)

internal/table/retry_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestRetryerBadSession(t *testing.T) {
8080
p := SessionProviderFunc{
8181
OnGet: func(ctx context.Context) (Session, error) {
8282
s := simpleSession(t)
83-
s.OnClose(func(context.Context) {
83+
s.OnClose(func() {
8484
closed[s] = true
8585
})
8686
return s, nil
@@ -129,7 +129,7 @@ func TestRetryerSessionClosing(t *testing.T) {
129129
p := SessionProviderFunc{
130130
OnGet: func(ctx context.Context) (Session, error) {
131131
s := simpleSession(t)
132-
s.OnClose(func(context.Context) {
132+
s.OnClose(func() {
133133
closed[s] = true
134134
})
135135
return s, nil

internal/table/session.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type session struct {
5151
status options.SessionStatus
5252

5353
onCloseMtx xsync.RWMutex
54-
onClose []func(ctx context.Context)
54+
onClose []func()
5555
}
5656

5757
func (s *session) NodeID() uint32 {
@@ -142,7 +142,7 @@ func (s *session) ID() string {
142142
return s.id
143143
}
144144

145-
func (s *session) OnClose(cb func(ctx context.Context)) {
145+
func (s *session) OnClose(cb func()) {
146146
if s.isClosed() {
147147
return
148148
}
@@ -178,8 +178,8 @@ func (s *session) Close(ctx context.Context) (err error) {
178178
// call all close listeners before doing request
179179
// firstly this need to clear Client from this session
180180
s.onCloseMtx.WithRLock(func() {
181-
for _, cb := range s.onClose {
182-
cb(ctx)
181+
for _, onClose := range s.onClose {
182+
onClose()
183183
}
184184
})
185185

table/table.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,20 @@ type TxOperation func(ctx context.Context, tx TransactionActor) error
2929

3030
type ClosableSession interface {
3131
closer.Closer
32+
3233
Session
34+
35+
OnClose(f func())
3336
}
3437

3538
type Client interface {
3639
closer.Closer
3740

3841
// CreateSession returns session or error for manually control of session lifecycle
39-
// CreateSession do not provide retry loop for failed create session requests.
40-
// Best effort policy may be implements with outer retry loop includes CreateSession call
42+
//
43+
// CreateSession implements internal busy loop until one of the following conditions is met:
44+
// - context was canceled or deadlined
45+
// - session was created
4146
CreateSession(ctx context.Context, opts ...Option) (s ClosableSession, err error)
4247

4348
// Do provide the best effort for execute operation.

0 commit comments

Comments
 (0)