Skip to content

Commit a707415

Browse files
Updated users streaming
1 parent 2fae117 commit a707415

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

internal/session/stream_user.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,34 +116,39 @@ func (m *Manager) handleSessionChangeEvent(conn *websocket.Conn, sessionID strin
116116

117117
/* ==== User Transaction List ==== */
118118

119-
/* sends list of current transactions of the user */
119+
/* send current user transactions */
120120
func (m *Manager) sendCurrentUserTransactions(conn *websocket.Conn, username, sessionID string, limit int) error {
121-
/* get session from manager */
122-
m.mutex.RLock()
123-
session, exists := m.sessionsMap[username]
124-
m.mutex.RUnlock()
125-
126-
/* return if session doesn't exist */
127-
if !exists {
128-
return fmt.Errorf("session not found")
129-
}
121+
ctx := context.Background()
130122

131123
/* get latest transactions from Redis */
132-
transactions, err := m.getTransactionResultsRedis(session, limit)
124+
key := fmt.Sprintf("session:%s:txresults", sessionID)
125+
values, err := m.redis.LRange(ctx, key, int64(-limit), -1).Result()
133126
if err != nil {
134-
return fmt.Errorf("failed to fetch transaction results from Redis: %w", err)
127+
return fmt.Errorf("failed to get transaction results: %w", err)
135128
}
136129

137-
/* create message payload for websocket */
130+
/* convert each JSON string back into a Transaction */
131+
transactions := make([]types.Transaction, 0, len(values))
132+
for _, val := range values {
133+
var tx types.Transaction
134+
if err := json.Unmarshal([]byte(val), &tx); err != nil {
135+
/* skip malformed results */
136+
continue
137+
}
138+
transactions = append(transactions, tx)
139+
}
140+
141+
/* prepare the message payload */
138142
message := StreamMessage{
139-
Type: "user_transactions",
143+
Type: "transaction_update",
140144
Data: map[string]interface{}{
141145
"session_id": sessionID,
142146
"transactions": transactions,
143147
},
144148
Timestamp: time.Now(),
145149
}
146150

151+
/* send the message to the client */
147152
return conn.WriteJSON(message)
148153
}
149154

@@ -179,6 +184,11 @@ func (m *Manager) listenForTransactionsChanges(ctx context.Context, conn *websoc
179184
}
180185
}
181186

187+
/*
188+
currently, handleTransactionChangeEvent sends the complete JSON package whenever anything is updated.
189+
The whole frontend will be updated even if one transaction changes it's state (for example, setting active to expired).
190+
*/
191+
182192
/* handle transaction change event */
183193
func (m *Manager) handleTransactionChangeEvent(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
184194
ctx := context.Background()

0 commit comments

Comments
 (0)