Skip to content

Commit 780e670

Browse files
committed
Add WriteZeroOptionalFields option for Snowflake compatibility
This commit adds a new WriterConfig option 'WriteZeroOptionalFields' that forces the thrift encoder to write optional fields even when they have zero values. This fixes compatibility issues with systems like Snowflake that require explicit null counts in statistics (NullCount=0) even when columns have no null values. Changes: - Added WriteZeroOptionalFields feature flag to thrift protocol - Made CompactProtocol configurable with SetFeatures method - Updated thrift encoder to respect writeZeroOptionalFields flag - Added WriteZeroOptionalFields config option to WriterConfig - Wired the option through to ColumnWriter protocol initialization The default behavior remains unchanged (false) for backward compatibility. Usage: writer := parquet.NewWriter(output, schema, parquet.WriteZeroOptionalFields(true), )
1 parent d6aa3e5 commit 780e670

File tree

6 files changed

+88
-50
lines changed

6 files changed

+88
-50
lines changed

config.go

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -213,38 +213,40 @@ func (c *ReaderConfig) Validate() error {
213213
// CreatedBy: "my test program",
214214
// })
215215
type WriterConfig struct {
216-
CreatedBy string
217-
ColumnPageBuffers BufferPool
218-
ColumnIndexSizeLimit int
219-
PageBufferSize int
220-
WriteBufferSize int
221-
DataPageVersion int
222-
DataPageStatistics bool
223-
MaxRowsPerRowGroup int64
224-
KeyValueMetadata map[string]string
225-
Schema *Schema
226-
BloomFilters []BloomFilterColumn
227-
Compression compress.Codec
228-
Sorting SortingConfig
229-
SkipPageBounds [][]string
230-
Encodings map[Kind]encoding.Encoding
231-
DictionaryMaxBytes int64
232-
SchemaConfig *SchemaConfig
216+
CreatedBy string
217+
ColumnPageBuffers BufferPool
218+
ColumnIndexSizeLimit int
219+
PageBufferSize int
220+
WriteBufferSize int
221+
DataPageVersion int
222+
DataPageStatistics bool
223+
MaxRowsPerRowGroup int64
224+
KeyValueMetadata map[string]string
225+
Schema *Schema
226+
BloomFilters []BloomFilterColumn
227+
Compression compress.Codec
228+
Sorting SortingConfig
229+
SkipPageBounds [][]string
230+
Encodings map[Kind]encoding.Encoding
231+
DictionaryMaxBytes int64
232+
SchemaConfig *SchemaConfig
233+
WriteZeroOptionalFields bool
233234
}
234235

235236
// DefaultWriterConfig returns a new WriterConfig value initialized with the
236237
// default writer configuration.
237238
func DefaultWriterConfig() *WriterConfig {
238239
return &WriterConfig{
239-
CreatedBy: defaultCreatedBy(),
240-
ColumnPageBuffers: &defaultColumnBufferPool,
241-
ColumnIndexSizeLimit: DefaultColumnIndexSizeLimit,
242-
PageBufferSize: DefaultPageBufferSize,
243-
WriteBufferSize: DefaultWriteBufferSize,
244-
DataPageVersion: DefaultDataPageVersion,
245-
DataPageStatistics: DefaultDataPageStatistics,
246-
MaxRowsPerRowGroup: DefaultMaxRowsPerRowGroup,
247-
SchemaConfig: DefaultSchemaConfig(),
240+
CreatedBy: defaultCreatedBy(),
241+
ColumnPageBuffers: &defaultColumnBufferPool,
242+
ColumnIndexSizeLimit: DefaultColumnIndexSizeLimit,
243+
PageBufferSize: DefaultPageBufferSize,
244+
WriteBufferSize: DefaultWriteBufferSize,
245+
DataPageVersion: DefaultDataPageVersion,
246+
DataPageStatistics: DefaultDataPageStatistics,
247+
MaxRowsPerRowGroup: DefaultMaxRowsPerRowGroup,
248+
SchemaConfig: DefaultSchemaConfig(),
249+
WriteZeroOptionalFields: false,
248250
Sorting: SortingConfig{
249251
SortingBuffers: &defaultSortingBufferPool,
250252
},
@@ -288,22 +290,23 @@ func (c *WriterConfig) ConfigureWriter(config *WriterConfig) {
288290
}
289291

290292
*config = WriterConfig{
291-
CreatedBy: coalesceString(c.CreatedBy, config.CreatedBy),
292-
ColumnPageBuffers: coalesceBufferPool(c.ColumnPageBuffers, config.ColumnPageBuffers),
293-
ColumnIndexSizeLimit: coalesceInt(c.ColumnIndexSizeLimit, config.ColumnIndexSizeLimit),
294-
PageBufferSize: coalesceInt(c.PageBufferSize, config.PageBufferSize),
295-
WriteBufferSize: coalesceInt(c.WriteBufferSize, config.WriteBufferSize),
296-
DataPageVersion: coalesceInt(c.DataPageVersion, config.DataPageVersion),
297-
DataPageStatistics: coalesceBool(c.DataPageStatistics, config.DataPageStatistics),
298-
MaxRowsPerRowGroup: coalesceInt64(c.MaxRowsPerRowGroup, config.MaxRowsPerRowGroup),
299-
KeyValueMetadata: keyValueMetadata,
300-
Schema: coalesceSchema(c.Schema, config.Schema),
301-
BloomFilters: coalesceBloomFilters(c.BloomFilters, config.BloomFilters),
302-
Compression: coalesceCompression(c.Compression, config.Compression),
303-
Sorting: coalesceSortingConfig(c.Sorting, config.Sorting),
304-
SkipPageBounds: coalesceSkipPageBounds(c.SkipPageBounds, config.SkipPageBounds),
305-
Encodings: encodings,
306-
SchemaConfig: coalesceSchemaConfig(c.SchemaConfig, config.SchemaConfig),
293+
CreatedBy: coalesceString(c.CreatedBy, config.CreatedBy),
294+
ColumnPageBuffers: coalesceBufferPool(c.ColumnPageBuffers, config.ColumnPageBuffers),
295+
ColumnIndexSizeLimit: coalesceInt(c.ColumnIndexSizeLimit, config.ColumnIndexSizeLimit),
296+
PageBufferSize: coalesceInt(c.PageBufferSize, config.PageBufferSize),
297+
WriteBufferSize: coalesceInt(c.WriteBufferSize, config.WriteBufferSize),
298+
DataPageVersion: coalesceInt(c.DataPageVersion, config.DataPageVersion),
299+
DataPageStatistics: coalesceBool(c.DataPageStatistics, config.DataPageStatistics),
300+
MaxRowsPerRowGroup: coalesceInt64(c.MaxRowsPerRowGroup, config.MaxRowsPerRowGroup),
301+
KeyValueMetadata: keyValueMetadata,
302+
Schema: coalesceSchema(c.Schema, config.Schema),
303+
BloomFilters: coalesceBloomFilters(c.BloomFilters, config.BloomFilters),
304+
Compression: coalesceCompression(c.Compression, config.Compression),
305+
Sorting: coalesceSortingConfig(c.Sorting, config.Sorting),
306+
SkipPageBounds: coalesceSkipPageBounds(c.SkipPageBounds, config.SkipPageBounds),
307+
Encodings: encodings,
308+
SchemaConfig: coalesceSchemaConfig(c.SchemaConfig, config.SchemaConfig),
309+
WriteZeroOptionalFields: coalesceBool(c.WriteZeroOptionalFields, config.WriteZeroOptionalFields),
307310
}
308311
}
309312

@@ -688,6 +691,18 @@ func SkipPageBounds(path ...string) WriterOption {
688691
return writerOption(func(config *WriterConfig) { config.SkipPageBounds = append(config.SkipPageBounds, path) })
689692
}
690693

694+
// WriteZeroOptionalFields creates a configuration option which forces the writer
695+
// to always encode optional thrift fields even when they have zero values.
696+
// This is useful for compatibility with systems like Snowflake that require
697+
// explicit null counts in statistics even when they are zero.
698+
//
699+
// Defaults to false (zero-valued optional fields are omitted).
700+
func WriteZeroOptionalFields(enabled bool) WriterOption {
701+
return writerOption(func(config *WriterConfig) {
702+
config.WriteZeroOptionalFields = enabled
703+
})
704+
}
705+
691706
// DefaultEncodingFor creates a configuration option which sets the default encoding
692707
// used by a writer for columns with the specified primitive type where none were defined.
693708
//

encoding/thrift/compact.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
// CompactProtocol is a Protocol implementation for the compact thrift protocol.
1515
//
1616
// https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#integer-encoding
17-
type CompactProtocol struct{}
17+
type CompactProtocol struct {
18+
features Features
19+
}
1820

1921
func (p *CompactProtocol) NewReader(r io.Reader) Reader {
2022
return &compactReader{protocol: p, binary: binaryReader{r: r}}
@@ -25,7 +27,15 @@ func (p *CompactProtocol) NewWriter(w io.Writer) Writer {
2527
}
2628

2729
func (p *CompactProtocol) Features() Features {
28-
return UseDeltaEncoding | CoalesceBoolFields
30+
if p.features == 0 {
31+
return UseDeltaEncoding | CoalesceBoolFields
32+
}
33+
return p.features
34+
}
35+
36+
// SetFeatures configures the protocol features.
37+
func (p *CompactProtocol) SetFeatures(features Features) {
38+
p.features = features
2939
}
3040

3141
type compactReader struct {

encoding/thrift/encode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ encodeFields:
278278
}
279279
}
280280

281-
if !f.flags.have(required) && x.IsZero() {
281+
if !f.flags.have(required) && x.IsZero() && !flags.have(writeZeroOptionalFields) {
282282
continue encodeFields
283283
}
284284

encoding/thrift/protocol.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ const (
1616
// CoalesceBoolFields is advertised by protocols that allow encoders to
1717
// coalesce boolean values into field types.
1818
CoalesceBoolFields
19+
20+
// WriteZeroOptionalFields is advertised by protocols that allow encoders to
21+
// write optional fields even when they have zero values.
22+
WriteZeroOptionalFields
1923
)
2024

2125
// The Protocol interface abstracts the creation of low-level thrift readers and

encoding/thrift/struct.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ const (
1616
optional flags = 1 << 3
1717
strict flags = 1 << 4
1818

19-
featuresBitOffset = 8
20-
useDeltaEncoding = flags(UseDeltaEncoding) << featuresBitOffset
21-
coalesceBoolFields = flags(CoalesceBoolFields) << featuresBitOffset
19+
featuresBitOffset = 8
20+
useDeltaEncoding = flags(UseDeltaEncoding) << featuresBitOffset
21+
coalesceBoolFields = flags(CoalesceBoolFields) << featuresBitOffset
22+
writeZeroOptionalFields = flags(WriteZeroOptionalFields) << featuresBitOffset
2223

2324
structFlags flags = enum | union | required | optional
2425
encodeFlags flags = strict | protocolFlags
2526
decodeFlags flags = strict | protocolFlags
26-
protocolFlags flags = useDeltaEncoding | coalesceBoolFields
27+
protocolFlags flags = useDeltaEncoding | coalesceBoolFields | writeZeroOptionalFields
2728
)
2829

2930
func (f flags) have(x flags) bool {

writer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,14 @@ func newWriterRowGroup(w *writer, config *WriterConfig) *writerRowGroup {
751751
dictionaryMaxBytes: config.DictionaryMaxBytes,
752752
}
753753

754+
// Configure protocol features based on writer options
755+
c.header.protocol = &thrift.CompactProtocol{}
756+
if config.WriteZeroOptionalFields {
757+
c.header.protocol.SetFeatures(thrift.UseDeltaEncoding |
758+
thrift.CoalesceBoolFields |
759+
thrift.WriteZeroOptionalFields)
760+
}
761+
754762
c.header.encoder.Reset(c.header.protocol.NewWriter(&c.buffers.header))
755763

756764
if leaf.maxDefinitionLevel > 0 {
@@ -1404,7 +1412,7 @@ type ColumnWriter struct {
14041412
buffers *writerBuffers
14051413

14061414
header struct {
1407-
protocol thrift.CompactProtocol
1415+
protocol *thrift.CompactProtocol
14081416
encoder thrift.Encoder
14091417
}
14101418

0 commit comments

Comments
 (0)