diff --git a/client.go b/client.go index cb2bffe..e0d080a 100644 --- a/client.go +++ b/client.go @@ -5,6 +5,7 @@ import ( jsonfile "github.com/cloudquery/filetypes/v4/json" "github.com/cloudquery/filetypes/v4/parquet" "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/filetypes/v4/xlsx" ) type Client struct { @@ -17,6 +18,7 @@ var ( _ types.FileType = (*csvfile.Client)(nil) _ types.FileType = (*jsonfile.Client)(nil) _ types.FileType = (*parquet.Client)(nil) + _ types.FileType = (*xlsx.Client)(nil) ) // NewClient creates a new client for the given spec @@ -49,6 +51,9 @@ func NewClient(spec *FileSpec) (*Client, error) { case FormatTypeParquet: client, err = parquet.NewClient(parquet.WithSpec(*spec.parquetSpec)) + case FormatTypeXLSX: + client, err = xlsx.NewClient() + default: // shouldn't be possible as Validate checks for type panic("unknown format " + spec.Format) diff --git a/go.mod b/go.mod index cc83cf4..6681150 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/invopop/jsonschema v0.13.0 github.com/stretchr/testify v1.10.0 github.com/wk8/go-ordered-map/v2 v2.1.8 + github.com/xuri/excelize/v2 v2.9.1 ) require ( @@ -38,11 +39,17 @@ require ( github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/richardlehane/mscfb v1.0.4 // indirect + github.com/richardlehane/msoleps v1.0.4 // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/samber/lo v1.49.1 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect github.com/thoas/go-funk v0.9.3 // indirect + github.com/tiendc/go-deepcopy v1.6.0 // indirect + github.com/xuri/efp v0.0.1 // indirect + github.com/xuri/nfp v0.0.1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/crypto v0.39.0 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/mod v0.25.0 // indirect golang.org/x/net v0.41.0 // indirect diff --git a/go.sum b/go.sum index f60fe43..cc341d5 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,11 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/richardlehane/mscfb v1.0.4 h1:WULscsljNPConisD5hR0+OyZjwK46Pfyr6mPu5ZawpM= +github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7gK3DypaEsUk= +github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= +github.com/richardlehane/msoleps v1.0.4 h1:WuESlvhX3gH2IHcd8UqyCuFY5yiq/GR/yqaSM/9/g00= +github.com/richardlehane/msoleps v1.0.4/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -107,8 +112,16 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +github.com/tiendc/go-deepcopy v1.6.0 h1:0UtfV/imoCwlLxVsyfUd4hNHnB3drXsfle+wzSCA5Wo= +github.com/tiendc/go-deepcopy v1.6.0/go.mod h1:toXoeQoUqXOOS/X4sKuiAoSk6elIdqc0pN7MTgOOo2I= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8= +github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI= +github.com/xuri/excelize/v2 v2.9.1 h1:VdSGk+rraGmgLHGFaGG9/9IWu1nj4ufjJ7uwMDtj8Qw= +github.com/xuri/excelize/v2 v2.9.1/go.mod h1:x7L6pKz2dvo9ejrRuD8Lnl98z4JLt0TGAwjhW+EiP8s= +github.com/xuri/nfp v0.0.1 h1:MDamSGatIvp8uOmDP8FnmjuQpu90NzdJxo7242ANR9Q= +github.com/xuri/nfp v0.0.1/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= @@ -127,8 +140,12 @@ go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFh go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= +golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= diff --git a/spec.go b/spec.go index f0c559c..f44f98c 100644 --- a/spec.go +++ b/spec.go @@ -9,6 +9,7 @@ import ( "github.com/cloudquery/filetypes/v4/csv" jsonfile "github.com/cloudquery/filetypes/v4/json" "github.com/cloudquery/filetypes/v4/parquet" + "github.com/cloudquery/filetypes/v4/xlsx" ) type FormatType string @@ -17,6 +18,7 @@ const ( FormatTypeCSV = "csv" FormatTypeJSON = "json" FormatTypeParquet = "parquet" + FormatTypeXLSX = "xlsx" ) // Compression type. @@ -41,6 +43,7 @@ type FileSpec struct { csvSpec *csv.CSVSpec jsonSpec *jsonfile.JSONSpec parquetSpec *parquet.ParquetSpec + xlsxSpec *xlsx.Spec } func (s *FileSpec) SetDefaults() { @@ -51,6 +54,8 @@ func (s *FileSpec) SetDefaults() { s.jsonSpec.SetDefaults() case FormatTypeParquet: s.parquetSpec.SetDefaults() + case FormatTypeXLSX: + s.xlsxSpec.SetDefaults() } } @@ -68,10 +73,14 @@ func (s *FileSpec) Validate() error { return s.jsonSpec.Validate() case FormatTypeParquet: if s.Compression != CompressionTypeNone { - return errors.New("compression is not supported for parquet format") // This won't work even if we wanted to, because parquet writer prematurely closes the file handle + return fmt.Errorf("compression is not supported for the %s format", s.Format) } - return s.parquetSpec.Validate() + case FormatTypeXLSX: + if s.Compression != CompressionTypeNone { + return fmt.Errorf("compression is not supported for the %s format", s.Format) + } + return s.xlsxSpec.Validate() default: return fmt.Errorf("unknown format %s", s.Format) } @@ -96,6 +105,9 @@ func (s *FileSpec) UnmarshalSpec() error { case FormatTypeParquet: s.parquetSpec = &parquet.ParquetSpec{} return dec.Decode(s.parquetSpec) + case FormatTypeXLSX: + s.xlsxSpec = &xlsx.Spec{} + return dec.Decode(s.xlsxSpec) default: return fmt.Errorf("unknown format %s", s.Format) } diff --git a/xlsx/client.go b/xlsx/client.go new file mode 100644 index 0000000..35431e2 --- /dev/null +++ b/xlsx/client.go @@ -0,0 +1,17 @@ +package xlsx + +type Options func(*Client) + +// Client is a csv client. +type Client struct { +} + +func NewClient(options ...Options) (*Client, error) { + c := &Client{} + + for _, option := range options { + option(c) + } + + return c, nil +} diff --git a/xlsx/read.go b/xlsx/read.go new file mode 100644 index 0000000..1424d42 --- /dev/null +++ b/xlsx/read.go @@ -0,0 +1,86 @@ +package xlsx + +import ( + "bytes" + "fmt" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/goccy/go-json" + "github.com/xuri/excelize/v2" +) + +func (cl *Client) Read(r types.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error { + file, err := excelize.OpenReader(r) + if err != nil { + return fmt.Errorf("failed to open xlsx reader: %w", err) + } + + sheetName := "data" + rows, err := file.GetRows(sheetName) + if err != nil { + return fmt.Errorf("failed to get rows from sheet %s: %w", sheetName, err) + } + + for _, row := range rows { + rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + for i, field := range rb.Fields() { + err := appendValue(field, row[i]) + if err != nil { + return fmt.Errorf("failed to read from sheet %s: %w", table.Name, err) + } + } + res <- rb.NewRecord() + } + return nil +} + +func appendValue(builder array.Builder, value any) error { + if value == nil { + builder.AppendNull() + return nil + } + switch bldr := builder.(type) { + case array.ListLikeBuilder: + lst := value.([]any) + if lst == nil { + bldr.AppendNull() + return nil + } + bldr.Append(true) + valBuilder := bldr.ValueBuilder() + for _, v := range lst { + if err := appendValue(valBuilder, v); err != nil { + return err + } + } + return nil + case *array.StructBuilder: + m := value.(map[string]any) + bldr.Append(true) + bldrType := bldr.Type().(*arrow.StructType) + for k, v := range m { + idx, _ := bldrType.FieldIdx(k) + fieldBldr := bldr.FieldBuilder(idx) + if err := appendValue(fieldBldr, v); err != nil { + return err + } + } + return nil + case *array.MonthIntervalBuilder, *array.DayTimeIntervalBuilder, *array.MonthDayNanoIntervalBuilder: + b, err := json.Marshal(value) + if err != nil { + return err + } + dec := json.NewDecoder(bytes.NewReader(b)) + return bldr.UnmarshalOne(dec) + case *array.Int8Builder, *array.Int16Builder, *array.Int32Builder, *array.Int64Builder: + return bldr.AppendValueFromString(fmt.Sprintf("%d", int64(value.(float64)))) + case *array.Uint8Builder, *array.Uint16Builder, *array.Uint32Builder, *array.Uint64Builder: + return bldr.AppendValueFromString(fmt.Sprintf("%d", uint64(value.(float64)))) + } + return builder.AppendValueFromString(fmt.Sprintf("%v", value)) +} diff --git a/xlsx/spec.go b/xlsx/spec.go new file mode 100644 index 0000000..258a983 --- /dev/null +++ b/xlsx/spec.go @@ -0,0 +1,23 @@ +package xlsx + +import ( + "github.com/invopop/jsonschema" +) + +type Spec struct{} + +func (Spec) JSONSchema() *jsonschema.Schema { + properties := jsonschema.NewProperties() + return &jsonschema.Schema{ + Description: "CloudQuery XLSX file output spec.", + Properties: properties, + Type: "object", + AdditionalProperties: jsonschema.FalseSchema, // "additionalProperties": false + } +} + +func (s *Spec) SetDefaults() {} + +func (s *Spec) Validate() error { + return nil +} diff --git a/xlsx/spec_test.go b/xlsx/spec_test.go new file mode 100644 index 0000000..fcd6230 --- /dev/null +++ b/xlsx/spec_test.go @@ -0,0 +1,25 @@ +package xlsx + +import ( + "testing" + + "github.com/cloudquery/codegen/jsonschema" + "github.com/stretchr/testify/require" +) + +func TestSpec_JSONSchema(t *testing.T) { + schema, err := jsonschema.Generate(Spec{}) + require.NoError(t, err) + + jsonschema.TestJSONSchema(t, string(schema), []jsonschema.TestCase{ + { + Name: "empty", + Spec: `{}`, + }, + { + Name: "extra keyword", + Err: true, + Spec: `{"extra":true}`, + }, + }) +} diff --git a/xlsx/testdata/TestWriteRead-default.xlsx b/xlsx/testdata/TestWriteRead-default.xlsx new file mode 100644 index 0000000..081df40 Binary files /dev/null and b/xlsx/testdata/TestWriteRead-default.xlsx differ diff --git a/xlsx/write.go b/xlsx/write.go new file mode 100644 index 0000000..f0918e2 --- /dev/null +++ b/xlsx/write.go @@ -0,0 +1,132 @@ +package xlsx + +import ( + "fmt" + "io" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/xuri/excelize/v2" +) + +const ( + defaultSheetName = "data" +) + +type Handle struct { + w io.Writer + schema *arrow.Schema + + idx int + file *excelize.File +} + +var _ types.Handle = (*Handle)(nil) + +func (cl *Client) WriteHeader(w io.Writer, t *schema.Table) (types.Handle, error) { + file := excelize.NewFile() + + err := file.SetSheetName("Sheet1", defaultSheetName) + if err != nil { + return nil, fmt.Errorf("failed to create new sheet: %w", err) + } + + var cells []any + for _, name := range t.Columns.Names() { + cells = append(cells, name) + } + + if err = file.SetSheetRow(defaultSheetName, "A1", &cells); err != nil { + return nil, fmt.Errorf("failed to set header row: %w", err) + } + + return &Handle{ + w: w, + schema: convertSchema(t.ToArrowSchema()), + idx: 2, + file: file, + }, nil +} + +func (h *Handle) WriteContent(records []arrow.Record) error { + for _, record := range records { + record := h.castToString(record) + for i := 0; i < int(record.NumRows()); i++ { + cellname, err := excelize.CoordinatesToCellName(1, h.idx) + if err != nil { + return fmt.Errorf("failed to convert coordinates to cell name: %w", err) + } + var cells []any + for j := 0; j < int(record.NumCols()); j++ { + cells = append(cells, record.Column(j).GetOneForMarshal(i)) + } + h.idx += 1 + if err := h.file.SetSheetRow(defaultSheetName, cellname, &cells); err != nil { + return fmt.Errorf("failed to set row in stream writer: %w", err) + } + } + } + + return nil +} + +func (h *Handle) WriteFooter() error { + return h.file.Write(h.w) +} + +func convertSchema(sch *arrow.Schema) *arrow.Schema { + oldFields := sch.Fields() + fields := make([]arrow.Field, len(oldFields)) + copy(fields, oldFields) + for i, f := range fields { + if !isTypeSupported(f.Type) { + fields[i].Type = arrow.BinaryTypes.String + } + } + + md := sch.Metadata() + newSchema := arrow.NewSchema(fields, &md) + return newSchema +} + +func isTypeSupported(t arrow.DataType) bool { + switch t.(type) { + case *arrow.BooleanType, + *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type, + *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type, + *arrow.Float32Type, *arrow.Float64Type, + *arrow.StringType, + *arrow.TimestampType, + *arrow.Date32Type, *arrow.Date64Type, + *arrow.Decimal128Type, *arrow.Decimal256Type, + *arrow.BinaryType: + return true + } + + return false +} + +func (h *Handle) castToString(rec arrow.Record) arrow.Record { + cols := make([]arrow.Array, h.schema.NumFields()) + for c := 0; c < h.schema.NumFields(); c++ { + col := rec.Column(c) + if isTypeSupported(col.DataType()) { + cols[c] = col + continue + } + + sb := array.NewStringBuilder(memory.DefaultAllocator) + for i := 0; i < col.Len(); i++ { + if col.IsNull(i) { + sb.AppendNull() + continue + } + sb.Append(col.ValueStr(i)) + } + cols[c] = sb.NewArray() + } + return array.NewRecord(h.schema, cols, rec.NumRows()) +} diff --git a/xlsx/write_read_test.go b/xlsx/write_read_test.go new file mode 100644 index 0000000..52fe26c --- /dev/null +++ b/xlsx/write_read_test.go @@ -0,0 +1,114 @@ +package xlsx + +import ( + "bufio" + "bytes" + "io" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/bradleyjkemp/cupaloy/v2" + "github.com/cloudquery/filetypes/v4/types" + "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/stretchr/testify/require" +) + +func TestWriteRead(t *testing.T) { + cases := []struct { + name string + options []Options + outputCount int + }{ + {name: "default", outputCount: 2}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + table := schema.TestTable("test", schema.TestSourceOptions{}) + sourceName := "test-source" + syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + opts := schema.GenTestDataOptions{ + SourceName: sourceName, + SyncTime: syncTime, + MaxRows: 2, + StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + } + tg := schema.NewTestDataGenerator(0) + record := tg.Generate(table, opts) + + cl, err := NewClient(tc.options...) + if err != nil { + t.Fatal(err) + } + + var b bytes.Buffer + writer := bufio.NewWriter(&b) + reader := bufio.NewReader(&b) + + if err := types.WriteAll(cl, writer, table, []arrow.Record{record}); err != nil { + t.Fatal(err) + } + writer.Flush() + + rawBytes, err := io.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + snap := cupaloy.New( + cupaloy.SnapshotFileExtension(".xlsx"), + cupaloy.SnapshotSubdirectory("testdata"), + ) + snap.SnapshotT(t, string(rawBytes)) + + byteReader := bytes.NewReader(rawBytes) + + ch := make(chan arrow.Record) + var readErr error + go func() { + readErr = cl.Read(byteReader, table, ch) + close(ch) + }() + received := make([]arrow.Record, 0, tc.outputCount) + for got := range ch { + received = append(received, got) + } + require.Empty(t, plugin.RecordsDiff(table.ToArrowSchema(), []arrow.Record{record}, received)) + require.NoError(t, readErr) + require.Equalf(t, tc.outputCount, len(received), "got %d row(s), want %d", len(received), tc.outputCount) + }) + } +} + +func BenchmarkWrite(b *testing.B) { + table := schema.TestTable("test", schema.TestSourceOptions{}) + sourceName := "test-source" + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ + SourceName: sourceName, + SyncTime: syncTime, + MaxRows: 1000, + } + tg := schema.NewTestDataGenerator(0) + record := tg.Generate(table, opts) + + cl, err := NewClient() + if err != nil { + b.Fatal(err) + } + var buf bytes.Buffer + writer := bufio.NewWriter(&buf) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := types.WriteAll(cl, writer, table, []arrow.Record{record}); err != nil { + b.Fatal(err) + } + + err = writer.Flush() + if err != nil { + b.Fatal(err) + } + buf.Reset() + } +}