|
1 | 1 | use actix_web::http::header; |
2 | 2 | use anyhow::anyhow; |
3 | 3 | use bytes::Bytes; |
4 | | -use tokio::{runtime::Handle, task::JoinError}; |
| 4 | +use std::{io::Read, path::Path, pin::Pin}; |
| 5 | +use tokio::{ |
| 6 | + io::{AsyncRead, BufReader}, |
| 7 | + runtime::Handle, |
| 8 | + task::JoinError, |
| 9 | +}; |
5 | 10 | use tracing::instrument; |
6 | 11 | use walker_common::compression::{Compression, DecompressionOptions, Detector}; |
7 | 12 |
|
@@ -95,6 +100,36 @@ fn detect(content_type: Option<header::ContentType>, bytes: &[u8]) -> Result<Com |
95 | 100 | }) |
96 | 101 | } |
97 | 102 |
|
| 103 | +/// Take a file, return a wrapped [`AsyncRead`], and wrap that with the required compression decoder. |
| 104 | +pub async fn decompress_async_read( |
| 105 | + path: impl AsRef<Path>, |
| 106 | +) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send>>> { |
| 107 | + let path = path.as_ref(); |
| 108 | + let source = tokio::fs::File::open(path).await?; |
| 109 | + let source = BufReader::new(source); |
| 110 | + |
| 111 | + Ok(match path.extension().and_then(|ext| ext.to_str()) { |
| 112 | + None | Some("sql") => Box::pin(source), |
| 113 | + Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)), |
| 114 | + Some("gz") => Box::pin(async_compression::tokio::bufread::GzipDecoder::new(source)), |
| 115 | + Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), |
| 116 | + }) |
| 117 | +} |
| 118 | + |
| 119 | +/// Take a file, return a wrapped [`Read`], and wrap that with the required compression decoder. |
| 120 | +pub fn decompress_read(path: impl AsRef<Path>) -> anyhow::Result<Box<dyn Read + Send>> { |
| 121 | + let path = path.as_ref(); |
| 122 | + let source = std::fs::File::open(path)?; |
| 123 | + let source = std::io::BufReader::new(source); |
| 124 | + |
| 125 | + Ok(match path.extension().and_then(|ext| ext.to_str()) { |
| 126 | + None | Some("sql") => Box::new(source), |
| 127 | + Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)), |
| 128 | + Some("gz") => Box::new(flate2::read::GzDecoder::new(source)), |
| 129 | + Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), |
| 130 | + }) |
| 131 | +} |
| 132 | + |
98 | 133 | #[cfg(test)] |
99 | 134 | mod test { |
100 | 135 | use crate::decompress::decompress_async; |
|
0 commit comments