Skip to content

Commit e3929b8

Browse files
committed
Merge branch 'master' of https://github.com/kelindar/event
2 parents d91e1f2 + d167a27 commit e3929b8

File tree

4 files changed

+192
-56
lines changed

4 files changed

+192
-56
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ It should output something along these lines, where order is not guaranteed give
7979
## Benchmarks
8080

8181
```
82-
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
83-
BenchmarkEvent/1-consumers-8 10021444 119.1 ns/op 10021301 msg 0 B/op 0 allocs/op
84-
BenchmarkEvent/10-consumers-8 799999 1595 ns/op 7999915 msg 0 B/op 0 allocs/op
85-
BenchmarkEvent/100-consumers-8 99048 14308 ns/op 9904769 msg 0 B/op 0 allocs/op
82+
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
83+
BenchmarkEvent/1x1-24 38709926 31.94 ns/op 30.89 million/s 1 B/op 0 allocs/op
84+
BenchmarkEvent/1x10-24 8107938 133.7 ns/op 74.76 million/s 45 B/op 0 allocs/op
85+
BenchmarkEvent/1x100-24 774168 1341 ns/op 72.65 million/s 373 B/op 0 allocs/op
86+
BenchmarkEvent/10x1-24 5755402 301.1 ns/op 32.98 million/s 7 B/op 0 allocs/op
87+
BenchmarkEvent/10x10-24 750022 1503 ns/op 64.47 million/s 438 B/op 0 allocs/op
88+
BenchmarkEvent/10x100-24 69363 14878 ns/op 67.11 million/s 3543 B/op 0 allocs/op
8689
```

default_test.go

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,47 @@ import (
88
"sync"
99
"sync/atomic"
1010
"testing"
11+
"time"
1112

1213
"github.com/stretchr/testify/assert"
1314
)
1415

