Skip to content

Commit 16bd3b5

Browse files
authored
feat(infra): add object tagging support for PutObject and ListObjects (#1845)
1 parent aae865d commit 16bd3b5

File tree

6 files changed

+171
-67
lines changed

6 files changed

+171
-67
lines changed

backend/infra/contract/storage/option.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,23 @@ type PutOption struct {
3838
ContentDisposition *string
3939
ContentLanguage *string
4040
Expires *time.Time
41+
Tagging map[string]string
4142
ObjectSize int64
4243
}
4344

4445
type PutOptFn func(option *PutOption)
4546

47+
func WithTagging(tag map[string]string) PutOptFn {
48+
return func(o *PutOption) {
49+
if len(tag) > 0 {
50+
o.Tagging = make(map[string]string, len(tag))
51+
for k, v := range tag {
52+
o.Tagging[k] = v
53+
}
54+
}
55+
}
56+
}
57+
4658
func WithContentType(v string) PutOptFn {
4759
return func(o *PutOption) {
4860
o.ContentType = &v

backend/infra/contract/storage/storage.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ import (
2424

2525
//go:generate mockgen -destination ../../../internal/mock/infra/contract/storage/storage_mock.go -package mock -source storage.go Factory
2626
type Storage interface {
27+
// PutObject puts the object with the specified key.
2728
PutObject(ctx context.Context, objectKey string, content []byte, opts ...PutOptFn) error
29+
// PutObjectWithReader puts the object with the specified key.
2830
PutObjectWithReader(ctx context.Context, objectKey string, content io.Reader, opts ...PutOptFn) error
31+
// GetObject returns the object with the specified key.
2932
GetObject(ctx context.Context, objectKey string) ([]byte, error)
33+
// DeleteObject deletes the object with the specified key.
3034
DeleteObject(ctx context.Context, objectKey string) error
35+
// GetObjectUrl returns a presigned URL for the object.
36+
// The URL is valid for the specified duration.
3137
GetObjectUrl(ctx context.Context, objectKey string, opts ...GetOptFn) (string, error)
32-
// ListObjects returns all objects with the specified prefix.
38+
// ListAllObjects returns all objects with the specified prefix.
3339
// It may return a large number of objects, consider using ListObjectsPaginated for better performance.
34-
ListObjects(ctx context.Context, prefix string) ([]*FileInfo, error)
35-
40+
ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*FileInfo, error)
3641
// ListObjectsPaginated returns objects with pagination support.
3742
// Use this method when dealing with large number of objects.
3843
ListObjectsPaginated(ctx context.Context, input *ListObjectsPaginatedInput) (*ListObjectsPaginatedOutput, error)
@@ -50,6 +55,8 @@ type ListObjectsPaginatedInput struct {
5055
Prefix string
5156
PageSize int
5257
Cursor string
58+
// Include objects tagging in the listing
59+
WithTagging bool
5360
}
5461

5562
type ListObjectsPaginatedOutput struct {
@@ -64,4 +71,5 @@ type FileInfo struct {
6471
LastModified time.Time
6572
ETag string
6673
Size int64
74+
Tagging map[string]string
6775
}

backend/infra/impl/storage/minio/minio.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24-
"log"
2524
"math/rand"
2625
"net/url"
2726
"time"
@@ -99,34 +98,45 @@ func (m *minioClient) test() {
9998
ctx := context.Background()
10099
objectName := fmt.Sprintf("test-file-%d.txt", rand.Int())
101100

102-
m.ListObjects(ctx, "")
101+
err := m.PutObject(ctx, objectName, []byte("hello content"),
102+
storage.WithContentType("text/plain"), storage.WithTagging(map[string]string{
103+
"uid": "7543149965070155780",
104+
"conversation_id": "7543149965070155781",
105+
"type": "user",
106+
}))
107+
if err != nil {
108+
logs.CtxErrorf(ctx, "upload file failed: %v", err)
109+
}
103110

104-
err := m.PutObject(ctx, objectName, []byte("hello content"), storage.WithContentType("text/plain"))
111+
logs.CtxInfof(ctx, "upload file success")
112+
113+
files, err := m.ListAllObjects(ctx, "test-file-", true)
105114
if err != nil {
106-
log.Fatalf("upload file failed: %v", err)
115+
logs.CtxErrorf(ctx, "list objects failed: %v", err)
107116
}
108-
log.Printf("upload file success")
117+
118+
logs.CtxInfof(ctx, "list objects success, files.len: %v", len(files))
109119

110120
url, err := m.GetObjectUrl(ctx, objectName)
111121
if err != nil {
112-
log.Fatalf("get file url failed: %v", err)
122+
logs.CtxErrorf(ctx, "get file url failed: %v", err)
113123
}
114124

115-
log.Printf("get file url success, url: %s", url)
125+
logs.CtxInfof(ctx, "get file url success, url: %s", url)
116126

117127
content, err := m.GetObject(ctx, objectName)
118128
if err != nil {
119-
log.Fatalf("download file failed: %v", err)
129+
logs.CtxErrorf(ctx, "download file failed: %v", err)
120130
}
121131

122-
log.Printf("download file success, content: %s", string(content))
132+
logs.CtxInfof(ctx, "download file success, content: %s", string(content))
123133

124134
err = m.DeleteObject(ctx, objectName)
125135
if err != nil {
126-
log.Fatalf("delete object failed: %v", err)
136+
logs.CtxErrorf(ctx, "delete object failed: %v", err)
127137
}
128138

129-
log.Printf("delete object success")
139+
logs.CtxInfof(ctx, "delete object success")
130140
}
131141

132142
func (m *minioClient) PutObject(ctx context.Context, objectKey string, content []byte, opts ...storage.PutOptFn) error {
@@ -161,6 +171,10 @@ func (m *minioClient) PutObjectWithReader(ctx context.Context, objectKey string,
161171
minioOpts.Expires = *option.Expires
162172
}
163173

174+
if option.Tagging != nil {
175+
minioOpts.UserTags = option.Tagging
176+
}
177+
164178
_, err := m.client.PutObject(ctx, m.bucketName, objectKey,
165179
content, option.ObjectSize, minioOpts)
166180
if err != nil {
@@ -223,7 +237,7 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
223237
return nil, fmt.Errorf("page size must be positive")
224238
}
225239

226-
files, err := m.ListObjects(ctx, input.Prefix)
240+
files, err := m.ListAllObjects(ctx, input.Prefix, input.WithTagging)
227241
if err != nil {
228242
return nil, err
229243
}
@@ -235,10 +249,11 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
235249
}, nil
236250
}
237251

238-
func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
252+
func (m *minioClient) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
239253
opts := minio.ListObjectsOptions{
240-
Prefix: prefix,
241-
Recursive: true,
254+
Prefix: prefix,
255+
Recursive: true,
256+
WithMetadata: withTagging,
242257
}
243258

244259
objectCh := m.client.ListObjects(ctx, m.bucketName, opts)
@@ -248,14 +263,17 @@ func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storag
248263
if object.Err != nil {
249264
return nil, object.Err
250265
}
266+
251267
files = append(files, &storage.FileInfo{
252268
Key: object.Key,
253269
LastModified: object.LastModified,
254270
ETag: object.ETag,
255271
Size: object.Size,
272+
Tagging: object.UserTags,
256273
})
257274

258-
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size)
275+
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d, tagging = %v",
276+
object.Key, object.LastModified, object.ETag, object.Size, object.UserTags)
259277
}
260278

261279
return files, nil

backend/infra/impl/storage/s3/s3.go

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"net/url"
2425
"time"
2526

2627
"github.com/aws/aws-sdk-go-v2/aws"
2728
"github.com/aws/aws-sdk-go-v2/config"
2829
"github.com/aws/aws-sdk-go-v2/credentials"
2930
"github.com/aws/aws-sdk-go-v2/service/s3"
31+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3032

3133
"github.com/coze-dev/coze-studio/backend/infra/contract/storage"
3234
"github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy"
3335
"github.com/coze-dev/coze-studio/backend/pkg/logs"
36+
"github.com/coze-dev/coze-studio/backend/pkg/taskgroup"
3437
)
3538

