Skip to content

Commit 3f75f6c

Browse files
gcmsgclaude
andcommitted
feat: add data retention and auto-cleanup mechanism
Add configurable retention policies for reputation_events, invocations, and abuse_reports tables. Includes PruneEvents/PruneInvocations/ PruneResolvedReports store methods (SQLite + Postgres), a new retention.Service that coordinates cleanup, and a ticker goroutine in main.go. Defaults: 90d reputation events, 30d invocations, 365d resolved abuse reports, 1h cleanup interval. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1b8f48b commit 3f75f6c

File tree

13 files changed

+439
-2
lines changed

13 files changed

+439
-2
lines changed

cmd/peerclawd/main.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/peerclaw/peerclaw-server/internal/observability"
2626
"github.com/peerclaw/peerclaw-server/internal/registry"
2727
"github.com/peerclaw/peerclaw-server/internal/reputation"
28+
"github.com/peerclaw/peerclaw-server/internal/retention"
2829
"github.com/peerclaw/peerclaw-server/internal/router"
2930
"github.com/peerclaw/peerclaw-server/internal/server"
3031
"github.com/peerclaw/peerclaw-server/internal/signaling"
@@ -152,8 +153,9 @@ func main() {
152153

153154
// Initialize invocation store.
154155
var invocationService *invocation.Service
156+
var invStore invocation.Store
155157
if sqlDB != nil {
156-
invStore := invocation.NewStore(cfg.Database.Driver, sqlDB)
158+
invStore = invocation.NewStore(cfg.Database.Driver, sqlDB)
157159
if err := invStore.Migrate(context.Background()); err != nil {
158160
logger.Error("failed to migrate invocation tables", "error", err)
159161
os.Exit(1)
@@ -164,8 +166,9 @@ func main() {
164166

165167
// Initialize review service.
166168
var reviewService *review.Service
169+
var revStore review.Store
167170
if sqlDB != nil {
168-
revStore := review.NewStore(cfg.Database.Driver, sqlDB)
171+
revStore = review.NewStore(cfg.Database.Driver, sqlDB)
169172
if err := revStore.Migrate(context.Background()); err != nil {
170173
logger.Error("failed to migrate review tables", "error", err)
171174
os.Exit(1)
@@ -440,6 +443,39 @@ func main() {
440443
logger.Info("email verification cleanup goroutine started", "interval", "1h")
441444
}
442445

446+
// Start data retention cleanup goroutine.
447+
if cfg.Retention.Enabled {
448+
retentionInterval, err := time.ParseDuration(cfg.Retention.CleanupInterval)
449+
if err != nil {
450+
retentionInterval = 1 * time.Hour
451+
}
452+
retentionSvc := retention.NewService(repStore, invStore, revStore, retention.Config{
453+
ReputationEventsDays: cfg.Retention.ReputationEventsDays,
454+
InvocationsDays: cfg.Retention.InvocationsDays,
455+
AbuseReportsDays: cfg.Retention.AbuseReportsDays,
456+
}, logger)
457+
go func() {
458+
ticker := time.NewTicker(retentionInterval)
459+
defer ticker.Stop()
460+
// Run once on startup.
461+
retentionSvc.RunOnce(ctx)
462+
for {
463+
select {
464+
case <-ctx.Done():
465+
return
466+
case <-ticker.C:
467+
retentionSvc.RunOnce(ctx)
468+
}
469+
}
470+
}()
471+
logger.Info("data retention cleanup started",
472+
"interval", retentionInterval,
473+
"reputation_events_days", cfg.Retention.ReputationEventsDays,
474+
"invocations_days", cfg.Retention.InvocationsDays,
475+
"abuse_reports_days", cfg.Retention.AbuseReportsDays,
476+
)
477+
}
478+
443479
var wg sync.WaitGroup
444480
errCh := make(chan error, 1)
445481

internal/config/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Config struct {
2323
Auth AuthConfig `yaml:"auth"`
2424
UserAuth UserAuthConfig `yaml:"user_auth"`
2525
SMTP SMTPConfig `yaml:"smtp"`
26+
Retention RetentionConfig `yaml:"retention"`
2627
}
2728

2829
// SMTPConfig holds SMTP email settings.
@@ -35,6 +36,15 @@ type SMTPConfig struct {
3536
TLS bool `yaml:"tls"` // default true
3637
}
3738

39+
// RetentionConfig holds data retention and auto-cleanup settings.
40+
type RetentionConfig struct {
41+
Enabled bool `yaml:"enabled"`
42+
ReputationEventsDays int `yaml:"reputation_events_days"`
43+
InvocationsDays int `yaml:"invocations_days"`
44+
AbuseReportsDays int `yaml:"abuse_reports_days"`
45+
CleanupInterval string `yaml:"cleanup_interval"`
46+
}
47+
3848
// AuthConfig holds authentication settings.
3949
type AuthConfig struct {
4050
Required bool `yaml:"required"` // When true, reject unauthenticated requests. Default true.
@@ -196,6 +206,13 @@ func DefaultConfig() *Config {
196206
RefreshTTL: "168h",
197207
BcryptCost: 12,
198208
},
209+
Retention: RetentionConfig{
210+
Enabled: true,
211+
ReputationEventsDays: 90,
212+
InvocationsDays: 30,
213+
AbuseReportsDays: 365,
214+
CleanupInterval: "1h",
215+
},
199216
}
200217
}
201218

internal/invocation/postgres.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,18 @@ func (s *PostgresStore) CountInvocations(ctx context.Context) (int, error) {
329329
return count, err
330330
}
331331

332+
// PruneInvocations deletes invocation records older than the given time.
333+
func (s *PostgresStore) PruneInvocations(ctx context.Context, olderThan time.Time) (int64, error) {
334+
res, err := s.db.ExecContext(ctx,
335+
`DELETE FROM invocations WHERE created_at < $1`,
336+
olderThan.UTC(),
337+
)
338+
if err != nil {
339+
return 0, err
340+
}
341+
return res.RowsAffected()
342+
}
343+
332344
func (s *PostgresStore) Close() error {
333345
return nil
334346
}

internal/invocation/sqlite.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,18 @@ func (s *SQLiteStore) CountInvocations(ctx context.Context) (int, error) {
342342
return count, err
343343
}
344344

345+
// PruneInvocations deletes invocation records older than the given time.
346+
func (s *SQLiteStore) PruneInvocations(ctx context.Context, olderThan time.Time) (int64, error) {
347+
res, err := s.db.ExecContext(ctx,
348+
`DELETE FROM invocations WHERE created_at < ?`,
349+
olderThan.UTC().Format(time.RFC3339),
350+
)
351+
if err != nil {
352+
return 0, err
353+
}
354+
return res.RowsAffected()
355+
}
356+
345357
func (s *SQLiteStore) Close() error {
346358
return nil
347359
}

internal/invocation/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ type Store interface {
8383
// TopAgents returns the top agents by call count since the given time.
8484
TopAgents(ctx context.Context, since time.Time, limit int) ([]AgentCallStats, error)
8585

86+
// PruneInvocations deletes invocation records older than the given time.
87+
// Returns the number of deleted rows.
88+
PruneInvocations(ctx context.Context, olderThan time.Time) (int64, error)
89+
8690
// CountInvocations returns the total number of invocations.
8791
CountInvocations(ctx context.Context) (int, error)
8892

internal/reputation/postgres.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,18 @@ func (s *PostgresStore) ListStaleOnlineAgents(ctx context.Context, timeout time.
197197
return ids, rows.Err()
198198
}
199199

200+
// PruneEvents deletes reputation events older than the given time.
201+
func (s *PostgresStore) PruneEvents(ctx context.Context, olderThan time.Time) (int64, error) {
202+
res, err := s.db.ExecContext(ctx,
203+
`DELETE FROM reputation_events WHERE created_at < $1`,
204+
olderThan.UTC(),
205+
)
206+
if err != nil {
207+
return 0, err
208+
}
209+
return res.RowsAffected()
210+
}
211+
200212
// Close is a no-op since the db is shared with the registry store.
201213
func (s *PostgresStore) Close() error {
202214
return nil

internal/reputation/sqlite.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,18 @@ func (s *SQLiteStore) ListStaleOnlineAgents(ctx context.Context, timeout time.Du
216216
return ids, rows.Err()
217217
}
218218

219+
// PruneEvents deletes reputation events older than the given time.
220+
func (s *SQLiteStore) PruneEvents(ctx context.Context, olderThan time.Time) (int64, error) {
221+
res, err := s.db.ExecContext(ctx,
222+
`DELETE FROM reputation_events WHERE created_at < ?`,
223+
olderThan.UTC().Format(time.RFC3339),
224+
)
225+
if err != nil {
226+
return 0, err
227+
}
228+
return res.RowsAffected()
229+
}
230+
219231
// Close is a no-op since the db is shared with the registry store.
220232
func (s *SQLiteStore) Close() error {
221233
return nil

internal/reputation/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type Store interface {
6060
// whose last heartbeat is older than the given timeout.
6161
ListStaleOnlineAgents(ctx context.Context, timeout time.Duration) ([]string, error)
6262

63+
// PruneEvents deletes reputation events older than the given time.
64+
// Returns the number of deleted rows.
65+
PruneEvents(ctx context.Context, olderThan time.Time) (int64, error)
66+
6367
// Migrate creates the required tables and columns.
6468
Migrate(ctx context.Context) error
6569

internal/retention/service.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package retention
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"time"
7+
8+
"github.com/peerclaw/peerclaw-server/internal/invocation"
9+
"github.com/peerclaw/peerclaw-server/internal/reputation"
10+
"github.com/peerclaw/peerclaw-server/internal/review"
11+
)
12+
13+
// Config holds retention policy settings.
14+
type Config struct {
15+
ReputationEventsDays int
16+
InvocationsDays int
17+
AbuseReportsDays int
18+
}
19+
20+
// PruneResult holds the number of rows deleted from each table.
21+
type PruneResult struct {
22+
ReputationEvents int64
23+
Invocations int64
24+
AbuseReports int64
25+
}
26+
27+
// Service coordinates data retention cleanup across stores.
28+
type Service struct {
29+
repStore reputation.Store
30+
invStore invocation.Store
31+
revStore review.Store
32+
config Config
33+
logger *slog.Logger
34+
}
35+
36+
// NewService creates a new retention service. Any store may be nil.
37+
func NewService(repStore reputation.Store, invStore invocation.Store, revStore review.Store, cfg Config, logger *slog.Logger) *Service {
38+
return &Service{
39+
repStore: repStore,
40+
invStore: invStore,
41+
revStore: revStore,
42+
config: cfg,
43+
logger: logger,
44+
}
45+
}
46+
47+
// RunOnce executes a single cleanup pass, deleting expired data from all configured stores.
48+
func (s *Service) RunOnce(ctx context.Context) (*PruneResult, error) {
49+
now := time.Now().UTC()
50+
result := &PruneResult{}
51+
52+
if s.repStore != nil && s.config.ReputationEventsDays > 0 {
53+
cutoff := now.AddDate(0, 0, -s.config.ReputationEventsDays)
54+
n, err := s.repStore.PruneEvents(ctx, cutoff)
55+
if err != nil {
56+
s.logger.Error("retention: failed to prune reputation events", "error", err)
57+
return result, err
58+
}
59+
result.ReputationEvents = n
60+
}
61+
62+
if s.invStore != nil && s.config.InvocationsDays > 0 {
63+
cutoff := now.AddDate(0, 0, -s.config.InvocationsDays)
64+
n, err := s.invStore.PruneInvocations(ctx, cutoff)
65+
if err != nil {
66+
s.logger.Error("retention: failed to prune invocations", "error", err)
67+
return result, err
68+
}
69+
result.Invocations = n
70+
}
71+
72+
if s.revStore != nil && s.config.AbuseReportsDays > 0 {
73+
cutoff := now.AddDate(0, 0, -s.config.AbuseReportsDays)
74+
n, err := s.revStore.PruneResolvedReports(ctx, cutoff)
75+
if err != nil {
76+
s.logger.Error("retention: failed to prune abuse reports", "error", err)
77+
return result, err
78+
}
79+
result.AbuseReports = n
80+
}
81+
82+
if result.ReputationEvents > 0 || result.Invocations > 0 || result.AbuseReports > 0 {
83+
s.logger.Info("retention cleanup completed",
84+
"reputation_events", result.ReputationEvents,
85+
"invocations", result.Invocations,
86+
"abuse_reports", result.AbuseReports,
87+
)
88+
}
89+
90+
return result, nil
91+
}

0 commit comments

Comments
 (0)