Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ specification:
time `Write` is called, a new record is added to the buffer. When the buffer is
full, all the records from it will be written to the S3 bucket, and an ack
function will be called for each record after being written.
version: v0.9.1
version: v0.9.2
author: Meroxa, Inc.
source:
parameters:
Expand Down
16 changes: 11 additions & 5 deletions destination/writer/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,23 @@
}

// Write stores the batch on AWS S3 as a file
func (w *S3) Write(ctx context.Context, batch *Batch) error {

Check failure on line 84 in destination/writer/s3.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary leading newline (whitespace)

batchBytes, err := batch.Bytes()
if err != nil {
return err
}

key := fmt.Sprintf(
"%d.%s",
time.Now().UnixNano(),
batch.Format.Ext(),
)
// Get the ID from the first record's key
var key string
if len(batch.Records) > 0 {
// Convert the key bytes to string
keyBytes := batch.Records[0].Key.Bytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, that could go wrong. For instance, the key could be structured data, in which case you'd get a JSON string (which is not in line with the key naming guidelines). Also, the key is related to the first record, the rest of the batch might not be related to that. I'm not a fan of this change, tbh.

I'm thinking how else we could achieve what you're trying to do. We could introduce a metadata field (e.g. s3.key) and use that as the object key, if it's present. You could then use a processor to set that metadata field. The issue remains that the first record in the batch is the only record for which this field would be respected, unless we change the logic to group records based on the metadata field and create multiple objects.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lovromazgon this makes sense. I actually run into that exact issue of the key being structured data when the source was a Postgres database (I previously was testing with a Generator source). I can try the metadata approach to get by for the demo

key = fmt.Sprintf("%s.%s", string(keyBytes), batch.Format.Ext())
} else {
// Fallback to timestamp if no records
key = fmt.Sprintf("%d.%s", time.Now().UnixNano(), batch.Format.Ext())
}

if w.KeyPrefix != "" {
key = path.Join(w.KeyPrefix, key)
Expand Down
Loading