Skip to content

Commit d812dab

Browse files
committed
update
1 parent 6aa30c0 commit d812dab

File tree

2 files changed

+116
-29
lines changed

2 files changed

+116
-29
lines changed

goroutine.go

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"os/signal"
88
"sync"
9+
"sync/atomic"
910
"time"
1011
)
1112

@@ -43,14 +44,19 @@ func Notify(sig ...os.Signal) chan os.Signal {
4344
return terminate
4445
}
4546

46-
func NewDelayOnce(delay time.Duration, timeout time.Duration) *DelayOnce {
47+
func NewDelayOnce(delay time.Duration, timeout time.Duration, debugMode ...bool) *DelayOnce {
4748
if timeout <= delay {
4849
panic(`timeout must be greater than delay`)
4950
}
51+
var debug bool
52+
if len(debugMode) > 0 {
53+
debug = debugMode[0]
54+
}
5055
return &DelayOnce{
5156
mp: sync.Map{},
5257
delay: delay,
5358
timeout: timeout,
59+
debug: debug,
5460
}
5561
}
5662

@@ -65,12 +71,14 @@ type DelayOnce struct {
6571
mp sync.Map
6672
delay time.Duration
6773
timeout time.Duration
74+
debug bool
6875
}
6976

7077
type eventSession struct {
7178
cancel context.CancelFunc
7279
time time.Time
7380
mutex sync.RWMutex
81+
stop chan struct{}
7482
}
7583

7684
func (e *eventSession) Renew(t time.Time) {
@@ -86,58 +94,109 @@ func (e *eventSession) Time() time.Time {
8694
return t
8795
}
8896

89-
func (d *DelayOnce) checkAndStore(parentCtx context.Context, key string) (exit bool, ctx context.Context) {
97+
func (e *eventSession) Cancel() <-chan struct{} {
98+
e.cancel()
99+
return e.stop
100+
}
101+
102+
func (d *DelayOnce) checkAndStore(parentCtx context.Context, key string) (*eventSession, context.Context) {
90103
v, loaded := d.mp.Load(key)
91104
if loaded {
92105
session := v.(*eventSession)
93106
if time.Since(session.Time()) < d.timeout { // 超过 d.timeout 后重新处理,d.timeout 内记录当前时间
94107
session.Renew(time.Now())
95108
d.mp.Store(key, session)
96-
return true, nil
109+
return nil, nil
110+
}
111+
112+
if d.debug {
113+
log.Println(`[DelayOnce] cancel -------------> ` + key)
114+
}
115+
116+
<-session.Cancel()
117+
118+
if d.debug {
119+
log.Println(`[DelayOnce] canceled -------------> ` + key)
97120
}
98-
session.cancel()
99121
}
100-
var cancel context.CancelFunc
101-
ctx, cancel = context.WithCancel(parentCtx)
102-
d.mp.Store(key, &eventSession{
122+
ctx, cancel := context.WithCancel(parentCtx)
123+
session := &eventSession{
103124
cancel: cancel,
104125
time: time.Now(),
105-
})
106-
return false, ctx
126+
stop: make(chan struct{}, 1),
127+
}
128+
d.mp.Store(key, session)
129+
return session, ctx
107130
}
108131

109132
func (d *DelayOnce) Do(parentCtx context.Context, key string, f func() error) (isNew bool) {
110-
exit, ctx := d.checkAndStore(parentCtx, key)
111-
if exit {
133+
session, ctx := d.checkAndStore(parentCtx, key)
134+
if session == nil {
112135
return false
113136
}
114137
go func(key string) {
115138
for {
116139
t := time.NewTicker(time.Second)
117140
defer t.Stop()
118141
select {
119-
case <-ctx.Done():
142+
case <-ctx.Done(): // 如果先进入“<-t.C”分支,会等“<-t.C”分支内的代码执行完毕后才有机会执行本分支
120143
d.mp.Delete(key)
144+
session.stop <- struct{}{}
145+
close(session.stop)
146+
if d.debug {
147+
log.Println(`[DelayOnce] close -------------> ` + key)
148+
}
121149
return
122150
case <-t.C:
123-
if err := d.exec(key, f); err != nil {
124-
log.Println(key+`:`, err)
151+
if time.Since(session.Time()) > d.delay { // 时间超过d.delay才触发
152+
err := f()
153+
session.Cancel()
154+
if err != nil {
155+
log.Println(key+`:`, err)
156+
}
125157
}
126158
}
127159
}
128160
}(key)
129161
return true
130162
}
131163

132-
func (d *DelayOnce) exec(key string, f func() error) (err error) {
133-
v, ok := d.mp.Load(key)
134-
if !ok {
135-
return
136-
}
137-
session := v.(*eventSession)
138-
if time.Since(session.Time()) > d.delay { // 时间超过d.delay才触发
139-
err = f()
164+
func (d *DelayOnce) DoWithState(parentCtx context.Context, key string, f func(func() bool) error) (isNew bool) {
165+
session, ctx := d.checkAndStore(parentCtx, key)
166+
if session == nil {
167+
return false
140168
}
141-
session.cancel()
142-
return
169+
go func(key string) {
170+
var state int32
171+
isAbort := func() bool {
172+
return atomic.LoadInt32(&state) > 0
173+
}
174+
go func() {
175+
<-ctx.Done()
176+
atomic.AddInt32(&state, 1)
177+
}()
178+
for {
179+
t := time.NewTicker(time.Second)
180+
defer t.Stop()
181+
select {
182+
case <-ctx.Done(): // 如果先进入“<-t.C”分支,会等“<-t.C”分支内的代码执行完毕后才有机会执行本分支
183+
d.mp.Delete(key)
184+
session.stop <- struct{}{}
185+
close(session.stop)
186+
if d.debug {
187+
log.Println(`[DelayOnce] close -------------> ` + key)
188+
}
189+
return
190+
case <-t.C:
191+
if time.Since(session.Time()) > d.delay { // 时间超过d.delay才触发
192+
err := f(isAbort)
193+
session.Cancel()
194+
if err != nil {
195+
log.Println(key+`:`, err)
196+
}
197+
}
198+
}
199+
}
200+
}(key)
201+
return true
143202
}

goroutine_test.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"sync"
78
"testing"
89
"time"
@@ -18,15 +19,15 @@ func TestLoop(t *testing.T) {
1819
cancel()
1920
}
2021

21-
func TestDelayOnce(t *testing.T) {
22-
d := NewDelayOnce(time.Second*2, time.Hour)
22+
func TestDelayOnceNormal(t *testing.T) {
23+
d := NewDelayOnce(time.Second*2, time.Hour, true)
2324
ctx := context.TODO()
2425
wg := sync.WaitGroup{}
2526
for i := 0; i < 10; i++ {
26-
fmt.Println(`Trigger`, time.Now())
27-
isNew := d.Do(ctx, `key`, func() error {
27+
log.Println(`Trigger key_normal`)
28+
isNew := d.Do(ctx, `key_normal`, func() error {
2829
defer wg.Done()
29-
fmt.Println(`Execute`, time.Now())
30+
log.Println(`Execute key_normal`)
3031
return nil
3132
})
3233
if isNew {
@@ -36,3 +37,30 @@ func TestDelayOnce(t *testing.T) {
3637
}
3738
wg.Wait()
3839
}
40+
41+
func TestDelayOnceTimeout(t *testing.T) {
42+
d := NewDelayOnce(time.Second*2, time.Second*5, true)
43+
ctx := context.TODO()
44+
wg := sync.WaitGroup{}
45+
for i := 0; i < 10; i++ {
46+
log.Println(`Trigger key_timeout`)
47+
isNew := d.DoWithState(ctx, `key_timeout`, func(isAbort func() bool) error {
48+
defer wg.Done()
49+
for i := 0; i < 4; i++ {
50+
if isAbort() {
51+
log.Println(`------> Stop key_timeout`)
52+
return nil
53+
}
54+
log.Println(`Execute key_timeout`, i)
55+
time.Sleep(time.Second * 5)
56+
}
57+
log.Println(`Execute key_timeout`)
58+
return nil
59+
})
60+
if isNew {
61+
wg.Add(1)
62+
}
63+
time.Sleep(time.Second * 6)
64+
}
65+
wg.Wait()
66+
}

0 commit comments

Comments
 (0)