Skip to content

Commit 57d10e2

Browse files
committed
simplified code
1 parent c38d148 commit 57d10e2

File tree

9 files changed

+74
-73
lines changed

9 files changed

+74
-73
lines changed

context.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
89
)
910

@@ -22,5 +23,5 @@ func WithOperationCancelAfter(ctx context.Context, operationCancelAfter time.Dur
2223

2324
// WithPreferredNodeID allows to set preferred node to get session from
2425
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
25-
return operation.WithPreferredNodeID(ctx, nodeID)
26+
return endpoint.WithNodeID(ctx, nodeID)
2627
}

internal/conn/middleware.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55

66
"google.golang.org/grpc"
7-
8-
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
97
)
108

119
var _ grpc.ClientConnInterface = (*middleware)(nil)
@@ -32,16 +30,6 @@ func (m *middleware) NewStream(
3230
return m.newStream(ctx, desc, method, opts...)
3331
}
3432

35-
func ModifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface {
36-
if nodeID != 0 {
37-
return WithContextModifier(cc, func(ctx context.Context) context.Context {
38-
return balancerContext.WithNodeID(ctx, nodeID)
39-
})
40-
}
41-
42-
return cc
43-
}
44-
4533
func WithContextModifier(
4634
cc grpc.ClientConnInterface,
4735
modifyCtx func(ctx context.Context) context.Context,

internal/operation/context.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
type (
99
ctxOperationTimeoutKey struct{}
1010
ctxOperationCancelAfterKey struct{}
11-
ctxWithPreferredNodeIDKey struct{}
1211
)
1312

1413
// WithTimeout returns a copy of parent context in which YDB operation timeout
@@ -34,10 +33,6 @@ func WithCancelAfter(ctx context.Context, operationCancelAfter time.Duration) co
3433
return context.WithValue(ctx, ctxOperationCancelAfterKey{}, operationCancelAfter)
3534
}
3635

