Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/modules/components/pages/outputs/aws_s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ output:
aws_s3:
bucket: "" # No default (required)
path: ${!counter()}-${!timestamp_unix_nano()}.txt
local_file_path: ""
tags: {}
content_type: application/octet-stream
metadata:
Expand All @@ -65,6 +66,7 @@ output:
aws_s3:
bucket: "" # No default (required)
path: ${!counter()}-${!timestamp_unix_nano()}.txt
local_file_path: ""
tags: {}
content_type: application/octet-stream
content_encoding: ""
Expand Down Expand Up @@ -204,6 +206,22 @@ path: ${!meta("kafka_key")}.json
path: ${!json("doc.namespace")}/${!json("doc.id")}.json
```

=== `local_file_path`

The path of the local file to upload.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`

*Default*: `""`

```yml
# Examples

local_file_path: /tmp/file.json
```

=== `tags`

Key/value pairs to store with the object as tags.
Expand Down
38 changes: 36 additions & 2 deletions internal/impl/aws/output_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"slices"
"sort"
"strings"
Expand All @@ -40,6 +42,7 @@ const (
s3oFieldBucket = "bucket"
s3oFieldForcePathStyleURLs = "force_path_style_urls"
s3oFieldPath = "path"
s3oFieldLocalFilePath = "local_file_path"
s3oFieldTags = "tags"
s3oFieldChecksumAlgorithm = "checksum_algorithm"
s3oFieldContentType = "content_type"
Expand Down Expand Up @@ -67,6 +70,7 @@ type s3oConfig struct {
Bucket string

Path *service.InterpolatedString
LocalFilePath *service.InterpolatedString
Tags []s3TagPair
ContentType *service.InterpolatedString
ContentEncoding *service.InterpolatedString
Expand Down Expand Up @@ -100,6 +104,10 @@ func s3oConfigFromParsed(pConf *service.ParsedConfig) (conf s3oConfig, err error
return
}

if conf.LocalFilePath, err = pConf.FieldInterpolatedString(s3oFieldLocalFilePath); err != nil {
return
}

var tagMap map[string]*service.InterpolatedString
if tagMap, err = pConf.FieldInterpolatedStringMap(s3oFieldTags); err != nil {
return
Expand Down Expand Up @@ -245,6 +253,10 @@ output:
Example(`${!counter()}-${!timestamp_unix_nano()}.txt`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`),
service.NewInterpolatedStringField(s3oFieldLocalFilePath).
Description("The path of the local file to upload.").
Default(``).
Example(`/tmp/file.json`),
service.NewInterpolatedStringMapField(s3oFieldTags).
Description("Key/value pairs to store with the object as tags.").
Default(map[string]any{}).
Expand Down Expand Up @@ -470,15 +482,15 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat
return fmt.Errorf("storage class interpolation: %w", err)
}

mBytes, err := m.AsBytes()
uploadBody, err := a.getUploadBody(m)
if err != nil {
return err
}

uploadInput := &s3.PutObjectInput{
Bucket: &a.conf.Bucket,
Key: aws.String(key),
Body: bytes.NewReader(mBytes),
Body: uploadBody,
ContentType: aws.String(contentType),
ContentEncoding: contentEncoding,
CacheControl: cacheControl,
Expand Down Expand Up @@ -527,6 +539,28 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat
})
}

func (a *amazonS3Writer) getUploadBody(m *service.Message) (io.Reader, error) {
localFilePath, err := a.conf.LocalFilePath.TryString(m)
if err != nil {
return nil, fmt.Errorf("local file path interpolation error: %w", err)
}

if localFilePath != "" {
file, err := os.Open(localFilePath)
if err != nil {
return nil, fmt.Errorf("local file read error: %w", err)
}
return file, nil
}

mBytes, err := m.AsBytes()
if err != nil {
return nil, err
}

return bytes.NewReader(mBytes), nil
}

func (*amazonS3Writer) Close(context.Context) error {
return nil
}