Skip to content

Commit 5acf276

Browse files
committed
feat(infra): add object tagging support for PutObject and ListObjects
1 parent 263a75b commit 5acf276

File tree

6 files changed

+169
-67
lines changed

6 files changed

+169
-67
lines changed

backend/infra/contract/storage/option.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,23 @@ type PutOption struct {
3838
ContentDisposition *string
3939
ContentLanguage *string
4040
Expires *time.Time
41+
Tagging map[string]string
4142
ObjectSize int64
4243
}
4344

4445
type PutOptFn func(option *PutOption)
4546

47+
func WithTagging(tag map[string]string) PutOptFn {
48+
return func(o *PutOption) {
49+
if len(tag) > 0 {
50+
o.Tagging = make(map[string]string, len(tag))
51+
for k, v := range tag {
52+
o.Tagging[k] = v
53+
}
54+
}
55+
}
56+
}
57+
4658
func WithContentType(v string) PutOptFn {
4759
return func(o *PutOption) {
4860
o.ContentType = &v

backend/infra/contract/storage/storage.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ import (
2424

2525
//go:generate mockgen -destination ../../../internal/mock/infra/contract/storage/storage_mock.go -package mock -source storage.go Factory
2626
type Storage interface {
27+
// PutObject puts the object with the specified key.
2728
PutObject(ctx context.Context, objectKey string, content []byte, opts ...PutOptFn) error
29+
// PutObjectWithReader puts the object with the specified key.
2830
PutObjectWithReader(ctx context.Context, objectKey string, content io.Reader, opts ...PutOptFn) error
31+
// GetObject returns the object with the specified key.
2932
GetObject(ctx context.Context, objectKey string) ([]byte, error)
33+
// DeleteObject deletes the object with the specified key.
3034
DeleteObject(ctx context.Context, objectKey string) error
35+
// GetObjectUrl returns a presigned URL for the object.
36+
// The URL is valid for the specified duration.
3137
GetObjectUrl(ctx context.Context, objectKey string, opts ...GetOptFn) (string, error)
32-
// ListObjects returns all objects with the specified prefix.
38+
// ListAllObjects returns all objects with the specified prefix.
3339
// It may return a large number of objects, consider using ListObjectsPaginated for better performance.
34-
ListObjects(ctx context.Context, prefix string) ([]*FileInfo, error)
35-
40+
ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*FileInfo, error)
3641
// ListObjectsPaginated returns objects with pagination support.
3742
// Use this method when dealing with large number of objects.
3843
ListObjectsPaginated(ctx context.Context, input *ListObjectsPaginatedInput) (*ListObjectsPaginatedOutput, error)
@@ -50,6 +55,8 @@ type ListObjectsPaginatedInput struct {
5055
Prefix string
5156
PageSize int
5257
Cursor string
58+
// Include objects tagging in the listing
59+
WithTagging bool
5360
}
5461

5562
type ListObjectsPaginatedOutput struct {
@@ -64,4 +71,5 @@ type FileInfo struct {
6471
LastModified time.Time
6572
ETag string
6673
Size int64
74+
Tagging map[string]string
6775
}

backend/infra/impl/storage/minio/minio.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24-
"log"
2524
"math/rand"
2625
"net/url"
2726
"time"
@@ -99,34 +98,45 @@ func (m *minioClient) test() {
9998
ctx := context.Background()
10099
objectName := fmt.Sprintf("test-file-%d.txt", rand.Int())
101100

102-
m.ListObjects(ctx, "")
101+
err := m.PutObject(ctx, objectName, []byte("hello content"),
102+
storage.WithContentType("text/plain"), storage.WithTagging(map[string]string{
103+
"uid": "7543149965070155780",
104+
"conversation_id": "7543149965070155781",
105+
"type": "user",
106+
}))
107+
if err != nil {
108+
logs.CtxErrorf(ctx, "upload file failed: %v", err)
109+
}
103110

104-
err := m.PutObject(ctx, objectName, []byte("hello content"), storage.WithContentType("text/plain"))
111+
logs.CtxInfof(ctx, "upload file success")
112+
113+
files, err := m.ListAllObjects(ctx, "test-file-", true)
105114
if err != nil {
106-
log.Fatalf("upload file failed: %v", err)
115+
logs.CtxErrorf(ctx, "list objects failed: %v", err)
107116
}
108-
log.Printf("upload file success")
117+
118+
logs.CtxInfof(ctx, "list objects success, files.len: %v", len(files))
109119

110120
url, err := m.GetObjectUrl(ctx, objectName)
111121
if err != nil {
112-
log.Fatalf("get file url failed: %v", err)
122+
logs.CtxErrorf(ctx, "get file url failed: %v", err)
113123
}
114124

115-
log.Printf("get file url success, url: %s", url)
125+
logs.CtxInfof(ctx, "get file url success, url: %s", url)
116126

117127
content, err := m.GetObject(ctx, objectName)
118128
if err != nil {
119-
log.Fatalf("download file failed: %v", err)
129+
logs.CtxErrorf(ctx, "download file failed: %v", err)
120130
}
121131

122-
log.Printf("download file success, content: %s", string(content))
132+
logs.CtxInfof(ctx, "download file success, content: %s", string(content))
123133

124134
err = m.DeleteObject(ctx, objectName)
125135
if err != nil {
126-
log.Fatalf("delete object failed: %v", err)
136+
logs.CtxErrorf(ctx, "delete object failed: %v", err)
127137
}
128138

129-
log.Printf("delete object success")
139+
logs.CtxInfof(ctx, "delete object success")
130140
}
131141

132142
func (m *minioClient) PutObject(ctx context.Context, objectKey string, content []byte, opts ...storage.PutOptFn) error {
@@ -161,6 +171,10 @@ func (m *minioClient) PutObjectWithReader(ctx context.Context, objectKey string,
161171
minioOpts.Expires = *option.Expires
162172
}
163173

174+
if option.Tagging != nil {
175+
minioOpts.UserTags = option.Tagging
176+
}
177+
164178
_, err := m.client.PutObject(ctx, m.bucketName, objectKey,
165179
content, option.ObjectSize, minioOpts)
166180
if err != nil {
@@ -223,7 +237,7 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
223237
return nil, fmt.Errorf("page size must be positive")
224238
}
225239

226-
files, err := m.ListObjects(ctx, input.Prefix)
240+
files, err := m.ListAllObjects(ctx, input.Prefix, input.WithTagging)
227241
if err != nil {
228242
return nil, err
229243
}
@@ -235,10 +249,11 @@ func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.L
235249
}, nil
236250
}
237251

238-
func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
252+
func (m *minioClient) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
239253
opts := minio.ListObjectsOptions{
240-
Prefix: prefix,
241-
Recursive: true,
254+
Prefix: prefix,
255+
Recursive: true,
256+
WithMetadata: withTagging,
242257
}
243258

244259
objectCh := m.client.ListObjects(ctx, m.bucketName, opts)
@@ -248,14 +263,17 @@ func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storag
248263
if object.Err != nil {
249264
return nil, object.Err
250265
}
266+
251267
files = append(files, &storage.FileInfo{
252268
Key: object.Key,
253269
LastModified: object.LastModified,
254270
ETag: object.ETag,
255271
Size: object.Size,
272+
Tagging: object.UserTags,
256273
})
257274

258-
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size)
275+
logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d, tagging = %v",
276+
object.Key, object.LastModified, object.ETag, object.Size, object.UserTags)
259277
}
260278

