Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ const (
SkipArchive = "glue.skip-archive"
SkipArchiveDefault = true

// GlueSchemaColumns controls whether schema columns are written to the Glue
// StorageDescriptor on table create/update. Set to "false" to omit columns,
// which avoids exceeding Glue's API payload limit for tables with very large
// schemas. Note: disabling columns breaks Athena column discovery via Glue
// and is incompatible with Lake Formation column-level access control.
// Cannot be set to "false" when GlueLakeFormationEnabled is "true".
GlueSchemaColumns = "glue.schema-columns"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a known property that exists outside of our usage of it here? or are we creating an entirely new usage/property here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, that's a case, it's something new.

This is a new property, it doesn't exist in iceberg-java or pyiceberg today. However, it follows the established glue.* namespace convention used consistently across implementations (e.g. glue.skip-archive, glue.id, glue.endpoint).

The Java community attempted to introduce something similar twice — glue.non-current-fields-disabled in apache/iceberg#11334 and apache/iceberg#12664 — both targeting the narrower problem of historical/non-current fields. Those PRs stalled and were closed due to inactivity without consensus, i would try to make them merged later, i think this has value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to get some pre-alignment in slack, if you can point me to a right direction here, so we can start this from broader context, not just iceberg-go one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best route here would be to send an email to the iceberg mailing list, linking this PR, and starting the discussion there.

GlueSchemaColumnsDefault = true

// GlueLakeFormationEnabled enables Lake Formation access control for the catalog.
// When set to "true", schema columns cannot be omitted from the StorageDescriptor
// (i.e. GlueSchemaColumns must remain at its default value of "true").
GlueLakeFormationEnabled = "glue.lakeformation-enabled"
GlueLakeFormationEnabledDefault = false

AccessKeyID = "glue.access-key-id"
SecretAccessKey = "glue.secret-access-key"
SessionToken = "glue.session-token"
Expand Down Expand Up @@ -171,6 +186,19 @@ func NewCatalog(opts ...Option) *Catalog {
}
}

// schemaColumnsEnabled returns whether schema columns should be included in the
// Glue StorageDescriptor, and an error if the configuration is invalid.
func (c *Catalog) schemaColumnsEnabled() (bool, error) {
include := c.props.GetBool(GlueSchemaColumns, GlueSchemaColumnsDefault)
if !include && c.props.GetBool(GlueLakeFormationEnabled, GlueLakeFormationEnabledDefault) {
return false, fmt.Errorf("%s=false is incompatible with %s=true: "+
"Lake Formation column-level access control requires StorageDescriptor columns",
GlueSchemaColumns, GlueLakeFormationEnabled)
}

return include, nil
}

// ListTables returns a list of Iceberg tables in the given Glue database.
//
// The namespace should just contain the Glue database name.
Expand Down Expand Up @@ -250,6 +278,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
return nil, errors.New("loaded filesystem IO does not support writing")
}

includeColumns, err := c.schemaColumnsEnabled()
if err != nil {
return nil, err
}

compression := staged.Table.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault)
if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation(), compression); err != nil {
return nil, err
Expand All @@ -258,7 +291,7 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: constructTableInput(tableName, staged.Table, nil),
TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns),
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
Expand All @@ -269,6 +302,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,

// RegisterTable registers a new table using existing metadata.
func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) {
includeColumns, err := c.schemaColumnsEnabled()
if err != nil {
return nil, err
}

database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
Expand All @@ -290,7 +328,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: constructTableInput(tableName, tbl, nil),
TableInput: constructTableInput(tableName, tbl, nil, includeColumns),
})
if err != nil {
return nil, fmt.Errorf("failed to register table %s.%s: %w", database, tableName, err)
Expand All @@ -300,6 +338,11 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
}

func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
includeColumns, err := c.schemaColumnsEnabled()
if err != nil {
return nil, "", err
}

