Skip to content

Commit 28e0006

Browse files
committed
* Returned safe-thread copy of endpoint.Endpoint to trace callbacks
* Checked cluster close/empty on keeper goroutine
1 parent baf2707 commit 28e0006

File tree

7 files changed

+64
-11
lines changed

7 files changed

+64
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
## 3.11.8
22
* Added `trace.EndpointInfo.LastUpdated()` timestamp
33
* Refactored `endpoint.Endpoint` (split to struct `endopint` and interface `Endpoint`)
4+
* Returned safe-thread copy of `endpoint.Endpoint` to trace callbacks
45
* Added `endpoint.Endpoint.Touch()` func for refresh endpoint info
56
* Added `conn.conn.onClose` slice for call optional funcs on close step
67
* Added removing `conn.Conn` from `conn.Pool` on `conn.Conn.Close()` call
8+
* Checked cluster close/empty on keeper goroutine
79

810
## 3.11.7
911
* Removed internal alias-type `errors.IssuesIterator`

internal/cluster/cluster.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
209209
if err != nil {
210210
onDone(nil, err)
211211
} else {
212-
onDone(cc.Endpoint(), nil)
212+
onDone(cc.Endpoint().Copy(), nil)
213213
}
214214
}()
215215

@@ -234,7 +234,7 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
234234

