11use async_compression:: tokio:: write:: ZstdEncoder ;
22use clap:: Parser ;
3- use futures_util :: future :: try_join_all ;
3+ use nix :: fcntl :: { AT_FDCWD , RenameFlags , renameat2 } ;
44use std:: boxed:: Box ;
55use std:: collections:: HashMap ;
6- use std:: hash:: Hasher ;
7- use std:: io:: Write ;
86use std:: os:: unix:: fs:: MetadataExt ;
97use std:: path:: { Path , PathBuf } ;
108use tokio:: fs;
119use tokio:: fs:: File ;
12- use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
10+ use tokio:: io:: { AsyncReadExt , AsyncWrite , AsyncWriteExt } ;
1311
1412use pkgsmgr:: types:: * ;
13+ use pkgsmgr:: utils:: Hasher ;
1514
1615#[ derive( Parser ) ]
1716#[ command( version, about, long_about = None ) ]
@@ -55,35 +54,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5554 println ! ( "Beginning hashing and compressing..." ) ;
5655 let mut hashes = HashMap :: new ( ) ;
5756
58- for file_group in files. chunks_mut ( 256 ) {
59- let futures: Vec < _ > = file_group
60- . iter_mut ( )
61- . map ( |file_path| {
62- compress_file (
63- file_path. to_path_buf ( ) ,
64- args. hash ,
65- args. compression ,
66- chunks_path,
67- )
68- } )
69- . collect ( ) ;
70-
71- for ( path, hash) in try_join_all ( futures) . await ? {
72- hashes. insert ( path, hash) ;
73- }
57+ for file_path in & files {
58+ let hash = hash_file ( file_path, args. hash ) . await ?;
59+
60+ compress ( file_path, args. compression , chunks_path, & hash) . await ?;
61+
62+ if fs:: hard_link ( & file_path, chunks_path. join ( & hash) )
63+ . await
64+ . is_err ( )
65+ {
66+ fs:: copy ( & file_path, chunks_path. join ( & hash) ) . await ?;
67+ } ;
68+
69+ hashes. insert ( file_path, hash) ;
7470 }
7571
7672 println ! ( "Generating manifest..." ) ;
7773 let mut manifest = "" . to_string ( ) ;
7874
7975 match args. compression {
80- Compression :: Zstd => manifest += "Compressed : zstd\n " ,
76+ Compression :: Zstd => manifest += "Compression : zstd\n " ,
8177 Compression :: None => ( ) ,
8278 }
79+ match args. hash {
80+ HashType :: Blake3 => manifest += "Hasher: blake3\n " ,
81+ HashType :: Xxh3_128 => manifest += "Hasher: xxh3_128\n " ,
82+ }
8383
8484 manifest += "---\n " ;
8585
86- for file in files {
86+ for file in & files {
8787 let hash = hashes
8888 . get ( & file)
8989 . expect ( "tried adding file to manifest that has no hash" ) ;
@@ -94,31 +94,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9494 let size = metadata. size ( ) / 1024 ;
9595 let path = file
9696 . strip_prefix ( & args. input_path )
97- . expect ( "tried adding file to manifest that is outside of input_path" ) ;
97+ . expect ( "tried adding file to manifest that is outside of input_path" )
98+ . to_str ( )
99+ . unwrap ( ) ;
100+
101+ manifest += & format ! ( "{mode};{size};{hash};{path}\n " ) ;
102+ }
103+
104+ // Atomically replace on-disk manifest
105+ let hash = & blake3:: hash ( manifest. as_bytes ( ) ) . to_hex ( ) . to_string ( ) ;
106+ let tmp_link_path = args. output_path . join ( "manifest.tmp" ) ;
107+ let main_link_path = args. output_path . join ( "manifest" ) ;
108+ let manifest_path = args. output_path . join ( hash) ;
98109
99- manifest += & format ! ( "{mode};{size};{hash};{path:?}\n " ) ;
110+ fs:: write ( manifest_path, manifest) . await ?;
111+ fs:: write ( & tmp_link_path, hash) . await ?;
112+
113+ if !& main_link_path. exists ( ) {
114+ fs:: write ( & main_link_path, "" ) . await ?;
100115 }
101116
102- fs:: write ( args. output_path . join ( "manifest" ) , manifest) . await ?;
117+ renameat2 (
118+ AT_FDCWD ,
119+ & tmp_link_path,
120+ AT_FDCWD ,
121+ & main_link_path,
122+ RenameFlags :: RENAME_EXCHANGE ,
123+ ) ?;
124+
125+ fs:: remove_file ( & tmp_link_path) . await ?;
103126
104127 Ok ( ( ) )
105128}
106129
107- async fn compress_file (
108- file_path : PathBuf ,
109- hash : HashType ,
110- compression : Compression ,
111- chunks_path : & Path ,
112- ) -> Result < ( PathBuf , String ) , Box < dyn std:: error:: Error > > {
130+ async fn hash_file (
131+ file_path : & Path ,
132+ hash_method : HashType ,
133+ ) -> Result < String , Box < dyn std:: error:: Error > > {
113134 let mut source_file = match File :: open ( & file_path) . await {
114135 Ok ( file) => file,
115136 Err ( e) => {
116137 eprintln ! ( "couldn't open source file: {}" , file_path. display( ) ) ;
117138 panic ! ( "{e}" )
118139 }
119140 } ;
120- let mut xxh_hasher = xxhash_rust :: xxh3 :: Xxh3Default :: new ( ) ;
121- let mut blake3_hasher = blake3 :: Hasher :: new ( ) ;
141+
142+ let mut hasher = Hasher :: new ( hash_method ) ;
122143
123144 let mut buf = [ 0 ; 8192 ] ;
124145 loop {
@@ -128,48 +149,55 @@ async fn compress_file(
128149 }
129150
130151 let chunk = & buf[ 0 ..n] ;
131- match hash {
132- HashType :: Blake3 => blake3_hasher. write_all ( chunk) ?,
133- HashType :: Xxh3_128 => xxh_hasher. write_all ( chunk) ?,
134- }
152+ hasher. write ( chunk) ;
135153 }
136154
137- let hash = match hash {
138- HashType :: Xxh3_128 => hex:: encode ( xxh_hasher. finish ( ) . to_le_bytes ( ) ) ,
139- HashType :: Blake3 => blake3_hasher. finalize ( ) . to_hex ( ) . to_string ( ) ,
140- } ;
155+ let hash = hasher. digest ( ) ;
141156
142- let chunk_path = & chunks_path. join ( & hash) ;
157+ Ok ( hash)
158+ }
143159
144- if !chunk_path. exists ( ) {
160+ async fn compress (
161+ file_path : & Path ,
162+ compression : Compression ,
163+ chunks_path : & Path ,
164+ hash : & str ,
165+ ) -> Result < ( ) , std:: io:: Error > {
166+ let compressed_chunk_filename = match compression {
167+ Compression :: Zstd => format ! ( "{hash}.zstd" ) ,
168+ Compression :: None => panic ! ( "Tried to compress on a non-compressable request." ) ,
169+ } ;
170+ let compressed_chunk_path = & chunks_path. join ( compressed_chunk_filename) ;
171+
172+ if !compressed_chunk_path. exists ( ) {
145173 let mut source_file = File :: open ( & file_path) . await . unwrap ( ) ;
146174 let temp_file_path = temp_file:: TempFile :: new ( ) ?;
147- let temp_file = File :: create ( & temp_file_path) . await ?;
148-
149- if compression == Compression :: Zstd {
150- let mut zstd = ZstdEncoder :: new ( temp_file) ;
175+ let mut temp_file = File :: create ( & temp_file_path) . await ?;
151176
152- let mut buf = [ 0 ; 8192 ] ;
153- loop {
154- let n = source_file. read ( & mut buf) . await ?;
155- if n == 0 {
156- break ;
157- }
177+ let mut compressor: Box < dyn AsyncWrite + Sync + Unpin > = match compression {
178+ Compression :: Zstd => Box :: new ( ZstdEncoder :: new ( & mut temp_file) ) ,
179+ Compression :: None => panic ! ( "Tried to copmress on a non-compressable request." ) ,
180+ } ;
158181
159- zstd. write_all ( & buf) . await ?;
182+ let mut buf = [ 0 ; 8192 ] ;
183+ loop {
184+ let n = source_file. read ( & mut buf) . await ?;
185+ if n == 0 {
186+ break ;
160187 }
161188
162- let mut zstd_path = chunk_path. clone ( ) ;
163- zstd_path. set_extension ( "zstd" ) ;
164- fs:: copy ( temp_file_path, zstd_path) . await ?;
189+ compressor. write_all ( & buf[ 0 ..n] ) . await ?;
165190 }
166191
167- if fs:: hard_link ( & file_path, chunk_path) . await . is_err ( ) {
168- fs:: copy ( & file_path, chunk_path) . await ?;
169- } ;
192+ // Finish compressing
193+ compressor. flush ( ) . await ?;
194+ compressor. shutdown ( ) . await ?;
195+
196+ // Move compressed from memory and onto disk
197+ fs:: copy ( temp_file_path, compressed_chunk_path) . await ?;
170198
171- println ! ( "Created chunk from path {file_path:?}" ) ;
199+ println ! ( "Compressed chunk from path {file_path:?}" ) ;
172200 } ;
173201
174- Ok ( ( file_path , hash ) )
202+ Ok ( ( ) )
175203}
0 commit comments