Skip to content

Commit 3ebb24a

Browse files
committed
Fix arrow reading empty frames
During symbol concat we can end up with a Segment with zero columns. Convert that to arrow gracefully. Also uncovered that arrow normalization doesn't correctly construct pandas_metadata for empty dataframes. This is also fixed and tested in this PR.
1 parent 4f3fdfd commit 3ebb24a

File tree

4 files changed

+60
-2
lines changed

4 files changed

+60
-2
lines changed

cpp/arcticdb/arrow/arrow_utils.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ std::vector<sparrow::array> arrow_arrays_from_column(const Column& column, std::
6363
std::shared_ptr<std::vector<sparrow::record_batch>> segment_to_arrow_data(SegmentInMemory& segment) {
6464
const auto total_blocks = segment.num_blocks();
6565
const auto num_columns = segment.num_columns();
66+
if (num_columns == 0) {
67+
// We can't construct a record batch with no columns, so in this case we return an empty list of record batches,
68+
// which needs special handling in python.
69+
return {};
70+
}
6671
const auto column_blocks = segment.column(0).num_blocks();
6772
util::check(total_blocks == column_blocks * num_columns, "Expected regular block size");
6873

python/arcticdb/version_store/_normalization.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,9 @@ def denormalize(self, item, norm_meta):
742742
index_type = pandas_meta.WhichOneof("index_type")
743743
if index_type == "index":
744744
index_meta = pandas_meta.index
745-
if index_meta.is_physically_stored:
745+
# Empty tables don't have `is_physically_stored=True` but we still output them with an empty DateTimeIndex.
746+
is_empty_table_with_datetime_index = len(item) == 0 and not index_meta.step
747+
if index_meta.is_physically_stored or is_empty_table_with_datetime_index:
746748
pandas_indexes = 1
747749
if index_meta.tz:
748750
timezones[0] = index_meta.tz

python/arcticdb/version_store/_store.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2432,7 +2432,11 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
24322432
record_batches = []
24332433
for record_batch in frame_data.extract_record_batches():
24342434
record_batches.append(pa.RecordBatch._import_from_c(record_batch.array(), record_batch.schema()))
2435-
table = pa.Table.from_batches(record_batches)
2435+
if len(record_batches) == 0:
2436+
# We get an empty list of record batches when output has no columns
2437+
table = pa.Table.from_arrays([])
2438+
else:
2439+
table = pa.Table.from_batches(record_batches)
24362440
data = self._arrow_normalizer.denormalize(table, read_result.norm)
24372441
else:
24382442
data = self._normalizer.denormalize(read_result.frame_data, read_result.norm)

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,35 @@ def test_bool_columns(lmdb_version_store_arrow):
7575
assert_frame_equal_with_arrow(table, df)
7676

7777

78+
def test_read_empty(lmdb_version_store_arrow):
79+
lib = lmdb_version_store_arrow
80+
sym = "sym"
81+
df = pd.DataFrame()
82+
lib.write(sym, df)
83+
table = lib.read(sym).data
84+
expected = lib.read(sym, output_format=OutputFormat.PANDAS).data
85+
# During normalization when doing the write we attach an empty DateTimeIndex to the DataFrame. We correctly see it
86+
# in arrow
87+
assert table.column_names == ["index"]
88+
assert table.shape == (0, 1)
89+
# arcticdb read(output_format=PANDAS) produces `pd.RangeIndex(start=0, stop=0, step=1)` column index if no columns
90+
# pyarrow to_pandas produces `pd.Index([])` if no columns.
91+
expected.columns = pd.Index([])
92+
assert_frame_equal_with_arrow(table, expected)
93+
94+
95+
def test_read_empty_with_columns(lmdb_version_store_arrow):
96+
lib = lmdb_version_store_arrow
97+
sym = "sym"
98+
df = pd.DataFrame({"col_int": np.zeros(0, dtype=np.int32), "col_float": np.zeros(0, dtype=np.float64)})
99+
lib.write(sym, df)
100+
table = lib.read(sym).data
101+
expected = lib.read(sym, output_format=OutputFormat.PANDAS).data
102+
assert table.column_names == ["index", "col_int", "col_float"]
103+
assert table.shape == (0, 3)
104+
assert_frame_equal_with_arrow(table, expected)
105+
106+
78107
def test_column_filtering(lmdb_version_store_arrow):
79108
lib = lmdb_version_store_arrow
80109
df = pd.DataFrame({"x": np.arange(10), "y": np.arange(10.0, 20.0)})
@@ -925,3 +954,21 @@ def test_resample_row_slice_responsible_for_no_buckets(lmdb_version_store_tiny_s
925954
table = lib.read(sym, date_range=date_range, query_builder=q).data
926955
expected = pd.DataFrame({"to_sum": [6]}, index=[pd.Timestamp(0)])
927956
assert_frame_equal_with_arrow(table, expected)
957+
958+
959+
def test_symbol_concat_empty_intersection(lmdb_version_store_arrow):
960+
# Tests a failing subset of test_symbol_concat_empty_column_intersection
961+
# TODO: Remove this test if we enable pipeline tests with arrow
962+
lib = lmdb_version_store_arrow
963+
sym_0 = "sym_0"
964+
sym_1 = "sym_1"
965+
df_0 = pd.DataFrame({"col_0": [0]})
966+
df_1 = pd.DataFrame({"col_1": [1]})
967+
lib.write(sym_0, df_0)
968+
lib.write(sym_1, df_1)
969+
q = QueryBuilder().concat("inner")
970+
table = lib.batch_read_and_join([sym_0, sym_1], query_builder=q).data
971+
assert table.column_names == []
972+
assert table.shape == (0, 0)
973+
expected = pd.DataFrame()
974+
assert_frame_equal_with_arrow(table, expected)

0 commit comments

Comments
 (0)