Skip to content

Commit 4b4801c

Browse files
Add Delta Compact Function (#142)
1 parent 2320dc6 commit 4b4801c

File tree

4 files changed

+101
-1
lines changed

4 files changed

+101
-1
lines changed

spark_utils/delta_lake/functions.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
Helper functions for Delta Lake
2525
"""
2626
import re
27-
from typing import Iterator
27+
from typing import Iterator, Optional
28+
2829

2930
from delta import DeltaTable
3031
from pyspark.sql import SparkSession
@@ -157,3 +158,66 @@ def get_table_info(
157158
return [table_col for table_col in cols if table_col.name not in parts_names], parts, table_path
158159

159160
return cols, [], table_path
161+
162+
163+
def delta_compact(
164+
spark_session: SparkSession,
165+
path: str,
166+
retain_hours: float = 48,
167+
compact_from_predicate: Optional[str] = None,
168+
target_file_size_bytes: Optional[int] = None,
169+
vacuum_only: bool = True,
170+
refresh_cache=False,
171+
) -> None:
172+
"""
173+
Runs bin-packing optimization to reduce number of files/increase average file size in the table physical storage.
174+
Refreshes delta cache after opt/vacuum have been finished.
175+
https://docs.delta.io/latest/optimizations-oss.html#optimizations
176+
177+
:param spark_session: Spark session that will perform the operation.
178+
:param path: Path to delta table, filesystem or hive.
179+
:param retain_hours: Age of data to retain, defaults to 48 hours.
180+
:param compact_from_predicate: Optional predicate to select a subset of data to compact (sql string).
181+
:param target_file_size_bytes: Optional target file size in bytes. Defaults to system default (1gb for Delta 2.1) if not provided.
182+
:param vacuum_only: If set to True, will perform a vacuum operation w/o compaction.
183+
:param refresh_cache: Refreshes table cache for this spark session.
184+
:return:
185+
"""
186+
spark_session.conf.set("spark.databricks.delta.optimize.repartition.enabled", "true")
187+
188+
table_to_compact = (
189+
DeltaTable.forPath(sparkSession=spark_session, path=path)
190+
if "://" in path
191+
else DeltaTable.forName(sparkSession=spark_session, tableOrViewName=path)
192+
)
193+
194+
if not vacuum_only:
195+
if target_file_size_bytes:
196+
spark_session.conf.set("spark.databricks.delta.optimize.minFileSize", str(target_file_size_bytes))
197+
spark_session.conf.set("spark.databricks.delta.optimize.maxFileSize", str(target_file_size_bytes))
198+
199+
if compact_from_predicate:
200+
table_to_compact.optimize().where(compact_from_predicate).executeCompaction()
201+
else:
202+
table_to_compact.optimize().executeCompaction()
203+
204+
table_path = f"delta.`{path}`" if "://" in path else path
205+
current_interval = int(
206+
re.search(
207+
r"\b\d+\b",
208+
table_to_compact.detail().head().properties.get("delta.logRetentionDuration", "interval 168 hours"),
209+
).group()
210+
)
211+
212+
if current_interval != round(retain_hours):
213+
spark_session.sql(
214+
f"ALTER table {table_path} SET TBLPROPERTIES ('delta.logRetentionDuration'='interval {round(retain_hours)} hours')"
215+
)
216+
217+
table_to_compact.vacuum(retentionHours=retain_hours)
218+
219+
if refresh_cache:
220+
if "://" in path:
221+
spark_session.sql(f"refresh {path}")
222+
else:
223+
spark_session.sql(f"refresh table {path}")

test/common.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pathlib
2+
3+
from pyspark.sql import SparkSession
4+
5+
6+
def generate_table(spark_session: SparkSession, suffix: str, dir=None) -> str:
7+
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/{suffix}" if not dir else dir
8+
df = spark_session.range(100)
9+
10+
for _ in range(10):
11+
df.write.format("delta").mode("overwrite").save(test_data_path)
12+
13+
return test_data_path

test/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ def spark_session():
1111
additional_configs={
1212
"spark.driver.extraJavaOptions": java_17_launch_options,
1313
"spark.executor.extraJavaOptions": java_17_launch_options,
14+
"spark.databricks.delta.retentionDurationCheck.enabled": "false",
1415
}
1516
).get_session()
1617

test/test_delta_lake_functions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from spark_utils.delta_lake.functions import delta_compact
2+
from glob import glob
3+
from pyspark.sql import SparkSession
4+
5+
from test.common import generate_table
6+
7+
8+
def test_delta_compact(spark_session: SparkSession):
9+
test_data_path = generate_table(spark_session, "compact")
10+
11+
delta_compact(
12+
spark_session=spark_session,
13+
path=f"file://{test_data_path}",
14+
retain_hours=0,
15+
vacuum_only=False,
16+
)
17+
18+
num_parquet_files = len(glob(f"{test_data_path}/*.parquet"))
19+
num_log_files = len(glob(f"{test_data_path}/_delta_log/*.json"))
20+
21+
# logs are cleaned on a daily basis, so we cannot test the log retention
22+
assert num_parquet_files == 1 and num_log_files == 14

0 commit comments

Comments
 (0)