Skip to content

Commit 29e2acd

Browse files
feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution (#1855)
1 parent ae20488 commit 29e2acd

File tree

3 files changed

+330
-0
lines changed

3 files changed

+330
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5656
- [#1838](https://github.com/NibiruChain/nibiru/pull/1838) - feat(eth): Go-ethereum, crypto, encoding, and unit tests for evm/types
5757
- [#1841](https://github.com/NibiruChain/nibiru/pull/1841) - feat(eth): Collections encoders for bytes, Ethereum addresses, and Ethereum hashes
5858
- [#1847](https://github.com/NibiruChain/nibiru/pull/1847) - fix(docker-chaosnet): release snapshot docker build failed CI.
59+
- [#1855](https://github.com/NibiruChain/nibiru/pull/1855) - feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution
5960

6061
#### Dapp modules: perp, spot, etc
6162

eth/rpc/pubsub/pubsub.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright (c) 2023-2024 Nibi, Inc.
2+
package pubsub
3+
4+
import (
5+
"sync"
6+
"sync/atomic"
7+
8+
"github.com/pkg/errors"
9+
10+
coretypes "github.com/cometbft/cometbft/rpc/core/types"
11+
)
12+
13+
type UnsubscribeFunc func()
14+
15+
// EventBus manages topics and subscriptions. A "topic" is a named channel of
16+
// communication. A "subscription" is the action taken by a subscriber to express
17+
// interest in receiving messages broadcasted from a specific topic.
18+
type EventBus interface {
19+
// AddTopic: Adds a new topic with the specified name and message source
20+
AddTopic(name string, src <-chan coretypes.ResultEvent) error
21+
// RemoveTopic: Removes the specified topic and all its related data,
22+
// ensuring clean up of resources.
23+
RemoveTopic(name string)
24+
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
25+
Topics() []string
26+
}
27+
28+
// memEventBus is an implemention of the `EventBus` interface.
29+
type memEventBus struct {
30+
topics map[string]<-chan coretypes.ResultEvent
31+
topicsMux *sync.RWMutex
32+
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
33+
subscribersMux *sync.RWMutex
34+
currentUniqueID uint64
35+
}
36+
37+
// NewEventBus returns a fresh imlpemention of `memEventBus`, which implements
38+
// the `EventBus` interface for managing Ethereum topics and subscriptions.
39+
func NewEventBus() EventBus {
40+
return &memEventBus{
41+
topics: make(map[string]<-chan coretypes.ResultEvent),
42+
topicsMux: new(sync.RWMutex),
43+
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
44+
subscribersMux: new(sync.RWMutex),
45+
}
46+
}
47+
48+
// GenUniqueID atomically increments and returns a unique identifier for a new subscriber.
49+
// This ID is used internally to manage subscriber-specific channels.
50+
func (m *memEventBus) GenUniqueID() uint64 {
51+
return atomic.AddUint64(&m.currentUniqueID, 1)
52+
}
53+
54+
// Topics returns a list of all topics currently managed by the EventBus. The
55+
// list is safe for concurrent access and is a snapshot of current topic names.
56+
func (m *memEventBus) Topics() (topics []string) {
57+
m.topicsMux.RLock()
58+
defer m.topicsMux.RUnlock()
59+
60+
topics = make([]string, 0, len(m.topics))
61+
for topicName := range m.topics {
62+
topics = append(topics, topicName)
63+
}
64+
65+
return topics
66+
}
67+
68+
// AddTopic adds a new topic with the specified name and message source
69+
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
70+
m.topicsMux.RLock()
71+
_, ok := m.topics[name]
72+
m.topicsMux.RUnlock()
73+
74+
if ok {
75+
return errors.New("topic already registered")
76+
}
77+
78+
m.topicsMux.Lock()
79+
m.topics[name] = src
80+
m.topicsMux.Unlock()
81+
82+
go m.publishTopic(name, src)
83+
84+
return nil
85+
}
86+
87+
// RemoveTopic: Removes the specified topic and all its related data, ensuring
88+
// clean up of resources.
89+
func (m *memEventBus) RemoveTopic(name string) {
90+
m.topicsMux.Lock()
91+
delete(m.topics, name)
92+
m.topicsMux.Unlock()
93+
}
94+
95+
// Subscribe attempts to create a subscription to the specified topic. It returns
96+
// a channel to receive messages, a function to unsubscribe, and an error if the
97+
// topic does not exist.
98+
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
99+
m.topicsMux.RLock()
100+
_, ok := m.topics[name]
101+
m.topicsMux.RUnlock()
102+
103+
if !ok {
104+
return nil, nil, errors.Errorf("topic not found: %s", name)
105+
}
106+
107+
ch := make(chan coretypes.ResultEvent)
108+
m.subscribersMux.Lock()
109+
defer m.subscribersMux.Unlock()
110+
111+
id := m.GenUniqueID()
112+
if _, ok := m.subscribers[name]; !ok {
113+
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
114+
}
115+
m.subscribers[name][id] = ch
116+
117+
unsubscribe := func() {
118+
m.subscribersMux.Lock()
119+
defer m.subscribersMux.Unlock()
120+
delete(m.subscribers[name], id)
121+
}
122+
123+
return ch, unsubscribe, nil
124+
}
125+
126+
func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
127+
for {
128+
msg, ok := <-src
129+
if !ok {
130+
m.closeAllSubscribers(name)
131+
m.topicsMux.Lock()
132+
delete(m.topics, name)
133+
m.topicsMux.Unlock()
134+
return
135+
}
136+
m.publishAllSubscribers(name, msg)
137+
}
138+
}
139+
140+
// closeAllSubscribers closes all subscriber channels associated with the
141+
// specified topic and removes the topic from the subscribers map. This function
142+
// is typically called when a topic is deleted or no longer available to ensure
143+
// all resources are released properly and to prevent goroutine leaks. It ensures
144+
// thread-safe execution by locking around the operation.
145+
func (m *memEventBus) closeAllSubscribers(name string) {
146+
m.subscribersMux.Lock()
147+
defer m.subscribersMux.Unlock()
148+
149+
subscribers := m.subscribers[name]
150+
delete(m.subscribers, name)
151+
// #nosec G705
152+
for _, sub := range subscribers {
153+
close(sub)
154+
}
155+
}
156+
157+
// publishAllSubscribers sends a message to all subscribers of the specified
158+
// topic. It uses a non-blocking send operation to deliver the message to
159+
// subscriber channels. If a subscriber's channel is not ready to receive the
160+
// message (i.e., the channel is full), the message is skipped for that
161+
// subscriber to avoid blocking the publisher. This function ensures thread-safe
162+
// access to subscribers by using a read lock.
163+
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) {
164+
m.subscribersMux.RLock()
165+
defer m.subscribersMux.RUnlock()
166+
subscribers := m.subscribers[name]
167+
// #nosec G705
168+
for _, sub := range subscribers {
169+
select {
170+
case sub <- msg:
171+
default:
172+
}
173+
}
174+
}

eth/rpc/pubsub/pubsub_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package pubsub
2+
3+
import (
4+
"log"
5+
"sort"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
rpccore "github.com/cometbft/cometbft/rpc/core/types"
11+
"github.com/stretchr/testify/require"
12+
"github.com/stretchr/testify/suite"
13+
)
14+
15+
// subscribeAndPublish: Helper function used to perform concurrent subscription
16+
// and publishing actions. It concurrently subscribes multiple clients to the
17+
// specified topic and simultanesouly sends an empty message to the topic channel
18+
// for each subscription.
19+
func subscribeAndPublish(t *testing.T, eb EventBus, topic string, topicChan chan rpccore.ResultEvent) {
20+
var (
21+
wg sync.WaitGroup
22+
subscribersCount = 50
23+
emptyMsg = rpccore.ResultEvent{}
24+
)
25+
for i := 0; i < subscribersCount; i++ {
26+
wg.Add(1)
27+
// concurrently subscribe to the topic
28+
go func() {
29+
defer wg.Done()
30+
_, _, err := eb.Subscribe(topic)
31+
require.NoError(t, err)
32+
}()
33+
34+
// send events to the topic
35+
wg.Add(1)
36+
go func() {
37+
defer wg.Done()
38+
topicChan <- emptyMsg
39+
}()
40+
}
41+
wg.Wait()
42+
}
43+
44+
type SuitePubsub struct {
45+
suite.Suite
46+
}
47+
48+
func TestSuitePubsub(t *testing.T) {
49+
suite.Run(t, new(SuitePubsub))
50+
}
51+
52+
func (s *SuitePubsub) TestAddTopic() {
53+
q := NewEventBus()
54+
// dummy vars
55+
topicA := "guard"
56+
topicB := "cream"
57+
58+
s.NoError(q.AddTopic(topicA, make(<-chan rpccore.ResultEvent)))
59+
s.NoError(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))
60+
s.Error(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent)))
61+
62+
topics := q.Topics()
63+
sort.Strings(topics) // cream should be first
64+
s.Require().EqualValues([]string{topicB, topicA}, topics)
65+
}
66+
67+
func (s *SuitePubsub) TestSubscribe() {
68+
q := NewEventBus()
69+
70+
// dummy vars
71+
topicA := "0xfoo"
72+
topicB := "blockchain"
73+
74+
srcA := make(chan rpccore.ResultEvent)
75+
err := q.AddTopic(topicA, srcA)
76+
s.NoError(err)
77+
78+
srcB := make(chan rpccore.ResultEvent)
79+
err = q.AddTopic(topicB, srcB)
80+
s.NoError(err)
81+
82+
// subscriber channels
83+
subChanA, _, err := q.Subscribe(topicA)
84+
s.NoError(err)
85+
subChanB1, _, err := q.Subscribe(topicB)
86+
s.NoError(err)
87+
subChanB2, _, err := q.Subscribe(topicB)
88+
s.NoError(err)
89+
90+
wg := new(sync.WaitGroup)
91+
wg.Add(4)
92+
93+
emptyMsg := rpccore.ResultEvent{}
94+
go func() {
95+
defer wg.Done()
96+
msg := <-subChanA
97+
log.Println(topicA+":", msg)
98+
s.EqualValues(emptyMsg, msg)
99+
}()
100+
101+
go func() {
102+
defer wg.Done()
103+
msg := <-subChanB1
104+
log.Println(topicB+":", msg)
105+
s.EqualValues(emptyMsg, msg)
106+
}()
107+
108+
go func() {
109+
defer wg.Done()
110+
msg := <-subChanB2
111+
log.Println(topicB+"2:", msg)
112+
s.EqualValues(emptyMsg, msg)
113+
}()
114+
115+
go func() {
116+
defer wg.Done()
117+
118+
time.Sleep(time.Second)
119+
120+
close(srcA)
121+
close(srcB)
122+
}()
123+
124+
wg.Wait()
125+
time.Sleep(time.Second)
126+
}
127+
128+
// TestConcurrentSubscribeAndPublish: Stress tests the module to make sure that
129+
// operations are handled properly under concurrent access.
130+
func (s *SuitePubsub) TestConcurrentSubscribeAndPublish() {
131+
var (
132+
wg sync.WaitGroup
133+
eb = NewEventBus()
134+
topicName = "topic-name"
135+
topicCh = make(chan rpccore.ResultEvent)
136+
runsCount = 5
137+
)
138+
139+
err := eb.AddTopic(topicName, topicCh)
140+
s.Require().NoError(err)
141+
142+
for i := 0; i < runsCount; i++ {
143+
subscribeAndPublish(s.T(), eb, topicName, topicCh)
144+
}
145+
146+
// close channel to make test end
147+
wg.Add(1)
148+
go func() {
149+
defer wg.Done()
150+
time.Sleep(2 * time.Second)
151+
close(topicCh)
152+
}()
153+
154+
wg.Wait()
155+
}

0 commit comments

Comments
 (0)