Skip to content

Commit e5f7670

Browse files
fix(glue): add glue.schema-columns property to skip StorageDescriptor columns
Introduces a new catalog property `glue.schema-columns` (default: `true`) that controls whether Iceberg schema columns are written to the Glue StorageDescriptor on table create and update operations. When set to `false`, the StorageDescriptor is populated with location only, avoiding the Glue API payload size limit that is hit for tables with very large schemas (e.g. ~3600 fields). Fixes #701
1 parent 52dbce5 commit e5f7670

File tree

2 files changed

+107
-12
lines changed

2 files changed

+107
-12
lines changed

catalog/glue/glue.go

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ const (
6565
SkipArchive = "glue.skip-archive"
6666
SkipArchiveDefault = true
6767

68+
// GlueSchemaColumns controls whether schema columns are written to the Glue
69+
// StorageDescriptor on table create/update. Set to "false" to omit columns,
70+
// which avoids exceeding Glue's API payload limit for tables with very large
71+
// schemas. Note: disabling columns breaks Athena column discovery via Glue
72+
// and is incompatible with Lake Formation column-level access control.
73+
// Cannot be set to "false" when GlueLakeFormationEnabled is "true".
74+
GlueSchemaColumns = "glue.schema-columns"
75+
GlueSchemaColumnsDefault = true
76+
77+
// GlueLakeFormationEnabled enables Lake Formation access control for the catalog.
78+
// When set to "true", schema columns cannot be omitted from the StorageDescriptor
79+
// (i.e. GlueSchemaColumns must remain at its default value of "true").
80+
GlueLakeFormationEnabled = "glue.lakeformation-enabled"
81+
GlueLakeFormationEnabledDefault = false
82+
6883
AccessKeyID = "glue.access-key-id"
6984
SecretAccessKey = "glue.secret-access-key"
7085
SessionToken = "glue.session-token"
@@ -171,6 +186,19 @@ func NewCatalog(opts ...Option) *Catalog {
171186
}
172187
}
173188

189+
// schemaColumnsEnabled returns whether schema columns should be included in the
190+
// Glue StorageDescriptor, and an error if the configuration is invalid.
191+
func (c *Catalog) schemaColumnsEnabled() (bool, error) {
192+
include := c.props.GetBool(GlueSchemaColumns, GlueSchemaColumnsDefault)
193+
if !include && c.props.GetBool(GlueLakeFormationEnabled, GlueLakeFormationEnabledDefault) {
194+
return false, fmt.Errorf("%s=false is incompatible with %s=true: "+
195+
"Lake Formation column-level access control requires StorageDescriptor columns",
196+
GlueSchemaColumns, GlueLakeFormationEnabled)
197+
}
198+
199+
return include, nil
200+
}
201+
174202
// ListTables returns a list of Iceberg tables in the given Glue database.
175203
//
176204
// The namespace should just contain the Glue database name.
@@ -250,6 +278,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
250278
return nil, errors.New("loaded filesystem IO does not support writing")
251279
}
252280

281+
includeColumns, err := c.schemaColumnsEnabled()
282+
if err != nil {
283+
return nil, err
284+
}
285+
253286
compression := staged.Table.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault)
254287
if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation(), compression); err != nil {
255288
return nil, err
@@ -258,7 +291,7 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
258291
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
259292
CatalogId: c.catalogId,
260293
DatabaseName: aws.String(database),
261-
TableInput: constructTableInput(tableName, staged.Table, nil),
294+
TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns),
262295
})
263296
if err != nil {
264297
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
@@ -269,6 +302,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
269302

270303
// RegisterTable registers a new table using existing metadata.
271304
func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) {
305+
includeColumns, err := c.schemaColumnsEnabled()
306+
if err != nil {
307+
return nil, err
308+
}
309+
272310
database, tableName, err := identifierToGlueTable(identifier)
273311
if err != nil {
274312
return nil, err
@@ -290,7 +328,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
290328
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
291329
CatalogId: c.catalogId,
292330
DatabaseName: aws.String(database),
293-
TableInput: constructTableInput(tableName, tbl, nil),
331+
TableInput: constructTableInput(tableName, tbl, nil, includeColumns),
294332
})
295333
if err != nil {
296334
return nil, fmt.Errorf("failed to register table %s.%s: %w", database, tableName, err)
@@ -300,6 +338,11 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
300338
}
301339