3639
type s3Client struct {
@@ -178,6 +181,11 @@ func (t *s3Client) PutObjectWithReader(ctx context.Context, objectKey string, co
178181
input.ContentLength = aws.Int64(option.ObjectSize)
179182
}
180183

184+
if option.Tagging != nil {
185+
tagging := mapToQueryParams(option.Tagging)
186+
input.Tagging = aws.String(tagging)
187+
}
188+
181189
// upload object
182190
_, err := client.PutObject(ctx, input)
183191
return err
@@ -239,49 +247,36 @@ func (t *s3Client) GetObjectUrl(ctx context.Context, objectKey string, opts ...s
239247
return req.URL, nil
240248
}
241249

242-
func (t *s3Client) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
243-
client := t.client
244-
bucket := t.bucketName
250+
func (t *s3Client) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
245251
const (
246252
DefaultPageSize = 100
247253
MaxListObjects = 10000
248254
)
249255

250-
input := &s3.ListObjectsV2Input{
251-
Bucket: aws.String(bucket),
252-
Prefix: aws.String(prefix),
253-
MaxKeys: aws.Int32(DefaultPageSize),
254-
}
255-
256-
paginator := s3.NewListObjectsV2Paginator(client, input)
257-
258256
var files []*storage.FileInfo
259-
for paginator.HasMorePages() {
260-
page, err := paginator.NextPage(ctx)
257+
var cursor string
258+
for {
259+
output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{
260+
Prefix: prefix,
261+
PageSize: DefaultPageSize,
262+
WithTagging: withTagging,
263+
Cursor: cursor,
264+
})
265+
261266
if err != nil {
262-
return nil, fmt.Errorf("failed to get page, %v", err)
267+
return nil, err
263268
}
264-
for _, obj := range page.Contents {
265-
f := &storage.FileInfo{}
266-
if obj.Key != nil {
267-
f.Key = *obj.Key
268-
}
269-
if obj.LastModified != nil {
270-
f.LastModified = *obj.LastModified
271-
}
272-
if obj.ETag != nil {
273-
f.ETag = *obj.ETag
274-
}
275-
if obj.Size != nil {
276-
f.Size = *obj.Size
277-
}
278-
279-
files = append(files, f)
280269

281-
}
270+
cursor = output.Cursor
271+
272+
files = append(files, output.Files...)
282273

283274
if len(files) >= MaxListObjects {
284-
logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files))
275+
logs.CtxErrorf(ctx, "list objects failed, max list objects: %d", MaxListObjects)
276+
break
277+
}
278+
279+
if !output.IsTruncated {
285280
break
286281
}
287282
}
@@ -340,5 +335,52 @@ func (t *s3Client) ListObjectsPaginated(ctx context.Context, input *storage.List
340335
output.Cursor = *p.NextContinuationToken
341336
}
342337

