Skip to content

Commit fa83f41

Browse files
authored
feat: add tags field to LastCheckpointHint (delta-io#1455)
## What changes are proposed in this pull request? Add optional tags field to LastCheckpointHint struct to support additional metadata about the last checkpoint as specified in Delta Lake protocol. (delta-io#1054) - Add tags: Option<HashMap<String, String>> field to LastCheckpointHint - Update all test cases to include tags: None - Maintain backward compatibility through optional field The tags field enables storing arbitrary string key-value pairs for checkpoint metadata while maintaining full backward compatibility with existing checkpoint files. ### This PR affects the following public APIs None ## How was this change tested? Test instances updated: 9 test functions that construct LastCheckpointHint passing in the default value None for the tags
1 parent 868c545 commit fa83f41

File tree

5 files changed

+81
-31
lines changed

5 files changed

+81
-31
lines changed

ffi/src/expressions/kernel_visitor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,9 @@ fn visit_expression_literal_string_impl(
267267

268268
// We need to get parse.expand working to be able to macro everything below, see issue #255
269269

270+
// This is a function called by the engine to transform the engine expression into a kernel expression
271+
// The engine visitor calls this function with the argument (predicate, kernel_expression_visitor) where
272+
// kernel_expression_visitor is a pointer to the KernelExpressionVisitorState struct.
270273
#[no_mangle]
271274
pub extern "C" fn visit_expression_literal_int(
272275
state: &mut KernelExpressionVisitorState,

kernel/src/last_checkpoint_hint.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Utities for reading the `_last_checkpoint` file. Maybe this file should instead go under
22
//! log_segment module since it should only really be used there? as hint for listing?
33
4+
use std::collections::HashMap;
5+
46
use crate::schema::Schema;
57
use crate::{DeltaResult, Error, StorageHandler, Version};
68
use delta_kernel_derive::internal_api;
@@ -34,6 +36,8 @@ pub(crate) struct LastCheckpointHint {
3436
pub(crate) checkpoint_schema: Option<Schema>,
3537
/// The checksum of the last checkpoint JSON.
3638
pub(crate) checksum: Option<String>,
39+
/// Additional metadata about the last checkpoint.
40+
pub(crate) tags: Option<HashMap<String, String>>,
3741
}
3842

3943
impl LastCheckpointHint {
@@ -67,4 +71,10 @@ impl LastCheckpointHint {
6771
}
6872
}
6973
}
74+
75+
/// Convert the LastCheckpointHint to JSON bytes
76+
#[cfg(test)]
77+
pub(crate) fn to_json_bytes(&self) -> Vec<u8> {
78+
serde_json::to_vec(self).expect("Failed to convert LastCheckpointHint to JSON bytes")
79+
}
7080
}

kernel/src/log_segment/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ fn build_snapshot_with_correct_last_uuid_checkpoint() {
290290
num_of_add_files: None,
291291
checkpoint_schema: None,
292292
checksum: None,
293+
tags: None,
293294
};
294295

295296
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -381,6 +382,7 @@ fn build_snapshot_with_out_of_date_last_checkpoint() {
381382
num_of_add_files: None,
382383
checkpoint_schema: None,
383384
checksum: None,
385+
tags: None,
384386
};
385387

386388
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -424,6 +426,7 @@ fn build_snapshot_with_correct_last_multipart_checkpoint() {
424426
num_of_add_files: None,
425427
checkpoint_schema: None,
426428
checksum: None,
429+
tags: None,
427430
};
428431

429432
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -473,6 +476,7 @@ fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() {
473476
num_of_add_files: None,
474477
checkpoint_schema: None,
475478
checksum: None,
479+
tags: None,
476480
};
477481

478482
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -516,6 +520,7 @@ fn build_snapshot_with_bad_checkpoint_hint_fails() {
516520
num_of_add_files: None,
517521
checkpoint_schema: None,
518522
checksum: None,
523+
tags: None,
519524
};
520525

521526
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -606,6 +611,7 @@ fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpo
606611
num_of_add_files: None,
607612
checkpoint_schema: None,
608613
checksum: None,
614+
tags: None,
609615
};
610616

