Skip to content

Commit 993d8f7

Browse files
authored
feat: Add retention size limit for rotated files (#1040)
* feat: Add retention size limit for rotated files This adds a new flag to the observer command to limit the size of the rotated files. When the rotated files exceed this size, the oldest files are deleted. Signed-off-by: Jakub Sztandera <[email protected]> * address review Signed-off-by: Jakub Sztandera <[email protected]> --------- Signed-off-by: Jakub Sztandera <[email protected]>
1 parent e7f84e7 commit 993d8f7

File tree

3 files changed

+62
-8
lines changed

3 files changed

+62
-8
lines changed

cmd/f3/observer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ var observerCmd = cli.Command{
6666
Usage: "The maximum length of time to keep the rotated files.",
6767
Value: 2 * 7 * 24 * time.Hour,
6868
},
69+
&cli.Uint64Flag{
70+
Name: "retentionSize",
71+
Usage: "The maximum size of the rotated files in megabytes. If not set, no limit is applied.",
72+
Value: 0,
73+
},
6974
&cli.StringFlag{
7075
Name: "dataSourceName",
7176
Usage: "The observer database DSN",
@@ -144,7 +149,9 @@ var observerCmd = cli.Command{
144149
observer.WithMaxBatchSize(cctx.Int("maxBatchSize")),
145150
observer.WithMaxBatchDelay(cctx.Duration("maxBatchDelay")),
146151
observer.WithChainExchangeMaxMessageAge(cctx.Duration("chainExchangeMaxMessageAge")),
152+
observer.WithMaxRetentionSize(cctx.Uint64("retentionSize") * 1024 * 1024),
147153
}
154+
148155
var identity crypto.PrivKey
149156
if cctx.IsSet("identity") {
150157
marshaledKey, err := os.ReadFile(cctx.String("identity"))

observer/observer.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88
"encoding/json"
99
"errors"
1010
"fmt"
11+
"io/fs"
1112
"net/http"
1213
"os"
1314
"path"
1415
"path/filepath"
16+
"sort"
1517
"sync"
1618
"time"
1719

@@ -527,7 +529,8 @@ func (o *Observer) rotateMessages(ctx context.Context) error {
527529
if err != nil {
528530
return err
529531
}
530-
var foundAtLeastOneParquet bool
532+
retainedSize := int64(0)
533+
var retained []fs.FileInfo
531534
for _, entry := range dir {
532535
if !entry.IsDir() && filepath.Ext(entry.Name()) == ".parquet" {
533536
info, err := entry.Info()
@@ -538,15 +541,42 @@ func (o *Observer) rotateMessages(ctx context.Context) error {
538541
if err := os.Remove(filepath.Join(o.rotatePath, entry.Name())); err != nil {
539542
logger.Errorw("Failed to remove retention policy for file", "file", entry.Name(), "err", err)
540543
} else {
541-
logger.Infow("Removed old file", "olderThan", o.retention, "file", entry.Name())
544+
logger.Infow("Removed file due to time retention policy", "olderThan", o.retention, "file", entry.Name())
542545
}
543546
} else {
544-
foundAtLeastOneParquet = true
547+
retainedSize += info.Size()
548+
retained = append(retained, info)
545549
}
546550
}
547551
}
548552

549-
return o.createOrReplaceMessagesView(ctx, foundAtLeastOneParquet)
553+
logger.Infow("Retention size", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize)
554+
555+
if o.maxRetentionSize > 0 && retainedSize > o.maxRetentionSize {
556+
logger.Infow("Retention size exceeded, deleting oldest files", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize)
557+
// sort retained by modification time, oldest last
558+
sort.Slice(retained, func(i, j int) bool {
559+
return retained[i].ModTime().After(retained[j].ModTime())
560+
})
561+
// iterate in reverse order to delete oldest first
562+
for i := len(retained) - 1; i >= 0; i-- {
563+
fi := retained[i]
564+
if retainedSize < o.maxRetentionSize {
565+
break
566+
}
567+
if err := os.Remove(filepath.Join(o.rotatePath, fi.Name())); err != nil {
568+
logger.Errorw("Failed to remove retention policy for file",
569+
"file", fi.Name(), "err", err)
570+
} else {
571+
logger.Infow("Removed file due to size retention policy",
572+
"size", fi.Size(), "file", fi.Name())
573+
retainedSize -= fi.Size()
574+
retained = retained[:i]
575+
}
576+
}
577+
}
578+
579+
return o.createOrReplaceMessagesView(ctx, len(retained) > 0)
550580
}
551581

552582
func (o *Observer) listenAndServeQueries() error {

observer/options.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package observer
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"time"
78

89
"github.com/filecoin-project/go-f3/blssig"
@@ -13,7 +14,7 @@ import (
1314
"github.com/libp2p/go-libp2p/core/host"
1415
"github.com/libp2p/go-libp2p/core/peer"
1516
"github.com/multiformats/go-multiaddr"
16-
"github.com/multiformats/go-multiaddr-dns"
17+
madns "github.com/multiformats/go-multiaddr-dns"
1718
)
1819

1920
type Option func(*options) error
@@ -35,9 +36,10 @@ type options struct {
3536
queryServerListenAddress string
3637
queryServerReadTimeout time.Duration
3738

38-
rotatePath string
39-
rotateInterval time.Duration
40-
retention time.Duration
39+
rotatePath string
40+
rotateInterval time.Duration
41+
retention time.Duration
42+
maxRetentionSize int64
4143

4244
pubSub *pubsub.PubSub
4345
pubSubValidatorDisabled bool
@@ -81,6 +83,7 @@ func newOptions(opts ...Option) (*options, error) {
8183
finalityCertsMaxPollInterval: 2 * time.Minute,
8284
chainExchangeBufferSize: 1000,
8385
chainExchangeMaxMessageAge: 3 * time.Minute,
86+
maxRetentionSize: 0,
8487
}
8588
for _, apply := range opts {
8689
if err := apply(&opt); err != nil {
@@ -270,6 +273,20 @@ func WithRetention(retention time.Duration) Option {
270273
}
271274
}
272275

276+
// WithMaxRetentionSize sets the maximum size of the retention directory.
277+
// This is weakly enforced, and the directory may grow larger than this
278+
// size. If the directory grows larger than this size, the oldest files
279+
// will be deleted until the directory size is below this size.
280+
func WithMaxRetentionSize(size uint64) Option {
281+
return func(o *options) error {
282+
if size > math.MaxInt64 {
283+
return fmt.Errorf("max retention size must be less than or equal to %d", math.MaxInt64)
284+
}
285+
o.maxRetentionSize = int64(size)
286+
return nil
287+
}
288+
}
289+
273290
func WithDataSourceName(dataSourceName string) Option {
274291
return func(o *options) error {
275292
o.dataSourceName = dataSourceName

0 commit comments

Comments
 (0)