Skip to content

Commit 7c44edb

Browse files
committed
Chore(cache): refactor/fix s3
- implement expiratiob (through proper meta data) - add a goroutine to run in the background
1 parent 7cb8c65 commit 7c44edb

File tree

3 files changed

+148
-19
lines changed

3 files changed

+148
-19
lines changed

cmd/bot/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,12 @@ func main() {
124124
Client: client,
125125
}
126126

127-
cacheClient := &content.CacheClientS3{
128-
MC: mc,
129-
Bucket: cacheBucket,
130-
CTX: ctx,
131-
}
127+
cacheClient := content.NewCacheClientS3(ctx, mc, cacheBucket)
128+
129+
// Initialize and start the cleanup handler
130+
cleanup := content.NewS3Cleanup(ctx, mc, cacheBucket)
131+
cleanup.Start()
132+
defer cleanup.Stop()
132133

133134
if err := content.Start(githubToken, cacheClient); err != nil {
134135
return fmt.Errorf("failed to start service: %v", err)

internal/content/s3_cache.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,46 @@ import (
1313

1414
// CacheClientS3 is a small cache that is backed by an S3-compatible store
1515
type CacheClientS3 struct {
16-
MC *minio.Client
17-
Bucket string
18-
CTX context.Context
16+
mc *minio.Client
17+
bucket string
18+
ctx context.Context
19+
defaultExpiration time.Duration
1920
}
2021

21-
func (c *CacheClientS3) Set(key string, value interface{}, exp time.Duration) error {
22-
data, err := json.Marshal(value)
23-
if err != nil {
22+
// NewCacheClientS3 creates a new S3 cache client with default settings
23+
func NewCacheClientS3(ctx context.Context, mc *minio.Client, bucket string) *CacheClientS3 {
24+
return &CacheClientS3{
25+
mc: mc,
26+
bucket: bucket,
27+
ctx: ctx,
28+
defaultExpiration: 24 * time.Hour,
29+
}
30+
}
31+
32+
func (c *CacheClientS3) Set(key string, value any, exp time.Duration) error {
33+
var data bytes.Buffer
34+
if err := json.NewEncoder(&data).Encode(value); err != nil {
2435
return err
2536
}
2637

27-
r := bytes.NewReader(data)
38+
r := bytes.NewReader(data.Bytes())
39+
40+
// Use the provided expiration time or fall back to default
41+
expiration := exp
42+
if expiration == 0 {
43+
expiration = c.defaultExpiration
44+
}
45+
46+
// Calculate the expiration time
47+
expiresAt := time.Now().Add(expiration)
2848

29-
_, err = c.MC.PutObject(c.CTX, c.Bucket, key, r, int64(r.Len()), minio.PutObjectOptions{
30-
Expires: time.Now().Add(exp),
49+
// Set metadata to track expiration
50+
metadata := map[string]string{
51+
"expires-at": expiresAt.Format(time.RFC3339),
52+
}
53+
54+
_, err := c.mc.PutObject(c.ctx, c.bucket, key, r, int64(r.Len()), minio.PutObjectOptions{
55+
UserMetadata: metadata,
3156
})
3257
return err
3358
}
@@ -36,17 +61,27 @@ func (c *CacheClientS3) Set(key string, value interface{}, exp time.Duration) er
3661
// does not exist, in other case we can use minio.ToErrorResponse(err) to extract more details about the
3762
// potential S3 related error
3863
func (c *CacheClientS3) Get(key string) (string, error) {
39-
if _, err := c.MC.StatObject(c.CTX, c.Bucket, key, minio.StatObjectOptions{}); err != nil {
64+
// First check if object exists and get its metadata
65+
objInfo, err := c.mc.StatObject(c.ctx, c.bucket, key, minio.StatObjectOptions{})
66+
if err != nil {
4067
return "", redis.Nil
4168
}
4269

43-
object, err := c.MC.GetObject(c.CTX, c.Bucket, key, minio.GetObjectOptions{})
70+
if expiresAt, ok := objInfo.UserMetadata["expires-at"]; ok {
71+
expTime, err := time.Parse(time.RFC3339, expiresAt)
72+
if err == nil && time.Now().After(expTime) {
73+
// Object has expired, delete it and return not found
74+
_ = c.Del(key) // Ignore delete error
75+
return "", redis.Nil
76+
}
77+
}
78+
79+
object, err := c.mc.GetObject(c.ctx, c.bucket, key, minio.GetObjectOptions{})
4480
if err != nil {
4581
return "", err
4682
}
4783

4884
var val any
49-
5085
if err := json.NewDecoder(object).Decode(&val); err != nil {
5186
return "", err
5287
}
@@ -63,12 +98,11 @@ func (c *CacheClientS3) Get(key string) (string, error) {
6398
}
6499

65100
func (c *CacheClientS3) Del(key string) error {
66-
return c.MC.RemoveObject(c.CTX, c.Bucket, key, minio.RemoveObjectOptions{
101+
return c.mc.RemoveObject(c.ctx, c.bucket, key, minio.RemoveObjectOptions{
67102
ForceDelete: true,
68103
})
69104
}
70105

71106
func (c *CacheClientS3) Scan(key string, action func(context.Context, string) error) error {
72-
73107
return nil
74108
}

internal/content/s3_cleanup.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package content
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"time"
7+
8+
"github.com/minio/minio-go/v7"
9+
)
10+
11+
// S3Cleanup handles background cleanup of expired objects in S3
12+
type S3Cleanup struct {
13+
mc *minio.Client
14+
bucket string
15+
ctx context.Context
16+
// cleanupInterval is how often to run the cleanup routine
17+
cleanupInterval time.Duration
18+
// stopCleanup is used to signal the cleanup routine to stop
19+
stopCleanup chan struct{}
20+
}
21+
22+
// NewS3Cleanup creates a new S3 cleanup handler
23+
func NewS3Cleanup(ctx context.Context, mc *minio.Client, bucket string) *S3Cleanup {
24+
return &S3Cleanup{
25+
mc: mc,
26+
bucket: bucket,
27+
ctx: ctx,
28+
cleanupInterval: 24 * time.Hour,
29+
stopCleanup: make(chan struct{}),
30+
}
31+
}
32+
33+
// Start begins the background cleanup routine
34+
func (c *S3Cleanup) Start() {
35+
go c.cleanupRoutine()
36+
}
37+
38+
// Stop stops the background cleanup routine
39+
func (c *S3Cleanup) Stop() {
40+
close(c.stopCleanup)
41+
}
42+
43+
// cleanupRoutine periodically checks for and deletes expired objects
44+
func (c *S3Cleanup) cleanupRoutine() {
45+
ticker := time.NewTicker(c.cleanupInterval)
46+
defer ticker.Stop()
47+
48+
for {
49+
select {
50+
case <-ticker.C:
51+
if err := c.cleanupExpired(); err != nil {
52+
slog.Error("failed to cleanup expired objects", "error", err)
53+
}
54+
case <-c.stopCleanup:
55+
return
56+
}
57+
}
58+
}
59+
60+
// cleanupExpired scans the bucket for expired objects and deletes them
61+
func (c *S3Cleanup) cleanupExpired() error {
62+
filter := minio.ListObjectsOptions{
63+
Recursive: true,
64+
WithMetadata: true,
65+
}
66+
67+
objectsCh := c.mc.ListObjects(c.ctx, c.bucket, filter)
68+
for obj := range objectsCh {
69+
if obj.Err != nil {
70+
continue
71+
}
72+
73+
// Check if object has expiration metadata
74+
if expiresAt, ok := obj.UserMetadata["expires-at"]; !ok {
75+
continue
76+
} else {
77+
expTime, err := time.Parse(time.RFC3339, expiresAt)
78+
if err != nil {
79+
continue
80+
}
81+
if time.Now().After(expTime) {
82+
// Object has expired, delete it
83+
if err := c.mc.RemoveObject(c.ctx, c.bucket, obj.Key, minio.RemoveObjectOptions{
84+
ForceDelete: true,
85+
}); err != nil {
86+
slog.Error("failed to delete expired object",
87+
"key", obj.Key,
88+
"error", err)
89+
}
90+
}
91+
}
92+
}
93+
return nil
94+
}

0 commit comments

Comments
 (0)