Skip to content

Commit 7682668

Browse files
committed
* Added to balancer notifying mechanism for listening in table client event about removing some nodes and closing sessions on them
* Removed from public client interfaces `closer.Closer` (for exclude undefined behaviour on client-side)
1 parent b83211c commit 7682668

File tree

15 files changed

+81
-52
lines changed

15 files changed

+81
-52
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
* Added listening node status from session and close session early if node not found in balancer
1+
* Added to balancer notifying mechanism for listening in table client event about removing some nodes and closing sessions on them
2+
* Removed from public client interfaces `closer.Closer` (for exclude undefined behaviour on client-side)
23

34
## v3.37.5
45
* Refactoring of `xsql` errors checking

coordination/coordination.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@ package coordination
33
import (
44
"context"
55

6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
76
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
87
)
98

109
type Client interface {
11-
closer.Closer
12-
1310
CreateNode(ctx context.Context, path string, config NodeConfig) (err error)
1411
AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
1512
DropNode(ctx context.Context, path string) (err error)

discovery/discovery.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
98
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
109
)
1110

@@ -19,8 +18,6 @@ func (w WhoAmI) String() string {
1918
}
2019

2120
type Client interface {
22-
closer.Closer
23-
2421
Discover(ctx context.Context) ([]endpoint.Endpoint, error)
2522
WhoAmI(ctx context.Context) (*WhoAmI, error)
2623
}

internal/balancer/balancer.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1010
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
1111
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1314
discoveryBuilder "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery"
1415
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
@@ -21,16 +22,29 @@ import (
2122

2223
var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
2324

25+
type discoveryClient interface {
26+
closer.Closer
27+
discovery.Client
28+
}
29+
2430
type Balancer struct {
2531
driverConfig config.Config
2632
balancerConfig balancerConfig.Config
2733
pool *conn.Pool
28-
discovery discovery.Client
34+
discovery discoveryClient
2935
discoveryRepeater repeater.Repeater
3036
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)
3137

3238
mu xsync.RWMutex
3339
connectionsState *connectionsState
40+
41+
onDiscovery []func(ctx context.Context, endpoints []endpoint.Info)
42+
}
43+
44+
func (b *Balancer) OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
45+
b.mu.WithLock(func() {
46+
b.onDiscovery = append(b.onDiscovery, onDiscovery)
47+
})
3448
}
3549

3650
func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
@@ -83,8 +97,16 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
8397
info := balancerConfig.Info{SelfLocation: localDC}
8498
state := newConnectionsState(connections, b.balancerConfig.IsPreferConn, info, b.balancerConfig.AllowFalback)
8599

100+
endpointsInfo := make([]endpoint.Info, len(endpoints))
101+
for i, e := range endpoints {
102+
endpointsInfo[i] = e
103+
}
104+
86105
b.mu.WithLock(func() {
87106
b.connectionsState = state
107+
for _, onDiscovery := range b.onDiscovery {
108+
onDiscovery(ctx, endpointsInfo)
109+
}
88110
})
89111
}
90112

