Skip to content

Commit c6ab847

Browse files
committed
marked truncated result as retryable error + added closing sessions if node removed from discovery results
1 parent e0268b8 commit c6ab847

File tree

9 files changed

+102
-54
lines changed

9 files changed

+102
-54
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Marked the truncated result as retryable error
2+
* Added closing sessions if node removed from discovery results
3+
14
## v3.37.6
25
* Added to balancer notifying mechanism for listening in table client event about removing some nodes and closing sessions on them
36
* Removed from public client interfaces `closer.Closer` (for exclude undefined behaviour on client-side)

internal/balancer/balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Balancer struct {
4141
onDiscovery []func(ctx context.Context, endpoints []endpoint.Info)
4242
}
4343

44-
func (b *Balancer) OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
44+
func (b *Balancer) OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
4545
b.mu.WithLock(func() {
4646
b.onDiscovery = append(b.onDiscovery, onDiscovery)
4747
})

internal/endpoint/endpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func withLastUpdated(ts time.Time) Option {
150150
}
151151
}
152152

153-
func New(address string, opts ...Option) Endpoint {
153+
func New(address string, opts ...Option) *endpoint {
154154
e := &endpoint{
155155
address: address,
156156
lastUpdated: time.Now(),

internal/table/client.go

Lines changed: 85 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type sessionBuilder func(ctx context.Context, opts ...sessionBuilderOption) (*se
3232
type balancerNotifier interface {
3333
grpc.ClientConnInterface
3434

35-
OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
35+
OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
3636
}
3737

3838
func New(balancer balancerNotifier, config config.Config) *Client {
@@ -55,6 +55,7 @@ func newClient(
5555
cc: balancer,
5656
build: builder,
5757
index: make(map[*session]sessionInfo),
58+
nodes: make(map[uint32]map[*session]struct{}),
5859
idle: list.New(),
5960
waitq: list.New(),
6061
limit: config.SizeLimit(),
@@ -67,7 +68,7 @@ func newClient(
6768
done: make(chan struct{}),
6869
}
6970
if balancer != nil {
70-
balancer.OnDiscovery(c.onDiscovery)
71+
balancer.OnUpdate(c.updateNodes)
7172
}
7273
if idleThreshold := config.IdleThreshold(); idleThreshold > 0 {
7374
c.spawnedGoroutines.Add(1)
@@ -88,6 +89,7 @@ type Client struct {
8889
// read-write fields
8990
mu xsync.Mutex
9091
index map[*session]sessionInfo
92+
nodes map[uint32]map[*session]struct{}
9193
createInProgress int // KIKIMR-9163: in-create-process counter
9294
limit int // Upper bound for Client size.
9395
idle *list.List // list<*session>
@@ -118,7 +120,7 @@ func withCreateSessionOnClose(onClose func(s *session)) createSessionOption {
118120
}
119121
}
120122

121-
func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
123+
func (c *Client) updateNodes(ctx context.Context, endpoints []endpoint.Info) {
122124
nodeIDs := make([]uint32, len(endpoints))
123125
for i, e := range endpoints {
124126
nodeIDs[i] = e.NodeID()
@@ -127,26 +129,17 @@ func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
127129
return nodeIDs[i] < nodeIDs[j]
128130
})
129131
c.mu.WithLock(func() {
130-
touched := make(map[*session]struct{}, len(c.index))
131-
for e := c.idle.Front(); e != nil; e = e.Next() {
132-
s := e.Value.(*session)
133-
nodeID := s.NodeID()
132+
for nodeID := range c.nodes {
134133
if sort.Search(len(nodeIDs), func(i int) bool {
135134
return nodeIDs[i] >= nodeID
136135
}) == len(nodeIDs) {
137-
c.internalPoolAsyncCloseSession(ctx, s)
138-
}
139-
touched[s] = struct{}{}
140-
}
141-
for s := range c.index {
142-
if _, has := touched[s]; has {
143-
continue
144-
}
145-
nodeID := s.NodeID()
146-
if sort.Search(len(nodeIDs), func(i int) bool {
147-
return nodeIDs[i] >= nodeID
148-
}) == len(nodeIDs) {
149-
s.SetStatus(options.SessionClosing)
136+
for s := range c.nodes[nodeID] {
137+
if c.index[s].idle != nil {
138+
c.internalPoolAsyncCloseSession(ctx, s)
139+
} else {
140+
s.SetStatus(options.SessionClosing)
141+
}
142+
}
150143
}
151144
}
152145
})
@@ -256,13 +249,51 @@ func (c *Client) createSession(ctx context.Context, opts ...createSessionOption)
256249
}
257250
}
258251

252+
func (c *Client) appendSessionToNodes(s *session) {
253+
c.mu.WithLock(func() {
254+
nodeID := s.NodeID()
255+
sessions, has := c.nodes[nodeID]
256+
if !has {
257+
sessions = make(map[*session]struct{})
258+
}
259+
sessions[s] = struct{}{}
260+
c.nodes[nodeID] = sessions
261+
})
262+
}
263+
264+
func (c *Client) removeSessionFromNodes(s *session) {
265+
c.mu.WithLock(func() {
266+
nodeID := s.NodeID()
267+
sessions, has := c.nodes[nodeID]
268+
if !has {
269+
sessions = make(map[*session]struct{})
270+
}
271+
delete(sessions, s)
272+
if len(sessions) == 0 {
273+
delete(c.nodes, nodeID)
274+
} else {
275+
c.nodes[nodeID] = sessions
276+
}
277+
})
278+
}
279+
259280
func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) {
260281
if c == nil {
261282
return nil, xerrors.WithStackTrace(errNilClient)
262283
}
263284
var s *session
285+
createSession := func(ctx context.Context) (*session, error) {
286+
s, err = c.createSession(ctx,
287+
withCreateSessionOnCreate(c.appendSessionToNodes),
288+
withCreateSessionOnClose(c.removeSessionFromNodes),
289+
)
290+
if err != nil {
291+
return nil, xerrors.WithStackTrace(err)
292+
}
293+
return s, nil
294+
}
264295
if !c.config.AutoRetry() {
265-
s, err = c.createSession(ctx)
296+
s, err = createSession(ctx)
266297
if err != nil {
267298
return nil, xerrors.WithStackTrace(err)
268299
}
@@ -272,7 +303,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
272303
err = retry.Retry(
273304
ctx,
274305
func(ctx context.Context) (err error) {
275-
s, err = c.createSession(ctx)
306+
s, err = createSession(ctx)
276307
if err != nil {
277308
return xerrors.WithStackTrace(err)
278309
}
@@ -328,40 +359,45 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
328359
})
329360
}()
330361

331-
s, err = c.createSession(meta.WithAllowFeatures(ctx,
332-
meta.HintSessionBalancer,
333-
), withCreateSessionOnCreate(func(s *session) {
334-
c.mu.WithLock(func() {
335-
c.index[s] = sessionInfo{
336-
touched: timeutil.Now(),
337-
}
338-
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
339-
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
340-
})
341-
}), withCreateSessionOnClose(func(s *session) {
342-
c.mu.WithLock(func() {
343-
info, has := c.index[s]
344-
if !has {
345-
panic("session not found in pool")
346-
}
362+
s, err = c.createSession(
363+
meta.WithAllowFeatures(ctx,
364+
meta.HintSessionBalancer,
365+
),
366+
withCreateSessionOnCreate(c.appendSessionToNodes),
367+
withCreateSessionOnClose(c.removeSessionFromNodes),
368+
withCreateSessionOnCreate(func(s *session) {
369+
c.mu.WithLock(func() {
370+
c.index[s] = sessionInfo{
371+
touched: timeutil.Now(),
372+
}
373+
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
374+
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
375+
})
376+
}), withCreateSessionOnClose(func(s *session) {
377+
c.mu.WithLock(func() {
378+
info, has := c.index[s]
379+
if !has {
380+
panic("session not found in pool")
381+
}
347382

348-
delete(c.index, s)
383+
delete(c.index, s)
349384

350-
trace.TableOnPoolSessionRemove(c.config.Trace(), s)
351-
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
385+
trace.TableOnPoolSessionRemove(c.config.Trace(), s)
386+
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
352387

353-
if !c.isClosed() {
354-
c.internalPoolNotify(nil)
355-
}
388+
if !c.isClosed() {
389+
c.internalPoolNotify(nil)
390+
}
356391

357-
if info.idle != nil {
358-
c.idle.Remove(info.idle)
359-
}
360-
})
361-
}))
392+
if info.idle != nil {
393+
c.idle.Remove(info.idle)
394+
}
395+
})
396+
}))
362397
if err != nil {
363398
return nil, xerrors.WithStackTrace(err)
364399
}
400+
365401
return s, nil
366402
}
367403

internal/table/scanner/scanner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2424
)
2525

26-
var errTruncated = xerrors.Wrap(errors.New("truncated result"))
26+
var errTruncated = xerrors.Retryable(errors.New("truncated result"))
2727

2828
type scanner struct {
2929
set *Ydb.ResultSet

internal/table/session.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config config.
129129
id: result.GetSessionId(),
130130
tableService: c,
131131
config: config,
132+
status: options.SessionReady,
132133
}
133134

134135
for _, o := range opts {

internal/xsql/conn.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,14 @@ func (c *conn) checkClosed(err error) error {
102102
}
103103

104104
func (c *conn) isClosed() bool {
105-
return atomic.LoadUint32(&c.closed) == 1
105+
if atomic.LoadUint32(&c.closed) == 1 {
106+
return true
107+
}
108+
if c.session.Status() != options.SessionReady.String() {
109+
c.setClosed()
110+
return true
111+
}
112+
return false
106113
}
107114

108115
func (c *conn) setClosed() {

table/table.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type Client interface {
6767

6868
type SessionInfo interface {
6969
ID() string
70+
Status() string
7071
}
7172

7273
type Session interface {

testutil/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func NewBalancer(opts ...balancerOption) *balancerStub {
291291
return c
292292
}
293293

294-
func (b *balancerStub) OnDiscovery(func(context.Context, []endpoint.Info)) {
294+
func (b *balancerStub) OnUpdate(func(context.Context, []endpoint.Info)) {
295295
}
296296

297297
type clientConn struct {

0 commit comments

Comments
 (0)