diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go index bd95d3eba..16ae462df 100644 --- a/catalog/glue/glue.go +++ b/catalog/glue/glue.go @@ -65,6 +65,21 @@ const ( SkipArchive = "glue.skip-archive" SkipArchiveDefault = true + // GlueSchemaColumns controls whether schema columns are written to the Glue + // StorageDescriptor on table create/update. Set to "false" to omit columns, + // which avoids exceeding Glue's API payload limit for tables with very large + // schemas. Note: disabling columns breaks Athena column discovery via Glue + // and is incompatible with Lake Formation column-level access control. + // Cannot be set to "false" when GlueLakeFormationEnabled is "true". + GlueSchemaColumns = "glue.schema-columns" + GlueSchemaColumnsDefault = true + + // GlueLakeFormationEnabled enables Lake Formation access control for the catalog. + // When set to "true", schema columns cannot be omitted from the StorageDescriptor + // (i.e. GlueSchemaColumns must remain at its default value of "true"). + GlueLakeFormationEnabled = "glue.lakeformation-enabled" + GlueLakeFormationEnabledDefault = false + AccessKeyID = "glue.access-key-id" SecretAccessKey = "glue.secret-access-key" SessionToken = "glue.session-token" @@ -171,6 +186,19 @@ func NewCatalog(opts ...Option) *Catalog { } } +// schemaColumnsEnabled returns whether schema columns should be included in the +// Glue StorageDescriptor, and an error if the configuration is invalid. +func (c *Catalog) schemaColumnsEnabled() (bool, error) { + include := c.props.GetBool(GlueSchemaColumns, GlueSchemaColumnsDefault) + if !include && c.props.GetBool(GlueLakeFormationEnabled, GlueLakeFormationEnabledDefault) { + return false, fmt.Errorf("%s=false is incompatible with %s=true: "+ + "Lake Formation column-level access control requires StorageDescriptor columns", + GlueSchemaColumns, GlueLakeFormationEnabled) + } + + return include, nil +} + // ListTables returns a list of Iceberg tables in the given Glue database. // // The namespace should just contain the Glue database name. @@ -250,6 +278,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return nil, errors.New("loaded filesystem IO does not support writing") } + includeColumns, err := c.schemaColumnsEnabled() + if err != nil { + return nil, err + } + compression := staged.Table.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault) if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation(), compression); err != nil { return nil, err @@ -258,7 +291,7 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), - TableInput: constructTableInput(tableName, staged.Table, nil), + TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns), }) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) @@ -269,6 +302,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, // RegisterTable registers a new table using existing metadata. func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) { + includeColumns, err := c.schemaColumnsEnabled() + if err != nil { + return nil, err + } + database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err @@ -290,7 +328,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), - TableInput: constructTableInput(tableName, tbl, nil), + TableInput: constructTableInput(tableName, tbl, nil, includeColumns), }) if err != nil { return nil, fmt.Errorf("failed to register table %s.%s: %w", database, tableName, err) @@ -300,6 +338,11 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier } func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { + includeColumns, err := c.schemaColumnsEnabled() + if err != nil { + return nil, "", err + } + // Load current table database, tableName, err := identifierToGlueTable(identifier) if err != nil { @@ -338,7 +381,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, _, err = c.glueSvc.UpdateTable(ctx, &glue.UpdateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), - TableInput: constructTableInput(tableName, staged.Table, currentGlueTable), + TableInput: constructTableInput(tableName, staged.Table, currentGlueTable, includeColumns), // use `VersionId` to implement optimistic locking VersionId: currentGlueTable.VersionId, SkipArchive: aws.Bool(c.props.GetBool(SkipArchive, SkipArchiveDefault)), @@ -350,7 +393,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{ CatalogId: c.catalogId, DatabaseName: aws.String(database), - TableInput: constructTableInput(tableName, staged.Table, nil), + TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns), }) if err != nil { return nil, "", err @@ -761,15 +804,19 @@ func constructParameters(staged *table.Table, previousGlueTable *types.Table) ma return parameters } -func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table) *types.TableInput { +func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table, includeColumns bool) *types.TableInput { + sd := &types.StorageDescriptor{ + Location: aws.String(staged.Location()), + } + if includeColumns { + sd.Columns = schemasToGlueColumns(staged.Metadata()) + } + tableInput := &types.TableInput{ - Name: aws.String(tableName), - TableType: aws.String(glueTableType), - Parameters: constructParameters(staged, previousGlueTable), - StorageDescriptor: &types.StorageDescriptor{ - Location: aws.String(staged.Location()), - Columns: schemasToGlueColumns(staged.Metadata()), - }, + Name: aws.String(tableName), + TableType: aws.String(glueTableType), + Parameters: constructParameters(staged, previousGlueTable), + StorageDescriptor: sd, } if comment, ok := staged.Properties()[PropsKeyDescription]; ok { diff --git a/catalog/glue/glue_test.go b/catalog/glue/glue_test.go index ddadcdf58..5eaebc164 100644 --- a/catalog/glue/glue_test.go +++ b/catalog/glue/glue_test.go @@ -1296,6 +1296,54 @@ func TestGlueCheckTableNotExists(t *testing.T) { assert.False(exists) } +func TestConstructTableInputSchemaColumns(t *testing.T) { + schema := iceberg.NewSchemaWithIdentifiers(1, []int{1}, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.Int64Type{}, Required: true}, + iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.StringType{}}, + ) + + builder, err := table.NewMetadataBuilder(2) + require.NoError(t, err) + require.NoError(t, builder.SetLoc("s3://bucket/db/tbl")) + require.NoError(t, builder.AddSchema(schema)) + require.NoError(t, builder.SetCurrentSchemaID(1)) + require.NoError(t, builder.AddPartitionSpec(iceberg.UnpartitionedSpec, true)) + require.NoError(t, builder.SetDefaultSpecID(iceberg.UnpartitionedSpec.ID())) + require.NoError(t, builder.AddSortOrder(&table.UnsortedSortOrder)) + require.NoError(t, builder.SetDefaultSortOrderID(table.UnsortedSortOrderID)) + meta, err := builder.Build() + require.NoError(t, err) + + tbl := table.New([]string{"db", "tbl"}, meta, "s3://bucket/db/tbl/metadata/v1.json", nil, nil) + + t.Run("columns included by default", func(t *testing.T) { + input := constructTableInput("tbl", tbl, nil, true) + require.NotNil(t, input.StorageDescriptor) + assert := require.New(t) + assert.Len(input.StorageDescriptor.Columns, 2) + }) + + t.Run("columns omitted when disabled", func(t *testing.T) { + input := constructTableInput("tbl", tbl, nil, false) + require.NotNil(t, input.StorageDescriptor) + assert := require.New(t) + assert.Empty(input.StorageDescriptor.Columns) + }) +} + +func TestSchemaColumnsLakeFormationIncompatibility(t *testing.T) { + cat := &Catalog{ + props: iceberg.Properties{ + GlueSchemaColumns: "false", + GlueLakeFormationEnabled: "true", + }, + } + _, err := cat.schemaColumnsEnabled() + require.Error(t, err) + require.Contains(t, err.Error(), GlueSchemaColumns) + require.Contains(t, err.Error(), GlueLakeFormationEnabled) +} + func cleanupTable(t *testing.T, ctlg catalog.Catalog, tbIdent table.Identifier, awsCfg aws.Config) { t.Helper()