Skip to content

Commit 4c8cc15

Browse files
committed
x
1 parent 21fe5dc commit 4c8cc15

File tree

1 file changed

+99
-35
lines changed

1 file changed

+99
-35
lines changed

src/query/service/src/spillers/union_file.rs

Lines changed: 99 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -552,77 +552,141 @@ where
552552

553553
#[cfg(test)]
554554
mod tests {
555+
use databend_common_base::base::GlobalUniqName;
555556
use databend_common_base::runtime::GlobalIORuntime;
557+
use databend_common_catalog::table_context::TableContext;
558+
use databend_common_config::SpillConfig;
556559
use databend_common_exception::Result;
557560
use databend_common_expression::types::array::ArrayColumnBuilder;
558561
use databend_common_expression::types::number::Int32Type;
559562
use databend_common_expression::types::ArgType;
560563
use databend_common_expression::types::DataType;
561564
use databend_common_expression::types::StringType;
562-
use databend_common_expression::types::UInt64Type;
563565
use databend_common_expression::Column;
564566
use databend_common_expression::FromData;
565567
use databend_common_storage::DataOperator;
568+
use databend_storages_common_cache::TempDirManager;
566569
use parquet::file::properties::WriterProperties;
567570
use parquet::file::properties::WriterPropertiesPtr;
571+
use tempfile::TempDir;
568572

569573
use super::*;
570574
use crate::spillers::async_buffer::BufferPool;
571575
use crate::test_kits::ConfigBuilder;
572576
use crate::test_kits::TestFixture;
573577

574578
#[tokio::test(flavor = "multi_thread")]
575-
async fn test_xxx() -> Result<()> {
576-
let config = ConfigBuilder::create().off_log().build();
577-
let fixture = TestFixture::setup_with_config(&config).await?;
578-
let _ctx = fixture.new_query_ctx().await?;
579-
580-
let props = WriterProperties::default().into();
579+
async fn test_union_file_writer_without_local() -> Result<()> {
580+
let spill_dir = TempDir::new().expect("create spill temp dir");
581+
let mut config = ConfigBuilder::create().off_log().build();
582+
config.spill = SpillConfig::new_for_test(
583+
spill_dir.path().to_string_lossy().into_owned(),
584+
0.01,
585+
1 << 30,
586+
);
581587

582-
let block = DataBlock::new_from_columns(vec![
583-
UInt64Type::from_data(vec![7, 8, 9]),
584-
StringType::from_data(vec!["c", "d", "e"]),
585-
]);
588+
let fixture = TestFixture::setup_with_config(&config).await?;
589+
let ctx = fixture.new_query_ctx().await?;
586590

587-
let data_schema = block.infer_schema();
588591
let executor = GlobalIORuntime::instance();
589592
let memory = 1024 * 1024 * 100;
590593

591594
let pool = BufferPool::create(executor, memory, 3);
592595
let op = DataOperator::instance().operator();
593596

594-
let path = "path";
595-
let writer = op.writer(path).await?;
597+
let remote_path = format!(
598+
"{}/{}",
599+
ctx.query_id_spill_prefix(),
600+
GlobalUniqName::unique()
601+
);
602+
let writer = op.writer(&remote_path).await?;
596603
let remote = pool.buffer_write(writer);
597604

598-
// let dir = todo!();
599-
// let path = todo!();
605+
let mut writer = UnionFileWriter::without_local(remote_path.clone(), remote);
606+
let mut expected = b"hello union writer".to_vec();
607+
writer.write_all(&expected)?;
608+
let extra = b" write bytes";
609+
writer.write_all(extra)?;
610+
expected.extend_from_slice(extra);
611+
writer.flush()?;
600612

601-
// let file = SyncDmaFile::create(path, true)?;
602-
// let align = todo!();
603-
// let buf = DmaWriteBuf::new(align, 4 * 1024 * 1024);
613+
let file = writer.finish()?;
614+
assert!(file.local_path.is_none());
615+
assert_eq!(file.remote_offset, Some(0));
616+
assert_eq!(file.remote_size, expected.len() as u64);
604617

605-
let file = UnionFileWriter::without_local(path.to_string(), remote);
606-
let mut file_writer = FileWriter::new(props, &data_schema, file)?;
618+
let reader = op.reader(&file.remote_path).await?;
619+
let buffer = reader.read(0..file.remote_size).await?;
620+
assert_eq!(buffer.to_vec(), expected);
607621

608-
let mut row_groups = vec![];
609-
let row_group = file_writer.spill(vec![block])?;
610-
row_groups.push((*row_group).clone());
622+
Ok(())
623+
}
611624

612-
let (metadata, file) = file_writer.finish()?;
625+
#[tokio::test(flavor = "multi_thread")]
626+
async fn test_union_file_writer_with_local() -> Result<()> {
627+
let spill_dir = TempDir::new().expect("create spill temp dir");
628+
let mut config = ConfigBuilder::create().off_log().build();
629+
config.spill = SpillConfig::new_for_test(
630+
spill_dir.path().to_string_lossy().into_owned(),
631+
0.01,
632+
1 << 30,
633+
);
613634

614-
let input = FileReader {
615-
meta: metadata.into(),
616-
local: None,
617-
remote_reader: op.reader(&file.remote_path).await?,
618-
remote_offset: None,
619-
};
635+
let fixture = TestFixture::setup_with_config(&config).await?;
636+
let ctx = fixture.new_query_ctx().await?;
620637

621-
let builder = ArrowReaderBuilder::new(input).await?;
622-
let stream = builder.with_batch_size(usize::MAX).build()?;
638+
let executor = GlobalIORuntime::instance();
639+
let memory = 1024 * 1024 * 100;
640+
641+
let pool = BufferPool::create(executor, memory, 3);
642+
let op = DataOperator::instance().operator();
643+
644+
let remote_path = format!(
645+
"{}/{}",
646+
ctx.query_id_spill_prefix(),
647+
GlobalUniqName::unique()
648+
);
649+
let writer = op.writer(&remote_path).await?;
650+
let remote = pool.buffer_write(writer);
651+
652+
let query_id = ctx.get_id();
653+
let temp_dir = TempDirManager::instance()
654+
.get_disk_spill_dir(memory, &query_id)
655+
.expect("local spill directory should be available");
656+
let temp_path = temp_dir
657+
.new_file_with_size(0)?
658+
.expect("spill temp file should be allocated");
659+
660+
let dio = false;
661+
let file = SyncDmaFile::create(&temp_path, dio)?;
662+
let buf = DmaWriteBuf::new(temp_dir.block_alignment(), 4 * 1024 * 1024);
663+
664+
let mut union_writer = UnionFileWriter::new(
665+
temp_dir.clone(),
666+
temp_path,
667+
file,
668+
buf,
669+
remote_path.clone(),
670+
remote,
671+
);
672+
673+
assert!(union_writer.has_opening_local());
674+
675+
let mut expected = b"bytes on disk".to_vec();
676+
union_writer.write_all(&expected)?;
677+
let extra = b" via union writer";
678+
union_writer.write_all(extra)?;
679+
expected.extend_from_slice(extra);
680+
union_writer.flush()?;
681+
682+
let file = union_writer.finish()?;
683+
684+
let local_path = file.local_path.clone().expect("local path should exist");
685+
assert!(file.remote_offset.is_none());
686+
assert_eq!(file.remote_size, 0);
623687

624-
let blocks = load_blocks_from_stream(&data_schema, stream).await?;
625-
println!("{:?}", blocks);
688+
let local_bytes = std::fs::read(local_path.as_ref())?;
689+
assert_eq!(local_bytes, expected);
626690

627691
Ok(())
628692
}

0 commit comments

Comments
 (0)