Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ type SchemaVisitorPerPrimitiveType[T any] interface {
VisitBinary() T
VisitUUID() T
VisitUnknown() T
VisitVariant() T
}

// Visit accepts a visitor and performs a post-order traversal of the given schema.
Expand Down Expand Up @@ -640,6 +641,8 @@ func visitField[T any](f NestedField, visitor SchemaVisitor[T]) T {
return perPrimitive.VisitFixed(t)
case UnknownType:
return perPrimitive.VisitUnknown()
case VariantType:
return perPrimitive.VisitVariant()
}
}

Expand Down
27 changes: 25 additions & 2 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,12 @@ func (c convertToIceberg) Primitive(dt arrow.DataType) (result iceberg.NestedFie
case *arrow.FixedSizeBinaryType:
result.Type = iceberg.FixedTypeOf(dt.ByteWidth)
case arrow.ExtensionType:
if dt.ExtensionName() == "arrow.uuid" {
switch dt.ExtensionName() {
case "arrow.uuid":
result.Type = iceberg.PrimitiveTypes.UUID
} else {
case "parquet.variant":
result.Type = iceberg.PrimitiveTypes.Variant
default:
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
default:
Expand Down Expand Up @@ -622,6 +625,22 @@ func (c convertToArrow) VisitUnknown() arrow.Field {
}
}

func (c convertToArrow) VisitVariant() arrow.Field {
// Use the Arrow Variant extension type (available since arrow-go v18.4.0)
// per the Parquet Variant specification.
// See: https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow/extensions#VariantType
if c.useLargeTypes {
vt, _ := extensions.NewVariantType(arrow.StructOf(
arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.LargeBinary, Nullable: false},
arrow.Field{Name: "value", Type: arrow.BinaryTypes.LargeBinary, Nullable: false},
))

return arrow.Field{Type: vt}
}

return arrow.Field{Type: extensions.NewDefaultVariantType()}
}

var _ iceberg.SchemaVisitorPerPrimitiveType[arrow.Field] = convertToArrow{}

// SchemaToArrowSchema converts an Iceberg schema to an Arrow schema. If the metadata parameter
Expand Down Expand Up @@ -1166,6 +1185,10 @@ func (a *arrowStatsCollector) Primitive(dt iceberg.PrimitiveType) []tblutils.Sta
metMode = tblutils.MetricsMode{Typ: tblutils.MetricModeCounts}
}

if _, ok := dt.(iceberg.VariantType); ok {
metMode = tblutils.MetricsMode{Typ: tblutils.MetricModeCounts}
}

