Skip to content

Commit fe09230

Browse files
authored
Add reaper routine to cleanup old transactions (#30)
* Add reaper routine to cleanup old transactions * Updates txm documentation to include reaper routine
1 parent f0f59c6 commit fe09230

File tree

6 files changed

+813
-78
lines changed

6 files changed

+813
-78
lines changed

documentation/relayer/transaction-manager.md

Lines changed: 103 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,23 @@ flowchart TD
2525
TXM[SuiTxm]
2626
Broadcaster[Broadcaster]
2727
Confirmer[Confirmer]
28+
Reaper[Reaper]
2829
StateStore[StateStore]
2930
RetryManager[RetryManager]
3031
GasManager[GasManager]
3132
PTBClient[PTBClient]
3233
3334
TXM --> Broadcaster
3435
TXM --> Confirmer
36+
TXM --> Reaper
3537
TXM --> RetryManager
3638
TXM --> GasManager
3739
3840
Broadcaster --> StateStore
3941
Broadcaster --> PTBClient
4042
Confirmer --> StateStore
4143
Confirmer --> PTBClient
44+
Reaper --> StateStore
4245
GasManager --> PTBClient
4346
4447
RetryManager --> Broadcaster
@@ -77,9 +80,10 @@ sequenceDiagram
7780
participant T as TXM
7881
participant B as Broadcaster
7982
participant F as Confirmer
83+
participant RP as Reaper
8084
participant S as StateStore
8185
participant N as SuiNode
82-
participant R as RetryManager
86+
participant RM as RetryManager
8387
8488
C->>T: Enqueue Transaction
8589
T->>N: Validate Gas
@@ -98,13 +102,18 @@ sequenceDiagram
98102
alt Success Path
99103
F->>S: Update to Finalized
100104
else Retry Path
101-
F->>R: Check Retry
102-
R-->>F: Retry Decision
105+
F->>RM: Check Retry
106+
RM-->>F: Retry Decision
103107
F->>S: Update to Retriable
104108
S->>B: Re-queue Transaction
105109
else Failure Path
106110
F->>S: Update to Failed
107111
end
112+
113+
Note over RP,S: Periodic Cleanup
114+
RP->>S: Get Old Finalized/Failed Transactions
115+
S-->>RP: Old Transactions List
116+
RP->>S: Delete Old Transactions
108117
```
109118

110119
## Core Components
@@ -178,9 +187,10 @@ The TXM implements proper service lifecycle management:
178187
// Start the service and launch background goroutines
179188
func (txm *SuiTxm) Start(ctx context.Context) error {
180189
return txm.Starter.StartOnce("SuiTxm", func() error {
181-
txm.done.Add(2) // broadcaster and confirmer goroutines
190+
txm.done.Add(3) // broadcaster, confirmer, and reaper goroutines
182191
go txm.broadcastLoop()
183192
go txm.confirmerLoop()
193+
go txm.reaperLoop()
184194
return nil
185195
})
186196
}
@@ -395,7 +405,58 @@ func handleTransactionError(ctx context.Context, txm *SuiTxm, tx SuiTx, result *
395405
}
396406
```
397407

398-
### 4. Retry Manager
408+
### 4. Reaper Routine
409+
410+
The reaper routine performs periodic cleanup of old transactions to prevent memory/storage bloat and maintain optimal performance. It runs as a background goroutine alongside the broadcaster and confirmer.
411+
412+
#### Responsibilities
413+
414+
- **Periodic Cleanup**: Remove old finalized and failed transactions based on configured retention periods
415+
- **Storage Optimization**: Prevent unbounded growth of transaction storage
416+
- **Performance Maintenance**: Keep transaction queries fast by limiting dataset size
417+
- **Resource Management**: Free memory and storage resources from completed transactions
418+
419+
#### Implementation Details
420+
421+
The reaper runs on a jittered ticker similar to the confirmer to avoid thundering herd problems.
422+
423+
#### Cleanup Logic
424+
425+
The cleanup function implements the core reaper logic by querying for finalized and failed transactions, calculating their age based on the `LastUpdatedAt` timestamp, and removing those that exceed the configured retention period.
426+
427+
#### Cleanup Criteria
428+
429+
The reaper applies specific criteria for transaction cleanup:
430+
431+
- **Finalized Transactions**: Removed after `TransactionRetentionSecs` seconds from last update
432+
- **Failed Transactions**: Removed after `TransactionRetentionSecs` seconds from last update
433+
- **Active Transactions**: Never cleaned up (Pending, Submitted, Retriable states)
434+
- **Time Calculation**: Uses `LastUpdatedAt` timestamp to determine age
435+
- **Boundary Logic**: Only transactions where `timeDiff > TransactionRetentionSecs` are cleaned up
436+
437+
#### Safety Features
438+
439+
The reaper includes several safety mechanisms:
440+
441+
- **State Filtering**: Only targets terminal states (Finalized, Failed)
442+
- **Conservative Timing**: Uses strict greater-than comparison for retention period
443+
- **Error Isolation**: Individual transaction cleanup failures don't stop the entire process
444+
- **Graceful Shutdown**: Responds to stop signals and context cancellation
445+
- **Logging**: Comprehensive logging for monitoring and debugging
446+
447+
#### Configuration Integration
448+
449+
The reaper uses the existing TXM configuration structure with `ReaperPollSecs` controlling how often the reaper runs and `TransactionRetentionSecs` determining how long to keep finalized/failed transactions. Default values are 10 seconds for both polling interval and retention period.
450+
451+
#### Performance Considerations
452+
453+
- **Jittered Timing**: Prevents multiple instances from running cleanup simultaneously
454+
- **Batch Processing**: Processes all eligible transactions in a single pass
455+
- **Efficient Queries**: Uses state-based filtering to minimize database load
456+
- **Non-blocking**: Runs independently without affecting transaction processing
457+
- **Resource Bounded**: Only processes existing transactions, no unbounded operations
458+
459+
### 5. Retry Manager
399460

400461
Implements sophisticated retry logic using pluggable strategy functions for different error types.
401462

@@ -474,7 +535,7 @@ customStrategy := func(tx *SuiTx, errorMsg string, maxRetries int) (bool, RetryS
474535
retryManager.RegisterStrategyFunc(customStrategy)
475536
```
476537

477-
### 5. State Store
538+
### 6. State Store
478539

479540
Manages transaction state persistence with thread-safe operations and comprehensive transaction lifecycle support.
480541

@@ -559,7 +620,7 @@ type InMemoryStore struct {
559620

560621
<!-- tabs:end -->
561622

562-
### 6. Gas Manager
623+
### 7. Gas Manager
563624

564625
Handles gas estimation and gas bumping for Sui transactions with a focus on cost optimization and retry success.
565626

@@ -631,33 +692,6 @@ const (
631692
gasManager := NewSuiGasManager(logger, ptbClient, maxGasBudget, percentualIncrease)
632693
```
633694

634-
### 7. Reaper Routine
635-
636-
> **Note**: The reaper routine is not yet implemented in the current codebase. This section describes the planned implementation.
637-
638-
The reaper routine will perform periodic cleanup of old transactions to prevent memory/storage bloat and maintain optimal performance.
639-
640-
#### Planned Cleanup Criteria
641-
642-
- **Finalized Transactions**: Remove after configured retention period
643-
- **Failed Transactions**: Remove after error analysis period
644-
- **Stale Transactions**: Remove transactions stuck in intermediate states beyond timeout
645-
646-
#### Future Configuration
647-
648-
```go
649-
type ReaperConfig struct {
650-
CleanupInterval time.Duration // How often to run cleanup
651-
FinalizedRetention time.Duration // How long to keep finalized transactions
652-
FailedRetention time.Duration // How long to keep failed transactions
653-
MaxBatchSize int // Maximum transactions to process per cleanup
654-
}
655-
```
656-
657-
#### Implementation Status
658-
659-
The reaper routine is planned for future implementation and will be added as a third goroutine in the TXM service lifecycle.
660-
661695
## Configuration
662696

663697
### Configuration Structure
@@ -666,27 +700,31 @@ The TXM uses a comprehensive configuration structure:
666700

667701
```go
668702
type Config struct {
669-
BroadcastChanSize uint // Size of the broadcast channel buffer
670-
RequestType string // Default request type for transactions
671-
ConfirmPollSecs uint // Polling interval for confirmer in seconds
672-
DefaultMaxGasAmount uint64 // Default maximum gas amount
673-
MaxTxRetryAttempts uint64 // Maximum retry attempts per transaction
674-
TransactionTimeout string // Transaction timeout duration
675-
MaxConcurrentRequests uint64 // Maximum concurrent requests
703+
BroadcastChanSize uint // Size of the broadcast channel buffer
704+
RequestType string // Default request type for transactions
705+
ConfirmPollSecs uint // Polling interval for confirmer in seconds
706+
ReaperPollSecs uint64 // Polling interval for reaper in seconds
707+
TransactionRetentionSecs uint64 // How long to keep finalized/failed transactions
708+
DefaultMaxGasAmount uint64 // Default maximum gas amount
709+
MaxTxRetryAttempts uint64 // Maximum retry attempts per transaction
710+
TransactionTimeout string // Transaction timeout duration
711+
MaxConcurrentRequests uint64 // Maximum concurrent requests
676712
}
677713
```
678714

679715
### Default Configuration
680716

681717
```go
682718
var DefaultConfigSet = Config{
683-
BroadcastChanSize: 100, // Channel buffer size
684-
RequestType: "WaitForLocalExecution", // Wait for local execution
685-
ConfirmPollSecs: 2, // Poll every 2 seconds
686-
DefaultMaxGasAmount: 200000, // 200k gas units
687-
MaxTxRetryAttempts: 5, // Up to 5 retries
688-
TransactionTimeout: "10s", // 10 second timeout
689-
MaxConcurrentRequests: 5, // 5 concurrent requests
719+
BroadcastChanSize: 100, // Channel buffer size
720+
RequestType: "WaitForLocalExecution", // Wait for local execution
721+
ConfirmPollSecs: 2, // Poll every 2 seconds
722+
ReaperPollSecs: 10, // Reaper runs every 10 seconds
723+
TransactionRetentionSecs: 10, // Keep transactions for 10 seconds
724+
DefaultMaxGasAmount: 200000, // 200k gas units
725+
MaxTxRetryAttempts: 5, // Up to 5 retries
726+
TransactionTimeout: "10s", // 10 second timeout
727+
MaxConcurrentRequests: 5, // 5 concurrent requests
690728
}
691729
```
692730

@@ -713,16 +751,20 @@ Different environments may require different configurations:
713751
```go
714752
// High-throughput production configuration
715753
productionConfig := Config{
716-
BroadcastChanSize: 500, // Larger buffer for high volume
717-
ConfirmPollSecs: 1, // Faster polling
718-
MaxConcurrentRequests: 20, // More concurrent processing
754+
BroadcastChanSize: 500, // Larger buffer for high volume
755+
ConfirmPollSecs: 1, // Faster polling
756+
ReaperPollSecs: 300, // Less frequent cleanup (5 minutes)
757+
TransactionRetentionSecs: 3600, // Keep transactions for 1 hour
758+
MaxConcurrentRequests: 20, // More concurrent processing
719759
}
720760

721761
// Development configuration
722762
devConfig := Config{
723-
BroadcastChanSize: 50, // Smaller buffer
724-
ConfirmPollSecs: 5, // Slower polling to reduce load
725-
MaxConcurrentRequests: 2, // Limited concurrency
763+
BroadcastChanSize: 50, // Smaller buffer
764+
ConfirmPollSecs: 5, // Slower polling to reduce load
765+
ReaperPollSecs: 30, // More frequent cleanup for testing
766+
TransactionRetentionSecs: 60, // Keep transactions for 1 minute
767+
MaxConcurrentRequests: 2, // Limited concurrency
726768
}
727769
```
728770

@@ -841,6 +883,12 @@ type TxmMetrics struct {
841883
// Resource usage
842884
StoreSize prometheus.Gauge
843885
GoroutineCount prometheus.Gauge
886+
887+
// Reaper metrics
888+
CleanupRuns prometheus.Counter
889+
TransactionsDeleted prometheus.Counter
890+
CleanupErrors prometheus.Counter
891+
CleanupDuration prometheus.Histogram
844892
}
845893
```
846894

@@ -875,6 +923,7 @@ func (txm *SuiTxm) HealthCheck() error {
875923
**Transaction Enqueueing**: Support for both Move function calls and PTBs
876924
**Broadcaster Routine**: Intelligent batching and transaction submission
877925
**Confirmer Routine**: Status monitoring with jittered polling
926+
**Reaper Routine**: Periodic cleanup of old finalized and failed transactions
878927
**Retry Manager**: Pluggable retry strategies with error classification
879928
**Gas Manager**: Gas estimation and intelligent gas bumping
880929
**State Store Interface**: Comprehensive transaction state management
@@ -883,8 +932,6 @@ func (txm *SuiTxm) HealthCheck() error {
883932
### Future Enhancements
884933

885934
🔄 **Exponential Backoff**: Currently marked as TODO in retry strategies
886-
🔄 **Reaper Routine**: Planned for transaction cleanup and storage optimization
887-
888935
🔄 **Metrics and Monitoring**: Integration with prometheus metrics
889936

890937
## Related Documentation

relayer/txm/config.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@ const (
99
// DefaultRequestType is the default request type for transactions.
1010
DefaultRequestType = "WaitForLocalExecution"
1111

12-
DefaultMaxGasAmount = 200000
13-
DefaultMaxTxRetryAttempts = 5
14-
DefaultTransactionTimeout = "10s"
15-
DefaultMaxConcurrentRequests = 5
12+
DefaultMaxGasAmount = 200000
13+
DefaultMaxTxRetryAttempts = 5
14+
DefaultTransactionTimeout = "10s"
15+
DefaultMaxConcurrentRequests = 5
16+
DefaultReaperPollSecs = 10
17+
DefaultTransactionRetentionSecs = 10
1618
)
1719

1820
type Config struct {
19-
BroadcastChanSize uint
20-
RequestType string
21-
ConfirmPollSecs uint
22-
DefaultMaxGasAmount uint64
23-
MaxTxRetryAttempts uint64
24-
TransactionTimeout string
25-
MaxConcurrentRequests uint64
21+
BroadcastChanSize uint
22+
RequestType string
23+
ConfirmPollSecs uint
24+
DefaultMaxGasAmount uint64
25+
MaxTxRetryAttempts uint64
26+
TransactionTimeout string
27+
MaxConcurrentRequests uint64
28+
ReaperPollSecs uint64
29+
TransactionRetentionSecs uint64
2630
}
2731

2832
var DefaultConfigSet = Config{
@@ -35,4 +39,7 @@ var DefaultConfigSet = Config{
3539

3640
TransactionTimeout: DefaultTransactionTimeout,
3741
MaxConcurrentRequests: DefaultMaxConcurrentRequests,
42+
43+
ReaperPollSecs: DefaultReaperPollSecs,
44+
TransactionRetentionSecs: DefaultTransactionRetentionSecs,
3845
}

relayer/txm/reaper.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package txm
2+
3+
import (
4+
"github.com/smartcontractkit/chainlink-common/pkg/services"
5+
)
6+
7+
func (txm *SuiTxm) reaperLoop() {
8+
defer txm.done.Done()
9+
txm.lggr.Infow("Starting reaper loop")
10+
11+
loopCtx, cancel := services.StopRChan(txm.stopChannel).NewCtx()
12+
defer cancel()
13+
14+
basePeriod := txm.configuration.ReaperPollSecs
15+
ticker, jitteredDuration := GetTicker(uint(basePeriod))
16+
17+
txm.lggr.Infow("Created reaper ticker",
18+
"basePeriod", basePeriod,
19+
"jitteredDuration", jitteredDuration.String())
20+
21+
for {
22+
select {
23+
case <-txm.stopChannel:
24+
txm.lggr.Infow("Reaper loop stopped")
25+
return
26+
case <-loopCtx.Done():
27+
txm.lggr.Infow("Loop context cancelled. Reaper loop stopped")
28+
return
29+
case <-ticker.C:
30+
txm.lggr.Debugw("Ticker fired, cleaning up transactions")
31+
cleanupTransactions(txm)
32+
}
33+
}
34+
}
35+
36+
func cleanupTransactions(txm *SuiTxm) {
37+
txm.lggr.Debugw("Cleaning up transactions")
38+
39+
// Get all finalized and failed transactions, never in-flight transactions
40+
states := []TransactionState{StateFinalized, StateFailed}
41+
42+
finalizedTransactions, err := txm.transactionRepository.GetTransactionsByStates(states)
43+
if err != nil {
44+
txm.lggr.Errorw("Error getting finalized transactions", "error", err)
45+
return
46+
}
47+
48+
currentTimestamp := GetCurrentUnixTimestamp()
49+
50+
for _, tx := range finalizedTransactions {
51+
txm.lggr.Debugw("Cleaning up finalized transaction", "transactionID", tx.TransactionID)
52+
timeDiff := currentTimestamp - tx.LastUpdatedAt
53+
txm.lggr.Debugw("Time difference", "timeDiff", timeDiff)
54+
if timeDiff > txm.configuration.TransactionRetentionSecs {
55+
txm.lggr.Debugw("Cleaning up finalized transaction", "transactionID", tx.TransactionID)
56+
err := txm.transactionRepository.DeleteTransaction(tx.TransactionID)
57+
if err != nil {
58+
txm.lggr.Errorw("Error deleting transaction", "transactionID", tx.TransactionID, "error", err)
59+
}
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)