Skip to content

Commit 230edc6

Browse files
committed
fixed race on write err in discovery.go + refactored repeater API
1 parent cfbe693 commit 230edc6

File tree

4 files changed

+31
-17
lines changed

4 files changed

+31
-17
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
* Removed `internal/lazy.Discovery` (discovery client always initialized)
99
* Fixed `trace.Table` event structs
1010
* Refactored grpc options for define dns-balancing configuration
11-
* Refactored `retry.Retry` signature (added `retry.WithID`, `retry.WithTrace` and `retry.WithIdempotent` opt-in args, required param `isIdempotentOperation` removed)
11+
* Refactored `retry.Retry` signature (added `retry.WithID`, `retry.WithTrace` and `retry.WithIdempotent` opt-in args, required param `isIdempotentOperation` removed)
12+
* Refactored package `internal/repeater`
1213

1314
## 3.9.4
1415
* Fixed data race on closing session pool

internal/discovery/discovery.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"net"
66
"strconv"
7-
"time"
87

98
"google.golang.org/protobuf/proto"
109

@@ -57,15 +56,14 @@ func New(
5756
repeater.NewRepeater(
5857
deadline.ContextWithoutDeadline(ctx),
5958
c.config.Interval(),
60-
func(ctx context.Context) {
59+
func(ctx context.Context) (err error) {
6160
next, err = c.Discover(ctx)
62-
// if nothing endpoint - re-discover after one second
63-
// and use old endpoint list
64-
if err != nil || len(next) == 0 {
65-
go func() {
66-
time.Sleep(time.Second)
67-
crudExplorer.Force()
68-
}()
61+
if err != nil {
62+
return err
63+
}
64+
65+
// if nothing endpoint - use old endpoint list
66+
if len(next) == 0 {
6967
return
7068
}
7169

@@ -94,7 +92,10 @@ func New(
9492
)
9593
},
9694
)
95+
9796
curr = next
97+
98+
return nil
9899
},
99100
),
100101
)

internal/repeater/repeat.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type repeater struct {
2020
interval time.Duration
2121

2222
// Task is a function that must be executed periodically.
23-
task func(context.Context)
23+
task func(context.Context) error
2424

2525
timer timeutil.Timer
2626
stopOnce sync.Once
@@ -35,7 +35,7 @@ type repeater struct {
3535
func NewRepeater(
3636
ctx context.Context,
3737
interval time.Duration,
38-
task func(ctx context.Context),
38+
task func(ctx context.Context) (err error),
3939
) Repeater {
4040
if interval <= 0 {
4141
return nil
@@ -72,22 +72,30 @@ func (r *repeater) Force() {
7272
}
7373
}
7474

75+
func (r *repeater) singleTask() {
76+
if err := r.task(r.ctx); err != nil {
77+
r.timer.Reset(time.Second)
78+
} else {
79+
r.timer.Reset(r.interval)
80+
}
81+
}
82+
7583
func (r *repeater) worker() {
7684
defer func() {
7785
close(r.done)
7886
}()
79-
r.task(r.ctx)
87+
r.singleTask()
8088
for {
8189
select {
8290
case <-r.stop:
8391
return
8492
case <-r.timer.C():
85-
r.task(r.ctx)
93+
r.singleTask()
8694
case <-r.force:
8795
if !r.timer.Stop() {
8896
<-r.timer.C()
8997
}
98+
r.timer.Reset(r.interval)
9099
}
91-
r.timer.Reset(r.interval)
92100
}
93101
}

internal/repeater/test/repeat_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ func TestRepeater(t *testing.T) {
2828

2929
exec := make(chan struct{})
3030
r := repeater.NewRepeater(ctx, 42*time.Second,
31-
func(_ context.Context) {
31+
func(_ context.Context) error {
3232
exec <- struct{}{}
33+
return nil
3334
})
3435

3536
timerC <- time.Now()
@@ -59,10 +60,11 @@ func TestRepeaterCancellation(t *testing.T) {
5960
defer cancel()
6061

6162
r := repeater.NewRepeater(ctx, 42*time.Second,
62-
func(ctx context.Context) {
63+
func(ctx context.Context) error {
6364
enter <- struct{}{}
6465
<-ctx.Done()
6566
exit <- struct{}{}
67+
return nil
6668
})
6769

6870
// Run callback in a separate goroutine to avoid deadlock.
@@ -104,12 +106,14 @@ func noRecv(ch interface{}, timeout time.Duration) error {
104106
}
105107

106108
func assertRecv(t *testing.T, timeout time.Duration, ch interface{}) {
109+
t.Helper()
107110
if err := recv(ch, timeout); err != nil {
108111
t.Fatalf("%s: %v", testutil.FileLine(2), err)
109112
}
110113
}
111114

112115
func assertNoRecv(t *testing.T, timeout time.Duration, ch interface{}) {
116+
t.Helper()
113117
if err := noRecv(ch, timeout); err != nil {
114118
t.Fatalf("%s: %v", testutil.FileLine(2), err)
115119
}

0 commit comments

Comments
 (0)