// Load current table
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
Expand Down Expand Up @@ -338,7 +381,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier,
_, err = c.glueSvc.UpdateTable(ctx, &glue.UpdateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: constructTableInput(tableName, staged.Table, currentGlueTable),
TableInput: constructTableInput(tableName, staged.Table, currentGlueTable, includeColumns),
// use `VersionId` to implement optimistic locking
VersionId: currentGlueTable.VersionId,
SkipArchive: aws.Bool(c.props.GetBool(SkipArchive, SkipArchiveDefault)),
Expand All @@ -350,7 +393,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier,
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
CatalogId: c.catalogId,
DatabaseName: aws.String(database),
TableInput: constructTableInput(tableName, staged.Table, nil),
TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns),
})
if err != nil {
return nil, "", err
Expand Down Expand Up @@ -761,15 +804,19 @@ func constructParameters(staged *table.Table, previousGlueTable *types.Table) ma
return parameters
}

func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table) *types.TableInput {
func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table, includeColumns bool) *types.TableInput {
sd := &types.StorageDescriptor{
Location: aws.String(staged.Location()),
}
if includeColumns {
sd.Columns = schemasToGlueColumns(staged.Metadata())
}

tableInput := &types.TableInput{
Name: aws.String(tableName),
TableType: aws.String(glueTableType),
Parameters: constructParameters(staged, previousGlueTable),
StorageDescriptor: &types.StorageDescriptor{
Location: aws.String(staged.Location()),
Columns: schemasToGlueColumns(staged.Metadata()),
},
Name: aws.String(tableName),
TableType: aws.String(glueTableType),
Parameters: constructParameters(staged, previousGlueTable),
StorageDescriptor: sd,
}

if comment, ok := staged.Properties()[PropsKeyDescription]; ok {
Expand Down
48 changes: 48 additions & 0 deletions catalog/glue/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,54 @@ func TestGlueCheckTableNotExists(t *testing.T) {
assert.False(exists)
}

func TestConstructTableInputSchemaColumns(t *testing.T) {
schema := iceberg.NewSchemaWithIdentifiers(1, []int{1},
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.Int64Type{}, Required: true},
iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.StringType{}},
)

builder, err := table.NewMetadataBuilder(2)
require.NoError(t, err)
require.NoError(t, builder.SetLoc("s3://bucket/db/tbl"))
require.NoError(t, builder.AddSchema(schema))
require.NoError(t, builder.SetCurrentSchemaID(1))
require.NoError(t, builder.AddPartitionSpec(iceberg.UnpartitionedSpec, true))
require.NoError(t, builder.SetDefaultSpecID(iceberg.UnpartitionedSpec.ID()))
require.NoError(t, builder.AddSortOrder(&table.UnsortedSortOrder))
require.NoError(t, builder.SetDefaultSortOrderID(table.UnsortedSortOrderID))
meta, err := builder.Build()
require.NoError(t, err)

tbl := table.New([]string{"db", "tbl"}, meta, "s3://bucket/db/tbl/metadata/v1.json", nil, nil)

t.Run("columns included by default", func(t *testing.T) {
input := constructTableInput("tbl", tbl, nil, true)
require.NotNil(t, input.StorageDescriptor)
assert := require.New(t)
assert.Len(input.StorageDescriptor.Columns, 2)
})

t.Run("columns omitted when disabled", func(t *testing.T) {
input := constructTableInput("tbl", tbl, nil, false)
require.NotNil(t, input.StorageDescriptor)
assert := require.New(t)
assert.Empty(input.StorageDescriptor.Columns)
})
}

func TestSchemaColumnsLakeFormationIncompatibility(t *testing.T) {
cat := &Catalog{
props: iceberg.Properties{
GlueSchemaColumns: "false",
GlueLakeFormationEnabled: "true",
},
}
_, err := cat.schemaColumnsEnabled()
require.Error(t, err)
require.Contains(t, err.Error(), GlueSchemaColumns)
require.Contains(t, err.Error(), GlueLakeFormationEnabled)
}

func cleanupTable(t *testing.T, ctlg catalog.Catalog, tbIdent table.Identifier, awsCfg aws.Config) {
t.Helper()

Expand Down
Loading