Skip to content

Commit b819d6d

Browse files
committed
Chore(cache): refactor/fix s3
- implement expiration (through proper meta data) - add a goroutine to run in the background
1 parent 7cb8c65 commit b819d6d

File tree

5 files changed

+185
-20
lines changed

5 files changed

+185
-20
lines changed

cmd/bot/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,12 @@ func main() {
124124
Client: client,
125125
}
126126

127-
cacheClient := &content.CacheClientS3{
128-
MC: mc,
129-
Bucket: cacheBucket,
130-
CTX: ctx,
131-
}
127+
cacheClient := content.NewCacheClientS3(ctx, mc, cacheBucket)
128+
129+
// Initialize and start the cleanup handler
130+
cleanup := content.NewS3Cleanup(ctx, mc, cacheBucket)
131+
cleanup.Start()
132+
defer cleanup.Stop()
132133

133134
if err := content.Start(githubToken, cacheClient); err != nil {
134135
return fmt.Errorf("failed to start service: %v", err)

internal/content/content.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package content
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"log/slog"
78
"strings"
89

910
"github.com/ezeoleaf/larry/cache"
1011
"github.com/ezeoleaf/larry/config"
1112
"github.com/ezeoleaf/larry/provider/github"
1213
"github.com/till/golangoss-bluesky/internal/bluesky"
14+
"github.com/till/golangoss-bluesky/internal/utils"
1315
)
1416

