Skip to content

Commit eec5c11

Browse files
authored
Merge pull request #709 from ydb-platform/bugfix-put-session
fix bug with no put session before second discovery
2 parents a4bfdff + b1dc8fc commit eec5c11

File tree

6 files changed

+27
-5
lines changed

6 files changed

+27
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed bug with returning session into pool before second re-discovery
2+
13
## v3.44.0
24
* Added `table/options.WithCallOptions` options for append custom grpc call options into `session.{BulkUpsert,Execute,StreamExecuteScanQuery}`
35
* Supported fake transactions in `database/sql` driver over connector option `ydb.WithFakeTx(queryMode)` and connection string param `go_fake_tx`

connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ func (c *Driver) Table() table.Client {
206206
c.tableOptions...,
207207
)...,
208208
),
209+
c.balancer.Nodes(),
209210
)
210211
return c.table.Close
211212
})

internal/balancer/balancer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ type Balancer struct {
4343
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
4444
}
4545

46+
func (b *Balancer) Nodes() (nodes []uint32) {
47+
b.mu.WithLock(func() {
48+
nodes = make([]uint32, 0, len(b.connectionsState.connByNodeID))
49+
for nodeID := range b.connectionsState.connByNodeID {
50+
nodes = append(nodes, nodeID)
51+
}
52+
})
53+
return nodes
54+
}
55+
4656
func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, endpoints []endpoint.Info)) {
4757
b.mu.WithLock(func() {
4858
b.onApplyDiscoveredEndpoints = append(b.onApplyDiscoveredEndpoints, onApplyDiscoveredEndpoints)

internal/table/client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,17 @@ type balancerNotifier interface {
3434
OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info))
3535
}
3636

37-
func New(balancer balancerNotifier, config config.Config) *Client {
37+
func New(balancer balancerNotifier, config config.Config, knownNodeIDs []uint32) *Client {
3838
return newClient(balancer, func(ctx context.Context, opts ...sessionBuilderOption) (s *session, err error) {
3939
return newSession(ctx, balancer, config, opts...)
40-
}, config)
40+
}, config, knownNodeIDs)
4141
}
4242

4343
func newClient(
4444
balancer balancerNotifier,
4545
builder sessionBuilder,
4646
config config.Config,
47+
knownNodeIDs []uint32,
4748
) *Client {
4849
var (
4950
ctx = context.Background()
@@ -55,7 +56,7 @@ func newClient(
5556
cc: balancer,
5657
build: builder,
5758
index: make(map[*session]sessionInfo),
58-
nodes: make(map[uint32]map[*session]struct{}),
59+
nodes: make(map[uint32]map[*session]struct{}, len(knownNodeIDs)*2),
5960
idle: list.New(),
6061
waitQ: list.New(),
6162
limit: config.SizeLimit(),
@@ -67,6 +68,9 @@ func newClient(
6768
},
6869
done: make(chan struct{}),
6970
}
71+
for _, nodeID := range knownNodeIDs {
72+
c.nodes[nodeID] = make(map[*session]struct{})
73+
}
7074
if balancer != nil {
7175
balancer.OnUpdate(c.updateNodes)
7276
}
@@ -268,6 +272,8 @@ func (c *Client) appendSessionToNodes(s *session) {
268272
nodeID := s.NodeID()
269273
if _, has := c.nodes[nodeID]; has {
270274
c.nodes[nodeID][s] = struct{}{}
275+
} else {
276+
fmt.Println("")
271277
}
272278
})
273279
}
@@ -286,8 +292,6 @@ func (c *Client) removeSessionFromNodes(s *session) {
286292
delete(sessions, s)
287293
if len(sessions) == 0 {
288294
delete(c.nodes, nodeID)
289-
} else {
290-
c.nodes[nodeID] = sessions
291295
}
292296
}
293297
})

internal/table/client_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ func TestSessionPoolRacyGet(t *testing.T) {
421421
config.WithSizeLimit(1),
422422
config.WithIdleThreshold(-1),
423423
),
424+
nil,
424425
)
425426
var (
426427
expSession *session
@@ -887,6 +888,7 @@ func newClientWithStubBuilder(
887888
cc: balancer,
888889
}).createSession,
889890
config.New(options...),
891+
nil,
890892
)
891893
}
892894

internal/table/session_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) {
378378
),
379379
),
380380
config.New(),
381+
nil,
381382
)
382383
ctx, cancel := context.WithTimeout(
383384
context.Background(),
@@ -470,6 +471,7 @@ func TestCreateTableRegression(t *testing.T) {
470471
),
471472
),
472473
config.New(),
474+
nil,
473475
)
474476

475477
ctx, cancel := context.WithTimeout(
@@ -564,6 +566,7 @@ func TestDescribeTableRegression(t *testing.T) {
564566
),
565567
),
566568
config.New(),
569+
nil,
567570
)
568571

569572
ctx, cancel := context.WithTimeout(

0 commit comments

Comments
 (0)