internal/balancer/connections_state.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,7 @@ func (s *connectionsState) preferConnection(ctx context.Context) conn.Conn {
7979
if e, hasPreferEndpoint := ContextEndpoint(ctx); hasPreferEndpoint {
8080
c := s.connByNodeID[e.NodeID()]
8181
if c != nil && isOkConnection(c, true) {
82-
e.Choose(true)
8382
return c
84-
} else {
85-
e.Choose(false)
8683
}
8784
}
8885

internal/balancer/ctx.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ type (
88

99
type Endpoint interface {
1010
NodeID() uint32
11-
12-
// Choose calls from balancer if this endpoint to request chosen or not
13-
Choose(chosen bool)
1411
}
1512

1613
func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context {

internal/table/client.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import (
44
"container/list"
55
"context"
66
"fmt"
7+
"sort"
78
"sync"
89
"sync/atomic"
910
"time"
1011

1112
"google.golang.org/grpc"
1213

14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1416
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -27,14 +29,20 @@ type sessionBuilderOption func(s *session)
2729
// sessionBuilder is the interface that holds logic of creating sessions.
2830
type sessionBuilder func(ctx context.Context, opts ...sessionBuilderOption) (*session, error)
2931

30-
func New(cc grpc.ClientConnInterface, config config.Config) *Client {
31-
return newClient(cc, func(ctx context.Context, opts ...sessionBuilderOption) (s *session, err error) {
32-
return newSession(ctx, cc, config, opts...)
32+
type balancerNotifier interface {
33+
grpc.ClientConnInterface
34+
35+
OnDiscovery(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
36+
}
37+
38+
func New(balancer balancerNotifier, config config.Config) *Client {
39+
return newClient(balancer, func(ctx context.Context, opts ...sessionBuilderOption) (s *session, err error) {
40+
return newSession(ctx, balancer, config, opts...)
3341
}, config)
3442
}
3543

3644
func newClient(
37-
cc grpc.ClientConnInterface,
45+
balancer balancerNotifier,
3846
builder sessionBuilder,
3947
config config.Config,
4048
) *Client {
@@ -44,7 +52,7 @@ func newClient(
4452
)
4553
c := &Client{
4654
config: config,
47-
cc: cc,
55+
cc: balancer,
4856
build: builder,
4957
index: make(map[*session]sessionInfo),
5058
idle: list.New(),
@@ -58,6 +66,7 @@ func newClient(
5866
},
5967
done: make(chan struct{}),
6068
}
69+
balancer.OnDiscovery(c.onDiscovery)
6170
if idleThreshold := config.IdleThreshold(); idleThreshold > 0 {
6271
c.spawnedGoroutines.Add(1)
6372
go c.internalPoolGC(ctx, idleThreshold)
@@ -107,6 +116,40 @@ func withCreateSessionOnClose(onClose func(s *session)) createSessionOption {
107116
}
108117
}
109118

119+
func (c *Client) onDiscovery(ctx context.Context, endpoints []endpoint.Info) {
120+
nodeIDs := make([]uint32, len(endpoints))
121+
for i, e := range endpoints {
122+
nodeIDs[i] = e.NodeID()
123+
}
124+
sort.Slice(nodeIDs, func(i, j int) bool {
125+
return nodeIDs[i] < nodeIDs[j]
126+
})
127+
c.mu.WithLock(func() {
128+
touched := make(map[*session]struct{}, len(c.index))
129+
for e := c.idle.Front(); e != nil; e = e.Next() {
130+
s := e.Value.(*session)
131+
nodeID := s.NodeID()
132+
if sort.Search(len(nodeIDs), func(i int) bool {
133+
return nodeIDs[i] >= nodeID
134+
}) == len(nodeIDs) {
135+
c.internalPoolAsyncCloseSession(ctx, s)
136+
}
137+
touched[s] = struct{}{}
138+
}
139+
for s := range c.index {
140+
if _, has := touched[s]; has {
141+
continue
142+
}
143+
nodeID := s.NodeID()
144+
if sort.Search(len(nodeIDs), func(i int) bool {
145+
return nodeIDs[i] >= nodeID
146+
}) == len(nodeIDs) {
147+
s.SetStatus(options.SessionClosing)
148+
}
149+
}
150+
})
151+
}
152+
110153
func (c *Client) createSession(ctx context.Context, opts ...createSessionOption) (s *session, err error) {
111154
if c.isClosed() {
112155
return nil, errClosedClient

internal/table/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,16 +880,16 @@ type StubBuilder struct {
880880

881881
func newClientWithStubBuilder(
882882
t testing.TB,
883-
cc grpc.ClientConnInterface,
883+
balancer balancerNotifier,
884884
stubLimit int,
885885
options ...config.Option,
886886
) *Client {
887887
return newClient(
888-
cc,
888+
balancer,
889889
(&StubBuilder{
890890
T: t,
891891
Limit: stubLimit,
892-
cc: cc,
892+
cc: balancer,
893893
}).createSession,
894894
config.New(options...),
895895
)

internal/table/session.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,6 @@ type session struct {
5353
closeOnce sync.Once
5454
}
5555

56-
func (s *session) Choose(chosen bool) {
57-
if s == nil {
58-
return
59-
}
60-
if !chosen {
61-
s.SetStatus(options.SessionClosing)
62-
}
63-
}
64-
6556
func (s *session) NodeID() uint32 {
6657
if s == nil {
6758
return 0

ratelimiter/ratelimiter.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ import (
44
"context"
55
"time"
66

7-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
87
"github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/options"
98
)
109

1110
type Client interface {
12-
closer.Closer
13-
1411
CreateResource(
1512
ctx context.Context,
1613
coordinationNodePath string,

0 commit comments

Comments
 (0)