return []tblutils.StatisticsCollector{{
FieldID: a.fieldID,
IcebergTyp: dt,
Expand Down
1 change: 1 addition & 0 deletions table/arrow_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestArrowToIceberg(t *testing.T) {
{arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary, false, ""},
{arrow.BinaryTypes.BinaryView, nil, false, "unsupported arrow type for conversion - binary_view"},
{extensions.NewUUIDType(), iceberg.PrimitiveTypes.UUID, true, ""},
{extensions.NewDefaultVariantType(), iceberg.PrimitiveTypes.Variant, true, ""},
{arrow.StructOf(arrow.Field{
Name: "foo",
Type: arrow.BinaryTypes.String,
Expand Down
5 changes: 4 additions & 1 deletion table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]St
panic(err)
}

fieldID := colMapping[colChunk.PathInSchema().String()]
fieldID, ok := colMapping[colChunk.PathInSchema().String()]
if !ok {
continue
}
statsCol := statsCols[fieldID]
if statsCol.Mode.Typ == MetricModeNone {
continue
Expand Down
6 changes: 6 additions & 0 deletions table/substrait/substrait.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (convertToSubstrait) VisitUnknown() types.Type {
return nil
}

func (convertToSubstrait) VisitVariant() types.Type {
// Variant types have no direct Substrait equivalent
// Returning nil indicates this type cannot be converted to Substrait
return nil
}

var _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] = (*convertToSubstrait)(nil)

var (
Expand Down
125 changes: 125 additions & 0 deletions table/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@ import (

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/arrow-go/v18/parquet/variant"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/internal/recipe"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go/modules/compose"
)
Expand Down Expand Up @@ -363,6 +367,127 @@ func (s *SparkIntegrationTestSuite) TestUpdateSpec() {
)
}

func (s *SparkIntegrationTestSuite) TestVariantType() {
icebergSchema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32},
iceberg.NestedField{ID: 2, Name: "variant_col", Type: iceberg.PrimitiveTypes.Variant},
)

// Variant type requires format version 3
tbl, err := s.cat.CreateTable(
s.ctx,
catalog.ToIdentifier("default", "go_test_variant"),
icebergSchema,
catalog.WithProperties(iceberg.Properties{table.PropertyFormatVersion: "3"}),
)
s.Require().NoError(err)

arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, false)
s.Require().NoError(err)

// Build Arrow table with Variant data using the variant extension type
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(s.T(), 0)

// Create builders
intBldr := array.NewInt32Builder(mem)
defer intBldr.Release()

variantBldr := extensions.NewVariantBuilder(mem, extensions.NewDefaultVariantType())
defer variantBldr.Release()

// Row 1: integer value in variant
intBldr.Append(1)
variantBldr.Append(mustVariant(s.T(), int32(42)))

// Row 2: string value in variant
intBldr.Append(2)
variantBldr.Append(mustVariant(s.T(), "hello"))

// Row 3: null variant
intBldr.Append(3)
variantBldr.AppendNull()

// Row 4: object value in variant
intBldr.Append(4)
variantBldr.Append(mustVariant(s.T(), map[string]any{
"name": "test",
"value": int64(123),
}))

// Build arrays
idArr := intBldr.NewInt32Array()
defer idArr.Release()
variantArr := variantBldr.NewArray().(*extensions.VariantArray)
defer variantArr.Release()

// Create record batch
rec := array.NewRecord(arrowSchema, []arrow.Array{idArr, variantArr}, int64(idArr.Len()))
defer rec.Release()

// Convert to Arrow Table
arrTable := array.NewTableFromRecords(arrowSchema, []arrow.Record{rec})
s.Require().NotNil(arrTable)
defer arrTable.Release()

// Write data to Iceberg table
tx := tbl.NewTransaction()
s.Require().NoError(tx.AppendTable(s.ctx, arrTable, 1000, nil))
_, err = tx.Commit(s.ctx)
s.Require().NoError(err)

// Read back and verify the data
ctx := compute.WithAllocator(s.ctx, mem)
results, err := tbl.Scan().ToArrowTable(ctx)
s.Require().NoError(err)
defer results.Release()

s.EqualValues(4, results.NumRows())
s.EqualValues(2, results.NumCols())

// Verify schema
s.True(arrowSchema.Equal(results.Schema()),
"expected: %s\ngot: %s\n", arrowSchema, results.Schema())

// Verify ID column
idCol := results.Column(0).Data().Chunk(0).(*array.Int32)
s.EqualValues(1, idCol.Value(0))
s.EqualValues(2, idCol.Value(1))
s.EqualValues(3, idCol.Value(2))
s.EqualValues(4, idCol.Value(3))

// Verify Variant column
variantCol := results.Column(1).Data().Chunk(0).(*extensions.VariantArray)
s.False(variantCol.IsNull(0))
s.False(variantCol.IsNull(1))
s.True(variantCol.IsNull(2))
s.False(variantCol.IsNull(3))

// Verify variant values
val0, err := variantCol.Value(0)
s.Require().NoError(err)
s.EqualValues(int32(42), val0.Value())

val1, err := variantCol.Value(1)
s.Require().NoError(err)
s.EqualValues("hello", val1.Value())

val3, err := variantCol.Value(3)
s.Require().NoError(err)
objVal, ok := val3.Value().(variant.ObjectValue)
s.True(ok, "expected ObjectValue, got %T", val3.Value())
s.EqualValues(2, objVal.NumElements())
}

// mustVariant is a helper to create variant values from any supported type
func mustVariant(t *testing.T, v any) variant.Value {
var b variant.Builder
require.NoError(t, b.Append(v))
val, err := b.Build()
require.NoError(t, err)
return val
}

func TestSparkIntegration(t *testing.T) {
suite.Run(t, new(SparkIntegrationTestSuite))
}
18 changes: 18 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (t *typeIFace) UnmarshalJSON(b []byte) error {
t.Type = BinaryType{}
case "unknown":
t.Type = UnknownType{}
case "variant":
t.Type = VariantType{}
default:
switch {
case strings.HasPrefix(typename, "fixed"):
Expand Down Expand Up @@ -743,6 +745,20 @@ func (UnknownType) primitive() {}
func (UnknownType) Type() string { return "unknown" }
func (UnknownType) String() string { return "unknown" }

// VariantType represents semi-structured data stored in a binary format.
// Requires format version 3+.
type VariantType struct{}

func (VariantType) Equals(other Type) bool {
_, ok := other.(VariantType)

return ok
}

func (VariantType) primitive() {}
func (VariantType) Type() string { return "variant" }
func (VariantType) String() string { return "variant" }

var PrimitiveTypes = struct {
Bool PrimitiveType
Int32 PrimitiveType
Expand All @@ -759,6 +775,7 @@ var PrimitiveTypes = struct {
Binary PrimitiveType
UUID PrimitiveType
Unknown PrimitiveType
Variant PrimitiveType
}{
Bool: BooleanType{},
Int32: Int32Type{},
Expand All @@ -775,6 +792,7 @@ var PrimitiveTypes = struct {
Binary: BinaryType{},
UUID: UUIDType{},
Unknown: UnknownType{},
Variant: VariantType{},
}

// PromoteType promotes the type being read from a file to a requested read type.
Expand Down
3 changes: 3 additions & 0 deletions types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestTypesBasic(t *testing.T) {
{"uuid", iceberg.PrimitiveTypes.UUID},
{"binary", iceberg.PrimitiveTypes.Binary},
{"unknown", iceberg.PrimitiveTypes.Unknown},
{"variant", iceberg.PrimitiveTypes.Variant},
{"fixed[5]", iceberg.FixedTypeOf(5)},
{"decimal(9, 4)", iceberg.DecimalTypeOf(9, 4)},
}
Expand Down Expand Up @@ -188,6 +189,7 @@ var NonParameterizedTypes = []iceberg.Type{
iceberg.PrimitiveTypes.String,
iceberg.PrimitiveTypes.Binary,
iceberg.PrimitiveTypes.UUID,
iceberg.PrimitiveTypes.Variant,
iceberg.PrimitiveTypes.Unknown,
}

Expand Down Expand Up @@ -221,6 +223,7 @@ func TestTypeStrings(t *testing.T) {
{iceberg.PrimitiveTypes.TimestampTzNs, "timestamptz_ns"},
{iceberg.PrimitiveTypes.String, "string"},
{iceberg.PrimitiveTypes.UUID, "uuid"},
{iceberg.PrimitiveTypes.Variant, "variant"},
{iceberg.PrimitiveTypes.Binary, "binary"},
{iceberg.PrimitiveTypes.Unknown, "unknown"},
{iceberg.FixedTypeOf(22), "fixed[22]"},
Expand Down
Loading