@@ -9,13 +9,12 @@ use std::{
99
1010use anyhow:: Context ;
1111use async_zip:: {
12- write:: {
12+ tokio :: write:: {
1313 EntryStreamWriter ,
1414 ZipFileWriter ,
1515 } ,
1616 Compression ,
1717 ZipEntryBuilder ,
18- ZipEntryBuilderExt ,
1918} ;
2019use bytes:: Bytes ;
2120use common:: {
@@ -723,7 +722,7 @@ impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> {
723722 table_name : TableName ,
724723 ) -> anyhow:: Result < Self > {
725724 let source_path = format ! ( "{table_name}/documents.jsonl" ) ;
726- let builder = ZipEntryBuilder :: new ( source_path. clone ( ) , Compression :: Deflate )
725+ let builder = ZipEntryBuilder :: new ( source_path. into ( ) , Compression :: Deflate )
727726 . unix_permissions ( ZIP_ENTRY_PERMISSIONS ) ;
728727 let entry_writer = zip_writer. write_entry_stream ( builder. build ( ) ) . await ?;
729728 Ok ( Self { entry_writer } )
@@ -736,11 +735,8 @@ impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> {
736735
737736 async fn write_json_line ( & mut self , json : JsonValue ) -> anyhow:: Result < ( ) > {
738737 let buf = serde_json:: to_vec ( & json) ?;
739- self . entry_writer . compat_mut_write ( ) . write_all ( & buf) . await ?;
740- self . entry_writer
741- . compat_mut_write ( )
742- . write_all ( & AFTER_DOCUMENTS_CLEAN )
743- . await ?;
738+ self . entry_writer . write_all ( & buf) . await ?;
739+ self . entry_writer . write_all ( & AFTER_DOCUMENTS_CLEAN ) . await ?;
744740 Ok ( ( ) )
745741 }
746742
@@ -756,7 +752,7 @@ struct ZipSnapshotUpload<'a> {
756752
757753impl < ' a > ZipSnapshotUpload < ' a > {
758754 async fn new ( out : & ' a mut ChannelWriter ) -> anyhow:: Result < Self > {
759- let writer = ZipFileWriter :: new ( out) ;
755+ let writer = ZipFileWriter :: with_tokio ( out) ;
760756 let mut zip_snapshot_upload = Self { writer } ;
761757 zip_snapshot_upload
762758 . write_full_file ( format ! ( "README.md" ) , README_MD_CONTENTS )
@@ -765,13 +761,10 @@ impl<'a> ZipSnapshotUpload<'a> {
765761 }
766762
767763 async fn write_full_file ( & mut self , path : String , contents : & str ) -> anyhow:: Result < ( ) > {
768- let builder = ZipEntryBuilder :: new ( path, Compression :: Deflate )
764+ let builder = ZipEntryBuilder :: new ( path. into ( ) , Compression :: Deflate )
769765 . unix_permissions ( ZIP_ENTRY_PERMISSIONS ) ;
770766 let mut entry_writer = self . writer . write_entry_stream ( builder. build ( ) ) . await ?;
771- entry_writer
772- . compat_mut_write ( )
773- . write_all ( contents. as_bytes ( ) )
774- . await ?;
767+ entry_writer. write_all ( contents. as_bytes ( ) ) . await ?;
775768 entry_writer. close ( ) . await ?;
776769 Ok ( ( ) )
777770 }
@@ -781,11 +774,11 @@ impl<'a> ZipSnapshotUpload<'a> {
781774 path : String ,
782775 mut contents : BoxStream < ' _ , std:: io:: Result < Bytes > > ,
783776 ) -> anyhow:: Result < ( ) > {
784- let builder = ZipEntryBuilder :: new ( path, Compression :: Deflate )
777+ let builder = ZipEntryBuilder :: new ( path. into ( ) , Compression :: Deflate )
785778 . unix_permissions ( ZIP_ENTRY_PERMISSIONS ) ;
786779 let mut entry_writer = self . writer . write_entry_stream ( builder. build ( ) ) . await ?;
787780 while let Some ( chunk) = contents. try_next ( ) . await ? {
788- entry_writer. compat_mut_write ( ) . write_all ( & chunk) . await ?;
781+ entry_writer. write_all ( & chunk) . await ?;
789782 }
790783 entry_writer. close ( ) . await ?;
791784 Ok ( ( ) )
@@ -817,23 +810,21 @@ impl<'a> ZipSnapshotUpload<'a> {
817810 generated_schema : GeneratedSchema < T > ,
818811 ) -> anyhow:: Result < ( ) > {
819812 let generated_schema_path = format ! ( "{table_name}/generated_schema.jsonl" ) ;
820- let builder = ZipEntryBuilder :: new ( generated_schema_path. clone ( ) , Compression :: Deflate )
813+ let builder = ZipEntryBuilder :: new ( generated_schema_path. into ( ) , Compression :: Deflate )
821814 . unix_permissions ( ZIP_ENTRY_PERMISSIONS ) ;
822815 let mut entry_writer = self . writer . write_entry_stream ( builder. build ( ) ) . await ?;
823816 let generated_schema_str = generated_schema. inferred_shape . to_string ( ) ;
824817 entry_writer
825- . compat_mut_write ( )
826818 . write_all ( serde_json:: to_string ( & generated_schema_str) ?. as_bytes ( ) )
827819 . await ?;
828- entry_writer. compat_mut_write ( ) . write_all ( b"\n " ) . await ?;
820+ entry_writer. write_all ( b"\n " ) . await ?;
829821 for ( override_id, override_export_context) in generated_schema. overrides . into_iter ( ) {
830822 let override_json =
831823 json ! ( { override_id. encode( ) : JsonValue :: from( override_export_context) } ) ;
832824 entry_writer
833- . compat_mut_write ( )
834825 . write_all ( serde_json:: to_string ( & override_json) ?. as_bytes ( ) )
835826 . await ?;
836- entry_writer. compat_mut_write ( ) . write_all ( b"\n " ) . await ?;
827+ entry_writer. write_all ( b"\n " ) . await ?;
837828 }
838829 entry_writer. close ( ) . await ?;
839830 Ok ( ( ) )
@@ -857,6 +848,7 @@ mod tests {
857848 use anyhow:: Context ;
858849 use bytes:: Bytes ;
859850 use common:: {
851+ async_zip_ext:: ZipEntryReaderExt ,
860852 document:: {
861853 ParsedDocument ,
862854 ResolvedDocument ,
@@ -1048,16 +1040,20 @@ mod tests {
10481040 . await ?
10491041 . context ( "object missing from storage" ) ?;
10501042 let stored_bytes = storage_stream. collect_as_bytes ( ) . await ?;
1051- let mut zip_reader = async_zip:: read:: mem:: ZipFileReader :: new ( & stored_bytes) . await ?;
1043+ let zip_reader =
1044+ async_zip:: base:: read:: mem:: ZipFileReader :: new ( stored_bytes. into ( ) ) . await ?;
10521045 let mut zip_entries = BTreeMap :: new ( ) ;
10531046 let filenames: Vec < _ > = zip_reader
1047+ . file ( )
10541048 . entries ( )
1055- . into_iter ( )
1056- . map ( |entry| entry. filename ( ) . to_string ( ) )
1049+ . iter ( )
1050+ . map ( |entry| entry. filename ( ) . as_str ( ) . unwrap ( ) . to_string ( ) )
10571051 . collect ( ) ;
10581052 for ( i, filename) in filenames. into_iter ( ) . enumerate ( ) {
1059- let entry_reader = zip_reader. entry_reader ( i) . await ?;
1060- let entry_contents = String :: from_utf8 ( entry_reader. read_to_end_crc ( ) . await ?) ?;
1053+ let mut entry_reader = zip_reader. reader_with_entry ( i) . await ?;
1054+ let entry_contents = entry_reader
1055+ . read_to_string_checked_bypass_async_zip_crc_bug ( )
1056+ . await ?;
10611057 zip_entries. insert ( filename, entry_contents) ;
10621058 }
10631059 assert_eq ! ( zip_entries, expected_export_entries) ;
@@ -1141,16 +1137,20 @@ mod tests {
11411137 . await ?
11421138 . context ( "object missing from storage" ) ?;
11431139 let stored_bytes = storage_stream. collect_as_bytes ( ) . await ?;
1144- let mut zip_reader = async_zip:: read:: mem:: ZipFileReader :: new ( & stored_bytes) . await ?;
1140+ let zip_reader =
1141+ async_zip:: base:: read:: mem:: ZipFileReader :: new ( stored_bytes. into ( ) ) . await ?;
11451142 let mut zip_entries = BTreeMap :: new ( ) ;
11461143 let filenames: Vec < _ > = zip_reader
1144+ . file ( )
11471145 . entries ( )
1148- . into_iter ( )
1149- . map ( |entry| entry. filename ( ) . to_string ( ) )
1146+ . iter ( )
1147+ . map ( |entry| entry. filename ( ) . as_str ( ) . unwrap ( ) . to_string ( ) )
11501148 . collect ( ) ;
11511149 for ( i, filename) in filenames. into_iter ( ) . enumerate ( ) {
1152- let entry_reader = zip_reader. entry_reader ( i) . await ?;
1153- let entry_contents = String :: from_utf8 ( entry_reader. read_to_end_crc ( ) . await ?) ?;
1150+ let mut entry_reader = zip_reader. reader_with_entry ( i) . await ?;
1151+ let entry_contents = entry_reader
1152+ . read_to_string_checked_bypass_async_zip_crc_bug ( )
1153+ . await ?;
11541154 zip_entries. insert ( filename, entry_contents) ;
11551155 }
11561156 assert_eq ! ( zip_entries, expected_export_entries) ;
0 commit comments