Skip to content

Commit 4532155

Browse files
committed
feat(infra): add object tagging support for PutObject and ListObjects
1 parent 263a75b commit 4532155

File tree

6 files changed

+166
-58
lines changed

6 files changed

+166
-58
lines changed

backend/infra/contract/storage/option.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,23 @@ type PutOption struct {
3838
ContentDisposition *string
3939
ContentLanguage *string
4040
Expires *time.Time
41+
Tagging map[string]string
4142
ObjectSize int64
4243
}
4344

4445
type PutOptFn func(option *PutOption)
4546

47+
func WithTagging(tag map[string]string) PutOptFn {
48+
return func(o *PutOption) {
49+
if len(tag) > 0 {
50+
o.Tagging = make(map[string]string, len(tag))
51+
for k, v := range tag {
52+
o.Tagging[k] = v
53+
}
54+
}
55+
}
56+
}
57+
4658
func WithContentType(v string) PutOptFn {
4759
return func(o *PutOption) {
4860
o.ContentType = &v

backend/infra/contract/storage/storage.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ import (
2424

2525
//go:generate mockgen -destination ../../../internal/mock/infra/contract/storage/storage_mock.go -package mock -source storage.go Factory
2626
type Storage interface {
27+
// PutObject puts the object with the specified key.
2728
PutObject(ctx context.Context, objectKey string, content []byte, opts ...PutOptFn) error
29+
// PutObjectWithReader puts the object with the specified key.
2830
PutObjectWithReader(ctx context.Context, objectKey string, content io.Reader, opts ...PutOptFn) error
31+
// GetObject returns the object with the specified key.
2932
GetObject(ctx context.Context, objectKey string) ([]byte, error)
33+
// DeleteObject deletes the object with the specified key.
3034
DeleteObject(ctx context.Context, objectKey string) error
35+
// GetObjectUrl returns a presigned URL for the object.
36+
// The URL is valid for the specified duration.
3137
GetObjectUrl(ctx context.Context, objectKey string, opts ...GetOptFn) (string, error)
3238
// ListObjects returns all objects with the specified prefix.
3339
// It may return a large number of objects, consider using ListObjectsPaginated for better performance.
34-
ListObjects(ctx context.Context, prefix string) ([]*FileInfo, error)
35-
40+
ListObjects(ctx context.Context, prefix string, withTagging bool) ([]*FileInfo, error)
3641
// ListObjectsPaginated returns objects with pagination support.
3742
// Use this method when dealing with large number of objects.
3843
ListObjectsPaginated(ctx context.Context, input *ListObjectsPaginatedInput) (*ListObjectsPaginatedOutput, error)
@@ -50,6 +55,8 @@ type ListObjectsPaginatedInput struct {
5055
Prefix string
5156
PageSize int
5257
Cursor string
58+
// Include objects tagging in the listing
59+
WithTagging bool
5360
}
5461

5562
type ListObjectsPaginatedOutput struct {
@@ -64,4 +71,5 @@ type FileInfo struct {
6471
LastModified time.Time
6572
ETag string
6673
Size int64
74+
Tagging map[string]string
6775
}

backend/infra/impl/storage/minio/minio.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,34 +99,45 @@ func (m *minioClient) test() {
9999
ctx := context.Background()
100100
objectName := fmt.Sprintf("test-file-%d.txt", rand.Int())
101101

102-
m.ListObjects(ctx, "")
102+
err := m.PutObject(ctx, objectName, []byte("hello content"),
103+
storage.WithContentType("text/plain"), storage.WithTagging(map[string]string{
104+
"uid": "7543149965070155780",
105+
"conversation_id": "7543149965070155781",
106+
"type": "user",
107+
}))
108+
if err != nil {
109+
logs.CtxErrorf(ctx, "upload file failed: %v", err)
110+
}
103111

104-
err := m.PutObject(ctx, objectName, []byte("hello content"), storage.WithContentType("text/plain"))
112+
logs.CtxInfof(ctx, "upload file success")
113+
114+
files, err := m.ListObjects(ctx, "test-file-", true)
105115
if err != nil {
106-
log.Fatalf("upload file failed: %v", err)
116+
logs.CtxErrorf(ctx, "list objects failed: %v", err)
107117
}
108-
log.Printf("upload file success")
118+
119+
logs.CtxInfof(ctx, "list objects success, files.len: %v", len(files))
109120

110121
url, err := m.GetObjectUrl(ctx, objectName)
111122
if err != nil {
112-
log.Fatalf("get file url failed: %v", err)
123+
logs.CtxErrorf(ctx, "get file url failed: %v", err)
113124
}
114125

115-
log.Printf("get file url success, url: %s", url)
126+
logs.CtxInfof(ctx, "get file url success, url: %s", url)
116127

117128
content, err := m.GetObject(ctx, objectName)
118129
if err != nil {
119-
log.Fatalf("download file failed: %v", err)
130+
logs.CtxErrorf(ctx, "download file failed: %v", err)
120131
}
121132

122-
log.Printf("download file success, content: %s", string(content))
133+
logs.CtxInfof(ctx, "download file success, content: %s", string(content))
123134

124135
err = m.DeleteObject(ctx, objectName)
125136
if err != nil {
126137
log.Fatalf("delete object failed: %v", err)
127138
}
128139

129-
log.Printf("delete object success")
140+
logs.CtxInfof(ctx, "delete object success")
130141
}
131142

132143
func (m *minioClient) PutObject(ctx context.Context, objectKey string, content []byte, opts ...storage.PutOptFn) error {
@@ -161,6 +172,10 @@ func (m *minioClient) PutObjectWithReader(ctx context.Context, objectKey string,
161172
minioOpts.Expires = *option.Expires
162173
}
163174

175+
if option.Tagging != nil {
176+
minioOpts.UserTags = option.Tagging
177+
}
178+
164179
_, err := m.client.PutObject(ctx, m.bucketName, objectKey,
165180
content, option.ObjectSize, minioOpts)
166181
if err != nil {
@@ -223,7 +238,7 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
223238
return nil, fmt.Errorf("page size must be positive")
224239
}
225240

226-
files, err := m.ListObjects(ctx, input.Prefix)
241+
files, err := m.ListObjects(ctx, input.Prefix, input.WithTagging)
227242
if err != nil {
228243
return nil, err
229244
}
@@ -235,10 +250,11 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
235250
}, nil
236251
}
237252

238-
func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
253+
func (m *minioClient) ListObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
239254
opts := minio.ListObjectsOptions{
240-
Prefix: prefix,
241-
Recursive: true,
255+
Prefix: prefix,
256+
Recursive: true,
257+
WithMetadata: withTagging,
242258
}
243259

244260
objectCh := m.client.ListObjects(ctx, m.bucketName, opts)
@@ -248,14 +264,17 @@ func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storag
248264
if object.Err != nil {
249265
return nil, object.Err
250266
}
267+
251268
files = append(files, &storage.FileInfo{
252269
Key: object.Key,
253270
LastModified: object.LastModified,
254271
ETag: object.ETag,
255272
Size: object.Size,
273+
Tagging: object.UserTags,
256274
})
257275

258-
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size)
276+
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d, tagging = %v",
277+
object.Key, object.LastModified, object.ETag, object.Size, object.UserTags)
259278
}
260279

261280
return files, nil

backend/infra/impl/storage/s3/s3.go

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,21 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"net/url"
25+
"strings"
26+
"sync"
2427
"time"
2528

2629
"github.com/aws/aws-sdk-go-v2/aws"
2730
"github.com/aws/aws-sdk-go-v2/config"
2831
"github.com/aws/aws-sdk-go-v2/credentials"
2932
"github.com/aws/aws-sdk-go-v2/service/s3"
33+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3034

3135
"github.com/coze-dev/coze-studio/backend/infra/contract/storage"
3236
"github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy"
3337
"github.com/coze-dev/coze-studio/backend/pkg/logs"
38+
"github.com/coze-dev/coze-studio/backend/pkg/taskgroup"
3439
)
3540

