@@ -5,10 +5,11 @@ use std::sync::{Arc, LazyLock};
55use itertools:: Itertools ;
66use url:: Url ;
77
8- use crate :: actions:: { get_commit_schema, ADD_NAME } ;
9- use crate :: actions:: { visitors:: SidecarVisitor , SIDECAR_NAME } ;
8+ use crate :: actions:: visitors:: SidecarVisitor ;
9+ use crate :: actions:: SIDECAR_NAME ;
10+ use crate :: actions:: { Add , Sidecar , ADD_NAME } ;
1011use crate :: log_replay:: ActionsBatch ;
11- use crate :: schema:: SchemaRef ;
12+ use crate :: schema:: { SchemaRef , StructField , StructType , ToSchema } ;
1213use crate :: utils:: require;
1314use crate :: { DeltaResult , Engine , Error , FileMeta , RowVisitor } ;
1415
@@ -49,11 +50,11 @@ impl ManifestPhase {
4950 log_root : Url ,
5051 engine : Arc < dyn Engine > ,
5152 ) -> DeltaResult < Self > {
52- #[ allow( clippy:: unwrap_used) ]
5353 static MANIFEST_READ_SCHMEA : LazyLock < SchemaRef > = LazyLock :: new ( || {
54- get_commit_schema ( )
55- . project ( & [ ADD_NAME , SIDECAR_NAME ] )
56- . unwrap ( )
54+ Arc :: new ( StructType :: new_unchecked ( [
55+ StructField :: nullable ( ADD_NAME , Add :: to_schema ( ) ) ,
56+ StructField :: nullable ( SIDECAR_NAME , Sidecar :: to_schema ( ) ) ,
57+ ] ) )
5758 } ) ;
5859
5960 let files = vec ! [ manifest_file. clone( ) ] ;
@@ -148,43 +149,37 @@ impl Iterator for ManifestPhase {
148149#[ cfg( test) ]
149150mod tests {
150151 use super :: * ;
151- use crate :: engine:: default:: executor:: tokio:: TokioBackgroundExecutor ;
152152 use crate :: engine:: default:: DefaultEngine ;
153153 use crate :: log_replay:: LogReplayProcessor ;
154154 use crate :: scan:: log_replay:: ScanLogReplayProcessor ;
155155 use crate :: scan:: state_info:: StateInfo ;
156+ use crate :: SnapshotRef ;
156157 use object_store:: local:: LocalFileSystem ;
157- use std:: path :: PathBuf ;
158- use std :: sync :: Arc as StdArc ;
158+ use std:: sync :: Arc ;
159+ use tempfile :: TempDir ;
159160
160161 fn load_test_table (
161162 table_name : & str ,
162- ) -> DeltaResult < (
163- StdArc < DefaultEngine < TokioBackgroundExecutor > > ,
164- StdArc < crate :: Snapshot > ,
165- url:: Url ,
166- ) > {
167- let mut path = PathBuf :: from ( env ! ( "CARGO_MANIFEST_DIR" ) ) ;
168- path. push ( "tests/data" ) ;
169- path. push ( table_name) ;
170-
171- let path = std:: fs:: canonicalize ( path)
172- . map_err ( |e| crate :: Error :: Generic ( format ! ( "Failed to canonicalize path: {}" , e) ) ) ?;
173-
174- let url = url:: Url :: from_directory_path ( path)
163+ ) -> DeltaResult < ( Arc < dyn Engine > , SnapshotRef , Url , TempDir ) > {
164+ let test_dir = test_utils:: load_test_data ( "tests/data" , table_name)
165+ . map_err ( |e| crate :: Error :: Generic ( format ! ( "Failed to load test data: {}" , e) ) ) ?;
166+ let test_path = test_dir. path ( ) . join ( table_name) ;
167+
168+ let url = url:: Url :: from_directory_path ( & test_path)
175169 . map_err ( |_| crate :: Error :: Generic ( "Failed to create URL from path" . to_string ( ) ) ) ?;
176170
177- let store = StdArc :: new ( LocalFileSystem :: new ( ) ) ;
178- let engine = StdArc :: new ( DefaultEngine :: new ( store) ) ;
171+ let store = Arc :: new ( LocalFileSystem :: new ( ) ) ;
172+ let engine = Arc :: new ( DefaultEngine :: new ( store) ) ;
179173 let snapshot = crate :: Snapshot :: builder_for ( url. clone ( ) ) . build ( engine. as_ref ( ) ) ?;
180174
181- Ok ( ( engine, snapshot, url) )
175+ Ok ( ( engine, snapshot, url, test_dir ) )
182176 }
183177
184178 #[ test]
185179 fn test_manifest_phase_with_checkpoint ( ) -> DeltaResult < ( ) > {
186180 // Use a table with v2 checkpoints where adds might be in sidecars
187- let ( engine, snapshot, log_root) = load_test_table ( "v2-checkpoints-json-with-sidecars" ) ?;
181+ let ( engine, snapshot, log_root, _tempdir) =
182+ load_test_table ( "v2-checkpoints-json-with-sidecars" ) ?;
188183 let log_segment = snapshot. log_segment ( ) ;
189184
190185 // Check if there are any checkpoint parts
@@ -193,7 +188,7 @@ mod tests {
193188 return Ok ( ( ) ) ;
194189 }
195190
196- let state_info = StdArc :: new ( StateInfo :: try_new (
191+ let state_info = Arc :: new ( StateInfo :: try_new (
197192 snapshot. schema ( ) ,
198193 snapshot. table_configuration ( ) ,
199194 None ,
@@ -210,10 +205,9 @@ mod tests {
210205 ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) ) ?;
211206
212207 // Count batches and collect results
213- let mut batch_count = 0 ;
214208 let mut file_paths = Vec :: new ( ) ;
215209
216- while let Some ( result) = manifest_phase. next ( ) {
210+ for result in manifest_phase {
217211 let batch = result?;
218212 let metadata = processor. process_actions_batch ( batch) ?;
219213 let paths = metadata. visit_scan_files (
@@ -223,18 +217,8 @@ mod tests {
223217 } ,
224218 ) ?;
225219 file_paths. extend ( paths) ;
226- batch_count += 1 ;
227220 }
228-
229- // For v2 checkpoints with sidecars, the manifest might not contain adds directly.
230- // In this test table, all adds are in sidecars, so manifest should be empty.
231- assert_eq ! (
232- batch_count, 1 ,
233- "Single manifest file should produce exactly 1 batch"
234- ) ;
235-
236221 // Verify the manifest itself contains no add files (they're all in sidecars)
237- file_paths. sort ( ) ;
238222 assert_eq ! (
239223 file_paths. len( ) , 0 ,
240224 "For this v2 checkpoint with sidecars, manifest should contain 0 add files (all in sidecars)"
@@ -245,7 +229,8 @@ mod tests {
245229
246230 #[ test]
247231 fn test_manifest_phase_collects_sidecars ( ) -> DeltaResult < ( ) > {
248- let ( engine, snapshot, log_root) = load_test_table ( "v2-checkpoints-json-with-sidecars" ) ?;
232+ let ( engine, snapshot, log_root, _tempdir) =
233+ load_test_table ( "v2-checkpoints-json-with-sidecars" ) ?;
249234 let log_segment = snapshot. log_segment ( ) ;
250235
251236 if log_segment. checkpoint_parts . is_empty ( ) {
@@ -256,8 +241,6 @@ mod tests {
256241 let checkpoint_file = & log_segment. checkpoint_parts [ 0 ] ;
257242 let manifest_file = checkpoint_file. location . clone ( ) ;
258243
259- let schema = crate :: actions:: get_commit_schema ( ) . project ( & [ crate :: actions:: ADD_NAME ] ) ?;
260-
261244 let mut manifest_phase =
262245 ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) ) ?;
263246
0 commit comments