611617
let (storage, log_root) = build_log_with_paths_and_checkpoint(
@@ -714,6 +720,7 @@ fn build_snapshot_with_checkpoint_greater_than_time_travel_version() {
714720
num_of_add_files: None,
715721
checkpoint_schema: None,
716722
checksum: None,
723+
tags: None,
717724
};
718725
let (storage, log_root) = build_log_with_paths_and_checkpoint(
719726
&[
@@ -760,6 +767,7 @@ fn build_snapshot_with_start_checkpoint_and_time_travel_version() {
760767
num_of_add_files: None,
761768
checkpoint_schema: None,
762769
checksum: None,
770+
tags: None,
763771
};
764772

765773
let (storage, log_root) = build_log_with_paths_and_checkpoint(

kernel/src/scan/data_skipping.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ impl DataSkippingFilter {
137137
let select_stats_evaluator = engine
138138
.evaluation_handler()
139139
.new_expression_evaluator(
140-
// safety: kernel is very broken if we don't have the schema for Add actions
141140
get_log_add_schema().clone(),
142141
STATS_EXPR.clone(),
143142
DataType::STRING,

kernel/src/snapshot.rs

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -933,8 +933,41 @@ mod tests {
933933
assert!(cp.is_none());
934934
}
935935

936-
fn valid_last_checkpoint() -> Vec<u8> {
937-
r#"{"size":8,"sizeInBytes":21857,"version":1}"#.as_bytes().to_vec()
936+
fn valid_last_checkpoint() -> (Vec<u8>, LastCheckpointHint) {
937+
let checkpoint = LastCheckpointHint {
938+
version: 1,
939+
size: 8,
940+
parts: None,
941+
size_in_bytes: Some(21857),
942+
num_of_add_files: None,
943+
checkpoint_schema: None,
944+
checksum: None,
945+
tags: None,
946+
};
947+
let data = checkpoint.to_json_bytes();
948+
(data, checkpoint)
949+
}
950+
951+
fn valid_last_checkpoint_with_tags() -> (Vec<u8>, LastCheckpointHint) {
952+
use std::collections::HashMap;
953+
954+
let (_, base_checkpoint) = valid_last_checkpoint();
955+
956+
let mut tags = HashMap::new();
957+
tags.insert(
958+
"author".to_string(),
959+
"test_read_table_with_last_checkpoint".to_string(),
960+
);
961+
tags.insert("environment".to_string(), "snapshot_tests".to_string());
962+
tags.insert("created_by".to_string(), "delta-kernel-rs".to_string());
963+
964+
let checkpoint = LastCheckpointHint {
965+
tags: Some(tags),
966+
..base_checkpoint
967+
};
968+
969+
let data = checkpoint.to_json_bytes();
970+
(data, checkpoint)
938971
}
939972

940973
#[test]
@@ -967,42 +1000,39 @@ mod tests {
9671000
// in memory file system
9681001
let store = Arc::new(InMemory::new());
9691002

970-
// put a valid/invalid _last_checkpoint file
971-
let data = valid_last_checkpoint();
972-
let invalid_data = "invalid".as_bytes().to_vec();
973-
let path = Path::from("valid/_last_checkpoint");
974-
let invalid_path = Path::from("invalid/_last_checkpoint");
1003+
// Define test cases: (path, data, expected_result)
1004+
let (data, expected) = valid_last_checkpoint();
1005+
let (data_with_tags, expected_with_tags) = valid_last_checkpoint_with_tags();
1006+
let test_cases = vec![
1007+
("valid", data, Some(expected)),
1008+
("invalid", "invalid".as_bytes().to_vec(), None),
1009+
("valid_with_tags", data_with_tags, Some(expected_with_tags)),
1010+
];
9751011

1012+
// Write all test files to the in memory file system
9761013
tokio::runtime::Runtime::new()
9771014
.expect("create tokio runtime")
9781015
.block_on(async {
979-
store
980-
.put(&path, data.into())
981-
.await
982-
.expect("put _last_checkpoint");
983-
store
984-
.put(&invalid_path, invalid_data.into())
985-
.await
986-
.expect("put _last_checkpoint");
1016+
for (path_prefix, data, _) in &test_cases {
1017+
let path = Path::from(format!("{}/_last_checkpoint", path_prefix));
1018+
store
1019+
.put(&path, data.clone().into())
1020+
.await
1021+
.expect("put _last_checkpoint");
1022+
}
9871023
});
9881024

9891025
let executor = Arc::new(TokioBackgroundExecutor::new());
9901026
let storage = ObjectStoreStorageHandler::new(store, executor);
991-
let url = Url::parse("memory:///valid/").expect("valid url");
992-
let valid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint");
993-
let url = Url::parse("memory:///invalid/").expect("valid url");
994-
let invalid = LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint");
995-
let expected = LastCheckpointHint {
996-
version: 1,
997-
size: 8,
998-
parts: None,
999-
size_in_bytes: Some(21857),
1000-
num_of_add_files: None,
1001-
checkpoint_schema: None,
1002-
checksum: None,
1003-
};
1004-
assert_eq!(valid.unwrap(), expected);
1005-
assert!(invalid.is_none());
1027+
1028+
// Test reading all checkpoints from the in memory file system for cases where the data is valid, invalid and
1029+
// valid with tags.
1030+
for (path_prefix, _, expected_result) in test_cases {
1031+
let url = Url::parse(&format!("memory:///{}/", path_prefix)).expect("valid url");
1032+
let result =
1033+
LastCheckpointHint::try_read(&storage, &url).expect("read last checkpoint");
1034+
assert_eq!(result, expected_result);
1035+
}
10061036
}
10071037

10081038
#[test_log::test]

0 commit comments

Comments
 (0)