Skip to content

Commit d76f26a

Browse files
committed
remove node session counting
1 parent 40c1c54 commit d76f26a

File tree

12 files changed

+45
-181
lines changed

12 files changed

+45
-181
lines changed

connection.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ydb
33
import (
44
"context"
55
"errors"
6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
76
"os"
87
"sync"
98

@@ -226,9 +225,6 @@ func (c *connection) Table() table.Client {
226225
)...,
227226
),
228227
)
229-
c.balancer.OnUpdateEndpoints(func(nodes []endpoint.Info) {
230-
c.table.UpdateNodes(nodes)
231-
})
232228
return c.table.Close
233229
})
234230
// may be nil if driver closed early

internal/balancer/balancer.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package balancer
33
import (
44
"context"
55
"fmt"
6+
67
"google.golang.org/grpc"
78

89
"github.com/ydb-platform/ydb-go-sdk/v3/config"
@@ -30,14 +31,6 @@ type Balancer struct {
3031

3132
mu xsync.RWMutex
3233
connectionsState *connectionsState
33-
34-
onUpdateEndpoints []func(endpoints []endpoint.Info)
35-
}
36-
37-
func (b *Balancer) OnUpdateEndpoints(cb func(nodes []endpoint.Info)) {
38-
b.mu.WithLock(func() {
39-
b.onUpdateEndpoints = append(b.onUpdateEndpoints, cb)
40-
})
4134
}
4235

4336
func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
@@ -92,13 +85,6 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
9285

9386
b.mu.WithLock(func() {
9487
b.connectionsState = state
95-
nodes := make([]endpoint.Info, len(endpoints))
96-
for i := range endpoints {
97-
nodes[i] = endpoints[i]
98-
}
99-
for _, cb := range b.onUpdateEndpoints {
100-
cb(nodes)
101-
}
10288
})
10389
}
10490

internal/balancer/ctx.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,14 @@ func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context {
1414
return context.WithValue(ctx, ctxEndpointKey{}, endpoint)
1515
}
1616

17-
type nodeIDer struct {
18-
nodeID uint32
19-
}
17+
type nodeID uint32
2018

