diff --git a/docs/modules/components/pages/outputs/aws_s3.adoc b/docs/modules/components/pages/outputs/aws_s3.adoc index f3a85a5cde..4600528085 100644 --- a/docs/modules/components/pages/outputs/aws_s3.adoc +++ b/docs/modules/components/pages/outputs/aws_s3.adoc @@ -41,6 +41,7 @@ output: aws_s3: bucket: "" # No default (required) path: ${!counter()}-${!timestamp_unix_nano()}.txt + local_file_path: "" tags: {} content_type: application/octet-stream metadata: @@ -65,6 +66,7 @@ output: aws_s3: bucket: "" # No default (required) path: ${!counter()}-${!timestamp_unix_nano()}.txt + local_file_path: "" tags: {} content_type: application/octet-stream content_encoding: "" @@ -204,6 +206,22 @@ path: ${!meta("kafka_key")}.json path: ${!json("doc.namespace")}/${!json("doc.id")}.json ``` +=== `local_file_path` + +The path of the local file to upload. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +local_file_path: /tmp/file.json +``` + === `tags` Key/value pairs to store with the object as tags. diff --git a/internal/impl/aws/output_s3.go b/internal/impl/aws/output_s3.go index 5666a44855..c4544631f6 100644 --- a/internal/impl/aws/output_s3.go +++ b/internal/impl/aws/output_s3.go @@ -18,7 +18,9 @@ import ( "bytes" "context" "fmt" + "io" "net/url" + "os" "slices" "sort" "strings" @@ -40,6 +42,7 @@ const ( s3oFieldBucket = "bucket" s3oFieldForcePathStyleURLs = "force_path_style_urls" s3oFieldPath = "path" + s3oFieldLocalFilePath = "local_file_path" s3oFieldTags = "tags" s3oFieldChecksumAlgorithm = "checksum_algorithm" s3oFieldContentType = "content_type" @@ -67,6 +70,7 @@ type s3oConfig struct { Bucket string Path *service.InterpolatedString + LocalFilePath *service.InterpolatedString Tags []s3TagPair ContentType *service.InterpolatedString ContentEncoding *service.InterpolatedString @@ -100,6 +104,10 @@ func s3oConfigFromParsed(pConf *service.ParsedConfig) (conf s3oConfig, err error return } + if conf.LocalFilePath, err = pConf.FieldInterpolatedString(s3oFieldLocalFilePath); err != nil { + return + } + var tagMap map[string]*service.InterpolatedString if tagMap, err = pConf.FieldInterpolatedStringMap(s3oFieldTags); err != nil { return @@ -245,6 +253,10 @@ output: Example(`${!counter()}-${!timestamp_unix_nano()}.txt`). Example(`${!meta("kafka_key")}.json`). Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`), + service.NewInterpolatedStringField(s3oFieldLocalFilePath). + Description("The path of the local file to upload."). + Default(``). + Example(`/tmp/file.json`), service.NewInterpolatedStringMapField(s3oFieldTags). Description("Key/value pairs to store with the object as tags."). Default(map[string]any{}). @@ -470,7 +482,7 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat return fmt.Errorf("storage class interpolation: %w", err) } - mBytes, err := m.AsBytes() + uploadBody, err := a.getUploadBody(m) if err != nil { return err } @@ -478,7 +490,7 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat uploadInput := &s3.PutObjectInput{ Bucket: &a.conf.Bucket, Key: aws.String(key), - Body: bytes.NewReader(mBytes), + Body: uploadBody, ContentType: aws.String(contentType), ContentEncoding: contentEncoding, CacheControl: cacheControl, @@ -527,6 +539,28 @@ func (a *amazonS3Writer) WriteBatch(wctx context.Context, msg service.MessageBat }) } +func (a *amazonS3Writer) getUploadBody(m *service.Message) (io.Reader, error) { + localFilePath, err := a.conf.LocalFilePath.TryString(m) + if err != nil { + return nil, fmt.Errorf("local file path interpolation error: %w", err) + } + + if localFilePath != "" { + file, err := os.Open(localFilePath) + if err != nil { + return nil, fmt.Errorf("local file read error: %w", err) + } + return file, nil + } + + mBytes, err := m.AsBytes() + if err != nil { + return nil, err + } + + return bytes.NewReader(mBytes), nil +} + func (*amazonS3Writer) Close(context.Context) error { return nil }