Skip to content

Commit 459c878

Browse files
authored
Merge pull request #4 from anantadwi13/dev
implement func wrapper
2 parents f6db0d7 + 73c8480 commit 459c878

File tree

4 files changed

+376
-1
lines changed

4 files changed

+376
-1
lines changed

.github/workflows/code-coverage.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Test and coverage
33
on: [push, pull_request]
44

55
env:
6-
PACKAGES: io
6+
PACKAGES: io, wrapper
77

88
jobs:
99
test:

wrapper/func.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package wrapper
2+
3+
import (
4+
"context"
5+
"errors"
6+
"reflect"
7+
"sync"
8+
"sync/atomic"
9+
)
10+
11+
var (
12+
ErrAlreadyShutdown = errors.New("already shutdown")
13+
)
14+
15+
type HandleFunc func(ctx context.Context, wrapperData *Data)
16+
17+
type Option func(wrapperData *Data)
18+
19+
type Middleware func(next HandleFunc) HandleFunc
20+
21+
type FuncManager interface {
22+
// Run will run the fn synchronously
23+
Run(ctx context.Context, fn HandleFunc, opts ...Option)
24+
// RunAsync will run the fn inside goroutine. No need to spawn the goroutine
25+
RunAsync(ctx context.Context, fn HandleFunc, opts ...Option)
26+
// Wait will wait for the func manager is shutdown
27+
Wait() <-chan struct{}
28+
// Shutdown will force shutdown when the ctx is done
29+
Shutdown(ctx context.Context) error
30+
}
31+
32+
type Data struct {
33+
dataLock sync.RWMutex
34+
data map[interface{}]interface{}
35+
}
36+
37+
func (d *Data) Get(key interface{}) interface{} {
38+
d.dataLock.RLock()
39+
defer d.dataLock.RUnlock()
40+
if d.data == nil {
41+
return nil
42+
}
43+
return d.data[key]
44+
}
45+
46+
func (d *Data) Set(key interface{}, val interface{}) error {
47+
if key == nil {
48+
return errors.New("nil key")
49+
}
50+
if !reflect.TypeOf(key).Comparable() {
51+
return errors.New("key is not comparable")
52+
}
53+
d.dataLock.Lock()
54+
defer d.dataLock.Unlock()
55+
if d.data == nil {
56+
d.data = make(map[interface{}]interface{})
57+
}
58+
d.data[key] = val
59+
return nil
60+
}
61+
62+
type key string
63+
64+
const (
65+
keyIdentifier = key("identifier")
66+
)
67+
68+
func WithOptionIdentifier(funcName string) Option {
69+
return func(data *Data) {
70+
_ = data.Set(keyIdentifier, funcName)
71+
}
72+
}
73+
74+
func GetIdentifier(wrapperData *Data) string {
75+
val, ok := wrapperData.Get(keyIdentifier).(string)
76+
if !ok {
77+
return ""
78+
}
79+
return val
80+
}
81+
82+
func WithMiddlewareRecoverPanic(onPanic func(recoverVal interface{}, wrapperData *Data)) Middleware {
83+
return func(next HandleFunc) HandleFunc {
84+
return func(ctx context.Context, wrapperData *Data) {
85+
defer func() {
86+
val := recover()
87+
if val != nil {
88+
if onPanic != nil {
89+
onPanic(val, wrapperData)
90+
}
91+
}
92+
}()
93+
next(ctx, wrapperData)
94+
}
95+
}
96+
}
97+
98+
type funcManager struct {
99+
wg sync.WaitGroup
100+
isShutdown int32
101+
shutdown chan struct{}
102+
mainCtx context.Context
103+
mainCtxCancel context.CancelFunc
104+
middlewares []Middleware
105+
}
106+
107+
func NewFuncManager(middlewares ...Middleware) FuncManager {
108+
ctx, cancel := context.WithCancel(context.Background())
109+
110+
m := &funcManager{
111+
shutdown: make(chan struct{}),
112+
mainCtx: ctx,
113+
mainCtxCancel: cancel,
114+
middlewares: middlewares,
115+
}
116+
117+
return m
118+
}
119+
120+
func (m *funcManager) Run(ctx context.Context, fn HandleFunc, opts ...Option) {
121+
if atomic.LoadInt32(&m.isShutdown) == 1 {
122+
return
123+
}
124+
125+
m.wg.Add(1)
126+
defer m.wg.Done()
127+
m.run(ctx, fn, opts...)
128+
}
129+
130+
func (m *funcManager) RunAsync(ctx context.Context, fn HandleFunc, opts ...Option) {
131+
if atomic.LoadInt32(&m.isShutdown) == 1 {
132+
return
133+
}
134+
135+
m.wg.Add(1)
136+
go func() {
137+
defer m.wg.Done()
138+
m.run(ctx, fn, opts...)
139+
}()
140+
}
141+
142+
func (m *funcManager) Wait() <-chan struct{} {
143+
return m.shutdown
144+
}
145+
146+
func (m *funcManager) Shutdown(ctx context.Context) error {
147+
if !atomic.CompareAndSwapInt32(&m.isShutdown, 0, 1) {
148+
return ErrAlreadyShutdown
149+
}
150+
151+
defer func() {
152+
close(m.shutdown)
153+
}()
154+
155+
m.mainCtxCancel()
156+
157+
done := make(chan struct{})
158+
go func() {
159+
m.wg.Wait()
160+
close(done)
161+
}()
162+
163+
select {
164+
case <-ctx.Done():
165+
return ctx.Err()
166+
case <-done:
167+
}
168+
169+
return nil
170+
}
171+
172+
func (m *funcManager) run(ctx context.Context, fn HandleFunc, opts ...Option) {
173+
ctx, cancel := context.WithCancel(ctx)
174+
defer cancel()
175+
176+
wrapperData := &Data{}
177+
178+
go func() {
179+
select {
180+
case <-ctx.Done():
181+
case <-m.mainCtx.Done():
182+
cancel()
183+
}
184+
}()
185+
186+
for _, opt := range opts {
187+
opt(wrapperData)
188+
}
189+
190+
for i := len(m.middlewares) - 1; i >= 0; i-- {
191+
fn = m.middlewares[i](fn)
192+
}
193+
194+
fn(ctx, wrapperData)
195+
}