21-
func (n nodeIDer) NodeID() uint32 {
22-
return n.nodeID
19+
func (n nodeID) NodeID() uint32 {
20+
return uint32(n)
2321
}
2422

25-
func WithNodeID(ctx context.Context, nodeID uint32) context.Context {
26-
return WithEndpoint(ctx, nodeIDer{nodeID: nodeID})
23+
func WithNodeID(ctx context.Context, id uint32) context.Context {
24+
return WithEndpoint(ctx, nodeID(id))
2725
}
2826

2927
func ContextEndpoint(ctx context.Context) (e Endpoint, ok bool) {

internal/table/client.go

Lines changed: 11 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"container/list"
55
"context"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
8-
"math"
9-
"sort"
107
"sync"
118
"sync/atomic"
129
"time"
@@ -49,14 +46,13 @@ func newClient(
4946
onDone = trace.TableOnInit(config.Trace(), &ctx)
5047
)
5148
c := &Client{
52-
config: config,
53-
cc: cc,
54-
build: builder,
55-
index: make(map[*session]sessionInfo),
56-
nodeLoading: make(map[uint32]int),
57-
idle: list.New(),
58-
waitq: list.New(),
59-
limit: config.SizeLimit(),
49+
config: config,
50+
cc: cc,
51+
build: builder,
52+
index: make(map[*session]sessionInfo),
53+
idle: list.New(),
54+
waitq: list.New(),
55+
limit: config.SizeLimit(),
6056
waitChPool: sync.Pool{
6157
New: func() interface{} {
6258
ch := make(chan *session)
@@ -83,11 +79,10 @@ type Client struct {
8379
cc grpc.ClientConnInterface
8480
config config.Config
8581
index map[*session]sessionInfo
86-
createInProgress int // KIKIMR-9163: in-create-process counter
87-
limit int // Upper bound for Client size.
88-
idle *list.List // list<table.session>
89-
waitq *list.List // list<*chan table.session>
90-
nodeLoading map[uint32]int
82+
createInProgress int // KIKIMR-9163: in-create-process counter
83+
limit int // Upper bound for Client size.
84+
idle *list.List // list<table.session>
85+
waitq *list.List // list<*chan table.session>
9186
keeperWake chan struct{} // Set by internalPoolKeeper.
9287
keeperStop chan struct{}
9388
keeperDone chan struct{}
@@ -127,8 +122,6 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
127122
defer cancel()
128123
}
129124

130-
createSessionCtx = balancer.WithNodeID(createSessionCtx, c.nextNodeID())
131-
132125
s, err = c.build(createSessionCtx)
133126

134127
select {
@@ -161,21 +154,6 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
161154
}
162155
}
163156
var s *session
164-
defer func() {
165-
if s != nil {
166-
c.mu.WithLock(func() {
167-
c.nodeLoading[s.NodeID()]++
168-
})
169-
s.onClose = append(s.onClose, func(s *session) {
170-
c.mu.WithLock(func() {
171-
nodeID := s.NodeID()
172-
if _, has := c.nodeLoading[nodeID]; has {
173-
c.nodeLoading[nodeID]--
174-
}
175-
})
176-
})
177-
}
178-
}()
179157
if !c.config.AutoRetry() {
180158
s, err = createSession(ctx)
181159
if err != nil {
@@ -219,28 +197,10 @@ func (c *Client) isClosed() bool {
219197
return atomic.LoadUint32(&c.closed) != 0
220198
}
221199

222-
func (c *Client) nextNodeID() (nodeID uint32) {
223-
nodeID = math.MaxUint32
224-
c.mu.WithLock(func() {
225-
min := math.MaxInt
226-
for id, count := range c.nodeLoading {
227-
if count < min {
228-
min = count
229-
nodeID = id
230-
}
231-
}
232-
fmt.Println("nextNodeID =", nodeID, "min =", min)
233-
})
234-
return nodeID
235-
}
236-
237200
// c.mu must NOT be held.
238201
func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err error) {
239202
defer func() {
240203
if s != nil {
241-
c.mu.WithLock(func() {
242-
c.nodeLoading[s.NodeID()]++
243-
})
244204
s.onClose = append(s.onClose, func(s *session) {
245205
c.spawnedGoroutines.Start("onClose", func(ctx context.Context) {
246206
c.mu.WithLock(func() {
@@ -259,11 +219,6 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
259219
if info.idle != nil {
260220
c.idle.Remove(info.idle)
261221
}
262-
263-
nodeID := s.NodeID()
264-
if _, has := c.nodeLoading[nodeID]; has {
265-
c.nodeLoading[nodeID]--
266-
}
267222
})
268223
})
269224
})
@@ -308,8 +263,6 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
308263
defer cancel()
309264
}
310265

311-
createSessionCtx = balancer.WithNodeID(createSessionCtx, c.nextNodeID())
312-
313266
s, err = c.build(createSessionCtx)
314267
if s == nil && err == nil {
315268
panic("ydb: abnormal result of session build")
@@ -321,8 +274,6 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
321274
c.index[s] = sessionInfo{}
322275
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
323276
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
324-
c.nodeLoading[s.NodeID()]++
325-
fmt.Printf("c.nodeLoading: %v", c.nodeLoading)
326277
}
327278
})
328279

@@ -356,28 +307,6 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
356307
}
357308
}
358309

359-
func (c *Client) UpdateNodes(nodes []endpoint.Info) {
360-
sort.Slice(nodes, func(i, j int) bool {
361-
return nodes[i].NodeID() < nodes[j].NodeID()
362-
})
363-
c.mu.WithLock(func() {
364-
for _, node := range nodes {
365-
nodeID := node.NodeID()
366-
if _, has := c.nodeLoading[nodeID]; !has {
367-
c.nodeLoading[nodeID] = 0
368-
}
369-
}
370-
nodeLoading := c.nodeLoading
371-
for nodeID := range nodeLoading {
372-
if sort.Search(len(nodes), func(i int) bool {
373-
return nodes[i].NodeID() <= nodeID
374-
}) < len(nodes) {
375-
delete(c.nodeLoading, nodeID)
376-
}
377-
}
378-
})
379-
}
380-
381310
type getOptions struct {
382311
t trace.Table
383312
}

0 commit comments

Comments
 (0)