Skip to content

Commit 84ce9b4

Browse files
Added archived sessions streaming
1 parent 19b634d commit 84ce9b4

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed

internal/session/stream_user.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/PythonHacker24/linux-acl-management-backend/internal/postgresql"
910
"github.com/PythonHacker24/linux-acl-management-backend/internal/types"
1011
"github.com/gorilla/websocket"
1112
"github.com/redis/go-redis/v9"
@@ -226,3 +227,117 @@ func (m *Manager) handleTransactionChangeEvent(conn *websocket.Conn, sessionID s
226227
/* send the message to the client */
227228
return conn.WriteJSON(message)
228229
}
230+
231+
/* ==== User Archived Session ==== */
232+
233+
/* send list of archived session of a user */
234+
func (m *Manager) sendCurrentArchivedSessions(conn *websocket.Conn, username string, page int, pageSize int) error {
235+
ctx := context.Background()
236+
237+
/* calculate LIMIT and OFFSET */
238+
limit := int32(pageSize)
239+
offset := int32((page - 1) * pageSize)
240+
241+
pqParams := &postgresql.GetSessionByUsernamePaginatedPQParams{
242+
Username: username,
243+
Limit: limit,
244+
Offset: offset,
245+
}
246+
247+
/* query sessions */
248+
sessions, err := m.archivalPQ.GetSessionByUsernamePaginatedPQ(ctx, *pqParams)
249+
if err != nil {
250+
return fmt.Errorf("failed to get sessions for username %s: %w", username, err)
251+
}
252+
253+
exists := len(sessions) > 0
254+
255+
/* convert to plain JSON-compatible slices */
256+
var outgoing []map[string]interface{}
257+
for _, session := range sessions {
258+
outgoing = append(outgoing, map[string]interface{}{
259+
"id": session.ID,
260+
"username": session.Username,
261+
"ip": session.Ip.String,
262+
"user_agent": session.UserAgent.String,
263+
"status": session.Status,
264+
"created_at": session.CreatedAt,
265+
"last_active_at": session.LastActiveAt,
266+
"expiry": session.Expiry,
267+
"completed_count": session.CompletedCount,
268+
"failed_count": session.FailedCount,
269+
"archived_at": session.ArchivedAt,
270+
})
271+
}
272+
273+
message := StreamMessage{
274+
Type: "session_state",
275+
Data: map[string]interface{}{
276+
"username": username,
277+
"exists": exists,
278+
"sessions": outgoing,
279+
"page": page,
280+
"pageSize": pageSize,
281+
},
282+
Timestamp: time.Now(),
283+
}
284+
285+
return conn.WriteJSON(message)
286+
}
287+
288+
/* ==== User Archived Transactions ==== */
289+
290+
/* send list of archived pending transactions */
291+
func (m *Manager) sendCurrentArchivedPendingTransactions(conn *websocket.Conn, username string, page int, pageSize int) error {
292+
ctx := context.Background()
293+
294+
limit := int32(pageSize)
295+
offset := int32((page - 1) * pageSize)
296+
297+
pqParams := &postgresql.GetPendingTransactionsByUserPaginatedPQParams{
298+
ExecutedBy: username,
299+
Limit: limit,
300+
Offset: offset,
301+
}
302+
303+
/* query transactions by executed_by */
304+
transactions, err := m.archivalPQ.GetPendingTransactionsByUserPaginatedPQ(ctx, *pqParams)
305+
if err != nil {
306+
return fmt.Errorf("failed to get transactions for user %s: %w", username, err)
307+
}
308+
309+
exists := len(transactions) > 0
310+
311+
var outgoing []map[string]interface{}
312+
for _, tx := range transactions {
313+
outgoing = append(outgoing, map[string]interface{}{
314+
"id": tx.ID,
315+
"session_id": tx.SessionID,
316+
"timestamp": tx.Timestamp,
317+
"operation": tx.Operation,
318+
"target_path": tx.TargetPath,
319+
"entries": tx.Entries, // JSONB will decode as []byte, so decode if needed
320+
"status": tx.Status,
321+
"error_msg": tx.ErrorMsg,
322+
"output": tx.Output,
323+
"executed_by": tx.ExecutedBy,
324+
"duration_ms": tx.DurationMs,
325+
"ExecStatus": tx.Execstatus,
326+
"created_at": tx.CreatedAt,
327+
})
328+
}
329+
330+
message := StreamMessage{
331+
Type: "transactions_state",
332+
Data: map[string]interface{}{
333+
"username": username,
334+
"exists": exists,
335+
"transactions": outgoing,
336+
"page": page,
337+
"pageSize": pageSize,
338+
},
339+
Timestamp: time.Now(),
340+
}
341+
342+
return conn.WriteJSON(message)
343+
}

0 commit comments

Comments
 (0)