Skip to content

Commit bdc3ac2

Browse files
committed
Allow to set preferred node id to execute query
1 parent 4d80170 commit bdc3ac2

File tree

10 files changed

+213
-50
lines changed

10 files changed

+213
-50
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Allow to set preferred node id to execute query
2+
13
## v3.90.1
24
* Small broken change: added method `ID()` into `spans.Span` interface (need to implement in adapter)
35
* Fixed traceparent header for tracing grpc requests

context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,8 @@ func WithOperationTimeout(ctx context.Context, operationTimeout time.Duration) c
1919
func WithOperationCancelAfter(ctx context.Context, operationCancelAfter time.Duration) context.Context {
2020
return operation.WithCancelAfter(ctx, operationCancelAfter)
2121
}
22+
23+
// WithPreferredNodeID allows to set preferred node to get session from
24+
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
25+
return operation.WithPreferredNodeID(ctx, nodeID)
26+
}

internal/conn/middleware.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55

66
"google.golang.org/grpc"
7+
8+
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
79
)
810

911
var _ grpc.ClientConnInterface = (*middleware)(nil)
@@ -30,6 +32,16 @@ func (m *middleware) NewStream(
3032
return m.newStream(ctx, desc, method, opts...)
3133
}
3234

35+
func ModifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface {
36+
if nodeID != 0 {
37+
return WithContextModifier(cc, func(ctx context.Context) context.Context {
38+
return balancerContext.WithNodeID(ctx, nodeID)
39+
})
40+
}
41+
42+
return cc
43+
}
44+
3345
func WithContextModifier(
3446
cc grpc.ClientConnInterface,
3547
modifyCtx func(ctx context.Context) context.Context,

internal/operation/context.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
type (
99
ctxOperationTimeoutKey struct{}
1010
ctxOperationCancelAfterKey struct{}
11+
ctxWithPreferredNodeIDKey struct{}
1112
)
1213

1314
// WithTimeout returns a copy of parent context in which YDB operation timeout
@@ -33,6 +34,10 @@ func WithCancelAfter(ctx context.Context, operationCancelAfter time.Duration) co
3334
return context.WithValue(ctx, ctxOperationCancelAfterKey{}, operationCancelAfter)
3435
}
3536

