Skip to content

Commit 4143d0c

Browse files
Updated interaction with transactions for expiry and more!
1 parent 5acd9ce commit 4143d0c

File tree

1 file changed

+40
-8
lines changed

1 file changed

+40
-8
lines changed

internal/session/interact.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,19 +95,21 @@ func (m *Manager) ExpireSession(username string) error {
9595
if !ok {
9696
continue
9797
}
98+
99+
/* make sure to set status to pending (shouldn't it be already set?) */
98100
txResult.Status = types.StatusPending
99101

100102
/* convert transactions into PostgreSQL compatible parameters */
101-
txnPQ, err := ConvertTransactiontoStoreParams(*txResult)
103+
txnPQ, err := ConvertTransactionPendingtoStoreParams(*txResult)
102104
if err != nil {
103-
m.errCh<-fmt.Errorf("failed to convert transaction to archive format: %w", err)
105+
m.errCh <- fmt.Errorf("failed to convert pending transaction to pending archive format: %w", err)
104106
continue
105107
}
106108

107109
/* store transaction in PostgreSQL with retries */
108110
var storeErr error
109111
for retries := 0; retries < 3; retries++ {
110-
if _, err := m.archivalPQ.CreateTransactionPQ(context.Background(), txnPQ); err != nil {
112+
if _, err := m.archivalPQ.CreatePendingTransactionPQ(context.Background(), txnPQ); err != nil {
111113
storeErr = err
112114
time.Sleep(time.Second * time.Duration(retries+1))
113115
continue
@@ -116,7 +118,7 @@ func (m *Manager) ExpireSession(username string) error {
116118
break
117119
}
118120
if storeErr != nil {
119-
m.errCh<-fmt.Errorf("failed to archive transaction %s after retries: %w", txResult.ID, storeErr)
121+
m.errCh <- fmt.Errorf("failed to archive transaction %s after retries: %w", txResult.ID, storeErr)
120122
continue
121123
}
122124
}
@@ -128,6 +130,36 @@ func (m *Manager) ExpireSession(username string) error {
128130
session.Status = StatusExpired
129131
}
130132

133+
/* get transaction results from Redis */
134+
results, err := m.getTransactionResultsRedis(session, 10000)
135+
if err != nil {
136+
m.errCh <- fmt.Errorf("failed to get transaction results from Redis: %w", err)
137+
} else {
138+
for _, txResult := range results {
139+
if txResult.Status == types.StatusSuccess || txResult.Status == types.StatusFailed {
140+
pqParams, err := ConvertTransactionResulttoStoreParams(txResult)
141+
if err != nil {
142+
m.errCh <- fmt.Errorf("failed to convert transaction result to archive format: %w", err)
143+
continue
144+
}
145+
var storeErr error
146+
for retries := 0; retries < 3; retries++ {
147+
if _, err := m.archivalPQ.CreateResultsTransactionPQ(context.Background(), pqParams); err != nil {
148+
storeErr = err
149+
time.Sleep(time.Second * time.Duration(retries+1))
150+
continue
151+
}
152+
storeErr = nil
153+
break
154+
}
155+
if storeErr != nil {
156+
m.errCh <- fmt.Errorf("failed to archive transaction result %s after retries: %w", txResult.ID, storeErr)
157+
continue
158+
}
159+
}
160+
}
161+
}
162+
131163
/* remove session from sessionOrder Linked List */
132164
if session.listElem != nil {
133165
m.sessionOrder.Remove(session.listElem)
@@ -148,19 +180,19 @@ func (m *Manager) ExpireSession(username string) error {
148180
break
149181
}
150182
if storeErr != nil {
151-
m.errCh<-fmt.Errorf("failed to archive session after retries: %w", storeErr)
183+
m.errCh <- fmt.Errorf("failed to archive session after retries: %w", storeErr)
152184
}
153185
} else {
154186
/* handle err */
155-
m.errCh<-fmt.Errorf("failed to convert session to archive format: %w", err)
187+
m.errCh <- fmt.Errorf("failed to convert session to archive format: %w", err)
156188
}
157189

158190
/* delete both session and transaction results from Redis */
159191
sessionKey := fmt.Sprintf("session:%s", session.ID)
160192
txResultsKey := fmt.Sprintf("session:%s:txresults", session.ID)
161193
result := m.redis.Del(context.Background(), sessionKey, txResultsKey)
162194
if result.Err() != nil {
163-
m.errCh<-fmt.Errorf("Failed to delete session from Redis: %w", result.Err())
195+
m.errCh <- fmt.Errorf("failed to delete session from Redis: %w", result.Err())
164196
}
165197

166198
/* remove session from sessionsMap */
@@ -175,7 +207,7 @@ func (m *Manager) AddTransaction(session *Session, txn *types.Transaction) error
175207
session.TransactionQueue.PushBack(txn)
176208

177209
/* store transaction to Redis as a pending transaction */
178-
if err := m.SaveTransactionResultsRedis(session, txn, "txpending"); err != nil {
210+
if err := m.SavePendingTransaction(session, txn); err != nil {
179211
return fmt.Errorf("failed to save transaction to Redis: %w", err)
180212
}
181213

0 commit comments

Comments
 (0)