Skip to content

Commit 7c2994d

Browse files
Added seperation of pending and active transactions
1 parent 7ab3139 commit 7c2994d

File tree

1 file changed

+122
-16
lines changed

1 file changed

+122
-16
lines changed

internal/session/stream_user.go

Lines changed: 122 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (m *Manager) handleSessionChangeEvent(conn *websocket.Conn, sessionID strin
102102
/* prepare the message payload */
103103
message := StreamMessage{
104104
Type: "session_update",
105-
Data: map[string]interface{}{
105+
Data: map[string]any{
106106
"session_id": sessionID,
107107
"session": session,
108108
"event_type": msg.Payload,
@@ -117,8 +117,8 @@ func (m *Manager) handleSessionChangeEvent(conn *websocket.Conn, sessionID strin
117117

118118
/* ==== User Transaction List ==== */
119119

120-
/* send current user transactions */
121-
func (m *Manager) sendCurrentUserTransactions(conn *websocket.Conn, sessionID string, limit int) error {
120+
/* send current user results transactions */
121+
func (m *Manager) sendCurrentUserTransactionsResults(conn *websocket.Conn, sessionID string, limit int) error {
122122
ctx := context.Background()
123123

124124
/* get latest transactions from Redis */
@@ -142,7 +142,43 @@ func (m *Manager) sendCurrentUserTransactions(conn *websocket.Conn, sessionID st
142142
/* prepare the message payload */
143143
message := StreamMessage{
144144
Type: "transaction_update",
145-
Data: map[string]interface{}{
145+
Data: map[string]any{
146+
"session_id": sessionID,
147+
"transactions": transactions,
148+
},
149+
Timestamp: time.Now(),
150+
}
151+
152+
/* send the message to the client */
153+
return conn.WriteJSON(message)
154+
}
155+
156+
/* send current user pending transactions */
157+
func (m *Manager) sendCurrentUserTransactionsPending(conn *websocket.Conn, sessionID string, limit int) error {
158+
ctx := context.Background()
159+
160+
/* get latest transactions from Redis */
161+
key := fmt.Sprintf("session:%s:txpending", sessionID)
162+
values, err := m.redis.LRange(ctx, key, int64(-limit), -1).Result()
163+
if err != nil {
164+
return fmt.Errorf("failed to get transaction results: %w", err)
165+
}
166+
167+
/* convert each JSON string back into a Transaction */
168+
transactions := make([]types.Transaction, 0, len(values))
169+
for _, val := range values {
170+
var tx types.Transaction
171+
if err := json.Unmarshal([]byte(val), &tx); err != nil {
172+
/* skip malformed results */
173+
continue
174+
}
175+
transactions = append(transactions, tx)
176+
}
177+
178+
/* prepare the message payload */
179+
message := StreamMessage{
180+
Type: "transaction_update",
181+
Data: map[string]any{
146182
"session_id": sessionID,
147183
"transactions": transactions,
148184
},
@@ -153,8 +189,8 @@ func (m *Manager) sendCurrentUserTransactions(conn *websocket.Conn, sessionID st
153189
return conn.WriteJSON(message)
154190
}
155191

156-
/* listen for transaction changes in Redis */
157-
func (m *Manager) listenForTransactionsChanges(ctx context.Context, conn *websocket.Conn, sessionID string) {
192+
/* listen for results transaction changes in Redis */
193+
func (m *Manager) listenForTransactionsChangesResults(ctx context.Context, conn *websocket.Conn, sessionID string) {
158194
/* subscribe to both keyspace and keyevent notifications */
159195
keyspacePattern := fmt.Sprintf("__keyspace@0__:session:%s:txresults", sessionID)
160196
keyeventPattern := fmt.Sprintf("__keyevent@0__:rpush:session:%s:txresults", sessionID)
@@ -178,7 +214,39 @@ func (m *Manager) listenForTransactionsChanges(ctx context.Context, conn *websoc
178214
return
179215
case msg := <-ch:
180216
/* changes in transactions stored in Redis detected; handle the event */
181-
if err := m.handleTransactionChangeEvent(conn, sessionID, msg); err != nil {
217+
if err := m.handleTransactionChangeEventResults(conn, sessionID, msg); err != nil {
218+
m.errCh <- fmt.Errorf("error handling transaction change: %w", err)
219+
}
220+
}
221+
}
222+
}
223+
224+
/* listen for pending transaction changes in Redis */
225+
func (m *Manager) listenForTransactionsChangesPending(ctx context.Context, conn *websocket.Conn, sessionID string) {
226+
/* subscribe to both keyspace and keyevent notifications */
227+
keyspacePattern := fmt.Sprintf("__keyspace@0__:session:%s:txpending", sessionID)
228+
keyeventPattern := fmt.Sprintf("__keyevent@0__:rpush:session:%s:txpending", sessionID)
229+
230+
/* subscribe to Redis keyspace and keyevent */
231+
pubsub, err := m.redis.PSubscribe(ctx, keyspacePattern, keyeventPattern)
232+
if err != nil {
233+
m.errCh <- fmt.Errorf("failed to subscribe to redis events: %w", err)
234+
return
235+
}
236+
237+
defer pubsub.Close()
238+
239+
/* Redis update channel */
240+
ch := pubsub.Channel()
241+
242+
/* handling transaction changes */
243+
for {
244+
select {
245+
case <-ctx.Done():
246+
return
247+
case msg := <-ch:
248+
/* changes in transactions stored in Redis detected; handle the event */
249+
if err := m.handleTransactionChangeEventPending(conn, sessionID, msg); err != nil {
182250
m.errCh <- fmt.Errorf("error handling transaction change: %w", err)
183251
}
184252
}
@@ -190,8 +258,8 @@ func (m *Manager) listenForTransactionsChanges(ctx context.Context, conn *websoc
190258
The whole frontend will be updated even if one transaction changes it's state (for example, setting active to expired).
191259
*/
192260

193-
/* handle transaction change event */
194-
func (m *Manager) handleTransactionChangeEvent(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
261+
/* handle transaction results change event */
262+
func (m *Manager) handleTransactionChangeEventResults(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
195263
ctx := context.Background()
196264

197265
/* get latest transactions */
@@ -215,7 +283,45 @@ func (m *Manager) handleTransactionChangeEvent(conn *websocket.Conn, sessionID s
215283
/* prepare the message payload */
216284
message := StreamMessage{
217285
Type: "transaction_update",
218-
Data: map[string]interface{}{
286+
Data: map[string]any{
287+
"session_id": sessionID,
288+
"transactions": transactions,
289+
"event_type": msg.Payload,
290+
"event_source": "redis_keyspace",
291+
},
292+
Timestamp: time.Now(),
293+
}
294+
295+
/* send the message to the client */
296+
return conn.WriteJSON(message)
297+
}
298+
299+
/* handle transaction pending change event */
300+
func (m *Manager) handleTransactionChangeEventPending(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
301+
ctx := context.Background()
302+
303+
/* get latest transactions */
304+
key := fmt.Sprintf("session:%s:txpending", sessionID)
305+
values, err := m.redis.LRange(ctx, key, -100, -1).Result()
306+
if err != nil {
307+
return fmt.Errorf("failed to get pending transactions: %w", err)
308+
}
309+
310+
/* convert each JSON string back into a Transaction */
311+
transactions := make([]types.Transaction, 0, len(values))
312+
for _, val := range values {
313+
var tx types.Transaction
314+
if err := json.Unmarshal([]byte(val), &tx); err != nil {
315+
/* skip malformed results */
316+
continue
317+
}
318+
transactions = append(transactions, tx)
319+
}
320+
321+
/* prepare the message payload */
322+
message := StreamMessage{
323+
Type: "transaction_update",
324+
Data: map[string]any{
219325
"session_id": sessionID,
220326
"transactions": transactions,
221327
"event_type": msg.Payload,
@@ -253,9 +359,9 @@ func (m *Manager) sendCurrentArchivedSessions(conn *websocket.Conn, username str
253359
exists := len(sessions) > 0
254360

255361
/* convert to plain JSON-compatible slices */
256-
var outgoing []map[string]interface{}
362+
var outgoing []map[string]any
257363
for _, session := range sessions {
258-
outgoing = append(outgoing, map[string]interface{}{
364+
outgoing = append(outgoing, map[string]any{
259365
"id": session.ID,
260366
"username": session.Username,
261367
"ip": session.Ip.String,
@@ -272,7 +378,7 @@ func (m *Manager) sendCurrentArchivedSessions(conn *websocket.Conn, username str
272378

273379
message := StreamMessage{
274380
Type: "session_state",
275-
Data: map[string]interface{}{
381+
Data: map[string]any{
276382
"username": username,
277383
"exists": exists,
278384
"sessions": outgoing,
@@ -308,9 +414,9 @@ func (m *Manager) sendCurrentArchivedPendingTransactions(conn *websocket.Conn, u
308414

309415
exists := len(transactions) > 0
310416

311-
var outgoing []map[string]interface{}
417+
var outgoing []map[string]any
312418
for _, tx := range transactions {
313-
outgoing = append(outgoing, map[string]interface{}{
419+
outgoing = append(outgoing, map[string]any{
314420
"id": tx.ID,
315421
"session_id": tx.SessionID,
316422
"timestamp": tx.Timestamp,
@@ -329,7 +435,7 @@ func (m *Manager) sendCurrentArchivedPendingTransactions(conn *websocket.Conn, u
329435

330436
message := StreamMessage{
331437
Type: "transactions_state",
332-
Data: map[string]interface{}{
438+
Data: map[string]any{
333439
"username": username,
334440
"exists": exists,
335441
"transactions": outgoing,

0 commit comments

Comments
 (0)