Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/io/parquetio.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ def process(self, row, w=DoFn.WindowParam, pane=DoFn.PaneInfoParam):

# reorder the data in columnar format.
for i, n in enumerate(self._schema.names):
self._buffer[i].append(row[n])
# Handle missing nullable fields by using None as default value
field = self._schema.field(i)
if field.nullable and n not in row:
self._buffer[i].append(None)
else:
self._buffer[i].append(row[n])

def finish_bundle(self):
if len(self._buffer[0]) > 0:
Expand Down
68 changes: 68 additions & 0 deletions sdks/python/apache_beam/io/parquetio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,74 @@ def test_schema_read_write(self):
| Map(stable_repr))
assert_that(readback, equal_to([stable_repr(r) for r in rows]))

def test_write_with_nullable_fields_missing_data(self):
"""Test WriteToParquet with nullable fields where some fields are missing from input dictionaries.

This test addresses the bug reported in https://github.com/apache/beam/issues/35791
where WriteToParquet fails with a KeyError if any nullable field is missing.
"""
# Define PyArrow schema with all fields nullable
schema = pa.schema([
pa.field("id", pa.int64(), nullable=True),
pa.field("name", pa.string(), nullable=True),
pa.field("age", pa.int64(), nullable=True),
pa.field("email", pa.string(), nullable=True),
])

# Sample data with missing nullable fields
data = [
{
'id': 1, 'name': 'Alice', 'age': 30
}, # missing 'email'
{
'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'
}, # all fields present
{
'id': 3, 'name': 'Charlie', 'age': None, 'email': None
}, # explicit None values
{
'id': 4, 'name': 'David'
}, # missing 'age' and 'email'
]

with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'nullable_test')

# Write data with missing nullable fields - this should not raise KeyError
with TestPipeline() as p:
_ = (
p
| Create(data)
| WriteToParquet(
path, schema, num_shards=1, shard_name_template=''))

# Read back and verify the data
with TestPipeline() as p:
readback = (
p
| ReadFromParquet(path + '*')
| Map(json.dumps, sort_keys=True))

# Expected data should have None for missing nullable fields
expected_data = [
{
'id': 1, 'name': 'Alice', 'age': 30, 'email': None
},
{
'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'
},
{
'id': 3, 'name': 'Charlie', 'age': None, 'email': None
},
{
'id': 4, 'name': 'David', 'age': None, 'email': None
},
]

assert_that(
readback,
equal_to([json.dumps(r, sort_keys=True) for r in expected_data]))

def test_batched_read(self):
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
Expand Down
Loading