Skip to content

Commit baf2707

Browse files
committed
## 3.11.8
* Added `trace.EndpointInfo.LastUpdated()` timestamp * Refactored `endpoint.Endpoint` (split to struct `endopint` and interface `Endpoint`) * Added `endpoint.Endpoint.Touch()` func for refresh endpoint info * Added `conn.conn.onClose` slice for call optional funcs on close step * Added removing `conn.Conn` from `conn.Pool` on `conn.Conn.Close()` call
1 parent 950ce16 commit baf2707

File tree

18 files changed

+296
-225
lines changed

18 files changed

+296
-225
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 3.11.8
2+
* Added `trace.EndpointInfo.LastUpdated()` timestamp
3+
* Refactored `endpoint.Endpoint` (split to struct `endopint` and interface `Endpoint`)
4+
* Added `endpoint.Endpoint.Touch()` func for refresh endpoint info
5+
* Added `conn.conn.onClose` slice for call optional funcs on close step
6+
* Added removing `conn.Conn` from `conn.Pool` on `conn.Conn.Close()` call
7+
18
## 3.11.7
29
* Removed internal alias-type `errors.IssuesIterator`
310

internal/cluster/cluster.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/cluster/entry"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint/info"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -261,16 +260,14 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
261260
panic("ydb: can't insert already existing endpoint")
262261
}
263262

264-
var wait chan struct{}
265-
defer func() {
266-
if wait != nil {
267-
close(wait)
268-
}
269-
}()
263+
cc.Endpoint().Touch()
270264

271265
entry := entry.Entry{Conn: cc}
266+
272267
entry.InsertInto(c.balancer)
268+
273269
c.index[e.Address()] = entry
270+
274271
if e.NodeID() > 0 {
275272
c.endpoints[e.NodeID()] = cc
276273
}
@@ -307,6 +304,8 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
307304
panic("ydb: cluster entry with nil conn")
308305
}
309306

307+
entry.Conn.Endpoint().Touch()
308+
310309
delete(c.endpoints, e.NodeID())
311310
c.index[e.Address()] = entry
312311

@@ -316,7 +315,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
316315

317316
if entry.Handle != nil {
318317
// entry.Handle may be nil when connection is being tracked.
319-
c.balancer.Update(entry.Handle, info.Info{})
318+
c.balancer.Update(entry.Handle, e.Info())
320319
}
321320

322321
return entry.Conn

internal/conn/conn.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type conn struct {
4545
state State
4646
locks int32
4747
ttl timeutil.Timer
48+
onClose []func(Conn)
4849
}
4950