3641
type s3Client struct {
@@ -178,6 +183,11 @@ func (t *s3Client) PutObjectWithReader(ctx context.Context, objectKey string, co
178183
input.ContentLength = aws.Int64(option.ObjectSize)
179184
}
180185

186+
if option.Tagging != nil {
187+
tagging := mapToQueryParams(option.Tagging)
188+
input.Tagging = aws.String(tagging)
189+
}
190+
181191
// upload object
182192
_, err := client.PutObject(ctx, input)
183193
return err
@@ -239,49 +249,36 @@ func (t *s3Client) GetObjectUrl(ctx context.Context, objectKey string, opts ...s
239249
return req.URL, nil
240250
}
241251

242-
func (t *s3Client) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
243-
client := t.client
244-
bucket := t.bucketName
252+
func (t *s3Client) ListObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
245253
const (
246254
DefaultPageSize = 100
247255
MaxListObjects = 10000
248256
)
249257

250-
input := &s3.ListObjectsV2Input{
251-
Bucket: aws.String(bucket),
252-
Prefix: aws.String(prefix),
253-
MaxKeys: aws.Int32(DefaultPageSize),
254-
}
258+
var files []*storage.FileInfo
259+
var cursor string
260+
for {
261+
output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{
262+
Prefix: prefix,
263+
PageSize: DefaultPageSize,
264+
WithTagging: withTagging,
265+
Cursor: cursor,
266+
})
255267

256-
paginator := s3.NewListObjectsV2Paginator(client, input)
268+
cursor = output.Cursor
257269

258-
var files []*storage.FileInfo
259-
for paginator.HasMorePages() {
260-
page, err := paginator.NextPage(ctx)
261270
if err != nil {
262-
return nil, fmt.Errorf("failed to get page, %v", err)
271+
return nil, err
263272
}
264-
for _, obj := range page.Contents {
265-
f := &storage.FileInfo{}
266-
if obj.Key != nil {
267-
f.Key = *obj.Key
268-
}
269-
if obj.LastModified != nil {
270-
f.LastModified = *obj.LastModified
271-
}
272-
if obj.ETag != nil {
273-
f.ETag = *obj.ETag
274-
}
275-
if obj.Size != nil {
276-
f.Size = *obj.Size
277-
}
278-
279-
files = append(files, f)
280273

281-
}
274+
files = append(files, output.Files...)
282275

283276
if len(files) >= MaxListObjects {
284-
logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files))
277+
logs.CtxErrorf(ctx, "list objects failed, max list objects: %d", MaxListObjects)
278+
break
279+
}
280+
281+
if !output.IsTruncated {
285282
break
286283
}
287284
}
@@ -340,5 +337,53 @@ func (t *s3Client) ListObjectsPaginated(ctx context.Context, input *storage.List
340337
output.Cursor = *p.NextContinuationToken
341338
}
342339

