Skip to content

Commit 28c9d7b

Browse files
Completed stream_user.go for basic websocket streaming
1 parent 40972ec commit 28c9d7b

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

internal/session/stream_user.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package session
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/gorilla/websocket"
9+
"github.com/redis/go-redis/v9"
10+
)
11+
12+
/* send current session of a user */
13+
func (m *Manager) sendCurrentSession(conn *websocket.Conn, sessionID string) error {
14+
ctx := context.Background()
15+
16+
/* get data for current session from Redis */
17+
key := fmt.Sprintf("session:%s", sessionID)
18+
sessionData, err := m.redis.HGetAll(ctx, key).Result()
19+
if err != nil {
20+
/* user session doen't exists */
21+
if err == redis.Nil {
22+
message := StreamMessage{
23+
Type: "session_state",
24+
Data: map[string]interface{}{
25+
"session_id": sessionID,
26+
"exists": false,
27+
},
28+
Timestamp: time.Now(),
29+
}
30+
return conn.WriteJSON(message)
31+
}
32+
/* error cannot be handed, return a error */
33+
return fmt.Errorf("failed to get session data: %w", err)
34+
}
35+
36+
/* session exists; convert Redis hash into session data */
37+
session := convertRedisHashToSession(sessionData)
38+
message := StreamMessage{
39+
Type: "session_state",
40+
Data: map[string]interface{}{
41+
"session_id": sessionID,
42+
"exists": true,
43+
"session": session,
44+
},
45+
Timestamp: time.Now(),
46+
}
47+
48+
return conn.WriteJSON(message)
49+
}
50+
51+
/* send data regarding current session */
52+
func (m *Manager) listenForSessionChanges(ctx context.Context, conn *websocket.Conn, sessionID string) {
53+
keyspacePattern := fmt.Sprintf("__keyspace@0__:session:%s", sessionID)
54+
55+
/* subscribe to Redis keyspace */
56+
pubsub, err := m.redis.PSubscribe(ctx, keyspacePattern)
57+
if err != nil {
58+
m.errCh <- fmt.Errorf("failed to subscribe to redis events: %w", err)
59+
return
60+
}
61+
62+
defer pubsub.Close()
63+
64+
/* Redis update channel */
65+
ch := pubsub.Channel()
66+
67+
/* handling session changes */
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
return
72+
case msg := <-ch:
73+
/* changes in session stored in Redis detected; handle the event */
74+
if err := m.handleSessionChangeEvent(conn, sessionID, msg); err != nil {
75+
m.errCh <- fmt.Errorf("error handling session change: %w", err)
76+
}
77+
}
78+
}
79+
}
80+
81+
/* handle session change event */
82+
func (m *Manager) handleSessionChangeEvent(conn *websocket.Conn, sessionID string, msg *redis.Message) error {
83+
ctx := context.Background()
84+
85+
/* get session data from Redis */
86+
key := fmt.Sprintf("session:%s", sessionID)
87+
sessionData, err := m.redis.HGetAll(ctx, key).Result()
88+
if err != nil {
89+
return fmt.Errorf("failed to get updated session data: %w", err)
90+
}
91+
92+
/* convert session data from Redis hash to session */
93+
session := convertRedisHashToSession(sessionData)
94+
95+
/* prepare the message payload */
96+
message := StreamMessage{
97+
Type: "session_update",
98+
Data: map[string]interface{}{
99+
"session_id": sessionID,
100+
"session": session,
101+
"event_type": msg.Payload,
102+
"event_source": "redis_keyspace",
103+
},
104+
Timestamp: time.Now(),
105+
}
106+
107+
/* send the message payload via websocket */
108+
return conn.WriteJSON(message)
109+
}

0 commit comments

Comments
 (0)