37-
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
38-
return context.WithValue(ctx, ctxWithPreferredNodeIDKey{}, nodeID)
39-
}
40-
4136
// ctxTimeout returns the timeout within given context after which
4237
// YDB should try to cancel operation and return result regardless of the cancelation.
4338
func ctxTimeout(ctx context.Context) (d time.Duration, ok bool) {
@@ -62,16 +57,3 @@ func ctxUntilDeadline(ctx context.Context) (time.Duration, bool) {
6257

6358
return 0, false
6459
}
65-
66-
func CtxPreferredNodeID(ctx context.Context) uint32 {
67-
x := ctx.Value(ctxWithPreferredNodeIDKey{})
68-
if x == nil {
69-
return 0
70-
}
71-
val, ok := x.(uint32)
72-
if !ok {
73-
return 0
74-
}
75-
76-
return val
77-
}

internal/pool/pool.go

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
"github.com/jonboulle/clockwork"
1010

11-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -32,7 +32,7 @@ type (
3232
clock clockwork.Clock
3333
limit int
3434
createTimeout time.Duration
35-
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
35+
createItem func(ctx context.Context) (PT, error)
3636
closeTimeout time.Duration
3737
closeItem func(ctx context.Context, item PT)
3838
idleTimeToLive time.Duration
@@ -65,7 +65,7 @@ type (
6565
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
6666
)
6767

68-
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(context.Context, uint32) (PT, error)) Option[PT, T] {
68+
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(context.Context) (PT, error)) Option[PT, T] {
6969
return func(c *Config[PT, T]) {
7070
c.createItem = f
7171
}
@@ -175,7 +175,7 @@ func New[PT ItemConstraint[T], T any](
175175
}
176176

177177
// defaultCreateItem returns a new item
178-
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context, uint32) (PT, error) {
178+
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
179179
var item T
180180

181181
return &item, nil
@@ -224,7 +224,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
224224
defer cancelCreate()
225225
}
226226

227-
newItem, err := p.config.createItem(createCtx, preferredNodeID)
227+
newItem, err := p.config.createItem(createCtx)
228228
if newItem != nil {
229229
p.mu.WithLock(func() {
230230
var useCounter uint64
@@ -462,12 +462,25 @@ func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic
462462
}
463463

464464
// p.mu must be held.
465-
func (p *Pool[PT, T]) peekFirstIdle(preferredNodeID uint32) (item PT, touched time.Time) {
465+
func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
466466
el := p.idle.Front()
467-
if preferredNodeID != 0 {
468-
for el != nil && el.Value.NodeID() != preferredNodeID {
469-
el = el.Next()
470-
}
467+
if el == nil {
468+
return
469+
}
470+
item = el.Value
471+
info, has := p.index[item]
472+
if !has || el != info.idle {
473+
panic(fmt.Sprintf("inconsistent index: (%v, %+v, %+v)", has, el, info.idle))
474+
}
475+
476+
return item, info.lastUsage
477+
}
478+
479+
// p.mu must be held.
480+
func (p *Pool[PT, T]) peekFirstIdleByNodeID(nodeID uint32) (item PT, touched time.Time) {
481+
el := p.idle.Front()
482+
for el != nil && el.Value.NodeID() != nodeID {
483+
el = el.Next()
471484
}
472485
if el == nil {
473486
return
@@ -485,8 +498,22 @@ func (p *Pool[PT, T]) peekFirstIdle(preferredNodeID uint32) (item PT, touched ti
485498
// to prevent session from dying in the internalPoolGC after it was returned
486499
// to be used only in outgoing functions that make session busy.
487500
// p.mu must be held.
488-
func (p *Pool[PT, T]) removeFirstIdle(preferredNodeID uint32) PT {
489-
idle, _ := p.peekFirstIdle(preferredNodeID)
501+
func (p *Pool[PT, T]) removeFirstIdle() PT {
502+
idle, _ := p.peekFirstIdle()
503+
if idle != nil {
504+
info := p.removeIdle(idle)
505+
p.index[idle] = info
506+
}
507+
508+
return idle
509+
}
510+
511+
// removes first session from idle and resets the keepAliveCount
512+
// to prevent session from dying in the internalPoolGC after it was returned
513+
// to be used only in outgoing functions that make session busy.
514+
// p.mu must be held.
515+
func (p *Pool[PT, T]) removeIdleByNodeID(nodeID uint32) PT {
516+
idle, _ := p.peekFirstIdleByNodeID(nodeID)
490517
if idle != nil {
491518
info := p.removeIdle(idle)
492519
p.index[idle] = info
@@ -592,7 +619,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
592619
}
593620
}
594621

595-
preferredNodeID := operation.CtxPreferredNodeID(ctx)
622+
preferredNodeID, hasPreferredNodeID := endpoint.ContextNodeID(ctx)
596623

597624
for ; attempt < maxAttempts; attempt++ {
598625
select {
@@ -602,7 +629,11 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
602629
}
603630

604631
if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
605-
return p.removeFirstIdle(preferredNodeID)
632+
if hasPreferredNodeID {
633+
return p.removeIdleByNodeID(preferredNodeID)
634+
}
635+
636+
return p.removeFirstIdle()
606637
}); item != nil {
607638
if item.IsAlive() {
608639
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {

internal/pool/pool_test.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
grpcStatus "google.golang.org/grpc/status"
2121

2222
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
23-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
23+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
2424
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2525
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2626
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -146,7 +146,7 @@ func caller() string {
146146
}
147147

148148
func mustGetItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T], nodeID uint32) PT {
149-
s, err := p.getItem(operation.WithPreferredNodeID(context.Background(), nodeID))
149+
s, err := p.getItem(endpoint.WithNodeID(context.Background(), nodeID))
150150
if err != nil {
151151
t.Helper()
152152
t.Fatalf("%s: %v", caller(), err)
@@ -184,12 +184,11 @@ func TestPool(t *testing.T) { //nolint:gocyclo
184184
require.NoError(t, err)
185185
})
186186
t.Run("RequireNodeIdFromPool", func(t *testing.T) {
187-
var nextNodeID uint32
188-
nextNodeID = 0
187+
nextNodeID := uint32(0)
189188
var newSessionCalled uint32
190189
p := New[*testItem, testItem](rootCtx,
191190
WithTrace[*testItem, testItem](defaultTrace),
192-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
191+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
193192
newSessionCalled++
194193
var (
195194
nodeID = nextNodeID
@@ -260,11 +259,13 @@ func TestPool(t *testing.T) { //nolint:gocyclo
260259
var newSessionCalled uint32
261260
p := New[*testItem, testItem](rootCtx,
262261
WithTrace[*testItem, testItem](defaultTrace),
263-
WithCreateItemFunc(func(_ context.Context, nodeID uint32) (*testItem, error) {
262+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
264263
newSessionCalled++
265264
v := testItem{
266265
v: 0,
267266
onNodeID: func() uint32 {
267+
nodeID, _ := endpoint.ContextNodeID(ctx)
268+
268269
return nodeID
269270
},
270271
}
@@ -297,7 +298,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
297298
WithItemUsageLimit[*testItem, testItem](5),
298299
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
299300
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
300-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
301+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
301302
atomic.AddInt64(&newCounter, 1)
302303

303304
var v testItem
@@ -321,7 +322,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
321322
var newCounter int64
322323
p := New(rootCtx,
323324
WithLimit[*testItem, testItem](1),
324-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
325+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
325326
atomic.AddInt64(&newCounter, 1)
326327
var v testItem
327328

@@ -355,7 +356,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
355356
WithLimit[*testItem, testItem](3),
356357
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
357358
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
358-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
359+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
359360
var (
360361
idx = created.Add(1) - 1
361362
v = testItem{
@@ -518,7 +519,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
518519
p := New[*testItem, testItem](rootCtx,
519520
WithLimit[*testItem, testItem](2),
520521
WithCreateItemTimeout[*testItem, testItem](0),
521-
WithCreateItemFunc[*testItem, testItem](func(ctx context.Context, _ uint32) (*testItem, error) {
522+
WithCreateItemFunc[*testItem, testItem](func(ctx context.Context) (*testItem, error) {
522523
v := testItem{
523524
v: 0,
524525
onClose: func() error {
@@ -598,7 +599,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
598599
p := New(rootCtx,
599600
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
600601
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
601-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
602+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
602603
atomic.AddInt64(&counter, 1)
603604

604605
if atomic.LoadInt64(&counter) < 10 {
@@ -621,7 +622,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
621622
p := New(rootCtx,
622623
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
623624
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
624-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
625+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
625626
atomic.AddInt64(&counter, 1)
626627

627628
if atomic.LoadInt64(&counter) < 10 {
@@ -645,7 +646,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
645646
p := New(rootCtx,
646647
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
647648
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
648-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
649+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
649650
atomic.AddInt64(&counter, 1)
650651

651652
if atomic.LoadInt64(&counter) < 10 {
@@ -668,7 +669,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
668669
p := New(rootCtx,
669670
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
670671
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
671-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
672+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
672673
atomic.AddInt64(&counter, 1)
673674

674675
if atomic.LoadInt64(&counter) < 10 {
@@ -816,7 +817,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
816817
)
817818
p := New(rootCtx,
818819
WithLimit[*testItem, testItem](1),
819-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
820+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
820821
atomic.AddInt64(&createCounter, 1)
821822

822823
v := &testItem{
@@ -853,7 +854,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
853854
WithLimit[*testItem, testItem](1),
854855
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
855856
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
856-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
857+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
857858
newItems.Add(1)
858859

859860
v := &testItem{
@@ -914,7 +915,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
914915
WithLimit[*testItem, testItem](1),
915916
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
916917
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
917-
WithCreateItemFunc(func(context.Context, uint32) (*testItem, error) {
918+
WithCreateItemFunc(func(ctx context.Context) (*testItem, error) {
918919
created.Add(1)
919920
v := testItem{
920921
v: 0,

internal/query/client.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
14-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1514
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1615
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1716
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
@@ -563,7 +562,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
563562
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
564563
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
565564
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
566-
pool.WithCreateItemFunc(func(ctx context.Context, nodeID uint32) (_ *Session, err error) {
565+
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
567566
var (
568567
createCtx context.Context
569568
cancelCreate context.CancelFunc
@@ -576,7 +575,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
576575
defer cancelCreate()
577576

578577
s, err := createSession(createCtx, client,
579-
session.WithConn(conn.ModifyConn(cc, nodeID)),
578+
session.WithConn(cc),
580579
session.WithDeleteTimeout(cfg.SessionDeleteTimeout()),
581580
session.WithTrace(cfg.Trace()),
582581
)

internal/query/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1590,7 +1590,7 @@ func testPool(
15901590
) *pool.Pool[*Session, Session] {
15911591
return pool.New[*Session, Session](ctx,
15921592
pool.WithLimit[*Session, Session](1),
1593-
pool.WithCreateItemFunc(func(ctx context.Context, _ uint32) (*Session, error) {
1593+
pool.WithCreateItemFunc(func(ctx context.Context) (*Session, error) {
15941594
return createSession(ctx)
15951595
}),
15961596
pool.WithSyncCloseItem[*Session, Session](),

0 commit comments

Comments
 (0)