Skip to content

Commit b83211c

Browse files
committed
add listening node status from session and close it early
1 parent fe0929c commit b83211c

File tree

5 files changed

+21
-11
lines changed

5 files changed

+21
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added listening node status from session and close session early if node not found in balancer
2+
13
## v3.37.5
24
* Refactoring of `xsql` errors checking
35

internal/balancer/connections_state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ func (s *connectionsState) preferConnection(ctx context.Context) conn.Conn {
7979
if e, hasPreferEndpoint := ContextEndpoint(ctx); hasPreferEndpoint {
8080
c := s.connByNodeID[e.NodeID()]
8181
if c != nil && isOkConnection(c, true) {
82+
e.Choose(true)
8283
return c
84+
} else {
85+
e.Choose(false)
8386
}
8487
}
8588

internal/balancer/ctx.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,15 @@ type (
88

99
type Endpoint interface {
1010
NodeID() uint32
11+
12+
// Choose calls from balancer if this endpoint to request chosen or not
13+
Choose(chosen bool)
1114
}
1215

1316
func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context {
1417
return context.WithValue(ctx, ctxEndpointKey{}, endpoint)
1518
}
1619

17-
type nodeID uint32
18-
19-
func (n nodeID) NodeID() uint32 {
20-
return uint32(n)
21-
}
22-
23-
func WithNodeID(ctx context.Context, id uint32) context.Context {
24-
return WithEndpoint(ctx, nodeID(id))
25-
}
26-
2720
func ContextEndpoint(ctx context.Context) (e Endpoint, ok bool) {
2821
if e, ok = ctx.Value(ctxEndpointKey{}).(Endpoint); ok {
2922
return e, true

internal/mock/conn.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ type Endpoint struct {
8282
LocalDCField bool
8383
}
8484

85+
func (e *Endpoint) Choose(bool) {
86+
}
87+
8588
func (e *Endpoint) NodeID() uint32 {
8689
return e.NodeIDField
8790
}

internal/table/session.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ type session struct {
5353
closeOnce sync.Once
5454
}
5555

56+
func (s *session) Choose(chosen bool) {
57+
if s == nil {
58+
return
59+
}
60+
if !chosen {
61+
s.SetStatus(options.SessionClosing)
62+
}
63+
}
64+
5665
func (s *session) NodeID() uint32 {
5766
if s == nil {
5867
return 0
@@ -165,7 +174,7 @@ func (s *session) Close(ctx context.Context) (err error) {
165174
onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, s)
166175

167176
_, err = s.tableService.DeleteSession(
168-
balancer.WithEndpoint(ctx, s),
177+
ctx,
169178
&Ydb_Table.DeleteSessionRequest{
170179
SessionId: s.id,
171180
OperationParams: operation.Params(ctx,

0 commit comments

Comments
 (0)