Skip to content

Commit e99fe45

Browse files
authored
Merge pull request #1 from Basekick-Labs/fix/202-null-handling-buffered-merge
fix(ingestion): pad missing columns with None in buffered merge (#202)
2 parents 017e8c1 + d36ac67 commit e99fe45

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

src/arc_client/ingestion/async_buffered.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,11 @@ async def _flush_all_unlocked(self) -> None:
122122
await self._flush_measurement_unlocked(measurement)
123123

124124
def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]:
125-
"""Merge multiple columnar batches into one."""
125+
"""Merge multiple columnar batches into one.
126+
127+
When batches have different column sets (sparse columns), missing
128+
positions are filled with None so all columns have equal length.
129+
"""
126130
if not batches:
127131
return {}
128132

@@ -133,12 +137,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list
133137
for batch in batches:
134138
all_columns.update(batch.keys())
135139

140+
# Merge each column, padding missing columns with None
136141
merged: dict[str, list[Any]] = {}
137142
for col_name in all_columns:
138143
merged[col_name] = []
139144
for batch in batches:
140145
if col_name in batch:
141146
merged[col_name].extend(batch[col_name])
147+
else:
148+
# Column missing from this batch — pad with None
149+
batch_len = len(batch.get("time", next(iter(batch.values()))))
150+
merged[col_name].extend([None] * batch_len)
142151

143152
return merged
144153

src/arc_client/ingestion/buffered.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ def _flush_all(self) -> None:
167167
self._flush_measurement(measurement)
168168

169169
def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list[Any]]:
170-
"""Merge multiple columnar batches into one."""
170+
"""Merge multiple columnar batches into one.
171+
172+
When batches have different column sets (sparse columns), missing
173+
positions are filled with None so all columns have equal length.
174+
"""
171175
if not batches:
172176
return {}
173177

@@ -179,13 +183,17 @@ def _merge_columnar(self, batches: list[dict[str, list[Any]]]) -> dict[str, list
179183
for batch in batches:
180184
all_columns.update(batch.keys())
181185

182-
# Merge each column
186+
# Merge each column, padding missing columns with None
183187
merged: dict[str, list[Any]] = {}
184188
for col_name in all_columns:
185189
merged[col_name] = []
186190
for batch in batches:
187191
if col_name in batch:
188192
merged[col_name].extend(batch[col_name])
193+
else:
194+
# Column missing from this batch — pad with None
195+
batch_len = len(batch.get("time", next(iter(batch.values()))))
196+
merged[col_name].extend([None] * batch_len)
189197

190198
return merged
191199

0 commit comments

Comments
 (0)