Skip to content

Commit 6604c06

Browse files
committed
Add test_store_parquet_metadata_modes()
1 parent a0d0777 commit 6604c06

File tree

1 file changed

+204
-0
lines changed

1 file changed

+204
-0
lines changed

testing/test_awswrangler/test_data_lake.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1924,3 +1924,207 @@ def test_to_parquet_modes(database, table, path, external_schema):
19241924
assert len(df3.columns) == 3
19251925
assert len(df3.index) == 4
19261926
assert df3.c1.sum() == 6
1927+
1928+
1929+
def test_store_parquet_metadata_modes(database, table, path, external_schema):
1930+
1931+
# Round 1 - Warm up
1932+
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
1933+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"]
1934+
wr.s3.wait_objects_exist(paths=paths)
1935+
wr.s3.store_parquet_metadata(
1936+
path=path,
1937+
dataset=True,
1938+
mode="overwrite",
1939+
database=database,
1940+
table=table,
1941+
description="c0",
1942+
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
1943+
columns_comments={"c0": "0"},
1944+
)
1945+
df2 = wr.athena.read_sql_table(table, database)
1946+
assert df.shape == df2.shape
1947+
assert df.c0.sum() == df2.c0.sum()
1948+
parameters = wr.catalog.get_table_parameters(database, table)
1949+
assert len(parameters) == 5
1950+
assert parameters["num_cols"] == str(len(df2.columns))
1951+
assert parameters["num_rows"] == str(len(df2.index))
1952+
assert wr.catalog.get_table_description(database, table) == "c0"
1953+
comments = wr.catalog.get_columns_comments(database, table)
1954+
assert len(comments) == len(df.columns)
1955+
assert comments["c0"] == "0"
1956+
1957+
# Round 2 - Overwrite
1958+
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
1959+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"]
1960+
wr.s3.wait_objects_exist(paths=paths)
1961+
wr.s3.store_parquet_metadata(
1962+
path=path,
1963+
dataset=True,
1964+
mode="overwrite",
1965+
database=database,
1966+
table=table,
1967+
description="c1",
1968+
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
1969+
columns_comments={"c1": "1"},
1970+
)
1971+
df2 = wr.athena.read_sql_table(table, database)
1972+
assert df.shape == df2.shape
1973+
assert df.c1.sum() == df2.c1.sum()
1974+
parameters = wr.catalog.get_table_parameters(database, table)
1975+
assert len(parameters) == 5
1976+
assert parameters["num_cols"] == str(len(df2.columns))
1977+
assert parameters["num_rows"] == str(len(df2.index))
1978+
assert wr.catalog.get_table_description(database, table) == "c1"
1979+
comments = wr.catalog.get_columns_comments(database, table)
1980+
assert len(comments) == len(df.columns)
1981+
assert comments["c1"] == "1"
1982+
1983+
# Round 3 - Append
1984+
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int16")
1985+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"]
1986+
wr.s3.wait_objects_exist(paths=paths)
1987+
wr.s3.store_parquet_metadata(
1988+
path=path,
1989+
dataset=True,
1990+
mode="append",
1991+
database=database,
1992+
table=table,
1993+
description="c1",
1994+
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)},
1995+
columns_comments={"c1": "1"},
1996+
)
1997+
df2 = wr.athena.read_sql_table(table, database)
1998+
assert len(df.columns) == len(df2.columns)
1999+
assert len(df.index) * 2 == len(df2.index)
2000+
assert df.c1.sum() + 1 == df2.c1.sum()
2001+
parameters = wr.catalog.get_table_parameters(database, table)
2002+
assert len(parameters) == 5
2003+
assert parameters["num_cols"] == str(len(df2.columns))
2004+
assert parameters["num_rows"] == str(len(df2.index))
2005+
assert wr.catalog.get_table_description(database, table) == "c1"
2006+
comments = wr.catalog.get_columns_comments(database, table)
2007+
assert len(comments) == len(df.columns)
2008+
assert comments["c1"] == "1"
2009+
2010+
# Round 4 - Append + New Column
2011+
df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, 1, None]})
2012+
df["c1"] = df["c1"].astype("Int16")
2013+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"]
2014+
wr.s3.wait_objects_exist(paths=paths)
2015+
wr.s3.store_parquet_metadata(
2016+
path=path,
2017+
dataset=True,
2018+
mode="append",
2019+
database=database,
2020+
table=table,
2021+
description="c1+c2",
2022+
parameters={"num_cols": "2", "num_rows": "9"},
2023+
columns_comments={"c1": "1", "c2": "2"},
2024+
)
2025+
df2 = wr.athena.read_sql_table(table, database)
2026+
assert len(df2.columns) == 2
2027+
assert len(df2.index) == 9
2028+
assert df2.c1.sum() == 4
2029+
parameters = wr.catalog.get_table_parameters(database, table)
2030+
assert len(parameters) == 5
2031+
assert parameters["num_cols"] == "2"
2032+
assert parameters["num_rows"] == "9"
2033+
assert wr.catalog.get_table_description(database, table) == "c1+c2"
2034+
comments = wr.catalog.get_columns_comments(database, table)
2035+
assert len(comments) == len(df.columns)
2036+
assert comments["c1"] == "1"
2037+
assert comments["c2"] == "2"
2038+
2039+
# Round 5 - Overwrite Partitioned
2040+
df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
2041+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", partition_cols=["c1"])["paths"]
2042+
wr.s3.wait_objects_exist(paths=paths)
2043+
wr.s3.store_parquet_metadata(
2044+
path=path,
2045+
dataset=True,
2046+
mode="overwrite",
2047+
database=database,
2048+
table=table,
2049+
description="c0+c1",
2050+
parameters={"num_cols": "2", "num_rows": "2"},
2051+
columns_comments={"c0": "zero", "c1": "one"},
2052+
)
2053+
df2 = wr.athena.read_sql_table(table, database)
2054+
assert df.shape == df2.shape
2055+
assert df.c1.sum() == df2.c1.astype(int).sum()
2056+
parameters = wr.catalog.get_table_parameters(database, table)
2057+
assert len(parameters) == 5
2058+
assert parameters["num_cols"] == "2"
2059+
assert parameters["num_rows"] == "2"
2060+
assert wr.catalog.get_table_description(database, table) == "c0+c1"
2061+
comments = wr.catalog.get_columns_comments(database, table)
2062+
assert len(comments) == len(df.columns)
2063+
assert comments["c0"] == "zero"
2064+
assert comments["c1"] == "one"
2065+
2066+
# Round 6 - Overwrite Partitions
2067+
df = pd.DataFrame({"c0": [None, "boo"], "c1": [0, 2]})
2068+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[
2069+
"paths"
2070+
]
2071+
wr.s3.wait_objects_exist(paths=paths)
2072+
wr.s3.store_parquet_metadata(
2073+
path=path,
2074+
dataset=True,
2075+
mode="append",
2076+
database=database,
2077+
table=table,
2078+
description="c0+c1",
2079+
parameters={"num_cols": "2", "num_rows": "3"},
2080+
columns_comments={"c0": "zero", "c1": "one"},
2081+
)
2082+
df2 = wr.athena.read_sql_table(table, database)
2083+
assert len(df2.columns) == 2
2084+
assert len(df2.index) == 3
2085+
assert df2.c1.astype(int).sum() == 3
2086+
parameters = wr.catalog.get_table_parameters(database, table)
2087+
assert len(parameters) == 5
2088+
assert parameters["num_cols"] == "2"
2089+
assert parameters["num_rows"] == "3"
2090+
assert wr.catalog.get_table_description(database, table) == "c0+c1"
2091+
comments = wr.catalog.get_columns_comments(database, table)
2092+
assert len(comments) == len(df.columns)
2093+
assert comments["c0"] == "zero"
2094+
assert comments["c1"] == "one"
2095+
2096+
# Round 7 - Overwrite Partitions + New Column
2097+
df = pd.DataFrame({"c0": ["bar", None], "c1": [1, 3], "c2": [True, False]})
2098+
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[
2099+
"paths"
2100+
]
2101+
wr.s3.wait_objects_exist(paths=paths)
2102+
wr.s3.store_parquet_metadata(
2103+
path=path,
2104+
dataset=True,
2105+
mode="append",
2106+
database=database,
2107+
table=table,
2108+
description="c0+c1+c2",
2109+
parameters={"num_cols": "3", "num_rows": "4"},
2110+
columns_comments={"c0": "zero", "c1": "one", "c2": "two"},
2111+
)
2112+
df2 = wr.athena.read_sql_table(table, database)
2113+
assert len(df2.columns) == 3
2114+
assert len(df2.index) == 4
2115+
assert df2.c1.astype(int).sum() == 6
2116+
parameters = wr.catalog.get_table_parameters(database, table)
2117+
assert len(parameters) == 5
2118+
assert parameters["num_cols"] == "3"
2119+
assert parameters["num_rows"] == "4"
2120+
assert wr.catalog.get_table_description(database, table) == "c0+c1+c2"
2121+
comments = wr.catalog.get_columns_comments(database, table)
2122+
assert len(comments) == len(df.columns)
2123+
assert comments["c0"] == "zero"
2124+
assert comments["c1"] == "one"
2125+
assert comments["c2"] == "two"
2126+
engine = wr.catalog.get_engine("aws-data-wrangler-redshift")
2127+
df3 = wr.db.read_sql_table(con=engine, table=table, schema=external_schema)
2128+
assert len(df3.columns) == 3
2129+
assert len(df3.index) == 4
2130+
assert df3.c1.astype(int).sum() == 6

0 commit comments

Comments
 (0)