1517
var (
@@ -29,7 +31,7 @@ func Start(token string, cacheClient cache.Client) error {
2931
func Do(ctx context.Context, c bluesky.Client) error {
3032
p, err := provider.GetContentToPublish()
3133
if err != nil {
32-
slog.Error("error fetching content", slog.Any("err", err))
34+
utils.LogError(fmt.Errorf("error fetching content: %w", err))
3335
return ErrCouldNotContent
3436
}
3537

internal/content/s3_cache.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,46 @@ import (
1313

1414
// CacheClientS3 is a small cache that is backed by an S3-compatible store
1515
type CacheClientS3 struct {
16-
MC *minio.Client
17-
Bucket string
18-
CTX context.Context
16+
mc *minio.Client
17+
bucket string
18+
ctx context.Context
19+
defaultExpiration time.Duration
1920
}
2021

21-
func (c *CacheClientS3) Set(key string, value interface{}, exp time.Duration) error {
22-
data, err := json.Marshal(value)
23-
if err != nil {
22+
// NewCacheClientS3 creates a new S3 cache client with default settings
23+
func NewCacheClientS3(ctx context.Context, mc *minio.Client, bucket string) *CacheClientS3 {
24+
return &CacheClientS3{
25+
mc: mc,
26+
bucket: bucket,
27+
ctx: ctx,
28+
defaultExpiration: 24 * time.Hour,
29+
}
30+
}
31+
32+
func (c *CacheClientS3) Set(key string, value any, exp time.Duration) error {
33+
var data bytes.Buffer
34+
if err := json.NewEncoder(&data).Encode(value); err != nil {
2435
return err
2536
}
2637

27-
r := bytes.NewReader(data)
38+
r := bytes.NewReader(data.Bytes())
39+
40+
// Use the provided expiration time or fall back to default
41+
expiration := exp
42+
if expiration == 0 {
43+
expiration = c.defaultExpiration
44+
}
45+
46+
// Calculate the expiration time
47+
expiresAt := time.Now().Add(expiration)
2848

29-
_, err = c.MC.PutObject(c.CTX, c.Bucket, key, r, int64(r.Len()), minio.PutObjectOptions{
30-
Expires: time.Now().Add(exp),
49+
// Set metadata to track expiration
50+
metadata := map[string]string{
51+
"expires-at": expiresAt.Format(time.RFC3339),
52+
}
53+
54+
_, err := c.mc.PutObject(c.ctx, c.bucket, key, r, int64(r.Len()), minio.PutObjectOptions{
55+
UserMetadata: metadata,
3156
})
3257
return err
3358
}
@@ -36,17 +61,27 @@ func (c *CacheClientS3) Set(key string, value interface{}, exp time.Duration) er
3661
// does not exist, in other case we can use minio.ToErrorResponse(err) to extract more details about the
3762
// potential S3 related error
3863
func (c *CacheClientS3) Get(key string) (string, error) {
39-
if _, err := c.MC.StatObject(c.CTX, c.Bucket, key, minio.StatObjectOptions{}); err != nil {
64+
// First check if object exists and get its metadata
65+
objInfo, err := c.mc.StatObject(c.ctx, c.bucket, key, minio.StatObjectOptions{})
66+
if err != nil {
4067
return "", redis.Nil
4168
}
4269

43-
object, err := c.MC.GetObject(c.CTX, c.Bucket, key, minio.GetObjectOptions{})
70+
if expiresAt, ok := objInfo.UserMetadata["expires-at"]; ok {
71+
expTime, err := time.Parse(time.RFC3339, expiresAt)
72+
if err == nil && time.Now().After(expTime) {
73+
// Object has expired, delete it and return not found
74+
_ = c.Del(key) // Ignore delete error
75+
return "", redis.Nil
76+
}
77+
}
78+
79+
object, err := c.mc.GetObject(c.ctx, c.bucket, key, minio.GetObjectOptions{})
4480
if err != nil {
4581
return "", err
4682
}
4783

4884
var val any
49-
5085
if err := json.NewDecoder(object).Decode(&val); err != nil {
5186
return "", err
5287
}
@@ -63,12 +98,11 @@ func (c *CacheClientS3) Get(key string) (string, error) {
6398
}
6499

65100
func (c *CacheClientS3) Del(key string) error {
66-
return c.MC.RemoveObject(c.CTX, c.Bucket, key, minio.RemoveObjectOptions{
101+
return c.mc.RemoveObject(c.ctx, c.bucket, key, minio.RemoveObjectOptions{
67102
ForceDelete: true,
68103
})
69104
}
70105

71106
func (c *CacheClientS3) Scan(key string, action func(context.Context, string) error) error {
72-
73107
return nil
74108
}

internal/content/s3_cleanup.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package content
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/minio/minio-go/v7"
9+
"github.com/till/golangoss-bluesky/internal/utils"
10+
)
11+
12+
// S3Cleanup handles background cleanup of expired objects in S3
13+
type S3Cleanup struct {
14+
mc *minio.Client
15+
bucket string
16+
// cleanupInterval is how often to run the cleanup routine
17+
cleanupInterval time.Duration
18+
// stopCleanup is used to signal the cleanup routine to stop
19+
stopCleanup chan struct{}
20+
}
21+
22+
// NewS3Cleanup creates a new S3 cleanup handler
23+
func NewS3Cleanup(mc *minio.Client, bucket string) *S3Cleanup {
24+
return &S3Cleanup{
25+
mc: mc,
26+
bucket: bucket,
27+
cleanupInterval: 24 * time.Hour,
28+
stopCleanup: make(chan struct{}),
29+
}
30+
}
31+
32+
// Start begins the background cleanup routine
33+
func (c *S3Cleanup) Start(ctx context.Context) {
34+
go c.cleanupRoutine(ctx)
35+
}
36+
37+
// Stop stops the background cleanup routine
38+
func (c *S3Cleanup) Stop() {
39+
close(c.stopCleanup)
40+
}
41+
42+
// cleanupRoutine periodically checks for and deletes expired objects
43+
func (c *S3Cleanup) cleanupRoutine(ctx context.Context) {
44+
ticker := time.NewTicker(c.cleanupInterval)
45+
defer ticker.Stop()
46+
47+
for {
48+
select {
49+
case <-ticker.C:
50+
if err := c.cleanupExpired(ctx); err != nil {
51+
utils.LogErrorWithContext(ctx, fmt.Errorf("failed to cleanup expired objects: %w", err))
52+
}
53+
case <-c.stopCleanup:
54+
return
55+
}
56+
}
57+
}
58+
59+
// cleanupExpired scans the bucket for expired objects and deletes them
60+
func (c *S3Cleanup) cleanupExpired(ctx context.Context) error {
61+
filter := minio.ListObjectsOptions{
62+
Recursive: true,
63+
WithMetadata: true,
64+
}
65+
66+
objectsCh := c.mc.ListObjects(ctx, c.bucket, filter)
67+
for obj := range objectsCh {
68+
if obj.Err != nil {
69+
utils.LogErrorWithContext(ctx, fmt.Errorf("failed to list objects: %w", obj.Err))
70+
continue
71+
}
72+
73+
// Check if object has expiration metadata
74+
expiresAt, ok := obj.UserMetadata["expires-at"]
75+
if !ok {
76+
utils.LogErrorWithContext(
77+
ctx,
78+
fmt.Errorf("no expiration metadata found for object: %s", obj.Key),
79+
)
80+
if err := c.delete(ctx, obj); err != nil {
81+
return err
82+
}
83+
continue
84+
}
85+
86+
expTime, err := time.Parse(time.RFC3339, expiresAt)
87+
if err != nil {
88+
utils.LogError(fmt.Errorf("failed to parse expiration time (%s): %w", obj.Key, err))
89+
continue
90+
}
91+
if !time.Now().After(expTime) {
92+
continue
93+
}
94+
95+
// Object has expired, delete it
96+
if err := c.delete(ctx, obj); err != nil {
97+
return err
98+
}
99+
}
100+
return nil
101+
}
102+
103+
func (c *S3Cleanup) delete(ctx context.Context, obj minio.ObjectInfo) error {
104+
if err := c.mc.RemoveObject(ctx, c.bucket, obj.Key, minio.RemoveObjectOptions{
105+
ForceDelete: true,
106+
}); err != nil {
107+
return fmt.Errorf("failed to delete expired object (%s): %w", obj.Key, err)
108+
}
109+
return nil
110+
}

internal/utils/utils.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
)
7+
8+
func LogError(err error) {
9+
slog.Error(err.Error(), LogAttr(err))
10+
}
11+
12+
func LogErrorWithContext(ctx context.Context, err error) {
13+
slog.ErrorContext(ctx, err.Error(), LogAttr(err))
14+
}
15+
16+
func LogAttr(err error) slog.Attr {
17+
return slog.Any("error", err)
18+
}

0 commit comments

Comments
 (0)