5051
func (c *conn) IsState(states ...State) bool {
@@ -79,7 +80,7 @@ func (c *conn) Endpoint() endpoint.Endpoint {
7980
if c != nil {
8081
return c.endpoint
8182
}
82-
return endpoint.Endpoint{}
83+
return nil
8384
}
8485

8586
func (c *conn) SetState(ctx context.Context, s State) State {
@@ -200,6 +201,9 @@ func (c *conn) Close(ctx context.Context) (err error) {
200201
c.closed = true
201202
err = c.close(ctx)
202203
c.setState(ctx, Destroyed)
204+
for _, f := range c.onClose {
205+
f(c)
206+
}
203207
return err
204208
}
205209

@@ -347,13 +351,24 @@ func (c *conn) NewStream(
347351
}, nil
348352
}
349353

350-
func New(endpoint endpoint.Endpoint, config Config) Conn {
354+
type option func(c *conn)
355+
356+
func withOnClose(onClose func(Conn)) option {
357+
return func(c *conn) {
358+
c.onClose = append(c.onClose, onClose)
359+
}
360+
}
361+
362+
func New(endpoint endpoint.Endpoint, config Config, opts ...option) Conn {
351363
c := &conn{
352364
state: Created,
353365
endpoint: endpoint,
354366
config: config,
355367
done: make(chan struct{}),
356368
}
369+
for _, o := range opts {
370+
o(c)
371+
}
357372
if ttl := config.ConnectionTTL(); ttl > 0 {
358373
c.ttl = timeutil.NewTimer(ttl)
359374
}

internal/conn/pool.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ func (p *pool) GetConn(e endpoint.Endpoint) Conn {
6161
if cc, ok := p.conns[e.Address()]; ok {
6262
return cc
6363
}
64-
cc := New(e, p.config)
64+
cc := New(e, p.config, withOnClose(func(c Conn) {
65+
p.mtx.Lock()
66+
defer p.mtx.Unlock()
67+
delete(p.conns, c.Endpoint().Address())
68+
}))
6569
p.conns[e.Address()] = cc
6670
return cc
6771
}

internal/discovery/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ func (d *client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
123123

124124
var location string
125125
defer func() {
126-
nodes := make([]string, 0)
126+
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
127127
for _, e := range endpoints {
128-
nodes = append(nodes, e.Address())
128+
nodes = append(nodes, e)
129129
}
130130
onDone(location, nodes, err)
131131
}()

internal/endpoint/endpoint.go

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,134 @@
11
package endpoint
22

3-
type Endpoint struct {
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint/info"
8+
)
9+
10+
type Endpoint interface {
11+
Info() info.Info
12+
String() string
13+
14+
NodeID() uint32
15+
Address() string
16+
Location() string
17+
LocalDC() bool
18+
LoadFactor() float32
19+
LastUpdated() time.Time
20+
21+
Touch(opts ...option)
22+
}
23+
24+
type endpoint struct {
425
id uint32
526
address string
627
location string
728
services []string
829

930
loadFactor float32
1031
local bool
32+
33+
lastUpdated time.Time
34+
}
35+
36+
func (e *endpoint) String() string {
37+
return fmt.Sprintf(`{id:%d,address:"%s",local:%t,location:"%s",loadFactor:%f,lastUpdated:"%s"}`,
38+
e.id,
39+
e.address,
40+
e.local,
41+
e.location,
42+
e.loadFactor,
43+
e.lastUpdated.Format(time.RFC3339),
44+
)
45+
}
46+
47+
func (e *endpoint) Info() info.Info {
48+
return info.Info{
49+
ID: e.id,
50+
Address: e.address,
51+
LoadFactor: e.loadFactor,
52+
Local: e.local,
53+
}
1154
}
1255

13-
func (e Endpoint) NodeID() uint32 {
56+
func (e *endpoint) NodeID() uint32 {
1457
return e.id
1558
}
1659

17-
func (e Endpoint) Address() (address string) {
60+
func (e *endpoint) Address() (address string) {
1861
return e.address
1962
}
2063

21-
func (e Endpoint) Location() string {
64+
func (e *endpoint) Location() string {
2265
return e.location
2366
}
2467

25-
func (e Endpoint) LocalDC() bool {
68+
func (e *endpoint) LocalDC() bool {
2669
return e.local
2770
}
2871

29-
func (e Endpoint) LoadFactor() float32 {
72+
func (e *endpoint) LoadFactor() float32 {
3073
return e.loadFactor
3174
}
3275

33-
type option func(e *Endpoint)
76+
func (e *endpoint) LastUpdated() time.Time {
77+
return e.lastUpdated
78+
}
79+
80+
func (e *endpoint) Touch(opts ...option) {
81+
e.lastUpdated = time.Now()
82+
for _, o := range opts {
83+
o(e)
84+
}
85+
}
86+
87+
type option func(e *endpoint)
3488

3589
func WithID(id uint32) option {
36-
return func(e *Endpoint) {
90+
return func(e *endpoint) {
3791
e.id = id
3892
}
3993
}
4094

4195
func WithLocation(location string) option {
42-
return func(e *Endpoint) {
96+
return func(e *endpoint) {
4397
e.location = location
4498
}
4599
}
46100

47101
func WithLocalDC(local bool) option {
48-
return func(e *Endpoint) {
102+
return func(e *endpoint) {
49103
e.local = local
50104
}
51105
}
52106

53107
func WithLoadFactor(loadFactor float32) option {
54-
return func(e *Endpoint) {
108+
return func(e *endpoint) {
55109
e.loadFactor = loadFactor
56110
}
57111
}
58112

59113
func WithServices(services []string) option {
60-
return func(e *Endpoint) {
114+
return func(e *endpoint) {
61115
e.services = append(e.services, services...)
62116
}
63117
}
64118

65-
func New(address string, opts ...option) (e Endpoint) {
66-
e.address = address
119+
func WithLastUpdated(ts time.Time) option {
120+
return func(e *endpoint) {
121+
e.lastUpdated = ts
122+
}
123+
}
124+
125+
func New(address string, opts ...option) Endpoint {
126+
e := &endpoint{
127+
address: address,
128+
lastUpdated: time.Now(),
129+
}
67130
for _, o := range opts {
68-
o(&e)
131+
o(e)
69132
}
70-
return
133+
return e
71134
}

internal/meta/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package meta
22

33
const (
4-
Version = "ydb-go-sdk/3.11.7"
4+
Version = "ydb-go-sdk/3.11.8"
55
)

log/discovery.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@ func Discovery(log Logger, details trace.Details) (t trace.Discovery) {
1717
start := time.Now()
1818
return func(info trace.DiscoverDoneInfo) {
1919
if info.Error == nil {
20-
log.Debugf(`discover done {latency:"%s",endpoints:%v}`,
20+
endpoints := make([]string, 0, len(info.Endpoints))
21+
for _, e := range info.Endpoints {
22+
endpoints = append(endpoints, e.String())
23+
}
24+
log.Debugf(`discover done {latency:"%v",endpoints:%v}`,
2125
time.Since(start),
22-
info.Endpoints,
26+
endpoints,
2327
)
2428
} else {
25-
log.Errorf(`discover failed {latency:"%s",error:"%s"}`,
29+
log.Errorf(`discover failed {latency:"%v",error:"%s"}`,
2630
time.Since(start),
2731
info.Error,
2832
)
@@ -34,13 +38,13 @@ func Discovery(log Logger, details trace.Details) (t trace.Discovery) {
3438
start := time.Now()
3539
return func(info trace.WhoAmIDoneInfo) {
3640
if info.Error == nil {
37-
log.Debugf(`whoAmI done {latency:"%s",user:%v,groups:%v}`,
41+
log.Debugf(`whoAmI done {latency:"%v",user:%v,groups:%v}`,
3842
time.Since(start),
3943
info.User,
4044
info.Groups,
4145
)
4246
} else {
43-
log.Errorf(`whoAmI failed {latency:"%s",error:"%s"}`,
47+
log.Errorf(`whoAmI failed {latency:"%v",error:"%s"}`,
4448
time.Since(start),
4549
info.Error,
4650
)

0 commit comments

Comments
 (0)