Skip to content

Commit 0440b44

Browse files
authored
Add option to set row group size and compression (#334)
1 parent 4b7bf99 commit 0440b44

File tree

3 files changed

+85
-2
lines changed

3 files changed

+85
-2
lines changed

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
9090
{
9191
if std::env::var(DEBUG_PYROSCOPE_URL).is_ok() {
9292
let url = std::env::var(DEBUG_PYROSCOPE_URL).ok();
93-
Some(start_profiling(url.unwrap()));
93+
start_profiling(url.unwrap());
9494
}
9595
}
9696

server/src/option.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ pub struct Server {
195195

196196
/// Server should check for update or not
197197
pub check_update: bool,
198+
199+
/// Rows in Parquet Rowgroup
200+
pub row_group_size: usize,
201+
202+
/// Parquet compression algorithm
203+
pub parquet_compression: Compression,
198204
}
199205

200206
impl FromArgMatches for Server {
@@ -231,6 +237,24 @@ impl FromArgMatches for Server {
231237
.get_one::<bool>(Self::CHECK_UPDATE)
232238
.cloned()
233239
.expect("default for check update");
240+
self.row_group_size = m
241+
.get_one::<usize>(Self::ROW_GROUP_SIZE)
242+
.cloned()
243+
.expect("default for row_group size");
244+
self.parquet_compression = match m
245+
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
246+
.expect("default for compression algo")
247+
.as_str()
248+
{
249+
"uncompressed" => Compression::UNCOMPRESSED,
250+
"snappy" => Compression::SNAPPY,
251+
"gzip" => Compression::GZIP,
252+
"lzo" => Compression::LZO,
253+
"brotli" => Compression::BROTLI,
254+
"lz4" => Compression::LZ4,
255+
"zstd" => Compression::ZSTD,
256+
_ => unreachable!(),
257+
};
234258

235259
Ok(())
236260
}
@@ -246,6 +270,8 @@ impl Server {
246270
pub const USERNAME: &str = "username";
247271
pub const PASSWORD: &str = "password";
248272
pub const CHECK_UPDATE: &str = "check-update";
273+
pub const ROW_GROUP_SIZE: &str = "row-group-size";
274+
pub const PARQUET_COMPRESSION_ALGO: &str = "compression-algo";
249275
pub const DEFAULT_USERNAME: &str = "admin";
250276
pub const DEFAULT_PASSWORD: &str = "admin";
251277

@@ -334,6 +360,60 @@ impl Server {
334360
.value_parser(value_parser!(bool))
335361
.help("Password for the basic authentication on the server"),
336362
)
363+
.arg(
364+
Arg::new(Self::ROW_GROUP_SIZE)
365+
.long(Self::ROW_GROUP_SIZE)
366+
.env("P_PARQUET_ROW_GROUP_SIZE")
367+
.value_name("NUMBER")
368+
.required(false)
369+
.default_value("16384")
370+
.value_parser(value_parser!(usize))
371+
.help("Number of rows in a row groups"),
372+
)
373+
.arg(
374+
Arg::new(Self::PARQUET_COMPRESSION_ALGO)
375+
.long(Self::PARQUET_COMPRESSION_ALGO)
376+
.env("P_PARQUET_COMPRESSION_ALGO")
377+
.value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]")
378+
.required(false)
379+
.default_value("lz4")
380+
.value_parser([
381+
"uncompressed",
382+
"snappy",
383+
"gzip",
384+
"lzo",
385+
"brotli",
386+
"lz4",
387+
"zstd"])
388+
.help("Parquet compression algorithm"),
389+
)
390+
}
391+
}
392+
393+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
394+
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
395+
pub enum Compression {
396+
UNCOMPRESSED,
397+
SNAPPY,
398+
GZIP,
399+
LZO,
400+
BROTLI,
401+
#[default]
402+
LZ4,
403+
ZSTD,
404+
}
405+
406+
impl From<Compression> for datafusion::parquet::basic::Compression {
407+
fn from(value: Compression) -> Self {
408+
match value {
409+
Compression::UNCOMPRESSED => datafusion::parquet::basic::Compression::UNCOMPRESSED,
410+
Compression::SNAPPY => datafusion::parquet::basic::Compression::SNAPPY,
411+
Compression::GZIP => datafusion::parquet::basic::Compression::GZIP,
412+
Compression::LZO => datafusion::parquet::basic::Compression::LZO,
413+
Compression::BROTLI => datafusion::parquet::basic::Compression::BROTLI,
414+
Compression::LZ4 => datafusion::parquet::basic::Compression::LZ4,
415+
Compression::ZSTD => datafusion::parquet::basic::Compression::ZSTD,
416+
}
337417
}
338418
}
339419

server/src/storage/object_storage.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ pub trait ObjectStorage: Sync + 'static {
321321
fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
322322
parquet_table.upsert(&parquet_path);
323323

324-
let props = WriterProperties::builder().build();
324+
let props = WriterProperties::builder()
325+
.set_max_row_group_size(CONFIG.parseable.row_group_size)
326+
.set_compression(CONFIG.parseable.parquet_compression.into())
327+
.build();
325328
let schema = Arc::new(record_reader.merged_schema());
326329
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
327330

0 commit comments

Comments
 (0)