Skip to content

Commit 62798f9

Browse files
Completed Transactions streaming
1 parent b9ea55b commit 62798f9

File tree

1 file changed

+100
-51
lines changed

1 file changed

+100
-51
lines changed

internal/session/handler.go

Lines changed: 100 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@ import (
2323
/* frontend safe handler for issuing transaction */
2424
func (m *Manager) IssueTransaction(w http.ResponseWriter, r *http.Request) {
2525
/* extract username from JWT Token */
26-
username := r.Context().Value(middleware.ContextKeyUsername)
26+
username, ok := r.Context().Value(middleware.ContextKeyUsername).(string)
27+
if !ok {
28+
http.Error(w, "Invalid user context", http.StatusInternalServerError)
29+
return
30+
}
2731

2832
/* acquire manager lock to access sessions map */
29-
m.mutex.Lock()
30-
session := m.sessionsMap[username.(string)]
31-
m.mutex.Unlock()
33+
m.mutex.RLock()
34+
session := m.sessionsMap[username]
35+
m.mutex.RUnlock()
3236

3337
if session == nil {
3438
http.Error(w, "Session not found", http.StatusNotFound)
@@ -53,7 +57,7 @@ func (m *Manager) IssueTransaction(w http.ResponseWriter, r *http.Request) {
5357
TargetPath: req.TargetPath,
5458
Entries: req.Entries,
5559
Status: types.StatusPending,
56-
ExecutedBy: username.(string),
60+
ExecutedBy: username,
5761
}
5862

5963
/* add transaction to session - session lock is already held */
@@ -63,19 +67,22 @@ func (m *Manager) IssueTransaction(w http.ResponseWriter, r *http.Request) {
6367
}
6468

6569
w.WriteHeader(http.StatusCreated)
66-
json.NewEncoder(w).Encode(map[string]string{
70+
if err := json.NewEncoder(w).Encode(map[string]string{
6771
"message": "Transaction scheduled",
6872
"txn_id": tx.ID.String(),
69-
})
73+
}); err != nil {
74+
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
75+
return
76+
}
7077
}
7178

7279
type handlerCtxKey string
7380

7481
const (
75-
StreamUserSession handlerCtxKey = "stream_user_session"
76-
StreamUserTransactions handlerCtxKey = "stream_user_transactions"
77-
StreamAllSessions handlerCtxKey = "stream_all_sessions"
78-
StreamAllTransactions handlerCtxKey = "stream_all_transactions"
82+
CtxStreamUserSession handlerCtxKey = "stream_user_session"
83+
CtxStreamUserTransactions handlerCtxKey = "stream_user_transactions"
84+
CtxStreamAllSessions handlerCtxKey = "stream_all_sessions"
85+
CtxStreamAllTransactions handlerCtxKey = "stream_all_transactions"
7986
)
8087

8188
/*
@@ -85,8 +92,26 @@ user/
8592
*/
8693
func (m *Manager) StreamUserSession(w http.ResponseWriter, r *http.Request) {
8794

88-
/* username := r.Context().Value(middleware.ContextKeyUsername) */
89-
sessionID := r.Context().Value(middleware.ContextKeySessionID).(string)
95+
username, ok := r.Context().Value(middleware.ContextKeyUsername).(string)
96+
if !ok {
97+
http.Error(w, "Invalid user context", http.StatusInternalServerError)
98+
return
99+
}
100+
101+
sessionID, ok := r.Context().Value(middleware.ContextKeySessionID).(string)
102+
if !ok {
103+
http.Error(w, "Invalid session context", http.StatusInternalServerError)
104+
return
105+
}
106+
107+
m.mutex.RLock()
108+
session, exists := m.sessionsMap[username]
109+
m.mutex.RUnlock()
110+
111+
if !exists || session.ID.String() != sessionID {
112+
http.Error(w, "unauthorized", http.StatusUnauthorized)
113+
return
114+
}
90115

91116
/* add a check for sessionID belongs to user */
92117
conn, err := m.upgrader.Upgrade(w, r, nil)
@@ -96,8 +121,8 @@ func (m *Manager) StreamUserSession(w http.ResponseWriter, r *http.Request) {
96121
}
97122
defer conn.Close()
98123

99-
/*
100-
context with cancel for web socket handlers
124+
/*
125+
context with cancel for web socket handlers
101126
this is the official context for a websocket connection
102127
cancelling this means closing components of the websocket handler
103128
*/
@@ -114,47 +139,71 @@ func (m *Manager) StreamUserSession(w http.ResponseWriter, r *http.Request) {
114139
go m.listenForSessionChanges(ctx, conn, sessionID)
115140

116141
/* specify the handler context */
117-
ctxVal := context.WithValue(ctx, "type", StreamUserSession)
142+
ctxVal := context.WithValue(ctx, "type", CtxStreamUserSession)
118143

119144
/* handle web socket instructions from client */
120-
m.handleWebSocketCommands(conn, sessionID, ctxVal, cancel)
145+
m.handleWebSocketCommands(conn, username, sessionID, ctxVal, cancel)
121146
}
122147

123148
/*
124-
// get user transactions information
125-
// requires user authentication from middleware
126-
// user/
127-
// */
128-
// func (m *manager) streamusertransactions(w http.responsewriter, r *http.request) {
129-
// /* username := r.context().value(middleware.contextkeyusername) */
130-
// sessionid := r.context().value(middleware.contextkeysessionid)
131-
//
132-
// /* add a check for sessionid belongs to user */
133-
// conn, err := m.upgrader.upgrade(w, r, nil)
134-
// if err != nil {
135-
// m.errch <- fmt.errorf("websocket upgrade error: %w", err)
136-
// return
137-
// }
138-
// defer conn.close()
139-
//
140-
// /* context with cancel for web socket handlers */
141-
// ctx, cancel := context.withcancel(context.background())
142-
// defer cancel()
143-
//
144-
// /* sending initial list of transactions data */
145-
// if err := m.sendcurrenttransactions(conn, sessionid); err != nil {
146-
// // log.printf("error sending initial session: %v", err)
147-
// m.errch <- fmt.errorf("error sending initial session: %w", err)
148-
// return
149-
// }
150-
//
151-
// /* stream changes in transactions made in redis */
152-
// go m.listenfortransactionschanges(ctx, conn, sessionid)
153-
//
154-
// /* handle web socket instructions from client */
155-
// m.handlewebsocketcommands(conn, cancel)
156-
// }
157-
//
149+
get user transactions information
150+
requires user authentication from middleware
151+
user/
152+
*/
153+
func (m *Manager) StreamUserTransactions(w http.ResponseWriter, r *http.Request) {
154+
username, ok := r.Context().Value(middleware.ContextKeyUsername).(string)
155+
if !ok {
156+
http.Error(w, "Invalid user context", http.StatusInternalServerError)
157+
return
158+
}
159+
160+
sessionID, ok := r.Context().Value(middleware.ContextKeySessionID).(string)
161+
if !ok {
162+
http.Error(w, "Invalid session ID context", http.StatusInternalServerError)
163+
return
164+
}
165+
166+
m.mutex.RLock()
167+
session, exists := m.sessionsMap[username]
168+
m.mutex.RUnlock()
169+
170+
if !exists || session.ID.String() != sessionID {
171+
http.Error(w, "unauthorized", http.StatusUnauthorized)
172+
return
173+
}
174+
175+
/* add a check for sessionid belongs to user */
176+
conn, err := m.upgrader.Upgrade(w, r, nil)
177+
if err != nil {
178+
m.errCh <- fmt.Errorf("websocket upgrade error: %w", err)
179+
return
180+
}
181+
defer conn.Close()
182+
183+
/*
184+
context with cancel for web socket handlers
185+
this is the official context for a websocket connection
186+
cancelling this means closing components of the websocket handler
187+
*/
188+
ctx, cancel := context.WithCancel(context.Background())
189+
defer cancel()
190+
191+
/* sending initial list of transactions data */
192+
if err := m.sendCurrentUserTransactions(conn, username, sessionID, 100); err != nil {
193+
m.errCh <- fmt.Errorf("error sending initial transactions: %w", err)
194+
return
195+
}
196+
197+
/* stream changes in transactions made in redis */
198+
go m.listenForTransactionsChanges(ctx, conn, sessionID)
199+
200+
/* specify the handler context */
201+
ctxVal := context.WithValue(ctx, "type", CtxStreamUserTransactions)
202+
203+
/* handle web socket instructions from client */
204+
m.handleWebSocketCommands(conn, username, sessionID, ctxVal, cancel)
205+
}
206+
158207
// /*
159208
// get all sessions in the system
160209
// requires admin authentication from middleware

0 commit comments

Comments
 (0)