Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ require (
github.com/opensearch-project/opensearch-go/v3 v3.1.0
github.com/ory/dockertest/v3 v3.12.0
github.com/oschwald/geoip2-golang v1.13.0
github.com/parquet-go/parquet-go v0.25.1
github.com/parquet-go/parquet-go v0.27.0
github.com/pebbe/zmq4 v1.4.0
github.com/pinecone-io/go-pinecone v1.1.1
github.com/pkg/sftp v1.13.9
Expand Down Expand Up @@ -264,6 +264,8 @@ require (
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/gomega v1.37.0 // indirect
github.com/parquet-go/bitpack v1.0.0 // indirect
github.com/parquet-go/jsonlite v1.0.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
Expand All @@ -282,6 +284,7 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/timandy/routine v1.1.5 // indirect
github.com/twpayne/go-geom v1.6.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
Expand Down Expand Up @@ -490,7 +493,6 @@ require (
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/segmentio/encoding v0.5.3
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
18 changes: 14 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,8 @@ github.com/ClickHouse/ch-go v0.68.0 h1:zd2VD8l2aVYnXFRyhTyKCrxvhSz1AaY4wBUXu/f0G
github.com/ClickHouse/ch-go v0.68.0/go.mod h1:C89Fsm7oyck9hr6rRo5gqqiVtaIY6AjdD0WFMyNRQ5s=
github.com/ClickHouse/clickhouse-go/v2 v2.40.3 h1:46jB4kKwVDUOnECpStKMVXxvR0Cg9zeV9vdbPjtn6po=
github.com/ClickHouse/clickhouse-go/v2 v2.40.3/go.mod h1:qO0HwvjCnTB4BPL/k6EE3l4d9f/uF+aoimAhJX70eKA=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE=
Expand Down Expand Up @@ -834,6 +836,10 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM=
github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY=
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -1915,8 +1921,12 @@ github.com/oschwald/geoip2-golang v1.13.0 h1:Q44/Ldc703pasJeP5V9+aFSZFmBN7DKHbNs
github.com/oschwald/geoip2-golang v1.13.0/go.mod h1:P9zG+54KPEFOliZ29i7SeYZ/GM6tfEL+rgSn03hYuUo=
github.com/oschwald/maxminddb-golang v1.13.1 h1:G3wwjdN9JmIK2o/ermkHM+98oX5fS+k5MbwsmL4MRQE=
github.com/oschwald/maxminddb-golang v1.13.1/go.mod h1:K4pgV9N/GcK694KSTmVSDTODk4IsCNThNdTmnaBZ/F8=
github.com/parquet-go/parquet-go v0.25.1 h1:l7jJwNM0xrk0cnIIptWMtnSnuxRkwq53S+Po3KG8Xgo=
github.com/parquet-go/parquet-go v0.25.1/go.mod h1:AXBuotO1XiBtcqJb/FKFyjBG4aqa3aQAAWF3ZPzCanY=
github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxPcQA=
github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs=
github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU=
github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0=
github.com/parquet-go/parquet-go v0.27.0 h1:vHWK2xaHbj+v1DYps03yDRpEsdtOeKbhiXUaixoPb3g=
github.com/parquet-go/parquet-go v0.27.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/paulmach/orb v0.12.0 h1:z+zOwjmG3MyEEqzv92UN49Lg1JFYx0L9GpGKNVDKk1s=
github.com/paulmach/orb v0.12.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
Expand Down Expand Up @@ -2071,8 +2081,6 @@ github.com/sashabaranov/go-openai v1.41.2/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adO
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/segmentio/encoding v0.5.3 h1:OjMgICtcSFuNvQCdwqMCv9Tg7lEOXGwm1J5RPQccx6w=
github.com/segmentio/encoding v0.5.3/go.mod h1:HS1ZKa3kSN32ZHVZ7ZLPLXWvOVIiZtyJnO1gPH1sKt0=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw=
Expand Down Expand Up @@ -2196,6 +2204,8 @@ github.com/twmb/franz-go/pkg/sr v1.6.0 h1:YcnD65hmdEuJljSM4O9Hldr/0oi+vrjPGHaRUu
github.com/twmb/franz-go/pkg/sr v1.6.0/go.mod h1:64CsHlsQnyFRq1sYPcCmlRrEG3PlLPb6cDddx2wGr28=
github.com/twmb/go-cache v1.2.1 h1:yUkLutow4S2x5NMbqFW24o14OsucoFI5Fzmlb6uBinM=
github.com/twmb/go-cache v1.2.1/go.mod h1:lArg9KhCl+GTFMikitLGhIBh/i11OK0lhSveqlMbbrY=
github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4=
github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/parquet/processor_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ schema:

tctx := t.Context()
_, err = encodeProc.ProcessBatch(tctx, service.MessageBatch{
service.NewMessage([]byte(`{"id":12,"name":"foo"}`)),
service.NewMessage([]byte(`{"id":"bar","name":"foo"}`)),
})
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot create parquet value of type FLOAT from go value of type int64")
assert.Contains(t, err.Error(), "encoding panic")
}

func TestParquetEncodeDecodeRoundTrip(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func setup(t *testing.T) (*streaming.SnowflakeRestClient, *streaming.SnowflakeSe
clientOptions := streaming.ClientOptions{
Account: envOr("SNOWFLAKE_ACCOUNT", "wqkfxqq-redpanda_aws"),
URL: fmt.Sprintf("https://%s.snowflakecomputing.com", envOr("SNOWFLAKE_ACCOUNT", "wqkfxqq-redpanda_aws")),
User: envOr("SNOWFLAKE_USER", "TYLERTYLER_DB"),
User: envOr("SNOWFLAKE_USER", "TYLERROCKWOOD"),
Role: "ACCOUNTADMIN",
PrivateKey: parseResult.(*rsa.PrivateKey),
ConnectVersion: "",
Expand Down
141 changes: 58 additions & 83 deletions internal/impl/snowflake/streaming/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ package streaming

import (
"bytes"
"encoding/binary"
"errors"
"fmt"

"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/format"
"github.com/segmentio/encoding/thrift"

"github.com/redpanda-data/benthos/v4/public/service"
)
Expand Down Expand Up @@ -66,41 +64,40 @@ func objectMessageToRow(msg *service.Message, out []any, nameToPosition map[stri
return nil
}

func constructRowGroupFromObject(
// writeRowGroupFromObject writes a batch of object messages directly to a concurrent row group's column writers,
// then flushes (compresses) the row group. Values are written directly to the column writers as they are converted.
func writeRowGroupFromObject(
batch service.MessageBatch,
schema *parquet.Schema,
transformers []*dataTransformer,
mode SchemaMode,
) ([]parquet.Row, []*statsBuffer, error) {
// We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into
// out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...).
// So create a massive matrix that we will write stuff in columnar form, but then we don't need to move any
// data to create rows of the data via an in-place transpose operation.
//
// TODO: Consider caching/pooling this matrix as I expect many are similarily sized.
rg *parquet.ConcurrentRowGroupWriter,
) ([]*statsBuffer, error) {
rowWidth := len(schema.Fields())
matrix := make([]parquet.Value, len(batch)*rowWidth)
nameToPosition := make(map[string]int, rowWidth)
stats := make([]*statsBuffer, rowWidth)
buffers := make([]typedBuffer, rowWidth)
columnWriters := rg.ColumnWriters()

for idx, t := range transformers {
leaf, ok := schema.Lookup(t.name)
if !ok {
return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name)
return nil, fmt.Errorf("invariant failed: unable to find column %q", t.name)
}
buffers[idx] = t.bufferFactory()
buffers[idx].Prepare(matrix, leaf.ColumnIndex, rowWidth)
buffers[idx].Reset(columnWriters[leaf.ColumnIndex], leaf.ColumnIndex)
stats[idx] = &statsBuffer{}
nameToPosition[t.name] = idx
}
// First we need to shred our record into columns, snowflake's data model
// is thankfully a flat list of columns, so no dremel style record shredding
// is needed

// Shred records into columns - snowflake's data model is a flat list of columns,
// so no dremel style record shredding is needed. Values are written directly
// to column writers as they are converted.
row := make([]any, rowWidth)
for _, msg := range batch {
err := objectMessageToRow(msg, row, nameToPosition, mode)
if err != nil {
return nil, nil, err
return nil, err
}
for i, v := range row {
t := transformers[i]
Expand All @@ -109,24 +106,21 @@ func constructRowGroupFromObject(
err = t.converter.ValidateAndConvert(s, v, b)
if err != nil {
if errors.Is(err, errNullValue) {
return nil, nil, &NonNullColumnError{msg, t.column.Name}
return nil, &NonNullColumnError{msg, t.column.Name}
}
// There is not special typed error for a validation error, there really isn't
// anything we can do about it.
return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err)
return nil, fmt.Errorf("invalid data for column %s: %w", t.name, err)
}
// reset the column as nil for the next row
row[i] = nil
}
}
// Now all our values have been written to each buffer - here is where we do our matrix
// transpose mentioned above
rows := make([]parquet.Row, len(batch))
for i := range rows {
rowStart := i * rowWidth
rows[i] = matrix[rowStart : rowStart+rowWidth]

// Flush compresses the row group data
if err := rg.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush row group: %w", err)
}
return rows, stats, nil

return stats, nil
}

// arrayMessageToRow converts a message into columnar form using the provided name to index mapping.
Expand Down Expand Up @@ -161,31 +155,35 @@ func arrayMessageToRow(msg *service.Message, out []any, mode SchemaMode) error {
return nil
}

func constructRowGroupFromArray(
// writeRowGroupFromArray writes a batch of array messages directly to a concurrent row group's column writers,
// then flushes (compresses) the row group. Values are written directly to the column writers as they are converted.
func writeRowGroupFromArray(
batch service.MessageBatch,
schema *parquet.Schema,
transformers []*dataTransformer,
mode SchemaMode,
) ([]parquet.Row, []*statsBuffer, error) {
// TODO: Switch to using concurrent row groups to write this stuff
rg *parquet.ConcurrentRowGroupWriter,
) ([]*statsBuffer, error) {
rowWidth := len(schema.Fields())
matrix := make([]parquet.Value, len(batch)*rowWidth)
stats := make([]*statsBuffer, rowWidth)
buffers := make([]typedBuffer, rowWidth)
columnWriters := rg.ColumnWriters()

for idx, t := range transformers {
leaf, ok := schema.Lookup(t.name)
if !ok {
return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name)
return nil, fmt.Errorf("invariant failed: unable to find column %q", t.name)
}
buffers[idx] = t.bufferFactory()
buffers[idx].Prepare(matrix, leaf.ColumnIndex, rowWidth)
buffers[idx].Reset(columnWriters[leaf.ColumnIndex], leaf.ColumnIndex)
stats[idx] = &statsBuffer{}
}

row := make([]any, rowWidth)
for _, msg := range batch {
err := arrayMessageToRow(msg, row, mode)
if err != nil {
return nil, nil, err
return nil, err
}
for i, v := range row {
t := transformers[i]
Expand All @@ -194,29 +192,27 @@ func constructRowGroupFromArray(
err = t.converter.ValidateAndConvert(s, v, b)
if err != nil {
if errors.Is(err, errNullValue) {
return nil, nil, &NonNullColumnError{msg, t.column.Name}
return nil, &NonNullColumnError{msg, t.column.Name}
}
// There is not special typed error for a validation error, there really isn't
// anything we can do about it.
return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err)
return nil, fmt.Errorf("invalid data for column %s: %w", t.name, err)
}
// reset the column as nil for the next row
row[i] = nil
}
}
// Now all our values have been written to each buffer - here is where we do our matrix
// transpose mentioned above
rows := make([]parquet.Row, len(batch))
for i := range rows {
rowStart := i * rowWidth
rows[i] = matrix[rowStart : rowStart+rowWidth]

// Flush compresses the row group data
if err := rg.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush row group: %w", err)
}
return rows, stats, nil

return stats, nil
}

type parquetWriter struct {
b *bytes.Buffer
w *parquet.GenericWriter[any]
b *bytes.Buffer
w *parquet.GenericWriter[any]
schema *parquet.Schema
}

func newParquetWriter(rpcnVersion string, schema *parquet.Schema) *parquetWriter {
Expand All @@ -230,53 +226,32 @@ func newParquetWriter(rpcnVersion string, schema *parquet.Schema) *parquetWriter
parquet.Compression(&parquet.Zstd),
parquet.WriteBufferSize(0),
)
return &parquetWriter{b, w}
return &parquetWriter{b, w, schema}
}

// WriteFile writes a new parquet file using the rows and metadata.
//
// NOTE: metadata is sticky - if you want the next file to remove metadata you need to set the value to the empty string
// to actually remove it. In the usage of this method in this package, the metadata keys are all always the same.
func (w *parquetWriter) WriteFile(rows []parquet.Row, metadata map[string]string) (out []byte, err error) {
// BeginRowGroup creates a new concurrent row group for parallel construction.
func (w *parquetWriter) BeginRowGroup() *parquet.ConcurrentRowGroupWriter {
return w.w.BeginRowGroup()
}

// Reset prepares the writer for a new file with the given metadata.
func (w *parquetWriter) Reset(metadata map[string]string) {
for k, v := range metadata {
w.w.SetKeyValueMetadata(k, v)
}
w.b.Reset()
w.w.Reset(w.b)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("encoding panic: %v", r)
}
}()
_, err = w.w.WriteRows(rows)
if err != nil {
return
}
err = w.w.Close()
out = w.b.Bytes()
return
}

func readParquetMetadata(parquetFile []byte) (metadata format.FileMetaData, err error) {
if len(parquetFile) < 8 {
return format.FileMetaData{}, fmt.Errorf("too small of parquet file: %d", len(parquetFile))
}
trailingBytes := parquetFile[len(parquetFile)-8:]
if string(trailingBytes[4:]) != "PAR1" {
return metadata, fmt.Errorf("missing magic bytes, got: %q", trailingBytes[4:])
}
footerSize := int(binary.LittleEndian.Uint32(trailingBytes))
if len(parquetFile) < footerSize+8 {
return metadata, fmt.Errorf("too small of parquet file: %d, footer size: %d", len(parquetFile), footerSize)
}
footerBytes := parquetFile[len(parquetFile)-(footerSize+8) : len(parquetFile)-8]
if err := thrift.Unmarshal(new(thrift.CompactProtocol), footerBytes, &metadata); err != nil {
return metadata, fmt.Errorf("unable to extract parquet metadata: %w", err)
// Close finalizes the parquet file and returns the bytes.
func (w *parquetWriter) Close() ([]byte, *format.FileMetaData, error) {
if err := w.w.Close(); err != nil {
return nil, nil, err
}
return
return w.b.Bytes(), w.w.File().Metadata(), nil
}

func totalUncompressedSize(metadata format.FileMetaData) int32 {
func totalUncompressedSize(metadata *format.FileMetaData) int32 {
var size int64
for _, rowGroup := range metadata.RowGroups {
size += rowGroup.TotalByteSize
Expand Down
Loading