Skip to content

Commit 6ce1deb

Browse files
authored
* Added retries to initial discovering (#409)
1 parent 8dfeab1 commit 6ce1deb

File tree

5 files changed

+30
-18
lines changed

5 files changed

+30
-18
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added retries to initial discovering
2+
13
## v3.38.2
24
* Added missing `RetentionPeriod` parameter for topic description
35
* Fixed reconnect problem for topic client

connection.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
413413
}()
414414

415415
if c.pool == nil {
416-
c.pool = conn.NewPool(ctx, c.config)
416+
c.pool = conn.NewPool(c.config)
417417
}
418418

419419
if c.userInfo != nil {
@@ -425,6 +425,12 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
425425
))
426426
}
427427

428+
if t := c.config.DialTimeout(); t > 0 {
429+
var cancel context.CancelFunc
430+
ctx, cancel = context.WithTimeout(ctx, t)
431+
defer cancel()
432+
}
433+
428434
c.balancer, err = balancer.New(ctx,
429435
c.config, c.pool,
430436
append(

internal/balancer/balancer.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater"
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2021
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2122
)
2223

@@ -146,6 +147,12 @@ func New(
146147
pool *conn.Pool,
147148
opts ...discoveryConfig.Option,
148149
) (b *Balancer, err error) {
150+
if t := c.DialTimeout(); t > 0 {
151+
var cancel context.CancelFunc
152+
ctx, cancel = context.WithTimeout(ctx, t)
153+
defer cancel()
154+
}
155+
149156
onDone := trace.DriverOnBalancerInit(
150157
c.Trace(),
151158
&ctx,
@@ -181,9 +188,17 @@ func New(
181188
endpointsToConnections(pool, []endpoint.Endpoint{discoveryEndpoint}),
182189
nil, balancerConfig.Info{}, false)
183190
} else {
184-
if err = b.clusterDiscovery(ctx); err != nil {
191+
// initialization of balancer state
192+
if err = retry.Retry(ctx, func(ctx context.Context) (err error) {
193+
if err = b.clusterDiscovery(ctx); err != nil {
194+
return xerrors.WithStackTrace(err)
195+
}
196+
return nil
197+
}, retry.WithIdempotent(true)); err != nil {
185198
return nil, xerrors.WithStackTrace(err)
186199
}
200+
201+
// run background discovering
187202
if d := discoveryConfig.Interval(); d > 0 {
188203
b.discoveryRepeater = repeater.New(d, func(ctx context.Context) (err error) {
189204
ctx, cancel := context.WithTimeout(ctx, d)
@@ -197,14 +212,6 @@ func New(
197212
}
198213
}
199214

200-
var cancel context.CancelFunc
201-
if t := c.DialTimeout(); t > 0 {
202-
ctx, cancel = context.WithTimeout(ctx, c.DialTimeout())
203-
} else {
204-
ctx, cancel = context.WithCancel(ctx)
205-
}
206-
defer cancel()
207-
208215
return b, nil
209216
}
210217

internal/balancer/local_dc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestLocalDCDiscovery(t *testing.T) {
144144
r := &Balancer{
145145
driverConfig: cfg,
146146
balancerConfig: *cfg.Balancer(),
147-
pool: conn.NewPool(ctx, cfg),
147+
pool: conn.NewPool(cfg),
148148
discovery: discoveryMock{endpoints: []endpoint.Endpoint{
149149
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
150150
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},

internal/conn/pool.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (p *Pool) Release(ctx context.Context) error {
144144
return nil
145145
}
146146

147-
func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
147+
func (p *Pool) connParker(ttl, interval time.Duration) {
148148
ticker := time.NewTicker(interval)
149149
defer ticker.Stop()
150150
for {
@@ -156,7 +156,7 @@ func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
156156
if time.Since(c.LastUsage()) > ttl {
157157
switch c.GetState() {
158158
case Online, Banned:
159-
_ = c.park(ctx)
159+
_ = c.park(context.Background())
160160
default:
161161
// nop
162162
}
@@ -176,10 +176,7 @@ func (p *Pool) collectConns() []*conn {
176176
return conns
177177
}
178178

179-
func NewPool(
180-
ctx context.Context,
181-
config Config,
182-
) *Pool {
179+
func NewPool(config Config) *Pool {
183180
p := &Pool{
184181
usages: 1,
185182
config: config,
@@ -188,7 +185,7 @@ func NewPool(
188185
done: make(chan struct{}),
189186
}
190187
if ttl := config.ConnectionTTL(); ttl > 0 {
191-
go p.connParker(ctx, ttl, ttl/2)
188+
go p.connParker(ttl, ttl/2)
192189
}
193190
return p
194191
}

0 commit comments

Comments
 (0)