Skip to content

Commit 1401370

Browse files
Added handlers for streaming to dashboards
1 parent 3159aee commit 1401370

File tree

1 file changed

+130
-0
lines changed

1 file changed

+130
-0
lines changed

internal/session/handler.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ const (
8383
CtxStreamUserTransactions handlerCtxKey = "stream_user_transactions"
8484
CtxStreamAllSessions handlerCtxKey = "stream_all_sessions"
8585
CtxStreamAllTransactions handlerCtxKey = "stream_all_transactions"
86+
CtxStreamUserArchiveSession handlerCtxKey = "stream_user_archive_sessions"
87+
CtxStreamUserArchivePendingTransactions handlerCtxKey = "stream_user_archive_pending_transactions"
8688
)
8789

8890
/*
@@ -209,6 +211,134 @@ func (m *Manager) StreamUserTransactions(w http.ResponseWriter, r *http.Request)
209211
m.handleWebSocketCommands(conn, username, sessionID, ctxVal, cancel)
210212
}
211213

214+
/*
215+
get user archived sessions information
216+
requires user authentication from middleware
217+
user/
218+
*/
219+
func (m *Manager) StreamUserArchiveSessions(w http.ResponseWriter, r *http.Request) {
220+
/* get the username */
221+
username, ok := r.Context().Value(middleware.ContextKeyUsername).(string)
222+
if !ok {
223+
http.Error(w, "Invalid user context", http.StatusInternalServerError)
224+
return
225+
}
226+
227+
/* get the session id */
228+
sessionID, ok := r.Context().Value(middleware.ContextKeySessionID).(string)
229+
if !ok {
230+
http.Error(w, "Invalid session ID context", http.StatusInternalServerError)
231+
return
232+
}
233+
234+
m.mutex.RLock()
235+
session, exists := m.sessionsMap[username]
236+
m.mutex.RUnlock()
237+
238+
if !exists || session.ID.String() != sessionID {
239+
http.Error(w, "unauthorized", http.StatusUnauthorized)
240+
return
241+
}
242+
243+
/* user exists and verified, upgrade the websocket connection */
244+
conn, err := m.upgrader.Upgrade(w, r, nil)
245+
if err != nil {
246+
m.errCh <- fmt.Errorf("websocket upgrade error: %w", err)
247+
return
248+
}
249+
defer conn.Close()
250+
251+
/*
252+
context with cancel for web socket handlers
253+
this is the official context for a websocket connection
254+
cancelling this means closing components of the websocket handler
255+
*/
256+
ctx, cancel := context.WithCancel(context.Background())
257+
defer cancel()
258+
259+
/* sending initial session data */
260+
if err := m.sendCurrentArchivedSessions(conn, username, 1, 1); err != nil {
261+
m.errCh <- fmt.Errorf("error sending initial session: %w", err)
262+
return
263+
}
264+
265+
/*
266+
don't send updated stream of data
267+
archive is updated when session expires
268+
so user will have no access to the dashboard and can't see new
269+
*/
270+
271+
/* specify the handler context */
272+
ctxVal := context.WithValue(ctx, "type", CtxStreamUserArchiveSession)
273+
274+
/* handle web socket instructions from client */
275+
m.handleWebSocketCommands(conn, username, sessionID, ctxVal, cancel)
276+
}
277+
278+
/*
279+
get user archived pending transactions information
280+
requires user authentication from middleware
281+
user/
282+
*/
283+
func (m *Manager) StreamUserArchivePendingTransactions(w http.ResponseWriter, r *http.Request) {
284+
/* get the username */
285+
username, ok := r.Context().Value(middleware.ContextKeyUsername).(string)
286+
if !ok {
287+
http.Error(w, "Invalid user context", http.StatusInternalServerError)
288+
return
289+
}
290+
291+
/* get the session id */
292+
sessionID, ok := r.Context().Value(middleware.ContextKeySessionID).(string)
293+
if !ok {
294+
http.Error(w, "Invalid session ID context", http.StatusInternalServerError)
295+
return
296+
}
297+
298+
m.mutex.RLock()
299+
session, exists := m.sessionsMap[username]
300+
m.mutex.RUnlock()
301+
302+
if !exists || session.ID.String() != sessionID {
303+
http.Error(w, "unauthorized", http.StatusUnauthorized)
304+
return
305+
}
306+
307+
/* user exists and verified, upgrade the websocket connection */
308+
conn, err := m.upgrader.Upgrade(w, r, nil)
309+
if err != nil {
310+
m.errCh <- fmt.Errorf("websocket upgrade error: %w", err)
311+
return
312+
}
313+
defer conn.Close()
314+
315+
/*
316+
context with cancel for web socket handlers
317+
this is the official context for a websocket connection
318+
cancelling this means closing components of the websocket handler
319+
*/
320+
ctx, cancel := context.WithCancel(context.Background())
321+
defer cancel()
322+
323+
/* sending initial session data */
324+
if err := m.sendCurrentArchivedPendingTransactions(conn, username, 1, 10); err != nil {
325+
m.errCh <- fmt.Errorf("error sending initial session: %w", err)
326+
return
327+
}
328+
329+
/*
330+
don't send updated stream of data
331+
archive is updated when session expires
332+
so user will have no access to the dashboard and can't see new
333+
*/
334+
335+
/* specify the handler context */
336+
ctxVal := context.WithValue(ctx, "type", CtxStreamUserArchivePendingTransactions)
337+
338+
/* handle web socket instructions from client */
339+
m.handleWebSocketCommands(conn, username, sessionID, ctxVal, cancel)
340+
}
341+
212342
// /*
213343
// get all sessions in the system
214344
// requires admin authentication from middleware

0 commit comments

Comments
 (0)