@@ -2,10 +2,9 @@ package repeater
22
33import (
44 "context"
5- "sync"
65 "time"
76
8- "github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil "
7+ "github.com/ydb-platform/ydb-go-sdk/v3/trace "
98)
109
1110type Repeater interface {
@@ -19,50 +18,73 @@ type repeater struct {
1918 // Interval must be greater than zero; if not, Repeater will panic.
2019 interval time.Duration
2120
21+ name string
22+ trace trace.Driver
23+
2224 // Task is a function that must be executed periodically.
2325 task func (context.Context ) error
2426
25- timer timeutil.Timer
26- stopOnce sync.Once
27- stop chan struct {}
28- done chan struct {}
29- ctx context.Context
30- cancel context.CancelFunc
31- force chan struct {}
27+ stop chan struct {}
28+ done chan struct {}
29+ force chan struct {}
30+ }
31+
32+ type option func (r * repeater )
33+
34+ func WithName (name string ) option {
35+ return func (r * repeater ) {
36+ r .name = name
37+ }
38+ }
39+
40+ func WithTrace (trace trace.Driver ) option {
41+ return func (r * repeater ) {
42+ r .trace = trace
43+ }
44+ }
45+
46+ func WithInterval (interval time.Duration ) option {
47+ return func (r * repeater ) {
48+ r .interval = interval
49+ }
3250}
3351
34- // NewRepeater creates and begins to execute task periodically.
35- func NewRepeater (
52+ type event string
53+
54+ const (
55+ eventTick = event ("tick" )
56+ eventForce = event ("force" )
57+ )
58+
59+ // New creates and begins to execute task periodically.
60+ func New (
3661 ctx context.Context ,
37- interval time.Duration ,
3862 task func (ctx context.Context ) (err error ),
63+ opts ... option ,
3964) Repeater {
40- if interval <= 0 {
41- return nil
42- }
43- ctx , cancel := context .WithCancel (ctx )
4465 r := & repeater {
45- interval : interval ,
46- task : task ,
47- timer : timeutil .NewTimer (interval ),
48- stopOnce : sync.Once {},
49- stop : make (chan struct {}),
50- done : make (chan struct {}),
51- ctx : ctx ,
52- cancel : cancel ,
53- force : make (chan struct {}),
66+ task : task ,
67+ stop : make (chan struct {}),
68+ done : make (chan struct {}),
69+ force : make (chan struct {}),
70+ }
71+
72+ for _ , o := range opts {
73+ o (r )
74+ }
75+
76+ if r .interval <= 0 {
77+ return nil
5478 }
55- go r .worker ()
79+
80+ go r .worker (ctx , r .interval )
81+
5682 return r
5783}
5884
5985// Stop stops to execute its task.
6086func (r * repeater ) Stop () {
61- r .stopOnce .Do (func () {
62- close (r .stop )
63- r .cancel ()
64- <- r .done
65- })
87+ close (r .stop )
6688}
6789
6890func (r * repeater ) Force () {
@@ -72,30 +94,32 @@ func (r *repeater) Force() {
7294 }
7395}
7496
75- func (r * repeater ) singleTask () {
76- if err := r .task (r .ctx ); err != nil {
77- r .timer .Reset (time .Second )
78- } else {
79- r .timer .Reset (r .interval )
80- }
97+ func (r * repeater ) wakeUp (ctx context.Context , e event ) {
98+ var cancel context.CancelFunc
99+ ctx , cancel = context .WithCancel (ctx )
100+ defer cancel ()
101+
102+ trace .DriverOnRepeaterWakeUp (
103+ r .trace ,
104+ & ctx ,
105+ r .name ,
106+ string (e ),
107+ )(r .task (ctx ))
81108}
82109
83- func (r * repeater ) worker () {
84- defer func () {
85- close (r .done )
86- }()
87- r .singleTask ()
110+ func (r * repeater ) worker (ctx context.Context , interval time.Duration ) {
111+ defer close (r .done )
112+
88113 for {
89114 select {
115+ case <- ctx .Done ():
116+ return
90117 case <- r .stop :
91118 return
92- case <- r . timer . C ( ):
93- r .singleTask ( )
119+ case <- time . After ( interval ):
120+ r .wakeUp ( ctx , eventTick )
94121 case <- r .force :
95- if ! r .timer .Stop () {
96- <- r .timer .C ()
97- }
98- r .timer .Reset (r .interval )
122+ r .wakeUp (ctx , eventForce )
99123 }
100124 }
101125}
0 commit comments