wrapper/func_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package wrapper
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"sync"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
)
12+
13+
func TestFlow1(t *testing.T) {
14+
checker := int32(14)
15+
wg := sync.WaitGroup{}
16+
ctx, cancel := context.WithCancel(context.Background())
17+
defer cancel()
18+
19+
wg.Add(3) // we will run 3 functions
20+
m := NewFuncManager(
21+
func(next HandleFunc) HandleFunc {
22+
return func(ctx context.Context, wrapperData *Data) {
23+
defer wg.Done()
24+
25+
switch GetIdentifier(wrapperData) {
26+
case "check-shutdown", "check-panic", "":
27+
atomic.AddInt32(&checker, -1)
28+
}
29+
atomic.AddInt32(&checker, -1)
30+
log.Println("previous", GetIdentifier(wrapperData))
31+
next(ctx, wrapperData)
32+
atomic.AddInt32(&checker, -1)
33+
log.Println("after", GetIdentifier(wrapperData))
34+
}
35+
},
36+
WithMiddlewareRecoverPanic(func(recoverVal interface{}, wrapperData *Data) {
37+
atomic.AddInt32(&checker, -1)
38+
log.Println("panic", GetIdentifier(wrapperData), recoverVal)
39+
}),
40+
)
41+
42+
runnerCtx, runnerCtxCancel := context.WithTimeout(context.Background(), 1*time.Second)
43+
defer runnerCtxCancel()
44+
45+
m.RunAsync(runnerCtx, func(ctx context.Context, wrapperData *Data) {
46+
<-ctx.Done()
47+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
48+
atomic.AddInt32(&checker, -1)
49+
}
50+
panic("trigger panic" + ctx.Err().Error())
51+
}, WithOptionIdentifier("check-panic"))
52+
53+
m.RunAsync(
54+
context.Background(),
55+
func(ctx context.Context, wrapperData *Data) {
56+
select {
57+
case <-time.After(20 * time.Second):
58+
panic("will not be triggered")
59+
case <-ctx.Done():
60+
atomic.AddInt32(&checker, -1)
61+
log.Println("will be here")
62+
}
63+
},
64+
WithOptionIdentifier("check-shutdown"),
65+
)
66+
67+
m.Run(
68+
context.Background(),
69+
func(ctx context.Context, wrapperData *Data) {
70+
atomic.AddInt32(&checker, -1)
71+
log.Println("executed")
72+
},
73+
)
74+
75+
go func() {
76+
wg.Wait()
77+
cancel()
78+
}()
79+
80+
go func() {
81+
<-time.After(2 * time.Second)
82+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
83+
defer cancel()
84+
err := m.Shutdown(ctx)
85+
if err != nil {
86+
log.Println("shutdown error", err)
87+
return
88+
}
89+
}()
90+
91+
<-m.Wait()
92+
<-ctx.Done()
93+
94+
// running function after shutting down
95+
m.Run(context.Background(), func(ctx context.Context, wrapperData *Data) {
96+
atomic.AddInt32(&checker, -1) // will not be triggered
97+
})
98+
m.RunAsync(context.Background(), func(ctx context.Context, wrapperData *Data) {
99+
// todo
100+
atomic.AddInt32(&checker, -1) // will not be triggered
101+
})
102+
err := m.Shutdown(context.Background())
103+
if errors.Is(err, ErrAlreadyShutdown) {
104+
atomic.AddInt32(&checker, -1)
105+
}
106+
107+
if checker != 0 {
108+
t.Errorf("invalid checker, checker is not 0. checker: %d", checker)
109+
}
110+
}
111+
112+
func TestFlow2(t *testing.T) {
113+
checker := int32(2)
114+
wg := sync.WaitGroup{}
115+
m := NewFuncManager()
116+
117+
wg.Add(1)
118+
m.RunAsync(context.Background(), func(ctx context.Context, wrapperData *Data) {
119+
defer wg.Done()
120+
<-time.After(2 * time.Second)
121+
atomic.AddInt32(&checker, -1)
122+
})
123+
124+
ctxShutdown, cancelCtxShutdown := context.WithTimeout(context.Background(), 1*time.Second)
125+
defer cancelCtxShutdown()
126+
err := m.Shutdown(ctxShutdown)
127+
if errors.Is(err, context.DeadlineExceeded) {
128+
atomic.AddInt32(&checker, -1)
129+
}
130+
131+
wg.Wait()
132+
133+
if checker != 0 {
134+
t.Errorf("invalid checker, checker is not 0. checker: %d", checker)
135+
}
136+
}
137+
138+
func TestData(t *testing.T) {
139+
checker := int32(6)
140+
data := &Data{}
141+
142+
// key is nil
143+
err := data.Set(nil, 123)
144+
if err != nil {
145+
checker--
146+
}
147+
148+
// non comparable
149+
err = data.Set([]string{"a"}, 123)
150+
if err != nil {
151+
checker--
152+
}
153+
154+
// key as string
155+
err = data.Set("abc", 123)
156+
if err == nil {
157+
checker--
158+
}
159+
160+
// key as another string type
161+
type key string
162+
err = data.Set(key("abc"), 456)
163+
if err == nil {
164+
checker--
165+
}
166+
167+
if data.Get("abc") == 123 {
168+
checker--
169+
}
170+
if data.Get(key("abc")) == 456 {
171+
checker--
172+
}
173+
174+
if checker != 0 {
175+
t.Errorf("invalid checker, checker is not 0. checker: %d", checker)
176+
}
177+
}

wrapper/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/anantadwi13/go-sdk/wrapper
2+
3+
go 1.14

0 commit comments

Comments
 (0)