1516
/*
1617
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
17-
BenchmarkEmit/1-subs-24 13407880 87.10 ns/op 13.41 million 0 B/op 0 allocs/op
18-
BenchmarkEmit/10-subs-24 1000000 1012 ns/op 10.00 million 0 B/op 0 allocs/op
19-
BenchmarkEmit/100-subs-24 103896 11714 ns/op 10.39 million 0 B/op 0 allocs/op
18+
BenchmarkEvent/1x1-24 38709926 31.94 ns/op 30.89 million/s 1 B/op 0 allocs/op
19+
BenchmarkEvent/1x10-24 8107938 133.7 ns/op 74.76 million/s 45 B/op 0 allocs/op
20+
BenchmarkEvent/1x100-24 774168 1341 ns/op 72.65 million/s 373 B/op 0 allocs/op
21+
BenchmarkEvent/10x1-24 5755402 301.1 ns/op 32.98 million/s 7 B/op 0 allocs/op
22+
BenchmarkEvent/10x10-24 750022 1503 ns/op 64.47 million/s 438 B/op 0 allocs/op
23+
BenchmarkEvent/10x100-24 69363 14878 ns/op 67.11 million/s 3543 B/op 0 allocs/op
2024
*/
21-
func BenchmarkEmit(b *testing.B) {
22-
for _, subs := range []int{1, 10, 100} {
23-
b.Run(fmt.Sprintf("%d-subs", subs), func(b *testing.B) {
24-
var count uint64
25-
for i := 0; i < subs; i++ {
26-
defer On(func(ev MyEvent1) {
27-
atomic.AddUint64(&count, 1)
28-
})()
29-
}
25+
func BenchmarkEvent(b *testing.B) {
26+
for _, topics := range []int{1, 10} {
27+
for _, subs := range []int{1, 10, 100} {
28+
b.Run(fmt.Sprintf("%dx%d", topics, subs), func(b *testing.B) {
29+
var count atomic.Int64
30+
for i := 0; i < subs; i++ {
31+
for id := 10; id < 10+topics; id++ {
32+
defer OnType(uint32(id), func(ev MyEvent3) {
33+
count.Add(1)
34+
})()
35+
}
36+
}
3037

31-
b.ReportAllocs()
32-
b.ResetTimer()
33-
for n := 0; n < b.N; n++ {
34-
Emit(MyEvent1{})
35-
}
36-
b.ReportMetric(float64(count)/1e6, "million")
37-
})
38+
start := time.Now()
39+
b.ReportAllocs()
40+
b.ResetTimer()
41+
for n := 0; n < b.N; n++ {
42+
for id := 10; id < 10+topics; id++ {
43+
Emit(MyEvent3{ID: id})
44+
}
45+
}
46+
47+
elapsed := time.Since(start)
48+
rate := float64(count.Load()) / 1e6 / elapsed.Seconds()
49+
b.ReportMetric(rate, "million/s")
50+
})
51+
}
3852
}
3953
}
4054

event.go

Lines changed: 95 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"reflect"
1010
"strings"
1111
"sync"
12+
"time"
1213
)
1314

1415
// Event represents an event contract
@@ -21,11 +22,32 @@ type Event interface {
2122
// Dispatcher represents an event dispatcher.
2223
type Dispatcher struct {
2324
subs sync.Map
25+
done chan struct{} // Cancellation
26+
df time.Duration // Flush interval
2427
}
2528

2629
// NewDispatcher creates a new dispatcher of events.
2730
func NewDispatcher() *Dispatcher {
28-
return &Dispatcher{}
31+
return &Dispatcher{
32+
df: 500 * time.Microsecond,
33+
done: make(chan struct{}),
34+
}
35+
}
36+
37+
// Close closes the dispatcher
38+
func (d *Dispatcher) Close() error {
39+
close(d.done)
40+
return nil
41+
}
42+
43+
// isClosed returns whether the dispatcher is closed or not
44+
func (d *Dispatcher) isClosed() bool {
45+
select {
46+
case <-d.done:
47+
return true
48+
default:
49+
return false
50+
}
2951
}
3052

3153
// Subscribe subscribes to an event, the type of the event will be automatically
@@ -37,21 +59,25 @@ func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc
3759

3860
// SubscribeTo subscribes to an event with the specified event type.
3961
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
40-
ctx, cancel := context.WithCancel(context.Background())
41-
sub := &consumer[T]{
42-
queue: make(chan T, 1024),
43-
exec: handler,
62+
if broker.isClosed() {
63+
panic(errClosed)
4464
}
4565

4666
// Add to consumer group, if it doesn't exist it will create one
47-
s, _ := broker.subs.LoadOrStore(eventType, new(group[T]))
67+
s, loaded := broker.subs.LoadOrStore(eventType, &group[T]{
68+
cond: sync.NewCond(new(sync.Mutex)),
69+
})
4870
group := groupOf[T](eventType, s)
49-
group.Add(ctx, sub)
71+
sub := group.Add(handler)
72+
73+
// Start flushing asynchronously if we just created a new group
74+
if !loaded {
75+
go group.Process(broker.df, broker.done)
76+
}
5077

5178
// Return unsubscribe function
5279
return func() {
5380
group.Del(sub)
54-
cancel() // Stop async processing
5581
}
5682
}
5783

@@ -80,57 +106,97 @@ func groupOf[T Event](eventType uint32, subs any) *group[T] {
80106
panic(errConflict[T](eventType, subs))
81107
}
82108

83-
// ------------------------------------- Subscriber List -------------------------------------
109+
// ------------------------------------- Subscriber -------------------------------------
84110

85111
// consumer represents a consumer with a message queue
86112
type consumer[T Event] struct {
87-
queue chan T // Message buffer
88-
exec func(T) // Process callback
113+
queue []T // Current work queue
114+
stop bool // Stop signal
89115
}
90116

91117
// Listen listens to the event queue and processes events
92-
func (s *consumer[T]) Listen(ctx context.Context) {
118+
func (s *consumer[T]) Listen(c *sync.Cond, fn func(T)) {
119+
pending := make([]T, 0, 128)
120+
93121
for {
94-
select {
95-
case ev := <-s.queue:
96-
s.exec(ev)
97-
case <-ctx.Done():
98-
return
122+
c.L.Lock()
123+
for len(s.queue) == 0 {
124+
switch {
125+
case s.stop:
126+
c.L.Unlock()
127+
return
128+
default:
129+
c.Wait()
130+
}
131+
}
132+
133+
// Swap buffers and reset the current queue
134+
temp := s.queue
135+
s.queue = pending
136+
pending = temp
137+
s.queue = s.queue[:0]
138+
c.L.Unlock()
139+
140+
// Outside of the critical section, process the work
141+
for i := 0; i < len(pending); i++ {
142+
fn(pending[i])
99143
}
100144
}
101145
}
102146

147+
// ------------------------------------- Subscriber Group -------------------------------------
148+
103149
// group represents a consumer group
104150
type group[T Event] struct {
105-
sync.RWMutex
151+
cond *sync.Cond
106152
subs []*consumer[T]
107153
}
108154

155+
// Process periodically broadcasts events
156+
func (s *group[T]) Process(interval time.Duration, done chan struct{}) {
157+
ticker := time.NewTicker(interval)
158+
for {
159+
select {
160+
case <-done:
161+
return
162+
case <-ticker.C:
163+
s.cond.Broadcast()
164+
}
165+
}
166+
}
167+
109168
// Broadcast sends an event to all consumers
110169
func (s *group[T]) Broadcast(ev T) {
111-
s.RLock()
112-
defer s.RUnlock()
170+
s.cond.L.Lock()
113171
for _, sub := range s.subs {
114-
sub.queue <- ev
172+
sub.queue = append(sub.queue, ev)
115173
}
174+
s.cond.L.Unlock()
116175
}
117176

118177
// Add adds a subscriber to the list
119-
func (s *group[T]) Add(ctx context.Context, sub *consumer[T]) {
120-
go sub.Listen(ctx)
178+
func (s *group[T]) Add(handler func(T)) *consumer[T] {
179+
sub := &consumer[T]{
180+
queue: make([]T, 0, 128),
181+
}
121182

122183
// Add the consumer to the list of active consumers
123-
s.Lock()
184+
s.cond.L.Lock()
124185
s.subs = append(s.subs, sub)
125-
s.Unlock()
186+
s.cond.L.Unlock()
187+
188+
// Start listening
189+
go sub.Listen(s.cond, handler)
190+
return sub
126191
}
127192

128193
// Del removes a subscriber from the list
129194
func (s *group[T]) Del(sub *consumer[T]) {
130-
s.Lock()
131-
defer s.Unlock()
195+
s.cond.L.Lock()
196+
defer s.cond.L.Unlock()
132197

133198
// Search and remove the subscriber
199+
sub.stop = true
134200
subs := make([]*consumer[T], 0, len(s.subs))
135201
for _, v := range s.subs {
136202
if v != sub {
@@ -142,6 +208,8 @@ func (s *group[T]) Del(sub *consumer[T]) {
142208

143209
// ------------------------------------- Debugging -------------------------------------
144210

211+
var errClosed = fmt.Errorf("event dispatcher is closed")
212+
145213
// Count returns the number of subscribers in this group
146214
func (s *group[T]) Count() int {
147215
return len(s.subs)

event_test.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package event
55

66
import (
7+
"fmt"
78
"sync"
89
"sync/atomic"
910
"testing"
@@ -15,21 +16,22 @@ func TestPublish(t *testing.T) {
1516
d := NewDispatcher()
1617
var wg sync.WaitGroup
1718

18-
// Subscribe
19+
// Subscribe, must be received in order
1920
var count int64
2021
defer Subscribe(d, func(ev MyEvent1) {
21-
atomic.AddInt64(&count, 1)
22+
assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number)
2223
wg.Done()
2324
})()
2425

2526
// Publish
26-
wg.Add(2)
27-
Publish(d, MyEvent1{})
28-
Publish(d, MyEvent1{})
27+
wg.Add(3)
28+
Publish(d, MyEvent1{Number: 1})
29+
Publish(d, MyEvent1{Number: 2})
30+
Publish(d, MyEvent1{Number: 3})
2931

3032
// Wait and check
3133
wg.Wait()
32-
assert.Equal(t, int64(2), count)
34+
assert.Equal(t, int64(3), count)
3335
}
3436

3537
func TestUnsubscribe(t *testing.T) {
@@ -88,6 +90,49 @@ func TestPublishDifferentType(t *testing.T) {
8890
})
8991
}
9092

93+
func TestCloseDispatcher(t *testing.T) {
94+
d := NewDispatcher()
95+
defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})()
96+
97+
assert.NoError(t, d.Close())
98+
assert.Panics(t, func() {
99+
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
100+
})
101+
}
102+
103+
func TestMatrix(t *testing.T) {
104+
const amount = 1000
105+
for _, subs := range []int{1, 10, 100} {
106+
for _, topics := range []int{1, 10} {
107+
expected := subs * topics * amount
108+
t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) {
109+
var count atomic.Int64
110+
var wg sync.WaitGroup
111+
wg.Add(expected)
112+
113+
d := NewDispatcher()
114+
for i := 0; i < subs; i++ {
115+
for id := 0; id < topics; id++ {
116+
defer SubscribeTo(d, uint32(id), func(ev MyEvent3) {
117+
count.Add(1)
118+
wg.Done()
119+
})()
120+
}
121+
}
122+
123+
for n := 0; n < amount; n++ {
124+
for id := 0; id < topics; id++ {
125+
go Publish(d, MyEvent3{ID: id})
126+
}
127+
}
128+
129+
wg.Wait()
130+
assert.Equal(t, expected, int(count.Load()))
131+
})
132+
}
133+
}
134+
}
135+
91136
// ------------------------------------- Test Events -------------------------------------
92137

93138
const (
@@ -106,3 +151,9 @@ type MyEvent2 struct {
106151
}
107152

108153
func (t MyEvent2) Type() uint32 { return TypeEvent2 }
154+
155+
type MyEvent3 struct {
156+
ID int
157+
}
158+
159+
func (t MyEvent3) Type() uint32 { return uint32(t.ID) }

0 commit comments

Comments
 (0)