302340
func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
341+
includeColumns, err := c.schemaColumnsEnabled()
342+
if err != nil {
343+
return nil, "", err
344+
}
345+
303346
// Load current table
304347
database, tableName, err := identifierToGlueTable(identifier)
305348
if err != nil {
@@ -338,7 +381,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier,
338381
_, err = c.glueSvc.UpdateTable(ctx, &glue.UpdateTableInput{
339382
CatalogId: c.catalogId,
340383
DatabaseName: aws.String(database),
341-
TableInput: constructTableInput(tableName, staged.Table, currentGlueTable),
384+
TableInput: constructTableInput(tableName, staged.Table, currentGlueTable, includeColumns),
342385
// use `VersionId` to implement optimistic locking
343386
VersionId: currentGlueTable.VersionId,
344387
SkipArchive: aws.Bool(c.props.GetBool(SkipArchive, SkipArchiveDefault)),
@@ -350,7 +393,7 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier,
350393
_, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
351394
CatalogId: c.catalogId,
352395
DatabaseName: aws.String(database),
353-
TableInput: constructTableInput(tableName, staged.Table, nil),
396+
TableInput: constructTableInput(tableName, staged.Table, nil, includeColumns),
354397
})
355398
if err != nil {
356399
return nil, "", err
@@ -761,15 +804,19 @@ func constructParameters(staged *table.Table, previousGlueTable *types.Table) ma
761804
return parameters
762805
}
763806

764-
func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table) *types.TableInput {
807+
func constructTableInput(tableName string, staged *table.Table, previousGlueTable *types.Table, includeColumns bool) *types.TableInput {
808+
sd := &types.StorageDescriptor{
809+
Location: aws.String(staged.Location()),
810+
}
811+
if includeColumns {
812+
sd.Columns = schemasToGlueColumns(staged.Metadata())
813+
}
814+
765815
tableInput := &types.TableInput{
766-
Name: aws.String(tableName),
767-
TableType: aws.String(glueTableType),
768-
Parameters: constructParameters(staged, previousGlueTable),
769-
StorageDescriptor: &types.StorageDescriptor{
770-
Location: aws.String(staged.Location()),
771-
Columns: schemasToGlueColumns(staged.Metadata()),
772-
},
816+
Name: aws.String(tableName),
817+
TableType: aws.String(glueTableType),
818+
Parameters: constructParameters(staged, previousGlueTable),
819+
StorageDescriptor: sd,
773820
}
774821

775822
if comment, ok := staged.Properties()[PropsKeyDescription]; ok {

catalog/glue/glue_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,54 @@ func TestGlueCheckTableNotExists(t *testing.T) {
12961296
assert.False(exists)
12971297
}
12981298

1299+
func TestConstructTableInputSchemaColumns(t *testing.T) {
1300+
schema := iceberg.NewSchemaWithIdentifiers(1, []int{1},
1301+
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.Int64Type{}, Required: true},
1302+
iceberg.NestedField{ID: 2, Name: "data", Type: iceberg.StringType{}},
1303+
)
1304+
1305+
builder, err := table.NewMetadataBuilder(2)
1306+
require.NoError(t, err)
1307+
require.NoError(t, builder.SetLoc("s3://bucket/db/tbl"))
1308+
require.NoError(t, builder.AddSchema(schema))
1309+
require.NoError(t, builder.SetCurrentSchemaID(1))
1310+
require.NoError(t, builder.AddPartitionSpec(iceberg.UnpartitionedSpec, true))
1311+
require.NoError(t, builder.SetDefaultSpecID(iceberg.UnpartitionedSpec.ID()))
1312+
require.NoError(t, builder.AddSortOrder(&table.UnsortedSortOrder))
1313+
require.NoError(t, builder.SetDefaultSortOrderID(table.UnsortedSortOrderID))
1314+
meta, err := builder.Build()
1315+
require.NoError(t, err)
1316+
1317+
tbl := table.New([]string{"db", "tbl"}, meta, "s3://bucket/db/tbl/metadata/v1.json", nil, nil)
1318+
1319+
t.Run("columns included by default", func(t *testing.T) {
1320+
input := constructTableInput("tbl", tbl, nil, true)
1321+
require.NotNil(t, input.StorageDescriptor)
1322+
assert := require.New(t)
1323+
assert.Len(input.StorageDescriptor.Columns, 2)
1324+
})
1325+
1326+
t.Run("columns omitted when disabled", func(t *testing.T) {
1327+
input := constructTableInput("tbl", tbl, nil, false)
1328+
require.NotNil(t, input.StorageDescriptor)
1329+
assert := require.New(t)
1330+
assert.Empty(input.StorageDescriptor.Columns)
1331+
})
1332+
}
1333+
1334+
func TestSchemaColumnsLakeFormationIncompatibility(t *testing.T) {
1335+
cat := &Catalog{
1336+
props: iceberg.Properties{
1337+
GlueSchemaColumns: "false",
1338+
GlueLakeFormationEnabled: "true",
1339+
},
1340+
}
1341+
_, err := cat.schemaColumnsEnabled()
1342+
require.Error(t, err)
1343+
require.Contains(t, err.Error(), GlueSchemaColumns)
1344+
require.Contains(t, err.Error(), GlueLakeFormationEnabled)
1345+
}
1346+
12991347
func cleanupTable(t *testing.T, ctlg catalog.Catalog, tbIdent table.Identifier, awsCfg aws.Config) {
13001348
t.Helper()
13011349

0 commit comments

Comments
 (0)