Skip to content

Commit 82fd97e

Browse files
authored
Merge pull request #190 from ydb-platform/add-address-to-error-grom-take
* Added address to error description which thrown from `conn.take()`
2 parents dbbb04d + 20f189b commit 82fd97e

File tree

5 files changed

+50
-68
lines changed

5 files changed

+50
-68
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
* Renamed package `internal/db` to `internal/database` for exclude collisions with variable name `db`
1+
* Added `conn.Ping(ctx)` method for check availability of `conn.Conn`
2+
* Refactored `cluster.Cluster.Get(ctx)` to return only available connection (instead of returning any connection from balancer)
3+
* Added address to error description thrown from `conn.take()`
4+
* Renamed package `internal/db` to `internal/database` to exclude collisions with variable name `db`
25

36
## v3.18.0
47
* Added `go1.18` to test matrix

internal/cluster/cluster.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func parseOptions(opts ...crudOption) *crudOptionsHolder {
129129

130130
type Getter interface {
131131
// Get gets conn from cluster
132-
Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, err error)
132+
Get(ctx context.Context) (cc conn.Conn, err error)
133133
}
134134

135135
type Inserter interface {
@@ -247,17 +247,11 @@ func (c *cluster) Close(ctx context.Context) (err error) {
247247

248248
// Get returns next available connection.
249249
// It returns error on given deadline cancellation or when cluster become closed.
250-
func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, err error) {
250+
func (c *cluster) Get(ctx context.Context) (cc conn.Conn, err error) {
251251
var cancel context.CancelFunc
252252
ctx, cancel = context.WithTimeout(ctx, MaxGetConnTimeout)
253253
defer cancel()
254254

255-
options := parseOptions(opts...)
256-
if options.withLock {
257-
c.mu.Lock()
258-
defer c.mu.Unlock()
259-
}
260-
261255
if c.closed {
262256
return nil, errors.WithStackTrace(ErrClusterClosed)
263257
}
@@ -272,7 +266,9 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
272266
}()
273267

274268
if e, ok := ContextEndpoint(ctx); ok {
269+
c.mu.RLock()
275270
cc, ok = c.endpoints[e.NodeID()]
271+
c.mu.RUnlock()
276272
if ok && cc.IsState(
277273
conn.Created,
278274
conn.Online,
@@ -282,15 +278,22 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
282278
}
283279
}
284280

285-
c.balancerMtx.RLock()
286-
defer c.balancerMtx.RUnlock()
287-
288-
cc = c.config.Balancer().Next()
289-
if cc == nil {
290-
return nil, errors.WithStackTrace(ErrClusterEmpty)
281+
for {
282+
select {
283+
case <-ctx.Done():
284+
return nil, errors.WithStackTrace(ctx.Err())
285+
default:
286+
c.balancerMtx.RLock()
287+
cc = c.config.Balancer().Next()
288+
c.balancerMtx.RUnlock()
289+
if cc == nil {
290+
return nil, errors.WithStackTrace(ErrClusterEmpty)
291+
}
292+
if err = cc.Ping(ctx); err == nil {
293+
return cc, nil
294+
}
295+
}
291296
}
292-
293-
return cc, nil
294297
}
295298

296299
// Insert inserts new connection into the cluster.

internal/cluster/cluster_test.go

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"sort"
77
"testing"
8-
"time"
98

109
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
1110
"github.com/ydb-platform/ydb-go-sdk/v3/config"
@@ -17,53 +16,6 @@ import (
1716
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1817
)
1918

20-
func TestClusterFastRedial(t *testing.T) {
21-
ctx, cancel := context.WithCancel(context.Background())
22-
defer cancel()
23-
24-
l, b := stub.Balancer()
25-
c := &cluster{
26-
config: config.New(
27-
config.WithBalancer(b),
28-
),
29-
index: make(map[string]entry.Entry),
30-
endpoints: make(map[uint32]conn.Conn),
31-
pool: conn.NewPool(ctx, config.New()),
32-
}
33-
34-
pingConnects := func(size int) chan struct{} {
35-
done := make(chan struct{})
36-
go func() {
37-
for i := 0; i < size*10; i++ {
38-
cc, err := c.Get(context.Background())
39-
// enforce close bad connects to track them
40-
if err == nil && c != nil && cc.Endpoint().Address() == "bad:0" {
41-
_ = c.Remove(ctx, cc.Endpoint())
42-
}
43-
}
44-
close(done)
45-
}()
46-
return done
47-
}
48-
49-
ne := []endpoint.Endpoint{
50-
endpoint.New("foo:0"),
51-
endpoint.New("bad:0"),
52-
}
53-
mergeEndpointIntoCluster(
54-
ctx,
55-
c,
56-
[]endpoint.Endpoint{},
57-
ne,
58-
)
59-
select {
60-
case <-pingConnects(len(ne)):
61-
62-
case <-time.After(time.Second * 15):
63-
t.Fatalf("Time limit exceeded while %d endpoints in balance. Wait channel used", len(*l))
64-
}
65-
}
66-
6719
func TestClusterMergeEndpoints(t *testing.T) {
6820
ctx, cancel := context.WithCancel(context.Background())
6921
defer cancel()

internal/cmd/gtrace/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,15 @@ func main() {
6767
// We should respect Go suffixes like `_linux.go`.
6868
name, tags, ext := splitOSArchTags(&buildCtx, gofile)
6969
openFile := func(name string) (*os.File, func()) {
70-
p := filepath.Join(workDir, name)
7170
var f *os.File
7271
// nolint: gofumpt
7372
// nolint: nolintlint
74-
f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
73+
// nolint: gosec
74+
f, err = os.OpenFile(
75+
filepath.Join(workDir, filepath.Clean(name)),
76+
os.O_WRONLY|os.O_CREATE|os.O_TRUNC,
77+
0600,
78+
)
7579
if err != nil {
7680
log.Fatal(err)
7781
}

internal/conn/conn.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ import (
2121
var (
2222
// errOperationNotReady specified error when operation is not ready
2323
errOperationNotReady = errors.New(fmt.Errorf("operation is not ready yet"))
24+
2425
// errClosedConnection specified error when connection are closed early
2526
errClosedConnection = errors.New(fmt.Errorf("connection closed early"))
27+
28+
// errUnavailableConnection specified error when connection are closed early
29+
errUnavailableConnection = errors.New(fmt.Errorf("connection unavailable"))
2630
)
2731

2832
type Conn interface {
@@ -32,6 +36,7 @@ type Conn interface {
3236

3337
LastUsage() time.Time
3438

39+
Ping(ctx context.Context) error
3540
IsState(states ...State) bool
3641
GetState() State
3742
SetState(State) State
@@ -57,6 +62,17 @@ type conn struct {
5762
onClose []func(*conn)
5863
}
5964

65+
func (c *conn) Ping(ctx context.Context) error {
66+
cc, err := c.take(ctx)
67+
if err != nil {
68+
return errors.WithStackTrace(err)
69+
}
70+
if !isAvailable(cc) {
71+
return errors.WithStackTrace(errUnavailableConnection)
72+
}
73+
return nil
74+
}
75+
6076
func (c *conn) Release(ctx context.Context) (err error) {
6177
var (
6278
onDone = trace.DriverOnConnRelease(
@@ -212,7 +228,7 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
212228

213229
cc, err = grpc.DialContext(ctx, address, c.config.GrpcDialOptions()...)
214230
if err != nil {
215-
return nil, errors.WithStackTrace(err)
231+
return nil, errors.WithStackTrace(fmt.Errorf("dial %s failed: %w", address, err))
216232
}
217233

218234
c.cc = cc
@@ -269,6 +285,10 @@ func isBroken(raw *grpc.ClientConn) bool {
269285
return s == connectivity.Shutdown || s == connectivity.TransientFailure
270286
}
271287

288+
func isAvailable(raw *grpc.ClientConn) bool {
289+
return raw != nil && raw.GetState() == connectivity.Ready
290+
}
291+
272292
// conn must be locked
273293
func (c *conn) close() (err error) {
274294
if c.cc == nil {

0 commit comments

Comments
 (0)