Skip to content

Commit 64fdb80

Browse files
authored
Merge pull request #373 from ydb-platform/early-closing-session
add listening discovery results for closing sessions on removed nodes early
2 parents fe0929c + cd65fdd commit 64fdb80

File tree

15 files changed

+88
-47
lines changed

15 files changed

+88
-47
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Added to balancer notifying mechanism for listening in table client event about removing some nodes and closing sessions on them
2+
* Removed from public client interfaces `closer.Closer` (for exclude undefined behaviour on client-side)
3+
14
## v3.37.5
25
* Refactoring of `xsql` errors checking
36

coordination/coordination.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@ package coordination
33
import (
44
"context"
55

6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
76
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
87
)
98

109
type Client interface {
11-
closer.Closer
12-
1310
CreateNode(ctx context.Context, path string, config NodeConfig) (err error)
1411
AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
1512
DropNode(ctx context.Context, path string) (err error)

discovery/discovery.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
98
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
109
)
1110

@@ -19,8 +18,6 @@ func (w WhoAmI) String() string {
1918
}
2019

2120
type Client interface {
22-
closer.Closer
23-
2421
Discover(ctx context.Context) ([]endpoint.Endpoint, error)
2522
WhoAmI(ctx context.Context) (*WhoAmI, error)
2623
}

internal/balancer/balancer.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1010
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
1111
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1314
discoveryBuilder "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery"
1415
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
@@ -21,16 +22,29 @@ import (
2122

2223
var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
2324

25+
type discoveryClient interface {
26+
closer.Closer
27+
discovery.Client
28+
}
29+
2430
type Balancer struct {
2531
driverConfig config.Config
2632
balancerConfig balancerConfig.Config
2733
pool *conn.Pool
28-
discovery discovery.Client
34+
discovery discoveryClient
2935
discoveryRepeater repeater.Repeater
3036
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)
3137

3238
mu xsync.RWMutex
3339
connectionsState *connectionsState
40+
41+
onDiscovery []func(ctx context.Context, endpoints []endpoint.Info)
42+
}
43+
44+
func (b *Balancer) OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
45+
b.mu.WithLock(func() {
46+
b.onDiscovery = append(b.onDiscovery, onDiscovery)
47+
})
3448
}
3549

3650
func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
@@ -83,8 +97,16 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
8397
info := balancerConfig.Info{SelfLocation: localDC}
8498
state := newConnectionsState(connections, b.balancerConfig.IsPreferConn, info, b.balancerConfig.AllowFalback)
8599

100+
endpointsInfo := make([]endpoint.Info, len(endpoints))
101+
for i, e := range endpoints {
102+
endpointsInfo[i] = e
103+
}
104+
86105
b.mu.WithLock(func() {
87106
b.connectionsState = state
107+
for _, onDiscovery := range b.onDiscovery {
108+
onDiscovery(ctx, endpointsInfo)
109+
}
88110
})
89111
}
90112

internal/balancer/ctx.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,6 @@ func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context {
1414
return context.WithValue(ctx, ctxEndpointKey{}, endpoint)
1515
}
1616