261279
return files, nil

backend/infra/impl/storage/s3/s3.go

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"net/url"
25+
"strings"
2426
"time"
2527

2628
"github.com/aws/aws-sdk-go-v2/aws"
2729
"github.com/aws/aws-sdk-go-v2/config"
2830
"github.com/aws/aws-sdk-go-v2/credentials"
2931
"github.com/aws/aws-sdk-go-v2/service/s3"
32+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3033

3134
"github.com/coze-dev/coze-studio/backend/infra/contract/storage"
3235
"github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy"
3336
"github.com/coze-dev/coze-studio/backend/pkg/logs"
37+
"github.com/coze-dev/coze-studio/backend/pkg/taskgroup"
3438
)
3539

3640
type s3Client struct {
@@ -178,6 +182,11 @@ func (t *s3Client) PutObjectWithReader(ctx context.Context, objectKey string, co
178182
input.ContentLength = aws.Int64(option.ObjectSize)
179183
}
180184

185+
if option.Tagging != nil {
186+
tagging := mapToQueryParams(option.Tagging)
187+
input.Tagging = aws.String(tagging)
188+
}
189+
181190
// upload object
182191
_, err := client.PutObject(ctx, input)
183192
return err
@@ -239,49 +248,36 @@ func (t *s3Client) GetObjectUrl(ctx context.Context, objectKey string, opts ...s
239248
return req.URL, nil
240249
}
241250

242-
func (t *s3Client) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) {
243-
client := t.client
244-
bucket := t.bucketName
251+
func (t *s3Client) ListAllObjects(ctx context.Context, prefix string, withTagging bool) ([]*storage.FileInfo, error) {
245252
const (
246253
DefaultPageSize = 100
247254
MaxListObjects = 10000
248255
)
249256

250-
input := &s3.ListObjectsV2Input{
251-
Bucket: aws.String(bucket),
252-
Prefix: aws.String(prefix),
253-
MaxKeys: aws.Int32(DefaultPageSize),
254-
}
255-
256-
paginator := s3.NewListObjectsV2Paginator(client, input)
257-
258257
var files []*storage.FileInfo
259-
for paginator.HasMorePages() {
260-
page, err := paginator.NextPage(ctx)
258+
var cursor string
259+
for {
260+
output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{
261+
Prefix: prefix,
262+
PageSize: DefaultPageSize,
263+
WithTagging: withTagging,
264+
Cursor: cursor,
265+
})
266+
261267
if err != nil {
262-
return nil, fmt.Errorf("failed to get page, %v", err)
268+
return nil, err
263269
}
264-
for _, obj := range page.Contents {
265-
f := &storage.FileInfo{}
266-
if obj.Key != nil {
267-
f.Key = *obj.Key
268-
}
269-
if obj.LastModified != nil {
270-
f.LastModified = *obj.LastModified
271-
}
272-
if obj.ETag != nil {
273-
f.ETag = *obj.ETag
274-
}
275-
if obj.Size != nil {
276-
f.Size = *obj.Size
277-
}
278-
279-
files = append(files, f)
280270

281-
}
271+
cursor = output.Cursor
272+
273+
files = append(files, output.Files...)
282274

283275
if len(files) >= MaxListObjects {
284-
logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files))
276+
logs.CtxErrorf(ctx, "list objects failed, max list objects: %d", MaxListObjects)
277+
break
278+
}
279+
280+
if !output.IsTruncated {
285281
break
286282
}
287283
}
@@ -340,5 +336,49 @@ func (t *s3Client) ListObjectsPaginated(ctx context.Context, input *storage.List
340336
output.Cursor = *p.NextContinuationToken
341337
}
342338