338+
if input.WithTagging {
339+
taskGroup := taskgroup.NewTaskGroup(ctx, 5)
340+
for idx := range files {
341+
f := files[idx]
342+
taskGroup.Go(func() error {
343+
tagging, err := client.GetObjectTagging(ctx, &s3.GetObjectTaggingInput{
344+
Bucket: aws.String(bucket),
345+
Key: aws.String(f.Key),
346+
})
347+
if err != nil {
348+
return err
349+
}
350+
351+
f.Tagging = tagsToMap(tagging.TagSet)
352+
return nil
353+
})
354+
}
355+
356+
if err := taskGroup.Wait(); err != nil {
357+
return nil, err
358+
}
359+
}
360+
343361
return output, nil
344362
}
363+
364+
func mapToQueryParams(tagging map[string]string) string {
365+
if len(tagging) == 0 {
366+
return ""
367+
}
368+
params := url.Values{}
369+
for k, v := range tagging {
370+
params.Set(k, v)
371+
}
372+
return params.Encode()
373+
}
374+
375+
func tagsToMap(tags []types.Tag) map[string]string {
376+
if len(tags) == 0 {
377+
return nil
378+
}
379+
m := make(map[string]string, len(tags))
380+
for _, tag := range tags {
381+
if tag.Key != nil && tag.Value != nil {
382+
m[*tag.Key] = *tag.Value
383+
}
384+
}
385+
return m
386+
}

0 commit comments

Comments
 (0)