17-
type nodeID uint32
18-
19-
func (n nodeID) NodeID() uint32 {
20-
return uint32(n)
21-
}
22-
23-
func WithNodeID(ctx context.Context, id uint32) context.Context {
24-
return WithEndpoint(ctx, nodeID(id))
25-
}
26-
2717
func ContextEndpoint(ctx context.Context) (e Endpoint, ok bool) {
2818
if e, ok = ctx.Value(ctxEndpointKey{}).(Endpoint); ok {
2919
return e, true

internal/mock/conn.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ type Endpoint struct {
8282
LocalDCField bool
8383
}
8484

85+
func (e *Endpoint) Choose(bool) {
86+
}
87+
8588
func (e *Endpoint) NodeID() uint32 {
8689
return e.NodeIDField
8790
}

internal/table/client.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import (
44
"container/list"
55
"context"
66
"fmt"
7+
"sort"
78
"sync"
89
"sync/atomic"
910
"time"
1011

1112
"google.golang.org/grpc"
1213

14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1416
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -27,14 +29,20 @@ type sessionBuilderOption func(s *session)
2729
// sessionBuilder is the interface that holds logic of creating sessions.
2830
type sessionBuilder func(ctx context.Context, opts ...sessionBuilderOption) (*session, error)
2931

30-
func New(cc grpc.ClientConnInterface, config config.Config) *Client {
31-
return newClient(cc, func(ctx context.Context, opts ...sessionBuilderOption) (s *session, err error) {
32-
return newSession(ctx, cc, config, opts...)
32+
type balancerNotifier interface {
33+
grpc.ClientConnInterface
34+
35+
OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
36+
}
37+
38+
func New(balancer balancerNotifier, config config.Config) *Client {
39+
return newClient(balancer, func(ctx context.Context, opts ...sessionBuilderOption) (s *session, err error) {
40+
return newSession(ctx, balancer, config, opts...)
3341
}, config)
3442
}
3543

3644
func newClient(
37-
cc grpc.ClientConnInterface,
45+
balancer balancerNotifier,
3846
builder sessionBuilder,
3947
config config.Config,
4048
) *Client {
@@ -44,7 +52,7 @@ func newClient(
4452
)
4553
c := &Client{
4654
config: config,
47-
cc: cc,
55+
cc: balancer,
4856
build: builder,
4957
index: make(map[*session]sessionInfo),
5058
idle: list.New(),
@@ -58,6 +66,9 @@ func newClient(
5866
},
5967
done: make(chan struct{}),
6068
}
69+
if balancer != nil {
70+
balancer.OnDiscovery(c.onDiscovery)
71+
}
6172
if idleThreshold := config.IdleThreshold(); idleThreshold > 0 {
6273
c.spawnedGoroutines.Add(1)
6374
go c.internalPoolGC(ctx, idleThreshold)
@@ -107,6 +118,40 @@ func withCreateSessionOnClose(onClose func(s *session)) createSessionOption {
107118
}
108119
}
109120

121+
func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
122+
nodeIDs := make([]uint32, len(endpoints))
123+
for i, e := range endpoints {
124+
nodeIDs[i] = e.NodeID()
125+
}
126+
sort.Slice(nodeIDs, func(i, j int) bool {
127+
return nodeIDs[i] < nodeIDs[j]
128+
})
129+
c.mu.WithLock(func() {
130+
touched := make(map[*session]struct{}, len(c.index))
131+
for e := c.idle.Front(); e != nil; e = e.Next() {
132+
s := e.Value.(*session)
133+
nodeID := s.NodeID()
134+
if sort.Search(len(nodeIDs), func(i int) bool {
135+
return nodeIDs[i] >= nodeID
136+
}) == len(nodeIDs) {
137+
c.internalPoolAsyncCloseSession(ctx, s)
138+
}
139+
touched[s] = struct{}{}
140+
}
141+
for s := range c.index {
142+
if _, has := touched[s]; has {
143+
continue
144+
}
145+
nodeID := s.NodeID()
146+
if sort.Search(len(nodeIDs), func(i int) bool {
147+
return nodeIDs[i] >= nodeID
148+
}) == len(nodeIDs) {
149+
s.SetStatus(options.SessionClosing)
150+
}
151+
}
152+
})
153+
}
154+
110155
func (c *Client) createSession(ctx context.Context, opts ...createSessionOption) (s *session, err error) {
111156
if c.isClosed() {
112157
return nil, errClosedClient

internal/table/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,16 +880,16 @@ type StubBuilder struct {
880880

881881
func newClientWithStubBuilder(
882882
t testing.TB,
883-
cc grpc.ClientConnInterface,
883+
balancer balancerNotifier,
884884
stubLimit int,
885885
options ...config.Option,
886886
) *Client {
887887
return newClient(
888-
cc,
888+
balancer,
889889
(&StubBuilder{
890890
T: t,
891891
Limit: stubLimit,
892-
cc: cc,
892+
cc: balancer,
893893
}).createSession,
894894
config.New(options...),
895895
)

internal/table/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (s *session) Close(ctx context.Context) (err error) {
165165
onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, s)
166166

167167
_, err = s.tableService.DeleteSession(
168-
balancer.WithEndpoint(ctx, s),
168+
ctx,
169169
&Ydb_Table.DeleteSessionRequest{
170170
SessionId: s.id,
171171
OperationParams: operation.Params(ctx,

ratelimiter/ratelimiter.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ import (
44
"context"
55
"time"
66

7-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
87
"github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/options"
98
)
109

1110
type Client interface {
12-
closer.Closer
13-
1411
CreateResource(
1512
ctx context.Context,
1613
coordinationNodePath string,

0 commit comments

Comments
 (0)