Skip to content

Commit 16755e6

Browse files
liamphmurphyLiam Brannigan
authored andcommitted
serialize fsck files_removed metric as a string for compatibility with Spark
Signed-off-by: Liam Murphy <[email protected]> Signed-off-by: Liam Brannigan <[email protected]>
1 parent 463d56c commit 16755e6

File tree

3 files changed

+59
-2
lines changed

3 files changed

+59
-2
lines changed

crates/core/src/operations/filesystem_check.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use futures::future::BoxFuture;
2222
use futures::StreamExt;
2323
pub use object_store::path::Path;
2424
use object_store::ObjectStore;
25-
use serde::Serialize;
25+
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
2626
use url::{ParseError, Url};
2727
use uuid::Uuid;
2828

@@ -56,6 +56,10 @@ pub struct FileSystemCheckMetrics {
5656
/// Was this a dry run
5757
pub dry_run: bool,
5858
/// Files that wrere removed successfully
59+
#[serde(
60+
serialize_with = "serialize_vec_string",
61+
deserialize_with = "deserialize_vec_string"
62+
)]
5963
pub files_removed: Vec<String>,
6064
}
6165

@@ -66,6 +70,24 @@ struct FileSystemCheckPlan {
6670
pub files_to_remove: Vec<Add>,
6771
}
6872

73+
// Custom serialization function that serializes metric details as a string
74+
fn serialize_vec_string<S>(value: &Vec<String>, serializer: S) -> Result<S::Ok, S::Error>
75+
where
76+
S: Serializer,
77+
{
78+
let json_string = serde_json::to_string(value).map_err(serde::ser::Error::custom)?;
79+
serializer.serialize_str(&json_string)
80+
}
81+
82+
// Custom deserialization that parses a JSON string into MetricDetails
83+
fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
84+
where
85+
D: Deserializer<'de>,
86+
{
87+
let s: String = Deserialize::deserialize(deserializer)?;
88+
serde_json::from_str(&s).map_err(DeError::custom)
89+
}
90+
6991
fn is_absolute_path(path: &str) -> DeltaResult<bool> {
7092
match Url::parse(path) {
7193
Ok(_) => Ok(true),

python/deltalake/table.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
NOT_SUPPORTED_READER_VERSION = 2
7070
SUPPORTED_READER_FEATURES = {"timestampNtz"}
7171

72+
FSCK_METRICS_FILES_REMOVED_LABEL = "files_removed"
73+
7274
FilterLiteralType = Tuple[str, str, Any]
7375
FilterConjunctionType = List[FilterLiteralType]
7476
FilterDNFType = List[FilterConjunctionType]
@@ -1432,7 +1434,11 @@ def repair(
14321434
commit_properties,
14331435
post_commithook_properties,
14341436
)
1435-
return json.loads(metrics)
1437+
deserialized_metrics = json.loads(metrics)
1438+
deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL] = json.loads(
1439+
deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL]
1440+
)
1441+
return deserialized_metrics
14361442

14371443
def transaction_versions(self) -> Dict[str, Transaction]:
14381444
return self._table.transaction_versions()

python/tests/pyspark_integration/test_write_to_pyspark.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests that deltalake(delta-rs) can write to tables written by PySpark."""
22

3+
import os
34
import pathlib
45

56
import pyarrow as pa
@@ -169,3 +170,31 @@ def test_spark_read_z_ordered_history(tmp_path: pathlib.Path):
169170
)
170171

171172
assert latest_operation_metrics["operationMetrics"] is not None
173+
174+
175+
@pytest.mark.pyspark
176+
@pytest.mark.integration
177+
def test_spark_read_repair_run(tmp_path):
178+
ids = ["1"] * 10
179+
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
180+
181+
id_array = pa.array(ids, type=pa.string())
182+
value_array = pa.array(values, type=pa.int32())
183+
184+
pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])
185+
186+
write_deltalake(tmp_path, pa_table, mode="append")
187+
write_deltalake(tmp_path, pa_table, mode="append")
188+
dt = DeltaTable(tmp_path)
189+
os.remove(dt.file_uris()[0])
190+
191+
dt.repair(dry_run=False)
192+
spark = get_spark()
193+
194+
history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")
195+
196+
latest_operation_metrics = (
197+
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
198+
)
199+
200+
assert latest_operation_metrics["operationMetrics"] is not None

0 commit comments

Comments
 (0)