Skip to content

Commit 6c78337

Browse files
Added transactions streaming functions - tested completely
1 parent c4478fa commit 6c78337

File tree

1 file changed

+112
-3
lines changed

1 file changed

+112
-3
lines changed

internal/session/stream_user.go

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ package session
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"time"
78

9+
"github.com/PythonHacker24/linux-acl-management-backend/internal/types"
810
"github.com/gorilla/websocket"
911
"github.com/redis/go-redis/v9"
1012
)
1113

14+
/* ==== User Session ==== */
15+
1216
/* send current session of a user */
1317
func (m *Manager) sendCurrentSession(conn *websocket.Conn, sessionID string) error {
1418
ctx := context.Background()
@@ -50,10 +54,12 @@ func (m *Manager) sendCurrentSession(conn *websocket.Conn, sessionID string) err
5054

5155
/* send data regarding current session */
5256
func (m *Manager) listenForSessionChanges(ctx context.Context, conn *websocket.Conn, sessionID string) {
57+
/* subscribe to both keyspace and keyevent notifications */
5358
keyspacePattern := fmt.Sprintf("__keyspace@0__:session:%s", sessionID)
59+
keyeventPattern := fmt.Sprintf("__keyevent@0__:hset:session:%s", sessionID)
5460

55-
/* subscribe to Redis keyspace */
56-
pubsub, err := m.redis.PSubscribe(ctx, keyspacePattern)
61+
/* subscribe to Redis keyspace and keyevent */
62+
pubsub, err := m.redis.PSubscribe(ctx, keyspacePattern, keyeventPattern)
5763
if err != nil {
5864
m.errCh <- fmt.Errorf("failed to subscribe to redis events: %w", err)
5965
return
@@ -104,6 +110,109 @@ func (m *Manager) handleSessionChangeEvent(conn *websocket.Conn, sessionID strin
104110
Timestamp: time.Now(),
105111
}
106112

107-
/* send the message payload via websocket */
113+
/* send the message to the client */
114+
return conn.WriteJSON(message)
115+
}
116+
117+
/* ==== User Transaction List ==== */
118+
119+
/* sends list of current transactions of the user */
120+
func (m *Manager) sendCurrentUserTransactions(conn *websocket.Conn, username, sessionID string, limit int) error {
121+
/* get session from manager */
122+
m.mutex.RLock()
123+
session, exists := m.sessionsMap[username]
124+
m.mutex.RUnlock()
125+
126+
/* return if session doesn't exist */
127+
if !exists {
128+
return fmt.Errorf("session not found")
129+
}
130+
131+
/* get latest transactions from Redis */
132+
transactions, err := m.getTransactionResultsRedis(session, limit)
133+
if err != nil {
134+
return fmt.Errorf("failed to fetch transaction results from Redis: %w", err)
135+
}
136+
137+
/* create message payload for websocket */
138+
message := StreamMessage{
139+
Type: "user_transactions",
140+
Data: map[string]interface{}{
141+
"session_id": sessionID,
142+
"transactions": transactions,
143+
},
144+
Timestamp: time.Now(),
145+
}
146+
147+
return conn.WriteJSON(message)
148+
}
149+
150+
/* listen for transaction changes in Redis */
151+
func (m *Manager) listenForTransactionsChanges(ctx context.Context, conn *websocket.Conn, sessionID string) {
152+
/* subscribe to both keyspace and keyevent notifications */
153+
keyspacePattern := fmt.Sprintf("__keyspace@0__:session:%s:txresults", sessionID)
154+
keyeventPattern := fmt.Sprintf("__keyevent@0__:rpush:session:%s:txresults", sessionID)
155+
156+
/* subscribe to Redis keyspace and keyevent */
157+
pubsub, err := m.redis.PSubscribe(ctx, keyspacePattern, keyeventPattern)
158+
if err != nil {
159+
m.errCh <- fmt.Errorf("failed to subscribe to redis events: %w", err)
160+
return
161+
}
162+
163+
defer pubsub.Close()
164+
165+
/* Redis update channel */
166+
ch := pubsub.Channel()
167+
168+
/* handling transaction changes */
169+
for {
170+
select {
171+
case <-ctx.Done():
172+
return
173+
case msg := <-ch:
174+
/* changes in transactions stored in Redis detected; handle the event */
175+
if err := m.handleTransactionChangeEvent(conn, sessionID, msg); err != nil {
176+
m.errCh <- fmt.Errorf("error handling transaction change: %w", err)
177+
}
178+
}
179+
}
180+
}
181+
182+
/* handle transaction change event */
183+
func (m *Manager) handleTransactionChangeEvent(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
184+
ctx := context.Background()
185+
186+
/* get latest transactions */
187+
key := fmt.Sprintf("session:%s:txresults", sessionID)
188+
values, err := m.redis.LRange(ctx, key, -100, -1).Result()
189+
if err != nil {
190+
return fmt.Errorf("failed to get transaction results: %w", err)
191+
}
192+
193+
/* convert each JSON string back into a Transaction */
194+
transactions := make([]types.Transaction, 0, len(values))
195+
for _, val := range values {
196+
var tx types.Transaction
197+
if err := json.Unmarshal([]byte(val), &tx); err != nil {
198+
/* skip malformed results */
199+
continue
200+
}
201+
transactions = append(transactions, tx)
202+
}
203+
204+
/* prepare the message payload */
205+
message := StreamMessage{
206+
Type: "transaction_update",
207+
Data: map[string]interface{}{
208+
"session_id": sessionID,
209+
"transactions": transactions,
210+
"event_type": msg.Payload,
211+
"event_source": "redis_keyspace",
212+
},
213+
Timestamp: time.Now(),
214+
}
215+
216+
/* send the message to the client */
108217
return conn.WriteJSON(message)
109218
}

0 commit comments

Comments
 (0)