340+
if input.WithTagging {
341+
taskGroup := taskgroup.NewTaskGroup(ctx, 5)
342+
mu := sync.Mutex{}
343+
344+
for idx := range files {
345+
f := files[idx]
346+
taskGroup.Go(func() error {
347+
tagging, err := client.GetObjectTagging(ctx, &s3.GetObjectTaggingInput{
348+
Bucket: aws.String(bucket),
349+
Key: aws.String(f.Key),
350+
})
351+
if err != nil {
352+
return err
353+
}
354+
355+
mu.Lock()
356+
f.Tagging = tagsToMap(tagging.TagSet)
357+
mu.Unlock()
358+
return nil
359+
})
360+
}
361+
362+
if err := taskGroup.Wait(); err != nil {
363+
return nil, err
364+
}
365+
}
366+
343367
return output, nil
344368
}
369+
370+
func mapToQueryParams(tagging map[string]string) string {
371+
queryParams := ""
372+
for k, v := range tagging {
373+
queryParams += fmt.Sprintf("%s=%s&", url.QueryEscape(k), url.QueryEscape(v))
374+
}
375+
return strings.TrimSuffix(queryParams, "&")
376+
}
377+
378+
func tagsToMap(tags []types.Tag) map[string]string {
379+
if len(tags) == 0 {
380+
return nil
381+
}
382+
m := make(map[string]string, len(tags))
383+
for _, tag := range tags {
384+
if tag.Key != nil && tag.Value != nil {
385+
m[*tag.Key] = *tag.Value
386+
}
387+
}
388+
return m
389+
}

0 commit comments

Comments
 (0)