Skip to content

Commit dca983c

Browse files
committed
Fix index recovery combined with columns filter. #408
1 parent 4bb5a77 commit dca983c

File tree

2 files changed

+46
-4
lines changed

2 files changed

+46
-4
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,24 @@ def _read_parquet_metadata(
170170

171171
def _apply_index(df: pd.DataFrame, metadata: Dict[str, Any]) -> pd.DataFrame:
172172
index_columns: List[Any] = metadata["index_columns"]
173+
ignore_index: bool = True
174+
_logger.debug("df.columns: %s", df.columns)
175+
173176
if index_columns:
174177
if isinstance(index_columns[0], str):
175-
df = df.set_index(keys=index_columns, drop=True, inplace=False, verify_integrity=False)
178+
indexes: List[str] = [i for i in index_columns if i in df.columns]
179+
if indexes:
180+
df = df.set_index(keys=indexes, drop=True, inplace=False, verify_integrity=False)
181+
ignore_index = False
176182
elif isinstance(index_columns[0], dict) and index_columns[0]["kind"] == "range":
177183
col = index_columns[0]
178184
if col["kind"] == "range":
179185
df.index = pd.RangeIndex(start=col["start"], stop=col["stop"], step=col["step"])
186+
ignore_index = False
180187
if col["name"] is not None and col["name"].startswith("__index_level_") is False:
181188
df.index.name = col["name"]
182189
df.index.names = [None if n is not None and n.startswith("__index_level_") else n for n in df.index.names]
183-
ignore_index: bool = False
184-
else:
185-
ignore_index = True
190+
186191
with warnings.catch_warnings():
187192
warnings.simplefilter("ignore", category=UserWarning)
188193
df._awswrangler_ignore_index = ignore_index # pylint: disable=protected-access

tests/test_s3_parquet.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,43 @@ def test_multi_index_recovery_nameless(path, use_threads):
325325
assert df.reset_index().equals(df2.reset_index())
326326

327327

328+
@pytest.mark.parametrize("use_threads", [True, False])
329+
@pytest.mark.parametrize("name", [None, "foo"])
330+
@pytest.mark.parametrize("pandas", [True, False])
331+
def test_index_columns(path, use_threads, name, pandas):
332+
df = pd.DataFrame({"c0": [0, 1], "c1": [2, 3]}, dtype="Int64")
333+
df.index.name = name
334+
path_file = f"{path}0.parquet"
335+
if pandas:
336+
df.to_parquet(path_file, index=True)
337+
else:
338+
wr.s3.to_parquet(df, path_file, index=True)
339+
wr.s3.wait_objects_exist(paths=[path_file], use_threads=use_threads)
340+
df2 = wr.s3.read_parquet([path_file], columns=["c0"], use_threads=use_threads)
341+
assert df[["c0"]].equals(df2)
342+
343+
344+
@pytest.mark.parametrize("use_threads", [True, False])
345+
@pytest.mark.parametrize("name", [None, "foo"])
346+
@pytest.mark.parametrize("pandas", [True, False])
347+
@pytest.mark.parametrize("drop", [True, False])
348+
def test_range_index_columns(path, use_threads, name, pandas, drop):
349+
df = pd.DataFrame({"c0": [0, 1], "c1": [2, 3]}, dtype="Int64", index=pd.RangeIndex(start=5, stop=7, step=1))
350+
df.index.name = name
351+
path_file = f"{path}0.parquet"
352+
if pandas:
353+
df.to_parquet(path_file, index=True)
354+
else:
355+
wr.s3.to_parquet(df, path_file, index=True)
356+
wr.s3.wait_objects_exist(paths=[path_file], use_threads=use_threads)
357+
358+
name = "__index_level_0__" if name is None else name
359+
columns = ["c0"] if drop else [name, "c0"]
360+
df2 = wr.s3.read_parquet([path_file], columns=columns, use_threads=use_threads)
361+
362+
assert df[["c0"]].reset_index(level=0, drop=drop).equals(df2.reset_index(level=0, drop=drop))
363+
364+
328365
def test_to_parquet_dataset_sanitize(path):
329366
df = pd.DataFrame({"C0": [0, 1], "camelCase": [2, 3], "c**--2": [4, 5], "Par": ["a", "b"]})
330367

0 commit comments

Comments
 (0)