Skip to content

Commit c43b507

Browse files
authored
Merge pull request #259 from kevinbarbour/configurable-file-cleanup
feat: Add configurable file cleanup service to prevent storage exhaustion
2 parents 73a9720 + 1c327d1 commit c43b507

File tree

7 files changed

+840
-0
lines changed

7 files changed

+840
-0
lines changed

docs/_data/docs-menu.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
link: /ops/merging/
4747
- name: File Options
4848
link: /ops/file-options/
49+
- name: File Cleanup
50+
link: /ops/cleanup/
4951

5052
- label: Production
5153
items:

docs/ops/cleanup.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
---
2+
layout: page
3+
title: File Cleanup
4+
---
5+
6+
# File Cleanup
7+
8+
ACH Gateway includes an optional cleanup feature that automatically removes old processed files from the storage directory. This helps prevent file descriptor exhaustion and disk space issues in long-running deployments.
9+
10+
## Overview
11+
12+
When ACH files are processed at cutoff times, they are moved from the `mergable/<shard-name>/` directory to isolated directories with timestamps (e.g., `<shard-name>-20240115-143000/`). After successful upload to the ODFI and audit storage, these directories remain on disk indefinitely by default.
13+
14+
The cleanup feature periodically scans for these isolated directories and removes those that:
15+
1. Are older than the configured retention duration
16+
2. Have been successfully processed (contain an `uploaded/` subdirectory)
17+
18+
## Configuration
19+
20+
The cleanup feature is configured per shard in the `Upload.merging.cleanup` section:
21+
22+
```yaml
23+
Upload:
24+
merging:
25+
directory: "./storage/"
26+
cleanup:
27+
enabled: true # Enable/disable cleanup
28+
retentionDuration: "24h" # How long to keep files after processing
29+
checkInterval: "1h" # How often to run cleanup
30+
```
31+
32+
### Configuration Options
33+
34+
- **enabled**: Boolean flag to enable or disable the cleanup feature (default: `false`)
35+
- **retentionDuration**: Duration string specifying how long to retain processed files (e.g., "24h", "7d", "168h")
36+
- **checkInterval**: Duration string specifying how often to run the cleanup process (e.g., "1h", "30m")
37+
38+
### Example Configurations
39+
40+
#### Basic cleanup - 24 hour retention
41+
```yaml
42+
Upload:
43+
merging:
44+
cleanup:
45+
enabled: true
46+
retentionDuration: "24h"
47+
checkInterval: "1h"
48+
```
49+
50+
#### Aggressive cleanup - 1 hour retention
51+
```yaml
52+
Upload:
53+
merging:
54+
cleanup:
55+
enabled: true
56+
retentionDuration: "1h"
57+
checkInterval: "15m"
58+
```
59+
60+
#### Weekly cleanup - 7 day retention
61+
```yaml
62+
Upload:
63+
merging:
64+
cleanup:
65+
enabled: true
66+
retentionDuration: "168h" # 7 days
67+
checkInterval: "24h" # Daily check
68+
```
69+
70+
## Safety Features
71+
72+
The cleanup service includes several safety features to prevent accidental data loss:
73+
74+
1. **Pattern Matching**: Only directories matching the pattern `<shard-name>-YYYYMMDD-HHMMSS` are considered for cleanup
75+
2. **Upload Verification**: Directories are only deleted if they contain an `uploaded/` subdirectory with files, indicating successful processing
76+
3. **Age Check**: Files must be older than the retention duration based on the timestamp in the directory name
77+
4. **Active Directory Protection**: The `mergable/` directories used for active processing are never deleted
78+
79+
## Monitoring
80+
81+
The cleanup service exposes Prometheus metrics for monitoring:
82+
83+
- `achgateway_cleanup_runs_total`: Total number of cleanup runs executed (labeled by shard and status)
84+
- `achgateway_cleanup_directories_deleted_total`: Total number of directories deleted (labeled by shard)
85+
- `achgateway_cleanup_errors_total`: Total number of errors during cleanup (labeled by shard and error type)
86+
87+
## Logging
88+
89+
The cleanup service logs its activities at various levels:
90+
91+
- **INFO**: Service start/stop, cleanup run summaries, directory deletions
92+
- **DEBUG**: Individual directory checks
93+
- **WARN**: Non-fatal errors during directory checks
94+
- **ERROR**: Fatal errors preventing cleanup
95+
96+
Example log entries:
97+
```
98+
INFO starting cleanup service shard=production checkInterval=1h retentionDuration=24h
99+
INFO starting cleanup run shard=production
100+
INFO deleting directory shard=production directory=production-20240114-143000
101+
INFO completed cleanup run shard=production deleted=5 errors=0 duration=1.2s
102+
```

examples/getting-started/config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,7 @@ ACHGateway:
7171
return: "/returned/"
7272
merging:
7373
directory: "./storage/"
74+
cleanup:
75+
enabled: false # Set to true to enable automatic cleanup
76+
retentionDuration: "24h" # Keep files for 24 hours after processing
77+
checkInterval: "1h" # Check for files to clean up every hour

internal/pipeline/aggregate.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type aggregator struct {
6464
preuploadTransformers []transform.PreUpload
6565
outputFormatter output.Formatter
6666
alerters alerting.Alerters
67+
cleanupService *CleanupService
6768
}
6869

6970
func newAggregator(logger log.Logger, eventEmitter events.Emitter, shard service.Shard, uploadAgents service.UploadAgents, errorAlerting service.ErrorAlerting) (*aggregator, error) {
@@ -107,6 +108,21 @@ func newAggregator(logger log.Logger, eventEmitter events.Emitter, shard service
107108
return nil, fmt.Errorf("error setting up alerters: %v", err)
108109
}
109110

111+
// Create cleanup service if configured
112+
var cleanupService *CleanupService
113+
if uploadAgents.Merging.Cleanup != nil && uploadAgents.Merging.Cleanup.Enabled {
114+
// Get the storage from merger (it's a filesystemMerging which has storage)
115+
if fm, ok := merger.(*filesystemMerging); ok {
116+
cleanupService, err = NewCleanupService(logger, fm.storage, shard, uploadAgents.Merging.Cleanup)
117+
if err != nil {
118+
return nil, fmt.Errorf("error creating cleanup service: %v", err)
119+
}
120+
logger.Info().With(log.Fields{
121+
"shard": log.String(shard.Name),
122+
}).Log("setup cleanup service")
123+
}
124+
}
125+
110126
return &aggregator{
111127
logger: logger,
112128
eventEmitter: eventEmitter,
@@ -119,10 +135,16 @@ func newAggregator(logger log.Logger, eventEmitter events.Emitter, shard service
119135
preuploadTransformers: preuploadTransformers,
120136
outputFormatter: outputFormatter,
121137
alerters: alerters,
138+
cleanupService: cleanupService,
122139
}, nil
123140
}
124141

125142
func (xfagg *aggregator) Start(ctx context.Context) {
143+
// Start cleanup service if configured
144+
if xfagg.cleanupService != nil {
145+
xfagg.cleanupService.Start(ctx)
146+
}
147+
126148
for {
127149
select {
128150
// process automated cutoff time triggering
@@ -154,6 +176,10 @@ func (xfagg *aggregator) Shutdown() {
154176
"shard": log.String(xfagg.shard.Name),
155177
}).Log("shutting down xfer aggregation")
156178

179+
if xfagg.cleanupService != nil {
180+
xfagg.cleanupService.Stop()
181+
}
182+
157183
if xfagg.auditStorage != nil {
158184
xfagg.auditStorage.Close()
159185
}

0 commit comments

Comments
 (0)