235235
// Insert inserts new connection into the cluster.
236236
func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
237-
onDone := trace.DriverOnClusterInsert(c.config.Trace(), &ctx, e)
237+
onDone := trace.DriverOnClusterInsert(c.config.Trace(), &ctx, e.Copy())
238238
defer func() {
239239
if cc != nil {
240240
onDone(cc.GetState())
@@ -277,7 +277,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
277277

278278
// Update updates existing connection's runtime stats such that load factor and others.
279279
func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
280-
onDone := trace.DriverOnClusterUpdate(c.config.Trace(), &ctx, e)
280+
onDone := trace.DriverOnClusterUpdate(c.config.Trace(), &ctx, e.Copy())
281281
defer func() {
282282
if cc != nil {
283283
onDone(cc.GetState())
@@ -323,7 +323,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
323323

324324
// Remove removes and closes previously inserted connection.
325325
func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
326-
onDone := trace.DriverOnClusterRemove(c.config.Trace(), &ctx, e)
326+
onDone := trace.DriverOnClusterRemove(c.config.Trace(), &ctx, e.Copy())
327327
defer func() {
328328
if cc != nil {
329329
onDone(cc.GetState())

internal/conn/conn.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (c *conn) setState(ctx context.Context, s State) State {
9393
onDone := trace.DriverOnConnStateChange(
9494
trace.ContextDriver(ctx).Compose(c.config.Trace()),
9595
&ctx,
96-
c.endpoint,
96+
c.endpoint.Copy(),
9797
c.state,
9898
)
9999
c.state = s
@@ -124,7 +124,7 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
124124
onDone := trace.DriverOnConnTake(
125125
trace.ContextDriver(ctx).Compose(c.config.Trace()),
126126
&ctx,
127-
c.endpoint,
127+
c.endpoint.Copy(),
128128
)
129129
defer func() {
130130
onDone(int(atomic.LoadInt32(&c.locks)), err)
@@ -161,7 +161,7 @@ func (c *conn) release(ctx context.Context) {
161161
onDone := trace.DriverOnConnRelease(
162162
trace.ContextDriver(ctx).Compose(c.config.Trace()),
163163
&ctx,
164-
c.endpoint,
164+
c.endpoint.Copy(),
165165
)
166166
atomic.AddInt32(&c.locks, -1)
167167
onDone(int(atomic.LoadInt32(&c.locks)))
@@ -214,7 +214,7 @@ func (c *conn) pessimize(ctx context.Context, err error) {
214214
trace.DriverOnPessimizeNode(
215215
trace.ContextDriver(ctx).Compose(c.config.Trace()),
216216
&ctx,
217-
c.endpoint,
217+
c.endpoint.Copy(),
218218
c.GetState(),
219219
err,
220220
)(c.SetState(ctx, Banned))
@@ -253,7 +253,7 @@ func (c *conn) Invoke(
253253
onDone := trace.DriverOnConnInvoke(
254254
trace.ContextDriver(ctx).Compose(c.config.Trace()),
255255
&ctx,
256-
c.endpoint,
256+
c.endpoint.Copy(),
257257
trace.Method(method),
258258
)
259259
defer func() {
@@ -320,7 +320,7 @@ func (c *conn) NewStream(
320320
streamRecv := trace.DriverOnConnNewStream(
321321
trace.ContextDriver(ctx).Compose(c.config.Trace()),
322322
&ctx,
323-
c.endpoint,
323+
c.endpoint.Copy(),
324324
trace.Method(method),
325325
)
326326
defer func() {

internal/discovery/discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (d *client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
125125
defer func() {
126126
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
127127
for _, e := range endpoints {
128-
nodes = append(nodes, e)
128+
nodes = append(nodes, e.Copy())
129129
}
130130
onDone(location, nodes, err)
131131
}()

internal/endpoint/endpoint.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package endpoint
22

33
import (
44
"fmt"
5+
"sync"
56
"time"
67

78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint/info"
@@ -10,6 +11,7 @@ import (
1011
type Endpoint interface {
1112
Info() info.Info
1213
String() string
14+
Copy() Endpoint
1315

1416
NodeID() uint32
1517
Address() string
@@ -22,6 +24,7 @@ type Endpoint interface {
2224
}
2325

2426
type endpoint struct {
27+
mu sync.RWMutex
2528
id uint32
2629
address string
2730
location string
@@ -33,7 +36,23 @@ type endpoint struct {
3336
lastUpdated time.Time
3437
}
3538

39+
func (e *endpoint) Copy() Endpoint {
40+
e.mu.RLock()
41+
defer e.mu.RUnlock()
42+
return &endpoint{
43+
id: e.id,
44+
address: e.address,
45+
location: e.location,
46+
services: append(make([]string, 0, len(e.services)), e.services...),
47+
loadFactor: e.loadFactor,
48+
local: e.local,
49+
lastUpdated: e.lastUpdated,
50+
}
51+
}
52+
3653
func (e *endpoint) String() string {
54+
e.mu.RLock()
55+
defer e.mu.RUnlock()
3756
return fmt.Sprintf(`{id:%d,address:"%s",local:%t,location:"%s",loadFactor:%f,lastUpdated:"%s"}`,
3857
e.id,
3958
e.address,
@@ -45,6 +64,8 @@ func (e *endpoint) String() string {
4564
}
4665

4766
func (e *endpoint) Info() info.Info {
67+
e.mu.RLock()
68+
defer e.mu.RUnlock()
4869
return info.Info{
4970
ID: e.id,
5071
Address: e.address,
@@ -54,30 +75,44 @@ func (e *endpoint) Info() info.Info {
5475
}
5576

5677
func (e *endpoint) NodeID() uint32 {
78+
e.mu.RLock()
79+
defer e.mu.RUnlock()
5780
return e.id
5881
}
5982

6083
func (e *endpoint) Address() (address string) {
84+
e.mu.RLock()
85+
defer e.mu.RUnlock()
6186
return e.address
6287
}
6388

6489
func (e *endpoint) Location() string {
90+
e.mu.RLock()
91+
defer e.mu.RUnlock()
6592
return e.location
6693
}
6794

6895
func (e *endpoint) LocalDC() bool {
96+
e.mu.RLock()
97+
defer e.mu.RUnlock()
6998
return e.local
7099
}
71100

72101
func (e *endpoint) LoadFactor() float32 {
102+
e.mu.RLock()
103+
defer e.mu.RUnlock()
73104
return e.loadFactor
74105
}
75106

76107
func (e *endpoint) LastUpdated() time.Time {
108+
e.mu.RLock()
109+
defer e.mu.RUnlock()
77110
return e.lastUpdated
78111
}
79112

80113
func (e *endpoint) Touch(opts ...option) {
114+
e.mu.Lock()
115+
defer e.mu.Unlock()
81116
e.lastUpdated = time.Now()
82117
for _, o := range opts {
83118
o(e)
@@ -88,36 +123,48 @@ type option func(e *endpoint)
88123

89124
func WithID(id uint32) option {
90125
return func(e *endpoint) {
126+
e.mu.Lock()
127+
defer e.mu.Unlock()
91128
e.id = id
92129
}
93130
}
94131

95132
func WithLocation(location string) option {
96133
return func(e *endpoint) {
134+
e.mu.Lock()
135+
defer e.mu.Unlock()
97136
e.location = location
98137
}
99138
}
100139

101140
func WithLocalDC(local bool) option {
102141
return func(e *endpoint) {
142+
e.mu.Lock()
143+
defer e.mu.Unlock()
103144
e.local = local
104145
}
105146
}
106147

107148
func WithLoadFactor(loadFactor float32) option {
108149
return func(e *endpoint) {
150+
e.mu.Lock()
151+
defer e.mu.Unlock()
109152
e.loadFactor = loadFactor
110153
}
111154
}
112155

113156
func WithServices(services []string) option {
114157
return func(e *endpoint) {
158+
e.mu.Lock()
159+
defer e.mu.Unlock()
115160
e.services = append(e.services, services...)
116161
}
117162
}
118163

119164
func WithLastUpdated(ts time.Time) option {
120165
return func(e *endpoint) {
166+
e.mu.Lock()
167+
defer e.mu.Unlock()
121168
e.lastUpdated = ts
122169
}
123170
}

internal/table/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"google.golang.org/grpc"
1111

12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/cluster"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/deadline"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/table"
@@ -580,6 +581,8 @@ func (c *client) keeper() {
580581
if err != nil {
581582
switch {
582583
case
584+
errors.Is(err, cluster.ErrClusterClosed),
585+
errors.Is(err, cluster.ErrClusterEmpty),
583586
errors.IsOpError(err, errors.StatusBadSession),
584587
errors.IsTransportError(err, errors.TransportErrorDeadlineExceeded):
585588
toDelete = append(toDelete, s)

trace/driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type EndpointInfo interface {
9393
NodeID() uint32
9494
Address() string
9595
LocalDC() bool
96+
Location() string
9697
LastUpdated() time.Time
9798
}
9899

0 commit comments

Comments
 (0)