Skip to content

Commit 5ad072f

Browse files
mgmt: publish message rate handled by session along with log message (#247)
* mgmt: add message per second handled by session along with log message * make messagetracker featured struct with less locks --------- Co-authored-by: Omer Akram <omer@xconn.io>
1 parent 4f0a596 commit 5ad072f

File tree

1 file changed

+88
-2
lines changed

1 file changed

+88
-2
lines changed

peer.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/gobwas/ws"
@@ -34,9 +35,78 @@ func NewBaseSession(id uint64, realm, authID, authRole, authMethod string, authE
3435
authExtra: authExtra,
3536
client: cl,
3637
serializer: serializer,
38+
tracker: newMessageTracker(),
3739
}
3840
}
3941

42+
type messageTracker struct {
43+
started bool
44+
rxCount uint64
45+
txCount uint64
46+
rxRate uint64
47+
txRate uint64
48+
stopTrackCh chan struct{}
49+
mu sync.RWMutex
50+
}
51+
52+
func newMessageTracker() *messageTracker {
53+
return &messageTracker{
54+
stopTrackCh: make(chan struct{}),
55+
}
56+
}
57+
58+
func (m *messageTracker) IncRx() {
59+
atomic.AddUint64(&m.rxCount, 1)
60+
}
61+
62+
func (m *messageTracker) IncTx() {
63+
atomic.AddUint64(&m.txCount, 1)
64+
}
65+
66+
func (m *messageTracker) Rates() (rxRate, txRate uint64) {
67+
m.mu.RLock()
68+
defer m.mu.RUnlock()
69+
return m.rxRate, m.txRate
70+
}
71+
72+
func (m *messageTracker) Start(interval time.Duration) bool {
73+
m.mu.Lock()
74+
defer m.mu.Unlock()
75+
76+
if m.started {
77+
return false
78+
}
79+
m.started = true
80+
81+
go m.trackLoop(interval)
82+
return true
83+
}
84+
85+
func (m *messageTracker) trackLoop(interval time.Duration) {
86+
ticker := time.NewTicker(interval)
87+
defer ticker.Stop()
88+
89+
for {
90+
select {
91+
case <-ticker.C:
92+
rx := atomic.SwapUint64(&m.rxCount, 0)
93+
tx := atomic.SwapUint64(&m.txCount, 0)
94+
95+
m.mu.Lock()
96+
m.rxRate = rx / uint64(interval.Seconds())
97+
m.txRate = tx / uint64(interval.Seconds())
98+
m.mu.Unlock()
99+
100+
case <-m.stopTrackCh:
101+
return
102+
}
103+
}
104+
}
105+
106+
func (m *messageTracker) Stop() {
107+
close(m.stopTrackCh)
108+
}
109+
40110
type baseSession struct {
41111
id uint64
42112
realm string
@@ -52,6 +122,8 @@ type baseSession struct {
52122
publishLogs bool
53123
topic string
54124

125+
tracker *messageTracker
126+
55127
sync.Mutex
56128
}
57129

@@ -64,6 +136,7 @@ func (b *baseSession) AuthExtra() map[string]any {
64136
}
65137

66138
func (b *baseSession) EnableLogPublishing(session *Session, topic string) {
139+
b.tracker.Start(time.Second)
67140
b.Lock()
68141
defer b.Unlock()
69142

@@ -73,6 +146,7 @@ func (b *baseSession) EnableLogPublishing(session *Session, topic string) {
73146
}
74147

75148
func (b *baseSession) DisableLogPublishing() {
149+
b.tracker.Stop()
76150
b.Lock()
77151
defer b.Unlock()
78152

@@ -99,12 +173,18 @@ func (b *baseSession) ReadMessage() (messages.Message, error) {
99173
var pub *PublishRequest
100174
b.Lock()
101175
if b.publishLogs {
102-
pub = b.session.Publish(b.topic).Arg(constructReceivedMsgLog(msg))
176+
rxRate, txRate := b.tracker.Rates()
177+
pub = b.session.Publish(b.topic).Arg(map[string]any{
178+
"message": constructReceivedMsgLog(msg),
179+
"rx_rate": rxRate,
180+
"tx_rate": txRate,
181+
})
103182
}
104183
b.Unlock()
105184

106185
if pub != nil {
107186
_ = pub.Do()
187+
b.tracker.IncRx()
108188
}
109189

110190
return msg, nil
@@ -119,12 +199,18 @@ func (b *baseSession) constructWriteMessage(message messages.Message) ([]byte, e
119199
var pub *PublishRequest
120200
b.Lock()
121201
if b.publishLogs {
122-
pub = b.session.Publish(b.topic).Arg(constructSendingMsgLog(message))
202+
rxRate, txRate := b.tracker.Rates()
203+
pub = b.session.Publish(b.topic).Arg(map[string]any{
204+
"message": constructSendingMsgLog(message),
205+
"rx_rate": rxRate,
206+
"tx_rate": txRate,
207+
})
123208
}
124209
b.Unlock()
125210

126211
if pub != nil {
127212
_ = pub.Do()
213+
b.tracker.IncTx()
128214
}
129215

130216
return payload, nil

0 commit comments

Comments
 (0)