11//! Manifest phase for log replay - processes single-part checkpoint manifest files.
22
3- use std:: sync:: Arc ;
3+ use std:: sync:: { Arc , LazyLock } ;
44
5+ use itertools:: Itertools ;
56use url:: Url ;
67
7- use crate :: actions:: Sidecar ;
88use crate :: actions:: { get_all_actions_schema, visitors:: SidecarVisitor , SIDECAR_NAME } ;
9+ use crate :: actions:: { get_commit_schema, Sidecar , ADD_NAME } ;
910use crate :: expressions:: Transform ;
1011use crate :: log_replay:: ActionsBatch ;
1112use crate :: schema:: { Schema , SchemaRef , StructField , ToSchema } ;
13+ use crate :: utils:: require;
1214use crate :: { DeltaResult , Engine , Error , Expression , ExpressionEvaluator , FileMeta , RowVisitor } ;
1315
1416/// Phase that processes single-part checkpoint manifest files.
@@ -17,8 +19,9 @@ use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMet
1719pub ( crate ) struct ManifestPhase {
1820 actions : Box < dyn Iterator < Item = DeltaResult < ActionsBatch > > + Send > ,
1921 sidecar_visitor : SidecarVisitor ,
20- original_schema : SchemaRef ,
22+ manifest_file : FileMeta ,
2123 log_root : Url ,
24+ is_complete : bool ,
2225}
2326
2427/// Possible transitions after ManifestPhase completes.
@@ -39,12 +42,18 @@ impl ManifestPhase {
3942 /// - `manifest_file`: The checkpoint manifest file to process
4043 /// - `log_root`: Root URL for resolving sidecar paths
4144 /// - `engine`: Engine for reading files
42- /// - `base_schema`: Schema columns required by the processor (will be augmented with sidecar)
4345 pub fn new (
4446 manifest_file : FileMeta ,
4547 log_root : Url ,
4648 engine : Arc < dyn Engine > ,
4749 ) -> DeltaResult < Self > {
50+ #[ allow( clippy:: unwrap_used) ]
51+ static MANIFEST_READ_SCHMEA : LazyLock < SchemaRef > = LazyLock :: new ( || {
52+ get_commit_schema ( )
53+ . project ( & [ ADD_NAME , SIDECAR_NAME ] )
54+ . unwrap ( )
55+ } ) ;
56+
4857 let files = vec ! [ manifest_file. clone( ) ] ;
4958
5059 // Determine file type from extension
@@ -59,13 +68,13 @@ impl ManifestPhase {
5968 "json" => {
6069 engine
6170 . json_handler ( )
62- . read_json_files ( & files, sidecar_schema. clone ( ) , None ) ?
63- }
64- "parquet" => {
65- engine
66- . parquet_handler ( )
67- . read_parquet_files ( & files, sidecar_schema. clone ( ) , None ) ?
71+ . read_json_files ( & files, MANIFEST_READ_SCHMEA . clone ( ) , None ) ?
6872 }
73+ "parquet" => engine. parquet_handler ( ) . read_parquet_files (
74+ & files,
75+ MANIFEST_READ_SCHMEA . clone ( ) ,
76+ None ,
77+ ) ?,
6978 ext => {
7079 return Err ( Error :: generic ( format ! (
7180 "Unsupported checkpoint extension: {}" ,
@@ -80,8 +89,8 @@ impl ManifestPhase {
8089 actions : Box :: new ( actions) ,
8190 sidecar_visitor : SidecarVisitor :: default ( ) ,
8291 log_root,
83- original_schema ,
84- projector ,
92+ manifest_file ,
93+ is_complete : false ,
8594 } )
8695 }
8796
@@ -91,13 +100,20 @@ impl ManifestPhase {
91100 /// - `Sidecars`: Extracted sidecar files
92101 /// - `Done`: No sidecars found
93102 pub ( crate ) fn finalize ( self ) -> DeltaResult < AfterManifest > {
94- // TODO: Check that stream is exhausted. We can track a boolean flag on whether we saw a None yet.
95- let sidecars = self
103+ require ! (
104+ self . is_complete,
105+ Error :: generic( format!(
106+ "Finalized called on ManifestReader for file {:?}" ,
107+ self . manifest_file. location
108+ ) )
109+ ) ;
110+
111+ let sidecars: Vec < _ > = self
96112 . sidecar_visitor
97113 . sidecars
98114 . into_iter ( )
99115 . map ( |s| s. to_filemeta ( & self . log_root ) )
100- . collect :: < Result < Vec < _ > , _ > > ( ) ?;
116+ . try_collect ( ) ?;
101117
102118 if sidecars. is_empty ( ) {
103119 Ok ( AfterManifest :: Done )
@@ -111,18 +127,18 @@ impl Iterator for ManifestPhase {
111127 type Item = DeltaResult < ActionsBatch > ;
112128
113129 fn next ( & mut self ) -> Option < Self :: Item > {
114- self . actions . next ( ) . map ( |batch_result| {
130+ let result = self . actions . next ( ) . map ( |batch_result| {
115131 batch_result. and_then ( |batch| {
116- // Extract sidecar references from the batch
117132 self . sidecar_visitor . visit_rows_of ( batch. actions ( ) ) ?;
118-
119- // Return the batch
120- // TODO: un-select sidecar actions
121- // TODO: project out sidecar actions
122- let batch = self . projector . evaluate ( batch) ;
123133 Ok ( batch)
124134 } )
125- } )
135+ } ) ;
136+
137+ if result. is_none ( ) {
138+ self . is_complete = true ;
139+ }
140+
141+ result
126142 }
127143}
128144
@@ -187,10 +203,8 @@ mod tests {
187203 let checkpoint_file = & log_segment. checkpoint_parts [ 0 ] ;
188204 let manifest_file = checkpoint_file. location . clone ( ) ;
189205
190- let schema = crate :: actions:: get_commit_schema ( ) . project ( & [ crate :: actions:: ADD_NAME ] ) ?;
191-
192206 let mut manifest_phase =
193- ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) , schema ) ?;
207+ ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) ) ?;
194208
195209 // Count batches and collect results
196210 let mut batch_count = 0 ;
@@ -242,13 +256,13 @@ mod tests {
242256 let schema = crate :: actions:: get_commit_schema ( ) . project ( & [ crate :: actions:: ADD_NAME ] ) ?;
243257
244258 let mut manifest_phase =
245- ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) , schema ) ?;
259+ ManifestPhase :: new ( manifest_file, log_root. clone ( ) , engine. clone ( ) ) ?;
246260
247261 // Drain the phase
248262 while manifest_phase. next ( ) . is_some ( ) { }
249263
250264 // Check if sidecars were collected
251- let next = manifest_phase. into_next ( ) ?;
265+ let next = manifest_phase. finalize ( ) ?;
252266
253267 match next {
254268 AfterManifest :: Sidecars { sidecars } => {
0 commit comments