37+
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
38+
return context.WithValue(ctx, ctxWithPreferredNodeIDKey{}, nodeID)
39+
}
40+
3641
// ctxTimeout returns the timeout within given context after which
3742
// YDB should try to cancel operation and return result regardless of the cancelation.
3843
func ctxTimeout(ctx context.Context) (d time.Duration, ok bool) {
@@ -57,3 +62,16 @@ func ctxUntilDeadline(ctx context.Context) (time.Duration, bool) {
5762

5863
return 0, false
5964
}
65+
66+
func CtxPreferredNodeID(ctx context.Context) uint32 {
67+
x := ctx.Value(ctxWithPreferredNodeIDKey{})
68+
if x == nil {
69+
return 0
70+
}
71+
val, ok := x.(uint32)
72+
if !ok {
73+
return 0
74+
}
75+
76+
return val
77+
}

internal/pool/pool.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/jonboulle/clockwork"
1010

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -20,6 +21,7 @@ type (
2021
Item interface {
2122
IsAlive() bool
2223
Close(ctx context.Context) error
24+
NodeID() uint32
2325
}
2426
ItemConstraint[T any] interface {
2527
*T
@@ -30,7 +32,7 @@ type (
3032
clock clockwork.Clock
3133
limit int
3234
createTimeout time.Duration
33-
createItem func(ctx context.Context) (PT, error)
35+
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
3436
closeTimeout time.Duration
3537
closeItem func(ctx context.Context, item PT)
3638
idleTimeToLive time.Duration
@@ -48,7 +50,7 @@ type (
4850
Pool[PT ItemConstraint[T], T any] struct {
4951
config Config[PT, T]
5052

51-
createItem func(ctx context.Context) (PT, error)
53+
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
5254
closeItem func(ctx context.Context, item PT)
5355

5456
mu xsync.RWMutex
@@ -63,7 +65,7 @@ type (
6365
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
6466
)
6567

66-
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
68+
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(context.Context, uint32) (PT, error)) Option[PT, T] {
6769
return func(c *Config[PT, T]) {
6870
c.createItem = f
6971
}
@@ -173,7 +175,7 @@ func New[PT ItemConstraint[T], T any](
173175
}
174176

175177
// defaultCreateItem returns a new item
176-
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
178+
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context, uint32) (PT, error) {
177179
var item T
178180

179181
return &item, nil
@@ -182,8 +184,8 @@ func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error)
182184
// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
183185
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
184186
p *Pool[PT, T],
185-
) func(ctx context.Context) (PT, error) {
186-
return func(ctx context.Context) (PT, error) {
187+
) func(ctx context.Context, preferredNodeID uint32) (PT, error) {
188+
return func(ctx context.Context, preferredNodeID uint32) (PT, error) {
187189
if !xsync.WithLock(&p.mu, func() bool {
188190
if len(p.index)+p.createInProgress < p.config.limit {
189191
p.createInProgress++
@@ -222,7 +224,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
222224
defer cancelCreate()
223225
}
224226

225-
newItem, err := p.config.createItem(createCtx)
227+
newItem, err := p.config.createItem(createCtx, preferredNodeID)
226228
if newItem != nil {
227229
p.mu.WithLock(func() {
228230
var useCounter uint64
@@ -314,7 +316,7 @@ func (p *Pool[PT, T]) changeState(changeState func() Stats) {
314316
}
315317
}
316318

317-
func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
319+
func (p *Pool[PT, T]) try(ctx context.Context, f func(context.Context, PT) error) (finalErr error) {
318320
if onTry := p.config.trace.OnTry; onTry != nil {
319321
onDone := onTry(&ctx,
320322
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
@@ -460,8 +462,13 @@ func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic
460462
}
461463

462464
// p.mu must be held.
463-
func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
465+
func (p *Pool[PT, T]) peekFirstIdle(preferredNodeID uint32) (item PT, touched time.Time) {
464466
el := p.idle.Front()
467+
if preferredNodeID != 0 {
468+
for el != nil && el.Value.NodeID() != preferredNodeID {
469+
el = el.Next()
470+
}
471+
}
465472
if el == nil {
466473
return
467474
}
@@ -478,8 +485,8 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
478485
// to prevent session from dying in the internalPoolGC after it was returned
479486
// to be used only in outgoing functions that make session busy.
480487
// p.mu must be held.
481-
func (p *Pool[PT, T]) removeFirstIdle() PT {
482-
idle, _ := p.peekFirstIdle()
488+
func (p *Pool[PT, T]) removeFirstIdle(preferredNodeID uint32) PT {
489+
idle, _ := p.peekFirstIdle(preferredNodeID)
483490
if idle != nil {
484491
info := p.removeIdle(idle)
485492
p.index[idle] = info
@@ -585,6 +592,8 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
585592
}
586593
}
587594

595+
preferredNodeID := operation.CtxPreferredNodeID(ctx)
596+
588597
for ; attempt < maxAttempts; attempt++ {
589598
select {
590599
case <-p.done:
@@ -593,7 +602,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
593602
}
594603

595604
if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
596-
return p.removeFirstIdle()
605+
return p.removeFirstIdle(preferredNodeID)
597606
}); item != nil {
598607
if item.IsAlive() {
599608
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {
@@ -625,7 +634,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
625634
}
626635
}
627636

628-
item, err := p.createItem(ctx)
637+
item, err := p.createItem(ctx, preferredNodeID)
629638
if item != nil {
630639
return item, nil
631640
}

0 commit comments

Comments
 (0)