Skip to content

Commit bca7636

Browse files
committed
update
1 parent c8e709b commit bca7636

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

goroutine.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"os"
88
"os/signal"
9+
"sync"
910
"time"
1011
)
1112

@@ -42,3 +43,104 @@ func Notify(sig ...os.Signal) chan os.Signal {
4243
signal.Notify(terminate, sig...)
4344
return terminate
4445
}
46+
47+
func NewDelayOnce(delay time.Duration, timeout time.Duration) *DelayOnce {
48+
if timeout <= delay {
49+
panic(`timeout must be greater than delay`)
50+
}
51+
return &DelayOnce{
52+
mp: sync.Map{},
53+
delay: delay,
54+
timeout: timeout,
55+
}
56+
}
57+
58+
// DelayOnce 触发之后延迟一定的时间后再执行。如果在延迟处理的时间段内再次触发,则延迟时间基于此处触发时间顺延
59+
// d := NewDelayOnce(time.Second*5, time.Hour)
60+
// ctx := context.TODO()
61+
// for i:=0; i<10; i++ {
62+
// d.Do(ctx, `key`,func() error { return nil })
63+
// }
64+
type DelayOnce struct {
65+
mp sync.Map
66+
delay time.Duration
67+
timeout time.Duration
68+
}
69+
70+
type eventSession struct {
71+
cancel context.CancelFunc
72+
time time.Time
73+
mutex sync.RWMutex
74+
}
75+
76+
func (e *eventSession) Renew(t time.Time) {
77+
e.mutex.Lock()
78+
e.time = t
79+
e.mutex.Unlock()
80+
}
81+
82+
func (e *eventSession) Time() time.Time {
83+
e.mutex.RLock()
84+
t := e.time
85+
e.mutex.RUnlock()
86+
return t
87+
}
88+
89+
func (d *DelayOnce) checkAndStore(parentCtx context.Context, key string) (exit bool, ctx context.Context) {
90+
v, loaded := d.mp.Load(key)
91+
if loaded {
92+
session := v.(*eventSession)
93+
if time.Since(session.Time()) < d.timeout { // 超过 d.timeout 后重新处理,d.timeout 内记录当前时间
94+
session.Renew(time.Now())
95+
d.mp.Store(key, session)
96+
return true, nil
97+
}
98+
session.cancel()
99+
}
100+
var cancel context.CancelFunc
101+
ctx, cancel = context.WithCancel(parentCtx)
102+
d.mp.Store(key, &eventSession{
103+
cancel: cancel,
104+
time: time.Now(),
105+
})
106+
return false, ctx
107+
}
108+
109+
func (d *DelayOnce) Do(parentCtx context.Context, key string, f func() error) (isNew bool) {
110+
exit, ctx := d.checkAndStore(parentCtx, key)
111+
if exit {
112+
return false
113+
}
114+
go func(key string) {
115+
for {
116+
t := time.NewTicker(time.Second)
117+
defer t.Stop()
118+
select {
119+
case <-ctx.Done():
120+
return
121+
case <-t.C:
122+
if err := d.exec(key, f); err != nil {
123+
log.Println(key+`:`, err)
124+
}
125+
}
126+
}
127+
}(key)
128+
return true
129+
}
130+
131+
func (d *DelayOnce) exec(key string, f func() error) (err error) {
132+
v, ok := d.mp.Load(key)
133+
if !ok {
134+
return
135+
}
136+
session := v.(*eventSession)
137+
if time.Since(session.Time()) > d.delay { // 时间超过d.delay才触发
138+
err = f()
139+
if err != nil {
140+
return
141+
}
142+
d.mp.Delete(key)
143+
session.cancel()
144+
}
145+
return
146+
}

goroutine_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"testing"
78
"time"
89
)
@@ -16,3 +17,22 @@ func TestLoop(t *testing.T) {
1617
time.Sleep(time.Second * 5)
1718
cancel()
1819
}
20+
21+
func TestDelayOnce(t *testing.T) {
22+
d := NewDelayOnce(time.Second*2, time.Hour)
23+
ctx := context.TODO()
24+
wg := sync.WaitGroup{}
25+
for i := 0; i < 10; i++ {
26+
fmt.Println(`Trigger`, time.Now())
27+
isNew := d.Do(ctx, `key`, func() error {
28+
defer wg.Done()
29+
fmt.Println(`Execute`, time.Now())
30+
return nil
31+
})
32+
if isNew {
33+
wg.Add(1)
34+
}
35+
time.Sleep(time.Second * 2)
36+
}
37+
wg.Wait()
38+
}

0 commit comments

Comments
 (0)