Skip to content

Commit e450861

Browse files
authored
Make API more usable for generic types (#2)
* Make API more usable for generic types * 1.19
1 parent 0a1be6a commit e450861

File tree

6 files changed

+129
-71
lines changed

6 files changed

+129
-71
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
runs-on: ubuntu-latest
1010
strategy:
1111
matrix:
12-
go: ["1.18"]
12+
go: ["1.19"]
1313
steps:
1414
- name: Set up Go ${{ matrix.go }}
1515
uses: actions/setup-go@v3

README.md

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,37 @@ This repository contains a **simple, in-process event dispatcher** to be used to
1414
```go
1515
// Various event types
1616
const EventA = 0x01
17-
const EventB = 0x02
1817

1918
// Event type for testing purposes
2019
type myEvent struct{
21-
kind uint32
2220
Data string
2321
}
2422

2523
// Type returns the event type
2624
func (ev myEvent) Type() uint32 {
27-
return ev.kind
25+
return EventA
2826
}
2927
```
3028

31-
When publishing events, you can create a `Dispatcher[T]` which allows to `Publish()` and `Subscribe()` to various event types.
29+
When publishing events, you can create a `Dispatcher` which is then used as a target of generic `event.Publish[T]()` and `event.Subscribe[T]()` functions to publish and subscribe to various event types respectively.
3230

3331
```go
34-
bus := event.NewDispatcher[Event]()
32+
bus := event.NewDispatcher()
3533

3634
// Subcribe to event A, and automatically unsubscribe at the end
37-
defer bus.Subscribe(EventA, func(e Event) {
35+
defer event.Subscribe(bus, func(e Event) {
3836
println("(consumer 1)", e.Data)
3937
})()
4038

4139
// Subcribe to event A, and automatically unsubscribe at the end
42-
defer bus.Subscribe(EventA, func(e Event) {
40+
defer event.Subscribe(bus, func(e Event) {
4341
println("(consumer 2)", e.Data)
4442
})()
4543

4644
// Publish few events
47-
bus.Publish(newEventA("event 1"))
48-
bus.Publish(newEventA("event 2"))
49-
bus.Publish(newEventA("event 3"))
45+
event.Publish(bus, newEventA("event 1"))
46+
event.Publish(bus, newEventA("event 2"))
47+
event.Publish(bus, newEventA("event 3"))
5048
```
5149

5250
It should output something along these lines, where order is not guaranteed given that both subscribers are processing messages asyncrhonously.

event.go

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ package event
55

66
import (
77
"context"
8+
"fmt"
9+
"reflect"
10+
"strings"
811
"sync"
912
)
1013

@@ -16,56 +19,65 @@ type Event interface {
1619
// ------------------------------------- Dispatcher -------------------------------------
1720

1821
// Dispatcher represents an event dispatcher.
19-
type Dispatcher[T Event] struct {
22+
type Dispatcher struct {
2023
subs sync.Map
2124
}
2225

2326
// NewDispatcher creates a new dispatcher of events.
24-
func NewDispatcher[T Event]() *Dispatcher[T] {
25-
return &Dispatcher[T]{}
27+
func NewDispatcher() *Dispatcher {
28+
return &Dispatcher{}
2629
}
2730

28-
// loadOrStore finds a subscriber group or creates a new one
29-
func (d *Dispatcher[T]) loadOrStore(key uint32) *group[T] {
30-
s, _ := d.subs.LoadOrStore(key, new(group[T]))
31-
return s.(*group[T])
31+
// Subscribe subscribes to an event, the type of the event will be automatically
32+
// inferred from the provided type. Must be constant for this to work.
33+
func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc {
34+
var event T
35+
return SubscribeTo(broker, event.Type(), handler)
3236
}
3337

34-
// Publish writes an event into the dispatcher
35-
func (d *Dispatcher[T]) Publish(ev T) {
36-
if g, ok := d.subs.Load(ev.Type()); ok {
37-
g.(*group[T]).Broadcast(ev)
38-
}
39-
}
40-
41-
// Subscribe subscribes to an callback event
42-
func (d *Dispatcher[T]) Subscribe(eventType uint32, handler func(T)) context.CancelFunc {
38+
// SubscribeTo subscribes to an event with the specified event type.
39+
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
4340
ctx, cancel := context.WithCancel(context.Background())
4441
sub := &consumer[T]{
4542
queue: make(chan T, 1024),
4643
exec: handler,
4744
}
4845

4946
// Add to consumer group, if it doesn't exist it will create one
50-
group := d.loadOrStore(eventType)
47+
s, _ := broker.subs.LoadOrStore(eventType, new(group[T]))
48+
group := groupOf[T](eventType, s)
5149
group.Add(ctx, sub)
5250

5351
// Return unsubscribe function
5452
return func() {
55-
d.unsubscribe(eventType, sub) // Remove from the list
56-
cancel() // Stop async processing
53+
group.Del(sub)
54+
cancel() // Stop async processing
5755
}
5856
}
5957

60-
// Count counts the number of subscribers
61-
func (d *Dispatcher[T]) count(eventType uint32) int {
62-
return len(d.loadOrStore(eventType).subs)
58+
// Publish writes an event into the dispatcher
59+
func Publish[T Event](broker *Dispatcher, ev T) {
60+
if s, ok := broker.subs.Load(ev.Type()); ok {
61+
group := groupOf[T](ev.Type(), s)
62+
group.Broadcast(ev)
63+
}
64+
}
65+
66+
// Count counts the number of subscribers, this is for testing only.
67+
func (d *Dispatcher) count(eventType uint32) int {
68+
if group, ok := d.subs.Load(eventType); ok {
69+
return group.(interface{ Count() int }).Count()
70+
}
71+
return 0
6372
}
6473

65-
// unsubscribe removes the subscriber from the list of subscribers
66-
func (d *Dispatcher[T]) unsubscribe(eventType uint32, sub *consumer[T]) {
67-
group := d.loadOrStore(eventType)
68-
group.Del(sub)
74+
// groupOf casts the subscriber group to the specified generic type
75+
func groupOf[T Event](eventType uint32, subs any) *group[T] {
76+
if group, ok := subs.(*group[T]); ok {
77+
return group
78+
}
79+
80+
panic(errConflict[T](eventType, subs))
6981
}
7082

7183
// ------------------------------------- Subscriber List -------------------------------------
@@ -127,3 +139,27 @@ func (s *group[T]) Del(sub *consumer[T]) {
127139
}
128140
s.subs = subs
129141
}
142+
143+
// ------------------------------------- Debugging -------------------------------------
144+
145+
// Count returns the number of subscribers in this group
146+
func (s *group[T]) Count() int {
147+
return len(s.subs)
148+
}
149+
150+
// String returns string representation of the type
151+
func (s *group[T]) String() string {
152+
typ := reflect.TypeOf(s).String()
153+
idx := strings.LastIndex(typ, "/")
154+
typ = typ[idx+1 : len(typ)-1]
155+
return typ
156+
}
157+
158+
// errConflict returns a conflict message
159+
func errConflict[T any](eventType uint32, existing any) string {
160+
var want T
161+
return fmt.Sprintf(
162+
"conflicting event type, want=<%T>, registered=<%s>, event=0x%v",
163+
want, existing, eventType,
164+
)
165+
}

event_test.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,61 +11,62 @@ import (
1111

1212
/*
1313
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
14-
BenchmarkEvent/1-consumers-8 10021444 119.1 ns/op 10021301 msg 0 B/op 0 allocs/op
15-
BenchmarkEvent/10-consumers-8 799999 1595 ns/op 7999915 msg 0 B/op 0 allocs/op
16-
BenchmarkEvent/100-consumers-8 99048 14308 ns/op 9904769 msg 0 B/op 0 allocs/op
14+
BenchmarkEvent/1-consumers-8 10240418 116.8 ns/op 10240023 msg 0 B/op 0 allocs/op
15+
BenchmarkEvent/10-consumers-8 923197 1396 ns/op 9231961 msg 0 B/op 0 allocs/op
16+
BenchmarkEvent/100-consumers-8 97951 12699 ns/op 9795055 msg 0 B/op 0 allocs/op
1717
*/
1818
func BenchmarkEvent(b *testing.B) {
1919
for _, subs := range []int{1, 10, 100} {
2020
b.Run(fmt.Sprintf("%d-consumers", subs), func(b *testing.B) {
2121
var count uint64
22-
d := NewDispatcher[testEvent]()
22+
d := NewDispatcher()
2323
for i := 0; i < subs; i++ {
24-
defer d.Subscribe(TestEventType, func(ev testEvent) {
24+
defer Subscribe(d, func(ev MyEvent1) {
2525
atomic.AddUint64(&count, 1)
2626
})()
2727
}
2828

2929
b.ReportAllocs()
3030
b.ResetTimer()
3131
for n := 0; n < b.N; n++ {
32-
d.Publish(testEvent{})
32+
Publish(d, MyEvent1{})
3333
}
3434
b.ReportMetric(float64(count), "msg")
3535
})
3636
}
3737
}
3838

3939
func TestPublish(t *testing.T) {
40-
d := NewDispatcher[testEvent]()
40+
d := NewDispatcher()
4141
var wg sync.WaitGroup
4242

4343
// Subscribe
4444
var count int64
45-
defer d.Subscribe(TestEventType, func(ev testEvent) {
45+
defer Subscribe(d, func(ev MyEvent1) {
4646
atomic.AddInt64(&count, 1)
4747
wg.Done()
4848
})()
4949

5050
// Publish
5151
wg.Add(2)
52-
d.Publish(testEvent{})
53-
d.Publish(testEvent{})
52+
Publish(d, MyEvent1{})
53+
Publish(d, MyEvent1{})
5454

5555
// Wait and check
5656
wg.Wait()
5757
assert.Equal(t, int64(2), count)
5858
}
5959

6060
func TestUnsubscribe(t *testing.T) {
61-
d := NewDispatcher[testEvent]()
62-
unsubscribe := d.Subscribe(TestEventType, func(ev testEvent) {
61+
d := NewDispatcher()
62+
assert.Equal(t, 0, d.count(TypeEvent1))
63+
unsubscribe := Subscribe(d, func(ev MyEvent1) {
6364
// Nothing
6465
})
6566

66-
assert.Equal(t, 1, d.count(TestEventType))
67+
assert.Equal(t, 1, d.count(TypeEvent1))
6768
unsubscribe()
68-
assert.Equal(t, 0, d.count(TestEventType))
69+
assert.Equal(t, 0, d.count(TypeEvent1))
6970
}
7071

7172
func TestConcurrent(t *testing.T) {
@@ -74,8 +75,8 @@ func TestConcurrent(t *testing.T) {
7475
var wg sync.WaitGroup
7576
wg.Add(1)
7677

77-
d := NewDispatcher[testEvent]()
78-
defer d.Subscribe(TestEventType, func(ev testEvent) {
78+
d := NewDispatcher()
79+
defer Subscribe(d, func(ev MyEvent1) {
7980
if current := atomic.AddInt64(&count, 1); current == max {
8081
wg.Done()
8182
}
@@ -84,24 +85,49 @@ func TestConcurrent(t *testing.T) {
8485
// Asynchronously publish
8586
go func() {
8687
for i := 0; i < max; i++ {
87-
d.Publish(testEvent{})
88+
Publish(d, MyEvent1{})
8889
}
8990
}()
9091

91-
defer d.Subscribe(TestEventType, func(ev testEvent) {
92+
defer Subscribe(d, func(ev MyEvent1) {
9293
// Subscriber that does nothing
9394
})()
9495

9596
wg.Wait()
9697
assert.Equal(t, max, int(count))
9798
}
9899

99-
// ------------------------------------- Test Event -------------------------------------
100+
func TestSubscribeDifferentType(t *testing.T) {
101+
d := NewDispatcher()
102+
assert.Panics(t, func() {
103+
SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {})
104+
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
105+
})
106+
}
107+
108+
func TestPublishDifferentType(t *testing.T) {
109+
d := NewDispatcher()
110+
assert.Panics(t, func() {
111+
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
112+
Publish(d, MyEvent1{})
113+
})
114+
}
100115

101-
const TestEventType = 0xff
116+
// ------------------------------------- Test Events -------------------------------------
102117

103-
type testEvent struct{}
118+
const (
119+
TypeEvent1 = 0x1
120+
TypeEvent2 = 0x2
121+
)
104122

105-
func (testEvent) Type() uint32 {
106-
return TestEventType
123+
type MyEvent1 struct {
124+
Number int
107125
}
126+
127+
func (t MyEvent1) Type() uint32 { return TypeEvent1 }
128+
129+
type MyEvent2 struct {
130+
Text string
131+
}
132+
133+
func (t MyEvent2) Type() uint32 { return TypeEvent2 }

example/main.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,41 +11,39 @@ import (
1111

1212
// Various event types
1313
const EventA = 0x01
14-
const EventB = 0x02
1514

1615
// Event type for testing purposes
1716
type Event struct {
18-
kind uint32
1917
Data string
2018
}
2119

2220
// Type returns the event type
2321
func (ev Event) Type() uint32 {
24-
return ev.kind
22+
return EventA
2523
}
2624

2725
// newEventA creates a new instance of an event
2826
func newEventA(data string) Event {
29-
return Event{kind: EventA, Data: data}
27+
return Event{Data: data}
3028
}
3129

3230
func main() {
33-
bus := event.NewDispatcher[Event]()
31+
bus := event.NewDispatcher()
3432

3533
// Subcribe to event A, and automatically unsubscribe at the end
36-
defer bus.Subscribe(EventA, func(e Event) {
34+
defer event.SubscribeTo(bus, EventA, func(e Event) {
3735
println("(consumer 1)", e.Data)
3836
})()
3937

4038
// Subcribe to event A, and automatically unsubscribe at the end
41-
defer bus.Subscribe(EventA, func(e Event) {
39+
defer event.SubscribeTo(bus, EventA, func(e Event) {
4240
println("(consumer 2)", e.Data)
4341
})()
4442

4543
// Publish few events
46-
bus.Publish(newEventA("event 1"))
47-
bus.Publish(newEventA("event 2"))
48-
bus.Publish(newEventA("event 3"))
44+
event.Publish(bus, newEventA("event 1"))
45+
event.Publish(bus, newEventA("event 2"))
46+
event.Publish(bus, newEventA("event 3"))
4947

5048
time.Sleep(10 * time.Millisecond)
5149
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/kelindar/event
22

3-
go 1.18
3+
go 1.19
44

55
require github.com/stretchr/testify v1.6.1
66

0 commit comments

Comments
 (0)