1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: io:: Cursor ;
16+ use std:: io:: Read ;
1517use std:: io:: Result ;
1618use std:: pin:: Pin ;
1719use std:: task:: Context ;
@@ -36,6 +38,7 @@ use futures::AsyncBufRead;
3638use futures:: AsyncRead ;
3739use log:: trace;
3840use pin_project:: pin_project;
41+ use zip:: ZipArchive ;
3942
4043use crate :: CompressAlgorithm ;
4144
@@ -73,6 +76,9 @@ impl From<CompressAlgorithm> for DecompressCodec {
7376 CompressAlgorithm :: Xz => DecompressCodec :: Xz ( XzDecoder :: new ( ) ) ,
7477 CompressAlgorithm :: Zlib => DecompressCodec :: Zlib ( ZlibDecoder :: new ( ) ) ,
7578 CompressAlgorithm :: Zstd => DecompressCodec :: Zstd ( ZstdDecoder :: new ( ) ) ,
79+ CompressAlgorithm :: Zip => {
80+ unreachable ! ( "Zip type requires additional judgment and use `decompress_all_zip`" )
81+ }
7682 }
7783 }
7884}
@@ -350,6 +356,25 @@ impl DecompressDecoder {
350356 main. extend_from_slice ( & tail) ;
351357 Ok ( main)
352358 }
359+
360+ pub fn decompress_all_zip ( compressed : & [ u8 ] ) -> databend_common_exception:: Result < Vec < u8 > > {
361+ let mut zip = ZipArchive :: new ( Cursor :: new ( compressed) ) . map_err ( |e| {
362+ ErrorCode :: InvalidCompressionData ( format ! ( "compression data invalid: {e}" ) )
363+ } ) ?;
364+ if zip. len ( ) > 1 {
365+ return Err ( ErrorCode :: InvalidCompressionData (
366+ "Zip only supports single file" ,
367+ ) ) ;
368+ }
369+ let mut file = zip. by_index ( 0 ) . map_err ( |e| {
370+ ErrorCode :: InvalidCompressionData ( format ! ( "compression data invalid: {e}" ) )
371+ } ) ?;
372+ let mut bytes = Vec :: new ( ) ;
373+ file. read_to_end ( & mut bytes) ?;
374+
375+ Ok ( bytes)
376+ }
377+
353378 // need to finish the decoding by adding a empty input
354379 pub fn decompress_batch (
355380 & mut self ,
@@ -577,6 +602,22 @@ mod tests {
577602 Ok ( ( ) )
578603 }
579604
605+ #[ tokio:: test]
606+ async fn test_decompress_reader_zip ( ) -> databend_common_exception:: Result < ( ) > {
607+ let _ = env_logger:: try_init ( ) ;
608+
609+ let mut rng = ThreadRng :: default ( ) ;
610+ let mut content = vec ! [ 0 ; 16 * 1024 * 1024 ] ;
611+ rng. fill_bytes ( & mut content) ;
612+
613+ let compressed_content = CompressCodec :: compress_all_zip ( & content) ?;
614+ let result = DecompressDecoder :: decompress_all_zip ( & compressed_content) ?;
615+
616+ assert_eq ! ( result, content) ;
617+
618+ Ok ( ( ) )
619+ }
620+
580621 #[ tokio:: test]
581622 async fn test_decompress_reader_ontime_gzip ( ) -> Result < ( ) > {
582623 let _ = env_logger:: try_init ( ) ;
0 commit comments