Skip to content

Commit dba6504

Browse files
Fixed interact for Redis and PQ functionality
1 parent db85759 commit dba6504

File tree

1 file changed

+72
-53
lines changed

1 file changed

+72
-53
lines changed

internal/session/interact.go

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,29 @@ func (m *Manager) CreateSession(username, ipAddress, userAgent string) error {
2525
}
2626

2727
/* Generate session metadata */
28-
sessionID := uuid.New().String()
28+
sessionID := uuid.New()
2929
now := time.Now()
3030

3131
/* create the session */
3232
session := &Session{
33-
ID: sessionID,
34-
Status: StatusActive,
35-
Username: username,
36-
IP: ipAddress,
37-
UserAgent: userAgent,
38-
Expiry: time.Now().Add(time.Duration(config.BackendConfig.AppInfo.SessionTimeout) * time.Hour),
39-
CreatedAt: now,
40-
LastActiveAt: now,
41-
Timer: time.AfterFunc(time.Duration(config.BackendConfig.AppInfo.SessionTimeout) * time.Hour,
33+
ID: sessionID,
34+
Status: StatusActive,
35+
Username: username,
36+
IP: ipAddress,
37+
UserAgent: userAgent,
38+
Expiry: time.Now().Add(time.Duration(config.BackendConfig.AppInfo.SessionTimeout) * time.Hour),
39+
CreatedAt: now,
40+
LastActiveAt: now,
41+
Timer: time.AfterFunc(time.Duration(config.BackendConfig.AppInfo.SessionTimeout)*time.Hour,
4242
func() { m.ExpireSession(username) },
4343
),
4444
CompletedCount: 0,
4545
FailedCount: 0,
46-
TransactionQueue: list.New(),
46+
TransactionQueue: list.New(),
4747
}
4848

4949
/* add session to active sessions map and list */
50-
element := m.sessionOrder.PushBack(session)
50+
element := m.sessionOrder.PushBack(session)
5151
session.listElem = element
5252

5353
/* store session into the manager */
@@ -71,81 +71,101 @@ func (m *Manager) ExpireSession(username string) {
7171
return
7272
}
7373

74+
session.Mutex.Lock()
75+
defer session.Mutex.Unlock()
76+
7477
/*
75-
delete the session from Redis
78+
delete the session from Redis
7679
check if any transactions are remaining in the queue
77-
if yes, label transactions and sessions pending
78-
if no, label session expired
80+
if yes, label transactions and sessions pending
81+
if no, label session expired
7982
push session and transactions to archive
8083
*/
8184

8285
/* check if transactions are remaining in the session queue */
8386
if session.TransactionQueue.Len() != 0 {
84-
/* transactions are pending, mark them pending */
87+
/* transactions are pending, mark them pending */
8588
for node := session.TransactionQueue.Front(); node != nil; node = node.Next() {
8689
/* work on transaction structure for *list.List() */
87-
txResult, ok := node.Value.(types.Transaction)
90+
txResult, ok := node.Value.(*types.Transaction)
8891
if !ok {
8992
continue
9093
}
91-
txResult.Status = types.StatusPending
92-
/* TODO: Push this all into PostgreSQL */
93-
txnPQ, err := ConvertTransactiontoStoreParams(txResult)
94-
if err != nil {
94+
txResult.Status = types.StatusPending
95+
96+
txnPQ, err := ConvertTransactiontoStoreParams(*txResult)
97+
if err != nil {
9598
/* error is conversion, continue the loop in good faith */
9699
/* need to handle these errors later */
100+
fmt.Printf("Failed to convert transaction to archive format: %v\n", err)
101+
continue
102+
}
103+
104+
/* store transaction in PostgreSQL */
105+
if _, err := m.archivalPQ.CreateTransactionPQ(context.Background(), txnPQ); err != nil {
106+
/* log error but continue processing other transactions */
107+
fmt.Printf("Failed to archive transaction %s: %v\n", txResult.ID, err)
108+
continue
97109
}
98-
if err == nil { m.archivalPQ.CreateTransactionPQ(context.Background(), txnPQ) }
99110
}
111+
112+
/* mark session as pending */
113+
session.Status = StatusPending
100114
} else {
101-
/* no empty transactions; mark the session as expired */
115+
/* empty transactions queue; mark the session as expired */
102116
session.Status = StatusExpired
103117
}
104118

105119
/* remove session from sessionOrder Linked List */
106120
if session.listElem != nil {
107121
m.sessionOrder.Remove(session.listElem)
108122
}
109-
123+
110124
/* convert all session parameters to PostgreSQL compatible parameters */
111125
archive, err := ConvertSessionToStoreParams(session)
112126
if err != nil {
113127
/* session conversion failed, leave it in good faith */
114128
/* handle err later */
129+
fmt.Printf("Failed to convert session to archive format: %v\n", err)
130+
return
115131
}
116132

117133
/* store session to the archive */
118-
if err == nil { m.archivalPQ.StoreSessionPQ(context.Background(), *archive) }
134+
if _, err := m.archivalPQ.StoreSessionPQ(context.Background(), *archive); err != nil {
135+
fmt.Printf("Failed to archive session: %v\n", err)
136+
return
137+
}
119138

120-
key := fmt.Sprintf("session:%s", session.ID)
121-
if err := m.redis.Del(context.Background(), key).Err(); err != nil {
122-
/* session deletion from redis failed, leave it in good faith */
123-
/* handle err later */
139+
/* delete both session and transaction results from Redis */
140+
sessionKey := fmt.Sprintf("session:%s", session.ID)
141+
txResultsKey := fmt.Sprintf("session:%s:txresults", session.ID)
142+
result := m.redis.Del(context.Background(), sessionKey, txResultsKey)
143+
if result.Err() != nil {
144+
fmt.Printf("Failed to delete session from Redis: %v\n", result.Err())
145+
} else {
146+
// Log the number of keys deleted
147+
deleted, _ := result.Result()
148+
fmt.Printf("Successfully deleted %d keys from Redis for session %s\n", deleted, session.ID)
124149
}
125150

126151
/* remove session from sessionsMap */
127152
delete(m.sessionsMap, username)
128153
}
129154

130-
/* add transaction to a session */
131-
func (m *Manager) AddTransaction(username string, txn interface{}) error {
132-
/* thread safety for the manager */
133-
m.mutex.Lock()
134-
defer m.mutex.Unlock()
135-
136-
/* get the session from sessions map with O(1) runtime */
137-
session, exists := m.sessionsMap[username]
138-
if !exists {
139-
return fmt.Errorf("session not found")
140-
}
141-
142-
/* thread safety for the session */
143-
session.Mutex.Lock()
144-
defer session.Mutex.Unlock()
145-
155+
/* add transaction to a session - assumes caller holds necessary locks */
156+
func (m *Manager) AddTransaction(session *Session, txn interface{}) error {
146157
/* push transaction into the queue from back */
147158
session.TransactionQueue.PushBack(txn)
148159

160+
/* convert transaction to correct type and save to Redis */
161+
if tx, ok := txn.(*types.Transaction); ok {
162+
if err := m.saveTransactionResultsRedis(session, *tx); err != nil {
163+
return fmt.Errorf("failed to save transaction to Redis: %w", err)
164+
}
165+
} else {
166+
return fmt.Errorf("invalid transaction type: expected *types.Transaction")
167+
}
168+
149169
return nil
150170
}
151171

@@ -173,11 +193,11 @@ func (m *Manager) refreshTimer(username string) error {
173193
if session.Timer != nil {
174194
session.Timer.Stop()
175195
}
176-
196+
177197
/* reset the session timer */
178-
session.Timer = time.AfterFunc(time.Duration(config.BackendConfig.AppInfo.SessionTimeout) * time.Hour,
179-
func() { m.ExpireSession(username) },
180-
)
198+
session.Timer = time.AfterFunc(time.Duration(config.BackendConfig.AppInfo.SessionTimeout)*time.Hour,
199+
func() { m.ExpireSession(username) },
200+
)
181201

182202
/* update Redis for session */
183203

@@ -201,10 +221,10 @@ func (m *Manager) toDashboardView(username string) (SessionView, error) {
201221
/* thread safety for the session */
202222
session.Mutex.Lock()
203223
defer session.Mutex.Unlock()
204-
224+
205225
/* can be directly served as JSON in handler */
206226
return SessionView{
207-
ID: session.ID,
227+
ID: session.ID.String(),
208228
Username: session.Username,
209229
IP: session.IP,
210230
UserAgent: session.UserAgent,
@@ -215,5 +235,4 @@ func (m *Manager) toDashboardView(username string) (SessionView, error) {
215235
FailedCount: session.FailedCount,
216236
PendingCount: session.TransactionQueue.Len(),
217237
}, nil
218-
}
219-
238+
}

0 commit comments

Comments
 (0)