Skip to content

Commit 925c91c

Browse files
authored
Merge pull request #248 from ydb-platform/force-log-backoff
* Implemented log backoff for force repeater wakeup's + meybe fix of race on read trailing metadata
2 parents 6af8383 + 0eeae65 commit 925c91c

File tree

10 files changed

+325
-151
lines changed

10 files changed

+325
-151
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* Fixed bug with convertation `time.Duration` from/to YDB value
77
* Removed from `ydb.Connection` embedding of `grpc.ClientConnInterface`
88
* Fixed stopping of repeater
9-
* Added hard minimal interval (1 second) between force wake up's
9+
* Added log backoff between force repeater wake up's (from 500ms to 32s)
1010
* Renamed `trace.DriverRepeaterTick{Start,Done}Info` to `trace.DriverRepeaterWakeUp{Start,Done}Info`
1111
* Fixed unexpected `NullFlag` while parse nil `JSONDocument` value
1212
* Removed `internal/conn/conn.streamUsages` and `internal/conn/conn.usages` (`internal/conn.conn` always touching last usage timestamp on API calls)

connection.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
14-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/single"
1514
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1615
internalCoordination "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination"
1716
coordinationConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config"
@@ -391,13 +390,6 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
391390
return nil, xerrors.WithStackTrace(errors.New("configuration: empty database"))
392391
}
393392

