Skip to content

Commit d475dc6

Browse files
authored
Merge pull request #852 from ydb-platform/time-after
Removed `backoff.Backoff.Wait` interface method for exclude resource leak with bug-provoked usage of `time.After`
2 parents db24653 + ecd3ccb commit d475dc6

File tree

6 files changed

+38
-39
lines changed

6 files changed

+38
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Removed `internal/backoff.Backoff.Wait` interface method for exclude resource leak with bug-provoked usage of `time.After` method
12
* Marked as deprecated `retry.WithDoRetryOptions` and `retry.WithDoTxRetryOptions`
23
* Added receiving first result set on construct `internal/table/scanner.NewStream()`
34
* Added experimental package `metrics` with SDK metrics

internal/backoff/backoff.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,11 @@ import (
44
"math"
55
"time"
66

7-
"github.com/jonboulle/clockwork"
8-
97
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
108
)
119

1210
// Backoff is the interface that contains logic of delaying operation retry.
1311
type Backoff interface {
14-
// Wait maps index of the retry to a channel which fulfillment means that
15-
// Delay is over.
16-
//
17-
// Note that retry index begins from 0 and 0-th index means that it is the
18-
// first retry attempt after an initial error.
19-
Wait(n int) <-chan time.Time
20-
2112
// Delay returns mapping of i to Delay.
2213
Delay(i int) time.Duration
2314
}
@@ -39,6 +30,8 @@ var (
3930
)
4031
)
4132

33+
var _ Backoff = (*logBackoff)(nil)
34+
4235
// logBackoff contains logarithmic Backoff policy.
4336
type logBackoff struct {
4437
// slotDuration is a size of a single time slot used in Backoff Delay
@@ -59,8 +52,6 @@ type logBackoff struct {
5952
// duration D; and R is a random sized part from [0,(D - F)].
6053
jitterLimit float64
6154

62-
clock clockwork.Clock
63-
6455
// generator of jitter
6556
r xrand.Rand
6657
}
@@ -85,12 +76,6 @@ func WithJitterLimit(jitterLimit float64) option {
8576
}
8677
}
8778

88-
func WithClock(clock clockwork.Clock) option {
89-
return func(b *logBackoff) {
90-
b.clock = clock
91-
}
92-
}
93-
9479
func WithSeed(seed int64) option {
9580
return func(b *logBackoff) {
9681
b.r = xrand.New(xrand.WithLock(), xrand.WithSeed(seed))
@@ -99,8 +84,7 @@ func WithSeed(seed int64) option {
9984

10085
func New(opts ...option) logBackoff {
10186
b := logBackoff{
102-
r: xrand.New(xrand.WithLock()),
103-
clock: clockwork.NewRealClock(),
87+
r: xrand.New(xrand.WithLock()),
10488
}
10589
for _, o := range opts {
10690
if o != nil {
@@ -110,11 +94,6 @@ func New(opts ...option) logBackoff {
11094
return b
11195
}
11296

113-
// Wait implements Backoff interface.
114-
func (b logBackoff) Wait(n int) <-chan time.Time {
115-
return b.clock.After(b.Delay(n))
116-
}
117-
11897
// Delay returns mapping of i to Delay.
11998
func (b logBackoff) Delay(i int) time.Duration {
12099
s := b.slotDuration

internal/repeater/repeater.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
168168
backoff.WithSlotDuration(500*time.Millisecond),
169169
backoff.WithCeiling(6),
170170
backoff.WithJitterLimit(1),
171-
backoff.WithClock(r.clock),
172171
)
173172

174173
// forceIndex defines delay index for force backoff
@@ -178,12 +177,16 @@ func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
178177
if forceIndex == 0 {
179178
return EventForce
180179
}
180+
181+
force := r.clock.NewTimer(force.Delay(forceIndex))
182+
defer force.Stop()
183+
181184
select {
182185
case <-ctx.Done():
183186
return EventCancel
184187
case <-tick.Chan():
185188
return EventTick
186-
case <-force.Wait(forceIndex):
189+
case <-force.Chan():
187190
return EventForce
188191
}
189192
}

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,18 @@ func (r *readerReconnector) ReadMessageBatch(ctx context.Context, opts ReadMessa
9494

9595
for {
9696
if attempt > 0 {
97-
select {
98-
case <-ctx.Done():
99-
return nil, ctx.Err()
100-
case <-backoff.Fast.Wait(attempt):
101-
// pass
97+
if err := func() error {
98+
t := r.clock.NewTimer(backoff.Fast.Delay(attempt))
99+
defer t.Stop()
100+
101+
select {
102+
case <-ctx.Done():
103+
return ctx.Err()
104+
case <-t.Chan():
105+
return nil
106+
}
107+
}(); err != nil {
108+
return nil, err
102109
}
103110
}
104111

@@ -211,13 +218,18 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
211218
request.reason,
212219
r.clock.Since(retriesStarted),
213220
); isRetriableErr {
214-
delay := retryBackoff.Delay(attempt)
215-
216-
select {
217-
case <-ctx.Done():
221+
if err := func() error {
222+
t := r.clock.NewTimer(retryBackoff.Delay(attempt))
223+
defer t.Stop()
224+
225+
select {
226+
case <-ctx.Done():
227+
return ctx.Err()
228+
case <-t.Chan():
229+
return nil
230+
}
231+
}(); err != nil {
218232
return
219-
case <-r.clock.After(delay):
220-
// pass
221233
}
222234
}
223235
}

internal/wait/wait.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package wait
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -11,8 +12,11 @@ import (
1112
// expiration.
1213
// It returns non-nil error if and only if deadline expiration branch wins.
1314
func waitBackoff(ctx context.Context, b backoff.Backoff, i int) error {
15+
t := time.NewTimer(b.Delay(i))
16+
defer t.Stop()
17+
1418
select {
15-
case <-b.Wait(i):
19+
case <-t.C:
1620
return nil
1721
case <-ctx.Done():
1822
if err := ctx.Err(); err != nil {

internal/xsql/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type ErrConnAlreadyHaveTx struct {
2020
}
2121

2222
func (err *ErrConnAlreadyHaveTx) Error() string {
23-
return "conn already have an opened currentTx: " + err.currentTx
23+
return "conn already have an open currentTx: " + err.currentTx
2424
}
2525

2626
func (err *ErrConnAlreadyHaveTx) As(target interface{}) bool {

0 commit comments

Comments
 (0)