Skip to content

Commit f03bfd3

Browse files
kelindarCopilot
andauthored
Fix ticker leak and improve readme (#6)
* Add MIT License and enhance README with usage details; refactor event dispatcher for improved subscription handling * Refactor consumer's Listen method to optimize queue handling and improve readability * Add defer statement to stop ticker in group.Process method * Update default_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent b0d5892 commit f03bfd3

File tree

4 files changed

+115
-54
lines changed

4 files changed

+115
-54
lines changed

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2025 Roman Atachiants
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,26 @@
77
<a href="https://coveralls.io/github/kelindar/event"><img src="https://coveralls.io/repos/github/kelindar/event/badge.svg" alt="Coverage"></a>
88
</p>
99

10+
## Fast, In-Process Event Dispatcher
11+
12+
This package offers a high-performance, **in-process event dispatcher** for Go, ideal for decoupling modules and enabling asynchronous event handling. It supports both synchronous and asynchronous processing, focusing on speed and simplicity.
13+
- **High Performance:** Processes millions of events per second, about **10x faster** than channels.
14+
- **Generic:** Works with any type implementing the `Event` interface.
15+
- **Asynchronous:** Each subscriber runs in its own goroutine, ensuring non-blocking event handling.
16+
17+
**Use When:**
18+
- ✅ Decoupling modules within a single Go process.
19+
- ✅ Implementing lightweight pub/sub or event-driven patterns.
20+
- ✅ Needing high-throughput, low-latency event dispatching.
21+
- ✅ Preferring a simple, dependency-free solution.
22+
23+
**Not For:**
24+
- ❌ Inter-process/service communication (use Kafka, NATS, etc.).
25+
- ❌ Event persistence, durability, or advanced routing/filtering.
26+
- ❌ Cross-language/platform scenarios.
27+
- ❌ Event replay, dead-letter queues, or deduplication.
28+
- ❌ Heavy subscribe/unsubscribe churn or massive dynamic subscriber counts.
29+
1030
## Generic In-Process Pub/Sub
1131

1232
This repository contains a **simple, in-process event dispatcher** to be used to decouple internal modules. It provides a generic way to define events, publish and subscribe to them.
@@ -78,12 +98,18 @@ It should output something along these lines, where order is not guaranteed give
7898

7999
## Benchmarks
80100

101+
Please note that the benchmarks are run on a 13th Gen Intel(R) Core(TM) i7-13700K CPU, and results may vary based on the machine and environment. This one demonstrates the publishing throughput of the event dispatcher, at different number of event types and subscribers.
102+
81103
```
82104
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
83-
BenchmarkEvent/1x1-24 164887946 14.55 ns/op 68.70 million/s 0 B/op 0 allocs/op
84-
BenchmarkEvent/1x10-24 28896586 80.06 ns/op 88.02 million/s 204 B/op 0 allocs/op
85-
BenchmarkEvent/1x100-24 1535168 1397 ns/op 71.57 million/s 26 B/op 0 allocs/op
86-
BenchmarkEvent/10x1-24 14288467 256.1 ns/op 39.03 million/s 0 B/op 0 allocs/op
87-
BenchmarkEvent/10x10-24 1624722 1265 ns/op 78.59 million/s 65 B/op 0 allocs/op
88-
BenchmarkEvent/10x100-24 164623 12767 ns/op 78.33 million/s 456 B/op 0 allocs/op
105+
BenchmarkEvent/1x1-24 14.72 ns/op 67.95 million/s 0 B/op 0 allocs/op
106+
BenchmarkEvent/1x10-24 84.61 ns/op 90.93 million/s 196 B/op 0 allocs/op
107+
BenchmarkEvent/1x100-24 1409 ns/op 70.95 million/s 10 B/op 0 allocs/op
108+
BenchmarkEvent/10x1-24 155.3 ns/op 64.38 million/s 0 B/op 0 allocs/op
109+
BenchmarkEvent/10x10-24 1315 ns/op 76.05 million/s 30 B/op 0 allocs/op
110+
BenchmarkEvent/10x100-24 13541 ns/op 73.84 million/s 210 B/op 0 allocs/op
89111
```
112+
113+
# License
114+
115+
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

default_test.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ import (
1414
)
1515

1616
/*
17+
go test -bench=. -benchmem -benchtime=10s
1718
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
18-
BenchmarkEvent/1x1-24 38709926 31.94 ns/op 30.89 million/s 1 B/op 0 allocs/op
19-
BenchmarkEvent/1x10-24 8107938 133.7 ns/op 74.76 million/s 45 B/op 0 allocs/op
20-
BenchmarkEvent/1x100-24 774168 1341 ns/op 72.65 million/s 373 B/op 0 allocs/op
21-
BenchmarkEvent/10x1-24 5755402 301.1 ns/op 32.98 million/s 7 B/op 0 allocs/op
22-
BenchmarkEvent/10x10-24 750022 1503 ns/op 64.47 million/s 438 B/op 0 allocs/op
23-
BenchmarkEvent/10x100-24 69363 14878 ns/op 67.11 million/s 3543 B/op 0 allocs/op
19+
BenchmarkEvent/1x1-24 814403188 14.72 ns/op 67.95 million/s 0 B/op 0 allocs/op
20+
BenchmarkEvent/1x10-24 161012098 84.61 ns/op 90.93 million/s 196 B/op 0 allocs/op
21+
BenchmarkEvent/1x100-24 7890922 1409 ns/op 70.95 million/s 10 B/op 0 allocs/op
22+
BenchmarkEvent/10x1-24 72358305 155.3 ns/op 64.38 million/s 0 B/op 0 allocs/op
23+
BenchmarkEvent/10x10-24 7632547 1315 ns/op 76.05 million/s 30 B/op 0 allocs/op
24+
BenchmarkEvent/10x100-24 832560 13541 ns/op 73.84 million/s 210 B/op 0 allocs/op
2425
*/
2526
func BenchmarkEvent(b *testing.B) {
2627
for _, topics := range []int{1, 10} {
@@ -52,6 +53,23 @@ func BenchmarkEvent(b *testing.B) {
5253
}
5354
}
5455

56+
/*
57+
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
58+
BenchmarkSubcribeConcurrent-24 1826686 606.3 ns/op 1648 B/op 5 allocs/op
59+
*/
60+
func BenchmarkSubscribeConcurrent(b *testing.B) {
61+
d := NewDispatcher()
62+
b.ReportAllocs()
63+
b.ResetTimer()
64+
65+
b.RunParallel(func(pb *testing.PB) {
66+
for pb.Next() {
67+
unsub := Subscribe(d, func(ev MyEvent1) {})
68+
unsub()
69+
}
70+
})
71+
}
72+
5573
func TestDefaultPublish(t *testing.T) {
5674
var wg sync.WaitGroup
5775

event.go

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Event interface {
2222
// registry holds an immutable sorted array of event mappings
2323
type registry struct {
2424
keys []uint32 // Event types (sorted)
25-
subs []any // Corresponding subscribers
25+
grps []any // Corresponding subscribers
2626
}
2727

2828
// ------------------------------------- Dispatcher -------------------------------------
@@ -44,7 +44,7 @@ func NewDispatcher() *Dispatcher {
4444

4545
d.subs.Store(&registry{
4646
keys: make([]uint32, 0, 16),
47-
subs: make([]any, 0, 16),
47+
grps: make([]any, 0, 16),
4848
})
4949
return d
5050
}
@@ -82,7 +82,7 @@ func (d *Dispatcher) findGroup(eventType uint32) any {
8282
}
8383

8484
if left < len(keys) && keys[left] == eventType {
85-
return reg.subs[left]
85+
return reg.grps[left]
8686
}
8787
return nil
8888
}
@@ -116,35 +116,31 @@ func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T))
116116
grp := &group[T]{cond: sync.NewCond(new(sync.Mutex))}
117117
sub := grp.Add(handler)
118118

119-
// Copy-on-write with CAS loop: insert new entry in sorted position
120-
for {
121-
old := broker.subs.Load()
122-
idx := sort.Search(len(old.keys), func(i int) bool {
123-
return old.keys[i] >= eventType
124-
})
125-
126-
// Create new arrays with space for one more element
127-
newKeys := make([]uint32, len(old.keys)+1)
128-
newSubs := make([]any, len(old.subs)+1)
129-
130-
// Copy elements before insertion point
131-
copy(newKeys[:idx], old.keys[:idx])
132-
copy(newSubs[:idx], old.subs[:idx])
133-
134-
// Insert new element
135-
newKeys[idx] = eventType
136-
newSubs[idx] = grp
137-
138-
// Copy elements after insertion point
139-
copy(newKeys[idx+1:], old.keys[idx:])
140-
copy(newSubs[idx+1:], old.subs[idx:])
141-
142-
// Atomically swap the registry
143-
newReg := &registry{keys: newKeys, subs: newSubs}
144-
if broker.subs.CompareAndSwap(old, newReg) {
145-
break
146-
}
147-
}
119+
// Copy-on-write: insert new entry in sorted position
120+
old := broker.subs.Load()
121+
idx := sort.Search(len(old.keys), func(i int) bool {
122+
return old.keys[i] >= eventType
123+
})
124+
125+
// Create new arrays with space for one more element
126+
newKeys := make([]uint32, len(old.keys)+1)
127+
newGrps := make([]any, len(old.grps)+1)
128+
129+
// Copy elements before insertion point
130+
copy(newKeys[:idx], old.keys[:idx])
131+
copy(newGrps[:idx], old.grps[:idx])
132+
133+
// Insert new element
134+
newKeys[idx] = eventType
135+
newGrps[idx] = grp
136+
137+
// Copy elements after insertion point
138+
copy(newKeys[idx+1:], old.keys[idx:])
139+
copy(newGrps[idx+1:], old.grps[idx:])
140+
141+
// Atomically store the new registry (mutex ensures no concurrent writers)
142+
newReg := &registry{keys: newKeys, grps: newGrps}
143+
broker.subs.Store(newReg)
148144

149145
// Start processing
150146
go grp.Process(broker.df, broker.done)
@@ -205,14 +201,13 @@ func (s *consumer[T]) Listen(c *sync.Cond, fn func(T)) {
205201

206202
// Swap buffers and reset the current queue
207203
temp := s.queue
208-
s.queue = pending
204+
s.queue = pending[:0]
209205
pending = temp
210-
s.queue = s.queue[:0]
211206
c.L.Unlock()
212207

213208
// Outside of the critical section, process the work
214-
for i := 0; i < len(pending); i++ {
215-
fn(pending[i])
209+
for _, event := range pending {
210+
fn(event)
216211
}
217212
}
218213
}
@@ -228,6 +223,7 @@ type group[T Event] struct {
228223
// Process periodically broadcasts events
229224
func (s *group[T]) Process(interval time.Duration, done chan struct{}) {
230225
ticker := time.NewTicker(interval)
226+
defer ticker.Stop()
231227
for {
232228
select {
233229
case <-done:
@@ -250,7 +246,7 @@ func (s *group[T]) Broadcast(ev T) {
250246
// Add adds a subscriber to the list
251247
func (s *group[T]) Add(handler func(T)) *consumer[T] {
252248
sub := &consumer[T]{
253-
queue: make([]T, 0, 128),
249+
queue: make([]T, 0, 64),
254250
}
255251

256252
// Add the consumer to the list of active consumers
@@ -270,13 +266,13 @@ func (s *group[T]) Del(sub *consumer[T]) {
270266

271267
// Search and remove the subscriber
272268
sub.stop = true
273-
subs := make([]*consumer[T], 0, len(s.subs))
274-
for _, v := range s.subs {
275-
if v != sub {
276-
subs = append(subs, v)
269+
for i, v := range s.subs {
270+
if v == sub {
271+
copy(s.subs[i:], s.subs[i+1:])
272+
s.subs = s.subs[:len(s.subs)-1]
273+
break
277274
}
278275
}
279-
s.subs = subs
280276
}
281277

282278
// ------------------------------------- Debugging -------------------------------------

0 commit comments

Comments
 (0)