394-
if single.IsSingle(c.config.Balancer()) {
395-
c.discoveryOptions = append(
396-
c.discoveryOptions,
397-
discoveryConfig.WithInterval(0),
398-
)
399-
}
400-
401393
if c.pool == nil {
402394
c.pool = conn.NewPool(
403395
ctx,

discovery/discovery_e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestDiscovery(t *testing.T) {
6363
ydb.WithConnectionTTL(time.Second*1),
6464
ydb.WithMinTLSVersion(tls.VersionTLS10),
6565
ydb.WithLogger(
66-
trace.MatchDetails(`ydb\.(driver|discovery).*`),
66+
trace.MatchDetails(`ydb\.(driver|discovery|repeater).*`),
6767
ydb.WithNamespace("ydb"),
6868
ydb.WithOutWriter(os.Stdout),
6969
ydb.WithErrWriter(os.Stdout),

internal/backoff/backoff_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package backoff
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestDelays(t *testing.T) {
13+
duration := func(s string) (d time.Duration) {
14+
d, err := time.ParseDuration(s)
15+
if err != nil {
16+
panic(err)
17+
}
18+
return d
19+
}
20+
b := New(
21+
WithSlotDuration(duration("500ms")),
22+
WithCeiling(6),
23+
WithJitterLimit(1),
24+
)
25+
for i, d := range map[int]time.Duration{
26+
0: duration("500ms"),
27+
1: duration("1s"),
28+
2: duration("2s"),
29+
3: duration("4s"),
30+
4: duration("8s"),
31+
5: duration("16s"),
32+
6: duration("32s"),
33+
7: duration("32s"),
34+
8: duration("32s"),
35+
} {
36+
t.Run(fmt.Sprintf("%v -> %v", i, d), func(t *testing.T) {
37+
require.Equal(t, d, b.Delay(i))
38+
})
39+
}
40+
}
41+
42+
func TestLogBackoff(t *testing.T) {
43+
type exp struct {
44+
eq time.Duration
45+
gte time.Duration
46+
lte time.Duration
47+
}
48+
for _, test := range []struct {
49+
name string
50+
backoff Backoff
51+
exp []exp
52+
seeds int64
53+
}{
54+
{
55+
backoff: New(
56+
WithSlotDuration(time.Second),
57+
WithCeiling(3),
58+
WithJitterLimit(0),
59+
),
60+
exp: []exp{
61+
{gte: 0, lte: time.Second}, // 1 << min(0, 3)
62+
{gte: 0, lte: 2 * time.Second}, // 1 << min(1, 3)
63+
{gte: 0, lte: 4 * time.Second}, // 1 << min(2, 3)
64+
{gte: 0, lte: 8 * time.Second}, // 1 << min(3, 3)
65+
{gte: 0, lte: 8 * time.Second}, // 1 << min(4, 3)
66+
{gte: 0, lte: 8 * time.Second}, // 1 << min(5, 3)
67+
{gte: 0, lte: 8 * time.Second}, // 1 << min(6, 3)
68+
},
69+
seeds: 1000,
70+
},
71+
{
72+
backoff: New(
73+
WithSlotDuration(time.Second),
74+
WithCeiling(3),
75+
WithJitterLimit(0.5),
76+
),
77+
exp: []exp{
78+
{gte: 500 * time.Millisecond, lte: time.Second}, // 1 << min(0, 3)
79+
{gte: 1 * time.Second, lte: 2 * time.Second}, // 1 << min(1, 3)
80+
{gte: 2 * time.Second, lte: 4 * time.Second}, // 1 << min(2, 3)
81+
{gte: 4 * time.Second, lte: 8 * time.Second}, // 1 << min(3, 3)
82+
{gte: 4 * time.Second, lte: 8 * time.Second}, // 1 << min(4, 3)
83+
{gte: 4 * time.Second, lte: 8 * time.Second}, // 1 << min(5, 3)
84+
{gte: 4 * time.Second, lte: 8 * time.Second}, // 1 << min(6, 3)
85+
},
86+
seeds: 1000,
87+
},
88+
{
89+
backoff: New(
90+
WithSlotDuration(time.Second),
91+
WithCeiling(3),
92+
WithJitterLimit(1),
93+
),
94+
exp: []exp{
95+
{eq: time.Second}, // 1 << min(0, 3)
96+
{eq: 2 * time.Second}, // 1 << min(1, 3)
97+
{eq: 4 * time.Second}, // 1 << min(2, 3)
98+
{eq: 8 * time.Second}, // 1 << min(3, 3)
99+
{eq: 8 * time.Second}, // 1 << min(4, 3)
100+
{eq: 8 * time.Second}, // 1 << min(5, 3)
101+
{eq: 8 * time.Second}, // 1 << min(6, 3)
102+
},
103+
},
104+
} {
105+
t.Run(test.name, func(t *testing.T) {
106+
if test.seeds == 0 {
107+
test.seeds = 1
108+
}
109+
for seed := int64(0); seed < test.seeds; seed++ {
110+
// Fix random to reproduce the tests.
111+
rand.Seed(seed)
112+
113+
for n, exp := range test.exp {
114+
act := test.backoff.Delay(n)
115+
if exp := exp.eq; exp != 0 {
116+
if exp != act {
117+
t.Fatalf(
118+
"unexpected Backoff delay: %s; want %s",
119+
act, exp,
120+
)
121+
}
122+
continue
123+
}
124+
if gte := exp.gte; act < gte {
125+
t.Errorf(
126+
"unexpected Backoff delay: %s; want >= %s",
127+
act, gte,
128+
)
129+
}
130+
if lte := exp.lte; act > lte {
131+
t.Errorf(
132+
"unexpected Backoff delay: %s; want <= %s",
133+
act, lte,
134+
)
135+
}
136+
}
137+
}
138+
})
139+
}
140+
}

internal/conn/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
197197
}
198198

199199
// prepend "ydb" scheme for grpc dns-xresolver to find the proper scheme
200-
// ydb:///", three slashes is ok. It need for good parse scheme in grpc resolver.
200+
// "ydb:///", three slashes is ok. It needs for good parse scheme in grpc resolver.
201201
address := "ydb:///" + c.endpoint.Address()
202202

203203
cc, err = grpc.DialContext(ctx, address, c.config.GrpcDialOptions()...)

internal/repeater/repeater.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package repeater
22

33
import (
44
"context"
5-
"sync/atomic"
65
"time"
76

7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
88
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
99
)
1010

@@ -25,10 +25,10 @@ type repeater struct {
2525
// Task is a function that must be executed periodically.
2626
task func(context.Context) error
2727

28-
stop context.CancelFunc
28+
cancel context.CancelFunc
2929
stopped chan struct{}
3030

31-
force int32
31+
force chan struct{}
3232
}
3333

3434
type option func(r *repeater)
@@ -54,8 +54,9 @@ func WithInterval(interval time.Duration) option {
5454
type event string
5555

5656
const (
57-
eventTick = event("tick")
58-
eventForce = event("force")
57+
eventTick = event("tick")
58+
eventForce = event("force")
59+
eventCancel = event("cancel")
5960
)
6061

6162
// New creates and begins to execute task periodically.
@@ -64,13 +65,14 @@ func New(
6465
task func(ctx context.Context) (err error),
6566
opts ...option,
6667
) *repeater {
67-
ctx, stop := context.WithCancel(context.Background())
68+
ctx, cancel := context.WithCancel(context.Background())
6869

6970
r := &repeater{
7071
interval: interval,
7172
task: task,
72-
stop: stop,
73+
cancel: cancel,
7374
stopped: make(chan struct{}),
75+
force: make(chan struct{}, 1),
7476
}
7577

7678
for _, o := range opts {
@@ -82,14 +84,24 @@ func New(
8284
return r
8385
}
8486

87+
func (r *repeater) stop(onCancel func()) {
88+
r.cancel()
89+
if onCancel != nil {
90+
onCancel()
91+
}
92+
<-r.stopped
93+
}
94+
8595
// Stop stops to execute its task.
8696
func (r *repeater) Stop() {
87-
r.stop()
88-
<-r.stopped
97+
r.stop(nil)
8998
}
9099

91100
func (r *repeater) Force() {
92-
atomic.AddInt32(&r.force, 1)
101+
select {
102+
case r.force <- struct{}{}:
103+
default:
104+
}
93105
}
94106

95107
func (r *repeater) wakeUp(ctx context.Context, e event) (err error) {
@@ -108,9 +120,12 @@ func (r *repeater) wakeUp(ctx context.Context, e event) (err error) {
108120
onDone(err)
109121

110122
if err != nil {
111-
atomic.StoreInt32(&r.force, 1)
123+
r.Force()
112124
} else {
113-
atomic.StoreInt32(&r.force, 0)
125+
select {
126+
case <-r.force:
127+
default:
128+
}
114129
}
115130
}()
116131

@@ -123,21 +138,52 @@ func (r *repeater) worker(ctx context.Context, interval time.Duration) {
123138
tick := time.NewTicker(interval)
124139
defer tick.Stop()
125140

126-
force := time.NewTicker(time.Second) // minimal interval between force wakeup's (maybe configured?)
127-
defer force.Stop()
141+
// force returns backoff with delays [500ms...32s]
142+
force := backoff.New(
143+
backoff.WithSlotDuration(500*time.Second),
144+
backoff.WithCeiling(6),
145+
backoff.WithJitterLimit(1),
146+
)
147+
148+
// forceIndex defines delay index for force backoff
149+
forceIndex := 0
150+
151+
waitForceEvent := func() event {
152+
if forceIndex == 0 {
153+
return eventForce
154+
}
155+
select {
156+
case <-ctx.Done():
157+
return eventCancel
158+
case <-tick.C:
159+
return eventTick
160+
case <-force.Wait(forceIndex):
161+
return eventForce
162+
}
163+
}
164+
165+
// processEvent func checks wakeup error and returns new force index
166+
processEvent := func(event event) {
167+
if event == eventCancel {
168+
return
169+
}
170+
if err := r.wakeUp(ctx, event); err != nil {
171+
forceIndex++
172+
} else {
173+
forceIndex = 0
174+
}
175+
}
128176

129177
for {
130178
select {
131179
case <-ctx.Done():
132180
return
133181

134182
case <-tick.C:
135-
_ = r.wakeUp(ctx, eventTick)
183+
processEvent(eventTick)
136184

137-
case <-force.C:
138-
if atomic.LoadInt32(&r.force) != 0 {
139-
_ = r.wakeUp(ctx, eventForce)
140-
}
185+
case <-r.force:
186+
processEvent(waitForceEvent())
141187
}
142188
}
143189
}

0 commit comments

Comments
 (0)