339+
if input.WithTagging {
340+
taskGroup := taskgroup.NewTaskGroup(ctx, 5)
341+
for idx := range files {
342+
f := files[idx]
343+
taskGroup.Go(func() error {
344+
tagging, err := client.GetObjectTagging(ctx, &s3.GetObjectTaggingInput{
345+
Bucket: aws.String(bucket),
346+
Key: aws.String(f.Key),
347+
})
348+
if err != nil {
349+
return err
350+
}
351+
352+
f.Tagging = tagsToMap(tagging.TagSet)
353+
return nil
354+
})
355+
}
356+
357+
if err := taskGroup.Wait(); err != nil {
358+
return nil, err
359+
}
360+
}
361+
343362
return output, nil
344363
}
364+
365+
func mapToQueryParams(tagging map[string]string) string {
366+
queryParams := ""
367+
for k, v := range tagging {
368+
queryParams += fmt.Sprintf("%s=%s&", url.QueryEscape(k), url.QueryEscape(v))
369+
}
370+
return strings.TrimSuffix(queryParams, "&")
371+
}
372+
373+
func tagsToMap(tags []types.Tag) map[string]string {
374+
if len(tags) == 0 {
375+
return nil
376+
}
377+
m := make(map[string]string, len(tags))
378+
for _, tag := range tags {
379+
if tag.Key != nil && tag.Value != nil {
380+
m[*tag.Key] = *tag.Value
381+
}
382+
}
383+
return m
384+
}

0 commit comments

Comments
 (0)