diff --git a/.typos.toml b/.typos.toml index 015175dee80dc..3ee56eaf62946 100644 --- a/.typos.toml +++ b/.typos.toml @@ -25,6 +25,10 @@ "creat" = "creat" "crate" = "crate" +[default.extend-identifiers] +## External +WRONLY = "WRONLY" + [files] extend-exclude = [ "**/Cargo.toml", diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index a45c2e8cadd1d..8d6d7ace71f69 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -22,7 +22,9 @@ use std::io::IoSlice; use std::io::SeekFrom; use std::io::Write; use std::ops::Range; +use std::os::fd::AsFd; use std::os::fd::BorrowedFd; +use std::os::fd::OwnedFd; use std::os::unix::io::AsRawFd; use std::path::Path; use std::ptr; @@ -30,7 +32,7 @@ use std::ptr::NonNull; use bytes::Bytes; use rustix::fs::OFlags; -use tokio::fs::File; +use tokio::fs::File as AsyncFile; use tokio::io::AsyncSeekExt; use crate::runtime::spawn_blocking; @@ -192,59 +194,16 @@ pub fn dma_buffer_to_bytes(buf: DmaBuffer) -> Bytes { Bytes::from(data) } -/// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to +/// A `AsyncDmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to /// perform direct IO. -struct DmaFile { - fd: File, +pub struct DmaFile { + fd: F, alignment: Alignment, buf: Option, + offset: usize, } -impl DmaFile { - async fn open_raw(path: impl AsRef, #[allow(unused)] dio: bool) -> io::Result { - #[allow(unused_mut)] - let mut flags = 0; - #[cfg(target_os = "linux")] - if dio { - flags = OFlags::DIRECT.bits() as i32 - } - - File::options() - .read(true) - .custom_flags(flags) - .open(path) - .await - } - - async fn create_raw(path: impl AsRef, #[allow(unused)] dio: bool) -> io::Result { - #[allow(unused_mut)] - let mut flags = OFlags::EXCL; - #[cfg(target_os = "linux")] - if dio { - flags |= OFlags::DIRECT; - } - - File::options() - .write(true) - .create(true) - .truncate(true) - .custom_flags(flags.bits() as i32) - .open(path) - .await - } - - /// Attempts to open a file in read-only mode. - async fn open(path: impl AsRef, dio: bool) -> io::Result { - let file = DmaFile::open_raw(path, dio).await?; - open_dma(file).await - } - - /// Opens a file in write-only mode. - async fn create(path: impl AsRef, dio: bool) -> io::Result { - let file = DmaFile::create_raw(path, dio).await?; - open_dma(file).await - } - +impl DmaFile { fn set_buffer(&mut self, buf: DmaBuffer) { self.buf = Some(buf) } @@ -261,7 +220,6 @@ impl DmaFile { /// Return the alignment requirement for this file. The returned alignment value can be used /// to allocate a buffer to use with this file: - #[expect(dead_code)] pub fn alignment(&self) -> Alignment { self.alignment } @@ -275,17 +233,38 @@ impl DmaFile { } fn write_direct(&mut self) -> io::Result { - let buf = self.buffer(); - match rustix::io::write(&self.fd, buf) { - Ok(n) => { - if n != buf.len() { - return Err(io::Error::other("short write")); + let buf = self.buf.as_ref().unwrap().as_slice(); + let mut written = 0; + let offset = self.align_down(self.offset); + + while written < buf.len() { + match rustix::io::pwrite(&self.fd, &buf[written..], (offset + written) as _) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "write returned zero bytes", + )); + } + Ok(n) => { + written += n; + self.offset = offset + written + } + Err(err) => { + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(err.into()); } - self.mut_buffer().clear(); - Ok(n) } - Err(e) => Err(e.into()), } + self.mut_buffer().clear(); + Ok(written) + } + + fn inc_offset(&mut self, n: usize) { + debug_assert!(n >= self.alignment.as_usize()); + debug_assert_eq!(n, self.alignment.align_down(n)); + self.offset = self.align_down(self.offset) + n; } fn read_direct(&mut self, n: usize) -> io::Result { @@ -305,33 +284,187 @@ impl DmaFile { } } - fn truncate(&self, length: usize) -> io::Result<()> { - rustix::fs::ftruncate(&self.fd, length as u64).map_err(|e| e.into()) + fn truncate(&mut self, length: usize) -> io::Result<()> { + rustix::fs::ftruncate(&self.fd, length as u64).map_err(io::Error::from) + } + + pub fn fsync(&mut self) -> io::Result<()> { + rustix::fs::fsync(&self.fd).map_err(io::Error::from) + } + + pub fn size(&self) -> io::Result { + Ok(rustix::fs::fstat(&self.fd)?.st_size as _) + } + + pub fn length(&self) -> usize { + self.offset + } +} + +pub type AsyncDmaFile = DmaFile; + +impl AsyncDmaFile { + async fn open_fd(path: impl AsRef, dio: bool) -> io::Result { + let flags = if cfg!(target_os = "linux") && dio { + OFlags::DIRECT.bits() as i32 + } else { + 0 + }; + + AsyncFile::options() + .read(true) + .custom_flags(flags) + .open(path) + .await + } + + async fn create_fd(path: impl AsRef, dio: bool) -> io::Result { + let flags = if cfg!(target_os = "linux") && dio { + OFlags::EXCL | OFlags::DIRECT + } else { + OFlags::EXCL + }; + + AsyncFile::options() + .write(true) + .create(true) + .truncate(true) + .custom_flags(flags.bits() as i32) + .open(path) + .await + } + + /// Attempts to open a file in read-only mode. + pub async fn open( + path: impl AsRef, + dio: bool, + align: Option, + ) -> io::Result { + let file = AsyncDmaFile::open_fd(path, dio).await?; + Self::new(file, align).await + } + + /// Opens a file in write-only mode. + async fn create(path: impl AsRef, dio: bool) -> io::Result { + let file = AsyncDmaFile::create_fd(path, dio).await?; + Self::new(file, None).await + } + + async fn new(file: AsyncFile, align: Option) -> io::Result { + let alignment = match align { + Some(align) => align, + None => { + let fd = file.as_raw_fd(); + let stat = asyncify(move || { + rustix::fs::fstatvfs(unsafe { BorrowedFd::borrow_raw(fd) }) + .map_err(io::Error::from) + }) + .await?; + Alignment::new(stat.f_bsize.max(512) as usize).unwrap() + } + }; + + Ok(AsyncDmaFile { + fd: file, + alignment, + buf: None, + offset: 0, + }) } async fn seek(&mut self, pos: SeekFrom) -> io::Result { self.fd.seek(pos).await } -} -async fn open_dma(file: File) -> io::Result { - let stat = fstatvfs(&file).await?; - let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); + pub async fn read_range(&mut self, range: Range) -> io::Result<(DmaBuffer, Range)> { + let align_start = self.align_down(range.start as usize); + let align_end = self.align_up(range.end as usize); + + let buf = Vec::with_capacity_in(align_end - align_start, DmaAllocator::new(self.alignment)); + self.set_buffer(buf); - Ok(DmaFile { - fd: file, - alignment, - buf: None, - }) + if align_start != 0 { + let offset = self.seek(SeekFrom::Start(align_start as u64)).await?; + if offset as usize != align_start { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "range out of range", + )); + } + } + + let fd = self.fd.as_raw_fd(); + let mut buf = self.buf.take().unwrap(); + let alignment = self.alignment; + let mut n; + loop { + (buf, n) = asyncify(move || { + let remain = buf.capacity() - buf.len(); + let mut file = DmaFile { + fd: unsafe { BorrowedFd::borrow_raw(fd) }, + alignment, + buf: Some(buf), + offset: 0, + }; + file.read_direct(remain).map(|n| (file.buf.unwrap(), n)) + }) + .await?; + if align_start + buf.len() >= range.end as usize { + break; + } + if n == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); + } + } + let rt_range = range.start as usize - align_start..range.end as usize - align_start; + Ok((buf, rt_range)) + } } -async fn fstatvfs(file: &File) -> io::Result { - let fd = file.as_raw_fd(); - asyncify(move || { - let fd = unsafe { BorrowedFd::borrow_raw(fd) }; - rustix::fs::fstatvfs(fd).map_err(|e| e.into()) - }) - .await +pub type SyncDmaFile = DmaFile; + +impl SyncDmaFile { + fn open_fd(path: impl rustix::path::Arg, dio: bool) -> io::Result { + let flags = if cfg!(target_os = "linux") && dio { + OFlags::RDONLY | OFlags::DIRECT + } else { + OFlags::RDONLY + }; + rustix::fs::open(path, flags, rustix::fs::Mode::empty()).map_err(io::Error::from) + } + + fn create_fd(path: impl rustix::path::Arg, dio: bool) -> io::Result { + let flags = if cfg!(target_os = "linux") && dio { + OFlags::EXCL | OFlags::CREATE | OFlags::TRUNC | OFlags::WRONLY | OFlags::DIRECT + } else { + OFlags::EXCL | OFlags::CREATE | OFlags::TRUNC | OFlags::WRONLY + }; + + rustix::fs::open(path, flags, rustix::fs::Mode::from_raw_mode(0o666)) + .map_err(io::Error::from) + } + + fn open_dma(fd: OwnedFd) -> io::Result> { + let stat = rustix::fs::fstatvfs(&fd)?; + let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); + + Ok(Self { + fd, + alignment, + buf: None, + offset: 0, + }) + } + + pub fn open(path: impl AsRef, dio: bool) -> io::Result { + let fd = Self::open_fd(path.as_ref(), dio)?; + Self::open_dma(fd) + } + + pub fn create(path: impl AsRef, dio: bool) -> io::Result { + let fd = Self::create_fd(path.as_ref(), dio)?; + Self::open_dma(fd) + } } async fn asyncify(f: F) -> io::Result @@ -347,7 +480,7 @@ where pub struct DmaWriteBuf { allocator: DmaAllocator, - data: Vec>, + data: Vec, chunk: usize, } @@ -360,6 +493,10 @@ impl DmaWriteBuf { } } + pub fn chunk(&self) -> usize { + self.chunk + } + pub fn size(&self) -> usize { if self.data.is_empty() { return 0; @@ -369,10 +506,11 @@ impl DmaWriteBuf { } pub async fn into_file(mut self, path: impl AsRef, dio: bool) -> io::Result { - let mut file = DmaFile { - fd: DmaFile::create_raw(path, dio).await?, + let mut file = AsyncDmaFile { + fd: AsyncDmaFile::create_fd(path, dio).await?, alignment: self.allocator.0, buf: None, + offset: 0, }; let file_length = self.size(); @@ -407,9 +545,170 @@ impl DmaWriteBuf { pub fn into_data(self) -> Vec { self.data } + + fn copy<'a>(src: &'a [u8], dst: &mut DmaBuffer, remain: usize) -> &'a [u8] { + if src.len() <= remain { + dst.extend_from_slice(src); + &src[src.len()..] + } else { + let (left, right) = src.split_at(remain); + dst.extend_from_slice(left); + right + } + } + + fn is_last_full(&self) -> bool { + self.data + .last() + .map(|dst| dst.len() == dst.capacity()) + .unwrap_or(true) + } + + pub fn fast_write(&mut self, buf: &[u8]) -> bool { + let Some(dst) = self.data.last_mut() else { + return false; + }; + + if buf.len() > dst.capacity() - dst.len() { + return false; + } + dst.extend_from_slice(buf); + true + } + + pub fn alloc_buffer(&mut self) { + debug_assert!(self.data.iter().all(|buf| buf.len() == self.chunk)); + self.data + .push(Vec::with_capacity_in(self.chunk, self.allocator)); + } + + pub fn flush_if_full(&mut self, file: &mut SyncDmaFile) -> io::Result { + debug_assert_eq!(self.allocator.0, file.alignment); + + if self.size() < self.chunk { + return Ok(0); + } + + let data = if self.is_last_full() { + &self.data + } else { + &self.data[..self.data.len() - 1] + }; + + let len = data.len() * self.chunk; + + let mut io_slices: Vec<_> = data.iter().map(|buf| IoSlice::new(buf)).collect(); + let written = writev_all(&file.fd, &mut io_slices, file.align_down(file.offset))?; + + let last = self.data.pop(); + self.data.clear(); + match last { + Some(last) if last.len() != last.capacity() => { + self.data.push(last); + } + _ => (), + } + + file.inc_offset(written); + + if written != len { + Err(io::Error::other("short write")) + } else { + Ok(written) + } + } + + fn flush_inner(&mut self, file: &mut SyncDmaFile, close: bool) -> io::Result<()> { + debug_assert_eq!(self.allocator.0, file.alignment); + + if self.data.is_empty() { + return Ok(()); + } + + let last = self + .data + .pop_if(|last| file.align_up(last.len()) > last.len()); + + let last = if let Some(mut last) = last { + if self.data.is_empty() { + use std::cmp::Ordering::*; + match (file.offset - file.align_down(file.offset)).cmp(&last.len()) { + Equal => return Ok(()), + Greater => unreachable!(), + Less => {} + } + } + let len = last.len(); + let align_up = file.align_up(len); + let pad = align_up - len; + debug_assert!(pad != 0); + unsafe { last.set_len(align_up) }; + Some((last, len, pad)) + } else { + None + }; + + let mut slices: Vec<_> = self + .data + .iter() + .map(|buf| IoSlice::new(buf)) + .chain(last.as_ref().map(|last| IoSlice::new(&last.0))) + .collect(); + let written = writev_all(&file.fd, &mut slices[..], file.align_down(file.offset))?; + self.data.clear(); + + file.inc_offset(written); + + if let Some((last, len, pad)) = last { + file.offset -= pad; + file.truncate(file.offset)?; + + if !close { + self.write_all(&last[file.align_down(len)..(file.align_up(len) - pad)])?; + } + } + + Ok(()) + } + + pub fn flush_and_close(&mut self, mut file: SyncDmaFile) -> io::Result { + self.flush_inner(&mut file, true)?; + Ok(file.length()) + } + + pub fn flush(&mut self, file: &mut SyncDmaFile) -> io::Result<()> { + self.flush_inner(file, false) + } +} + +fn writev_all(fd: impl AsFd, mut slices: &mut [IoSlice<'_>], offset: usize) -> io::Result { + let mut written = 0; + + while !slices.is_empty() { + match rustix::io::pwritev(fd.as_fd(), slices, (offset + written) as _) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "writev returned zero bytes", + )); + } + Ok(n) => { + written += n; + IoSlice::advance_slices(&mut slices, n); + } + Err(err) => { + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(err.into()); + } + }; + } + + Ok(written) } -impl Write for DmaWriteBuf { +impl io::Write for DmaWriteBuf { fn write(&mut self, mut buf: &[u8]) -> io::Result { let n = buf.len(); while !buf.is_empty() { @@ -419,20 +718,12 @@ impl Write for DmaWriteBuf { (dst, remain) } _ => { - self.data - .push(Vec::with_capacity_in(self.chunk, self.allocator)); + self.alloc_buffer(); (self.data.last_mut().unwrap(), self.chunk) } }; - if buf.len() <= remain { - dst.extend_from_slice(buf); - buf = &buf[buf.len()..] - } else { - let (left, right) = buf.split_at(remain); - dst.extend_from_slice(left); - buf = right - } + buf = Self::copy(buf, dst, remain); } Ok(n) } @@ -446,7 +737,7 @@ pub async fn dma_write_file_vectored<'a>( path: impl AsRef, bufs: &'a [IoSlice<'a>], ) -> io::Result { - let mut file = DmaFile::create(path.as_ref(), true).await?; + let mut file = AsyncDmaFile::create(path.as_ref(), true).await?; let file_length = bufs.iter().map(|buf| buf.len()).sum(); if file_length == 0 { @@ -504,7 +795,7 @@ pub async fn dma_read_file( mut writer: impl io::Write, ) -> io::Result { const BUFFER_SIZE: usize = 1024 * 1024; - let mut file = DmaFile::open(path.as_ref(), true).await?; + let mut file = AsyncDmaFile::open(path.as_ref(), true, None).await?; let buf = Vec::with_capacity_in( file.align_up(BUFFER_SIZE), DmaAllocator::new(file.alignment), @@ -539,46 +830,15 @@ pub async fn dma_read_file_range( path: impl AsRef, range: Range, ) -> io::Result<(DmaBuffer, Range)> { - let mut file = DmaFile::open(path.as_ref(), true).await?; - - let align_start = file.align_down(range.start as usize); - let align_end = file.align_up(range.end as usize); - - let buf = Vec::with_capacity_in(align_end - align_start, DmaAllocator::new(file.alignment)); - file.set_buffer(buf); - - if align_start != 0 { - let offset = file.seek(SeekFrom::Start(align_start as u64)).await?; - if offset as usize != align_start { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "range out of range", - )); - } - } - - let mut n; - loop { - (file, n) = asyncify(move || { - let buf = file.buffer(); - let remain = buf.capacity() - buf.len(); - file.read_direct(remain).map(|n| (file, n)) - }) - .await?; - if align_start + file.buffer().len() >= range.end as usize { - break; - } - if n == 0 { - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); - } - } - - let rt_range = range.start as usize - align_start..range.end as usize - align_start; - Ok((file.buf.unwrap(), rt_range)) + let mut file = AsyncDmaFile::open(path.as_ref(), true, None).await?; + file.read_range(range).await } #[cfg(test)] mod tests { + use std::io::Read; + use std::io::Write; + use super::*; #[test] @@ -629,7 +889,7 @@ mod tests { assert_eq!(length, want.len()); assert_eq!(got, want); - let file = DmaFile::open(filename, dio).await?; + let file = AsyncDmaFile::open(filename, dio, None).await?; let align = file.alignment; drop(file); @@ -699,7 +959,7 @@ mod tests { let bufs = vec![IoSlice::new(&want)]; dma_write_file_vectored(filename, &bufs).await.unwrap(); - let mut file = DmaFile::open(filename, true).await.unwrap(); + let mut file = AsyncDmaFile::open(filename, true, None).await.unwrap(); let buf = Vec::with_capacity_in(file_size, DmaAllocator::new(file.alignment)); file.set_buffer(buf); @@ -737,4 +997,71 @@ mod tests { let buf = got.to_vec(); println!("{:?} {}", buf.as_ptr(), buf.capacity()); } + + #[test] + fn test_write() -> io::Result<()> { + let filename = "test_write_file"; + let _ = std::fs::remove_file(filename); + let mut file = SyncDmaFile::create(filename, true)?; + + let mut buf = DmaWriteBuf::new(file.alignment, file.alignment.as_usize() * 2); + + { + buf.write_all(b"1")?; + buf.flush(&mut file)?; + file.fsync()?; + + assert_eq!(file.offset, 1); + + let mut got = Vec::new(); + let mut read = std::fs::File::open(filename)?; + let n = read.read_to_end(&mut got)?; + assert_eq!(n, 1); + + assert_eq!(b"1".as_slice(), got.as_slice()); + } + + { + buf.write_all(b"2")?; + buf.write_all(b"3")?; + buf.flush(&mut file)?; + file.fsync()?; + + assert_eq!(file.offset, 3); + + let mut got = Vec::new(); + let mut read = std::fs::File::open(filename)?; + let n = read.read_to_end(&mut got)?; + assert_eq!(n, 3); + + assert_eq!(b"123".as_slice(), got.as_slice()); + } + + { + let data: Vec<_> = b"123" + .iter() + .copied() + .cycle() + .take(file.alignment.as_usize() * 3) + .collect(); + + buf.write_all(&data)?; + buf.flush(&mut file)?; + file.fsync()?; + + assert_eq!(file.offset, 3 + data.len()); + + let mut got = Vec::new(); + let mut read = std::fs::File::open(filename)?; + let n = read.read_to_end(&mut got)?; + assert_eq!(n, 3 + data.len()); + + let want: Vec<_> = [&b"123"[..], &data].concat(); + assert_eq!(want.as_slice(), got.as_slice()); + } + + let _ = std::fs::remove_file(filename); + + Ok(()) + } } diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 4f98ac033fa04..f97911ead9c00 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -31,13 +31,7 @@ mod uniq_id; mod watch_notify; pub use build_info::*; -pub use dma::dma_buffer_to_bytes; -pub use dma::dma_read_file; -pub use dma::dma_read_file_range; -pub use dma::dma_write_file_vectored; -pub use dma::Alignment; -pub use dma::DmaAllocator; -pub use dma::DmaWriteBuf; +pub use dma::*; pub use drop_callback::DropCallback; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; diff --git a/src/query/expression/src/types/array.rs b/src/query/expression/src/types/array.rs index 6586103755b14..cdf459dee9897 100755 --- a/src/query/expression/src/types/array.rs +++ b/src/query/expression/src/types/array.rs @@ -324,7 +324,7 @@ impl ArrayColumn { impl ArrayColumn { pub fn upcast(self, data_type: &DataType) -> ArrayColumn { - let values_type = data_type.as_array().unwrap(); + let values_type = data_type.as_array().expect("must array type"); ArrayColumn { values: T::upcast_column_with_type(self.values, values_type), offsets: self.offsets, diff --git a/src/query/pipeline/transforms/src/processors/traits/spill.rs b/src/query/pipeline/transforms/src/processors/traits/spill.rs index 5ce4444528e54..407605cf3adf8 100644 --- a/src/query/pipeline/transforms/src/processors/traits/spill.rs +++ b/src/query/pipeline/transforms/src/processors/traits/spill.rs @@ -22,6 +22,16 @@ pub enum Location { Local(TempPath), } +impl Location { + pub fn is_local(&self) -> bool { + matches!(self, Location::Local(_)) + } + + pub fn is_remote(&self) -> bool { + matches!(self, Location::Remote(_)) + } +} + #[async_trait::async_trait] pub trait DataBlockSpill: Clone + Send + Sync + 'static { async fn spill(&self, data_block: DataBlock) -> Result { diff --git a/src/query/service/src/physical_plans/physical_recluster.rs b/src/query/service/src/physical_plans/physical_recluster.rs index 1cd58dc8364f3..b95a3031c25d6 100644 --- a/src/query/service/src/physical_plans/physical_recluster.rs +++ b/src/query/service/src/physical_plans/physical_recluster.rs @@ -350,6 +350,7 @@ impl IPhysicalPlan for HilbertPartition { self.num_partitions, window_spill_settings.clone(), disk_spill.clone(), + false, CompactStrategy::new(self.rows_per_block, max_bytes_per_block), )?, ))) diff --git a/src/query/service/src/physical_plans/physical_window_partition.rs b/src/query/service/src/physical_plans/physical_window_partition.rs index 9239994c9df02..0e00ed3ec2af4 100644 --- a/src/query/service/src/physical_plans/physical_window_partition.rs +++ b/src/query/service/src/physical_plans/physical_window_partition.rs @@ -161,6 +161,7 @@ impl IPhysicalPlan for WindowPartition { _ => unimplemented!(), }; let window_spill_settings = MemorySettings::from_window_settings(&builder.ctx)?; + let enable_backpressure_spiller = settings.get_enable_backpressure_spiller()?; let processor_id = AtomicUsize::new(0); builder.main_pipeline.add_transform(|input, output| { @@ -181,6 +182,7 @@ impl IPhysicalPlan for WindowPartition { num_partitions, window_spill_settings.clone(), disk_spill.clone(), + enable_backpressure_spiller, strategy, )?, ))) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 37dc06bde0233..13cfa1ed55a53 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -270,7 +270,7 @@ impl HashJoinSpiller { .add_data_block(partition_id, data_block); if let Some(data_blocks) = self .partition_buffer - .fetch_data_blocks(partition_id, &fetch_option)? + .fetch_data_blocks(partition_id, &fetch_option) { self.spiller .spill_with_partition(partition_id, data_blocks) @@ -341,8 +341,9 @@ impl HashJoinSpiller { PartitionBufferFetchOption::ReadPartition }; - self.partition_buffer - .fetch_data_blocks(partition_id, &option) + Ok(self + .partition_buffer + .fetch_data_blocks(partition_id, &option)) } fn partition_data_block( diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs index 5aa4562c98865..692511aa01a94 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs @@ -16,6 +16,7 @@ mod data_processor_strategy; mod hilbert_partition_exchange; mod transform_window_partition_collect; mod window_partition_buffer; +mod window_partition_buffer_v2; mod window_partition_exchange; mod window_partition_meta; mod window_partition_partial_top_n_exchange; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index 3051a2f0f018c..d525f44f435f9 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -20,6 +20,7 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; @@ -30,31 +31,105 @@ use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_transforms::MemorySettings; use databend_common_settings::Settings; use databend_common_storage::DataOperator; +use either::Either; +use super::window_partition_buffer_v2::WindowPartitionBufferV2; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; use crate::pipelines::processors::transforms::DataProcessorStrategy; use crate::sessions::QueryContext; +use crate::spillers::BackpressureSpiller; +use crate::spillers::BufferPool; use crate::spillers::Spiller; use crate::spillers::SpillerConfig; use crate::spillers::SpillerDiskConfig; use crate::spillers::SpillerType; +enum WindowBuffer { + V1(WindowPartitionBuffer), + V2(WindowPartitionBufferV2), +} + +impl WindowBuffer { + fn new( + spiller: Either, + num_partitions: usize, + sort_block_size: usize, + memory_settings: MemorySettings, + ) -> Result { + match spiller { + Either::Left(spiller) => { + let inner = WindowPartitionBuffer::new( + spiller, + num_partitions, + sort_block_size, + memory_settings, + )?; + Ok(Self::V1(inner)) + } + Either::Right(spiller) => { + let inner = WindowPartitionBufferV2::new( + spiller, + num_partitions, + sort_block_size, + memory_settings, + )?; + Ok(Self::V2(inner)) + } + } + } + + fn need_spill(&mut self) -> bool { + match self { + WindowBuffer::V1(inner) => inner.need_spill(), + WindowBuffer::V2(inner) => inner.need_spill(), + } + } + + fn is_empty(&self) -> bool { + match self { + WindowBuffer::V1(inner) => inner.is_empty(), + WindowBuffer::V2(inner) => inner.is_empty(), + } + } + + fn add_data_block(&mut self, partition_id: usize, data_block: DataBlock) { + match self { + WindowBuffer::V1(inner) => inner.add_data_block(partition_id, data_block), + WindowBuffer::V2(inner) => inner.add_data_block(partition_id, data_block), + } + } + + async fn spill(&mut self) -> Result<()> { + match self { + WindowBuffer::V1(inner) => inner.spill().await, + WindowBuffer::V2(inner) => inner.spill().await, + } + } + + async fn restore(&mut self) -> Result> { + match self { + WindowBuffer::V1(inner) => inner.restore().await, + WindowBuffer::V2(inner) => inner.restore().await, + } + } +} + #[derive(Debug, Clone, Copy)] -pub enum Step { +enum Step { Sync(SyncStep), Async(AsyncStep), Finish, } #[derive(Debug, Clone, Copy)] -pub enum SyncStep { +enum SyncStep { Collect, Process, } #[derive(Debug, Clone, Copy)] -pub enum AsyncStep { +enum AsyncStep { Spill, Restore, } @@ -69,7 +144,7 @@ pub struct TransformWindowPartitionCollect { // The partition id is used to map the partition id to the new partition id. partition_id: Vec, // The buffer is used to control the memory usage of the window operator. - buffer: WindowPartitionBuffer, + buffer: WindowBuffer, strategy: S, @@ -79,6 +154,7 @@ pub struct TransformWindowPartitionCollect { } impl TransformWindowPartitionCollect { + #[expect(clippy::too_many_arguments)] pub fn new( ctx: Arc, input: Arc, @@ -89,6 +165,7 @@ impl TransformWindowPartitionCollect { num_partitions: usize, memory_settings: MemorySettings, disk_spill: Option, + enable_backpressure_spiller: bool, strategy: S, ) -> Result { // Calculate the partition ids collected by the processor. @@ -110,18 +187,26 @@ impl TransformWindowPartitionCollect { use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; - // Create an inner `Spiller` to spill data. + // Create spillers for window operator. let operator = DataOperator::instance().spill_operator(); - let spiller = Spiller::create(ctx, operator, spill_config)?; + let spiller = if !enable_backpressure_spiller { + Either::Left(Spiller::create(ctx, operator, spill_config)?) + } else { + let runtime = GlobalIORuntime::instance(); + let buffer_pool = BufferPool::create(runtime, 128 * 1024 * 1024, 3); + Either::Right(BackpressureSpiller::create( + ctx, + operator, + spill_config, + buffer_pool, + 8 * 1024 * 1024, + )?) + }; // Create the window partition buffer. let sort_block_size = settings.get_window_partition_sort_block_size()? as usize; - let buffer = WindowPartitionBuffer::new( - spiller, - partitions.len(), - sort_block_size, - memory_settings, - )?; + let buffer = + WindowBuffer::new(spiller, partitions.len(), sort_block_size, memory_settings)?; Ok(Self { input, @@ -275,7 +360,7 @@ impl TransformWindowPartitionCollect { fn collect_data_block( data_block: DataBlock, partition_ids: &[usize], - buffer: &mut WindowPartitionBuffer, + buffer: &mut WindowBuffer, ) { if let Some(meta) = data_block .get_owned_meta() diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index bf01acedc586c..f5ee18c7567ac 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -63,7 +63,7 @@ impl WindowPartitionBuffer { self.can_spill && self.memory_settings.check_spill() } - pub fn out_of_memory_limit(&mut self) -> bool { + fn out_of_memory_limit(&mut self) -> bool { self.memory_settings.check_spill() } @@ -93,7 +93,7 @@ impl WindowPartitionBuffer { { if let Some(data_blocks) = self .partition_buffer - .fetch_data_blocks(partition_id, &option)? + .fetch_data_blocks(partition_id, &option) { return self .spiller @@ -112,7 +112,7 @@ impl WindowPartitionBuffer { self.partition_buffer.partition_memory_size(partition_id); if let Some(data_blocks) = self .partition_buffer - .fetch_data_blocks(partition_id, &option)? + .fetch_data_blocks(partition_id, &option) { partitions_to_spill.push((partition_id, data_blocks)); accumulated_bytes += partition_memory_size; @@ -190,9 +190,9 @@ impl WindowPartitionBuffer { let option = PartitionBufferFetchOption::PickPartitionWithThreshold(0); if let Some(data_blocks) = self .partition_buffer - .fetch_data_blocks(partition_id, &option)? + .fetch_data_blocks(partition_id, &option) { - result.extend(self.concat_data_blocks(data_blocks)?); + result.extend(concat_data_blocks(data_blocks, self.sort_block_size)?); } } @@ -203,9 +203,9 @@ impl WindowPartitionBuffer { let option = PartitionBufferFetchOption::PickPartitionWithThreshold(0); if let Some(data_blocks) = self .restored_partition_buffer - .fetch_data_blocks(partition_id, &option)? + .fetch_data_blocks(partition_id, &option) { - result.extend(self.concat_data_blocks(data_blocks)?); + result.extend(concat_data_blocks(data_blocks, self.sort_block_size)?); } } @@ -215,26 +215,29 @@ impl WindowPartitionBuffer { } Ok(vec![]) } +} - fn concat_data_blocks(&self, data_blocks: Vec) -> Result> { - let mut num_rows = 0; - let mut result = Vec::new(); - let mut current_blocks = Vec::new(); - - for data_block in data_blocks.into_iter() { - num_rows += data_block.num_rows(); - current_blocks.push(data_block); - if num_rows >= self.sort_block_size { - result.push(DataBlock::concat(¤t_blocks)?); - num_rows = 0; - current_blocks.clear(); - } - } - - if !current_blocks.is_empty() { +pub(super) fn concat_data_blocks( + data_blocks: Vec, + target_size: usize, +) -> Result> { + let mut num_rows = 0; + let mut result = Vec::new(); + let mut current_blocks = Vec::new(); + + for data_block in data_blocks.into_iter() { + num_rows += data_block.num_rows(); + current_blocks.push(data_block); + if num_rows >= target_size { result.push(DataBlock::concat(¤t_blocks)?); + num_rows = 0; + current_blocks.clear(); } + } - Ok(result) + if !current_blocks.is_empty() { + result.push(DataBlock::concat(¤t_blocks)?); } + + Ok(result) } diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs new file mode 100644 index 0000000000000..51c2bcb89893b --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs @@ -0,0 +1,305 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_pipeline_transforms::MemorySettings; + +use super::concat_data_blocks; +use crate::spillers::BackpressureSpiller; +use crate::spillers::SpillReader; +use crate::spillers::SpillWriter; + +#[async_trait::async_trait] +pub trait Reader: Send { + async fn restore(&mut self, ordinals: Vec) -> Result>; +} + +#[async_trait::async_trait] +pub trait Writer: Send { + type Reader: Reader; + + async fn spill(&mut self, blocks: Vec) -> Result; + + async fn close(self) -> Result; +} + +#[async_trait::async_trait] +pub trait Builder: Send + Sync { + type Writer: Writer; + + async fn create(&self, schema: Arc) -> Result; +} + +#[async_trait::async_trait] +impl Builder for BackpressureSpiller { + type Writer = SpillWriter; + + async fn create(&self, schema: Arc) -> Result { + self.new_spill_writer(schema) + } +} + +#[async_trait::async_trait] +impl Writer for SpillWriter { + type Reader = SpillReader; + + async fn spill(&mut self, blocks: Vec) -> Result { + if !self.is_opened() { + self.open().await?; + } + self.add_row_group(blocks) + } + + async fn close(self) -> Result { + self.close() + } +} + +#[async_trait::async_trait] +impl Reader for SpillReader { + async fn restore(&mut self, ordinals: Vec) -> Result> { + self.restore(ordinals).await + } +} + +#[derive(Default)] +enum PartitionSpillState +where + W: Writer, + W::Reader: Reader, +{ + #[default] + Empty, + Writing(W), + Reading(W::Reader), +} + +struct PartitionSlot +where + W: Writer, + W::Reader: Reader, +{ + state: PartitionSpillState, + spilled_ordinals: Vec, + buffered_blocks: Vec, + buffered_size: usize, +} + +impl Default for PartitionSlot +where + W: Writer, + W::Reader: Reader, +{ + fn default() -> Self { + Self { + state: Default::default(), + spilled_ordinals: Default::default(), + buffered_blocks: Default::default(), + buffered_size: Default::default(), + } + } +} + +impl PartitionSlot +where + W: Writer, + W::Reader: Reader, +{ + fn add_block(&mut self, block: DataBlock) { + self.buffered_size += block.memory_size(); + self.buffered_blocks.push(block); + } + + fn memory_size(&self) -> usize { + self.buffered_size + } + + fn is_empty(&self) -> bool { + self.buffered_blocks.is_empty() + } + + fn fetch_blocks(&mut self, threshold: Option) -> Option> { + if self.buffered_size >= threshold.unwrap_or_default() { + self.buffered_size = 0; + Some(std::mem::take(&mut self.buffered_blocks)) + } else { + None + } + } + + async fn writer_mut<'a, B>(&'a mut self, builder: &B, block: &DataBlock) -> Result<&'a mut W> + where B: Builder { + match &mut self.state { + state @ PartitionSpillState::Empty => { + let writer = builder.create(block.infer_schema().into()).await?; + let _ = std::mem::replace(state, PartitionSpillState::Writing(writer)); + let PartitionSpillState::Writing(writer) = state else { + unreachable!() + }; + Ok(writer) + } + PartitionSpillState::Writing(writer) => Ok(writer), + PartitionSpillState::Reading(_) => unreachable!("partition already closed"), + } + } + + async fn close_writer(&mut self) -> Result<&mut W::Reader> { + let PartitionSpillState::Writing(writer) = std::mem::take(&mut self.state) else { + unreachable!() + }; + self.state = PartitionSpillState::Reading(writer.close().await?); + let PartitionSpillState::Reading(reader) = &mut self.state else { + unreachable!() + }; + Ok(reader) + } +} + +pub(super) type WindowPartitionBufferV2 = PartitionBuffer; + +pub(super) struct PartitionBuffer +where B: Builder +{ + spiller: B, + partitions: Vec>, + memory_settings: MemorySettings, + min_row_group_size: usize, + num_partitions: usize, + sort_block_size: usize, + can_spill: bool, + next_to_restore_partition_id: isize, +} + +impl PartitionBuffer +where B: Builder +{ + pub fn new( + spiller: B, + num_partitions: usize, + sort_block_size: usize, + memory_settings: MemorySettings, + ) -> Result { + let partitions = (0..num_partitions) + .map(|_| PartitionSlot::default()) + .collect(); + Ok(Self { + spiller, + partitions, + memory_settings, + min_row_group_size: 10 * 1024 * 1024, + num_partitions, + sort_block_size, + can_spill: false, + next_to_restore_partition_id: -1, + }) + } +} + +impl PartitionBuffer +where B: Builder +{ + pub fn need_spill(&mut self) -> bool { + self.can_spill && self.memory_settings.check_spill() + } + + pub fn is_empty(&self) -> bool { + self.next_to_restore_partition_id + 1 >= self.num_partitions as isize + } + + pub fn add_data_block(&mut self, partition_id: usize, data_block: DataBlock) { + if data_block.is_empty() { + return; + } + let partition = &mut self.partitions[partition_id]; + partition.add_block(data_block); + if !self.can_spill && partition.memory_size() >= self.min_row_group_size { + self.can_spill = true; + } + } + + pub async fn spill(&mut self) -> Result<()> { + let spill_unit_size = self.memory_settings.spill_unit_size; + let next_to_restore_partition_id = (self.next_to_restore_partition_id + 1) as usize; + + let mut preferred_partition: Option<(usize, usize)> = None; + for partition_id in (next_to_restore_partition_id..self.num_partitions).rev() { + let partition = &mut self.partitions[partition_id]; + if partition.is_empty() { + continue; + } + if let Some(blocks) = partition.fetch_blocks(Some(spill_unit_size)) { + let ordinal = { + let writer = partition.writer_mut(&self.spiller, &blocks[0]).await?; + writer.spill(blocks).await? + }; + partition.spilled_ordinals.push(ordinal); + return Ok(()); + } + + let partition_size = partition.memory_size(); + if preferred_partition + .as_ref() + .map(|(_, size)| partition_size > *size) + .unwrap_or(true) + { + preferred_partition = Some((partition_id, partition_size)); + } + } + + if let Some((partition_id, size)) = preferred_partition + && size >= self.min_row_group_size + { + let partition = &mut self.partitions[partition_id]; + let blocks = partition.fetch_blocks(None).unwrap(); + let ordinal = { + let writer = partition.writer_mut(&self.spiller, &blocks[0]).await?; + writer.spill(blocks).await? + }; + partition.spilled_ordinals.push(ordinal); + } else { + self.can_spill = false; + } + Ok(()) + } + + pub async fn restore(&mut self) -> Result> { + while self.next_to_restore_partition_id + 1 < self.num_partitions as isize { + self.next_to_restore_partition_id += 1; + let partition_id = self.next_to_restore_partition_id as usize; + let partition = &mut self.partitions[partition_id]; + + let ordinals = std::mem::take(&mut partition.spilled_ordinals); + let mut result = if ordinals.is_empty() { + Vec::new() + } else { + let reader = partition.close_writer().await?; + reader.restore(ordinals).await? + }; + + if let Some(blocks) = partition.fetch_blocks(None) { + result.extend(concat_data_blocks(blocks, self.sort_block_size)?); + } + + if !result.is_empty() { + return Ok(result); + } + } + + Ok(vec![]) + } +} diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index c35e12f03021b..7cbfe7a062cf2 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -411,26 +411,20 @@ impl QueryContext { self.shared.clear_tables_cache() } - pub fn add_spill_file( - &self, - location: spillers::Location, - layout: spillers::Layout, - data_size: usize, - ) { - if matches!(location, spillers::Location::Remote(_)) { - let current_id = self.get_cluster().local_id(); - let mut w = self.shared.cluster_spill_progress.write(); - let p = SpillProgress::new(1, data_size); - w.entry(current_id) - .and_modify(|stats| { - stats.incr(&p); - }) - .or_insert(p); - } - { - let mut w = self.shared.spilled_files.write(); - w.insert(location, layout); - } + pub fn incr_spill_progress(&self, file_nums: usize, data_size: usize) { + let current_id = self.get_cluster().local_id(); + let mut w = self.shared.cluster_spill_progress.write(); + let p = SpillProgress::new(file_nums, data_size); + w.entry(current_id) + .and_modify(|stats| { + stats.incr(&p); + }) + .or_insert(p); + } + + pub fn add_spill_file(&self, location: spillers::Location, layout: spillers::Layout) { + let mut w = self.shared.spilled_files.write(); + w.insert(location, layout); } pub fn set_cluster_spill_progress(&self, source_target: &str, stats: SpillProgress) { diff --git a/src/query/service/src/spillers/adapter.rs b/src/query/service/src/spillers/adapter.rs index e70043766abef..5bdf2c22011f7 100644 --- a/src/query/service/src/spillers/adapter.rs +++ b/src/query/service/src/spillers/adapter.rs @@ -25,20 +25,27 @@ use databend_common_base::base::dma_buffer_to_bytes; use databend_common_base::base::dma_read_file_range; use databend_common_base::base::ProgressValues; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; use databend_common_pipeline_transforms::traits::DataBlockSpill; +use databend_storages_common_cache::ParquetMetaData; use databend_storages_common_cache::TempPath; use opendal::Buffer; use opendal::Operator; +use parquet::file::metadata::RowGroupMetaDataPtr; +use super::async_buffer::BufferPool; +use super::block_reader::BlocksReader; +use super::block_writer::BlocksWriter; use super::inner::*; use super::serialize::*; +use super::union_file::*; use super::Location; use crate::sessions::QueryContext; -use crate::spillers::block_reader::BlocksReader; -use crate::spillers::block_writer::BlocksWriter; +#[derive(Clone)] pub struct PartitionAdapter { ctx: Arc, // Stores the spilled files that controlled by current spiller @@ -53,7 +60,11 @@ impl SpillAdapter for PartitionAdapter { .write() .unwrap() .insert(location.clone(), layout.clone()); - self.ctx.as_ref().add_spill_file(location, layout, size); + + if location.is_remote() { + self.ctx.as_ref().incr_spill_progress(1, size); + } + self.ctx.as_ref().add_spill_file(location, layout); } fn get_spill_layout(&self, location: &Location) -> Option { @@ -95,8 +106,8 @@ impl Spiller { #[async_backtrace::framed] /// Read spilled data with partition id - pub async fn read_spilled_partition(&mut self, procedure_id: &usize) -> Result> { - if let Some(locs) = self.adapter.partition_location.get(procedure_id) { + pub async fn read_spilled_partition(&mut self, partition_id: &usize) -> Result> { + if let Some(locs) = self.adapter.partition_location.get(partition_id) { let mut spilled_data = Vec::with_capacity(locs.len()); for (loc, _data_size, _blocks_num) in locs.iter() { let block = self.read_spilled_file(loc).await?; @@ -221,7 +232,7 @@ impl Spiller { let instant = Instant::now(); let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. - record_write_profile(&location, &instant, write_bytes); + record_write_profile(location.is_local(), &instant, write_bytes); self.adapter .add_spill_file(location.clone(), layout, write_bytes); @@ -265,7 +276,7 @@ impl Spiller { }; // Record statistics. - record_read_profile(location, &instant, data.len()); + record_read_profile(location.is_local(), &instant, data.len()); // Deserialize partitioned data block. let mut partitioned_data = Vec::with_capacity(partitions.len()); @@ -299,7 +310,7 @@ impl Spiller { Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?, }; - record_read_profile(location, &instant, data.len()); + record_read_profile(location.is_local(), &instant, data.len()); deserialize_block(layout, data) } @@ -344,6 +355,55 @@ impl Spiller { } } +#[derive(Clone)] +pub struct BackpressureAdapter { + ctx: Arc, + buffer_pool: Arc, + chunk_size: usize, +} + +impl BackpressureAdapter { + fn add_spill_file(&self, location: Location, layout: Layout, size: usize) { + if location.is_remote() { + self.ctx.as_ref().incr_spill_progress(1, size); + self.ctx + .as_ref() + .add_spill_file(location.clone(), layout.clone()); + } + } +} + +pub type BackpressureSpiller = SpillerInner; + +impl BackpressureSpiller { + pub fn create( + ctx: Arc, + operator: Operator, + config: SpillerConfig, + buffer_pool: Arc, + chunk_size: usize, + ) -> Result { + Self::new( + BackpressureAdapter { + ctx, + buffer_pool, + chunk_size, + }, + operator, + config, + ) + } + + pub fn new_spill_writer(&self, schema: Arc) -> Result { + Ok(SpillWriter { + spiller: self.clone(), + chunk_size: self.adapter.chunk_size, + schema, + file_writer: None, + }) + } +} + pub struct MergedPartition { pub location: Location, pub partitions: Vec<(usize, Chunk)>, @@ -354,9 +414,129 @@ pub struct Chunk { pub layout: Layout, } +pub struct SpillWriter { + spiller: BackpressureSpiller, + chunk_size: usize, + schema: Arc, + file_writer: Option>, +} + +impl SpillWriter { + pub async fn open(&mut self) -> Result<()> { + if self.file_writer.is_some() { + return Err(ErrorCode::Internal("SpillWriter already opened")); + } + + let writer = self + .spiller + .new_file_writer( + &self.schema, + &self.spiller.adapter.buffer_pool, + self.chunk_size, + ) + .await?; + self.file_writer = Some(writer); + Ok(()) + } + + pub fn is_opened(&self) -> bool { + self.file_writer.is_some() + } + + pub fn add_row_group(&mut self, blocks: Vec) -> Result { + let Some(file_writer) = self.file_writer.as_mut() else { + return Err(ErrorCode::Internal("SpillWriter should open first")); + }; + + let is_local = file_writer.has_opening_local(); + let start = std::time::Instant::now(); + + let row_group_meta = file_writer.spill(blocks)?; + + record_write_profile(is_local, &start, row_group_meta.compressed_size() as _); + + let ordinal = row_group_meta.ordinal().unwrap(); + Ok(ordinal as _) + } + + pub fn new_row_group_encoder(&self) -> Option { + self.file_writer.as_ref().map(|w| w.new_row_group()) + } + + pub fn add_row_group_encoded( + &mut self, + row_group: RowGroupEncoder, + ) -> Result { + let Some(file_writer) = self.file_writer.as_mut() else { + return Err(ErrorCode::Internal("SpillWriter should open first")); + }; + Ok(file_writer.flush_row_group(row_group)?) + } + + pub fn close(self) -> Result { + let Some(file_writer) = self.file_writer else { + return Err(ErrorCode::Internal( + "attempted to close window spill writer without data".to_string(), + )); + }; + + let (metadata, union_file) = file_writer.finish()?; + + if let Some(path) = &union_file.local_path { + self.spiller.adapter.add_spill_file( + Location::Local(path.clone()), + Layout::Parquet, + path.size(), + ); + } + + self.spiller.adapter.add_spill_file( + Location::Remote(union_file.remote_path.clone()), + Layout::Parquet, + union_file + .remote_size + .saturating_sub(union_file.remote_offset.unwrap_or_default()) as _, + ); + + Ok(SpillReader { + spiller: self.spiller, + schema: self.schema, + parquet_metadata: Arc::new(metadata), + union_file, + }) + } +} + +pub struct SpillReader { + spiller: BackpressureSpiller, + schema: Arc, + parquet_metadata: Arc, + union_file: UnionFile, +} + +impl SpillReader { + pub async fn restore(&mut self, ordinals: Vec) -> Result> { + if ordinals.is_empty() { + return Ok(Vec::new()); + } + + self.spiller + .load_row_groups( + self.union_file.clone(), + self.parquet_metadata.clone(), + &self.schema, + ordinals, + ) + .await + } +} + impl SpillAdapter for Arc { fn add_spill_file(&self, location: Location, layout: Layout, size: usize) { - self.as_ref().add_spill_file(location, layout, size); + if matches!(location, Location::Remote(_)) { + self.incr_spill_progress(1, size); + } + self.as_ref().add_spill_file(location, layout); } fn get_spill_layout(&self, location: &Location) -> Option { @@ -372,7 +552,10 @@ pub struct SortAdapter { impl SpillAdapter for SortAdapter { fn add_spill_file(&self, location: Location, layout: Layout, size: usize) { match location { - Location::Remote(_) => self.ctx.as_ref().add_spill_file(location, layout, size), + Location::Remote(_) => { + self.ctx.as_ref().incr_spill_progress(1, size); + self.ctx.as_ref().add_spill_file(location, layout); + } Location::Local(temp_path) => { self.local_files.write().unwrap().insert(temp_path, layout); } @@ -455,9 +638,7 @@ impl LiteSpiller { Location::Local(_) => None, }) .collect(); - let op = self.0.local_operator.as_ref().unwrap_or(&self.0.operator); - - op.delete_iter(files).await?; + self.0.operator.delete_iter(files).await?; Ok(()) } } diff --git a/src/query/service/src/spillers/async_buffer.rs b/src/query/service/src/spillers/async_buffer.rs index dc8f8524f33c3..e5036c1e4ca31 100644 --- a/src/query/service/src/spillers/async_buffer.rs +++ b/src/query/service/src/spillers/async_buffer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::VecDeque; +use std::io; use std::io::Write; use std::sync::Arc; use std::sync::Condvar; @@ -86,7 +87,6 @@ pub struct BufferPool { } impl BufferPool { - #[allow(dead_code)] pub fn create(executor: Arc, memory: usize, workers: usize) -> Arc { let (working_tx, working_rx) = async_channel::unbounded(); let (buffers_tx, buffers_rx) = async_channel::unbounded(); @@ -119,6 +119,7 @@ impl BufferPool { available_write_buffers_tx: buffers_tx, }) } + pub fn try_alloc_buffer(&self) -> Option { self.available_write_buffers.try_recv().ok() } @@ -153,7 +154,6 @@ impl BufferPool { } } - #[allow(dead_code)] pub fn buffer_write(self: &Arc, writer: Writer) -> BufferWriter { BufferWriter::new(writer, self.clone()) } @@ -231,7 +231,6 @@ impl BufferWriter { Ok(()) } - #[allow(dead_code)] pub fn close(mut self) -> std::io::Result { self.flush()?; @@ -264,8 +263,8 @@ impl BufferWriter { } } -impl std::io::Write for BufferWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { +impl io::Write for BufferWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { if buf.is_empty() { return Ok(0); } @@ -315,7 +314,7 @@ impl std::io::Write for BufferWriter { Ok(written) } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { if matches!(&self.current_bytes, Some(current_bytes) if !current_bytes.is_empty()) { if let Some(current_bytes) = self.current_bytes.take() { self.pending_buffers.push_back(current_bytes.freeze()); diff --git a/src/query/service/src/spillers/inner.rs b/src/query/service/src/spillers/inner.rs index 2edd9e967b912..4fd22b727be09 100644 --- a/src/query/service/src/spillers/inner.rs +++ b/src/query/service/src/spillers/inner.rs @@ -100,18 +100,18 @@ pub trait SpillAdapter: Send + Sync + 'static { /// 3. Serialization and deserialization input data /// 4. Interact with the underlying storage engine to write and read spilled data #[derive(Clone)] -pub struct SpillerInner { +pub struct SpillerInner { pub(super) adapter: A, pub(super) operator: Operator, location_prefix: String, - temp_dir: Option>, + pub(super) temp_dir: Option>, // for dio disabled pub(super) local_operator: Option, - use_parquet: bool, + pub(super) use_parquet: bool, _spiller_type: SpillerType, } -impl SpillerInner { +impl SpillerInner { pub fn new(adapter: A, operator: Operator, config: SpillerConfig) -> Result { let SpillerConfig { location_prefix, @@ -139,17 +139,6 @@ impl SpillerInner { }) } - /// Spill some [`DataBlock`] to storage. These blocks will be concat into one. - #[fastrace::trace(name = "Spiller::spill")] - pub async fn spill(&self, data_block: Vec) -> Result { - let (location, layout, data_size) = self.spill_unmanage(data_block).await?; - - // Record columns layout for spilled data. - self.adapter - .add_spill_file(location.clone(), layout, data_size); - Ok(location) - } - async fn spill_unmanage( &self, data_block: Vec, @@ -170,7 +159,7 @@ impl SpillerInner { let location = self.write_encodes(data_size, buf).await?; // Record statistics. - record_write_profile(&location, &instant, data_size); + record_write_profile(location.is_local(), &instant, data_size); let layout = columns_layout.pop().unwrap(); Ok((location, layout, data_size)) } @@ -179,13 +168,6 @@ impl SpillerInner { format!("{}/{}", self.location_prefix, GlobalUniqName::unique()) } - /// Read a certain file to a [`DataBlock`]. - #[fastrace::trace(name = "Spiller::read_spilled_file")] - pub async fn read_spilled_file(&self, location: &Location) -> Result { - let layout = self.adapter.get_spill_layout(location).unwrap(); - self.read_unmanage_spilled_file(location, &layout).await - } - async fn read_unmanage_spilled_file( &self, location: &Location, @@ -219,17 +201,22 @@ impl SpillerInner { Location::Remote(loc) => self.operator.read(loc).await?, }; - record_read_profile(location, &instant, data.len()); + record_read_profile(location.is_local(), &instant, data.len()); deserialize_block(columns_layout, data) } - pub(super) async fn write_encodes(&self, size: usize, buf: DmaWriteBuf) -> Result { + pub(super) fn new_location(&self, size: usize) -> Result { let location = match &self.temp_dir { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), } .unwrap_or(Location::Remote(self.create_unique_location())); + Ok(location) + } + + pub(super) async fn write_encodes(&self, size: usize, buf: DmaWriteBuf) -> Result { + let location = self.new_location(size)?; let mut writer = match (&location, &self.local_operator) { (Location::Local(path), None) => { @@ -268,47 +255,58 @@ impl SpillerInner { } } -pub(super) fn record_write_profile(location: &Location, start: &Instant, write_bytes: usize) { - match location { - Location::Remote(_) => { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteBytes, - write_bytes, - ); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillWriteTime, - start.elapsed().as_millis() as usize, - ); - } - Location::Local(_) => { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillWriteTime, - start.elapsed().as_millis() as usize, - ); - } +impl SpillerInner { + /// Spill some [`DataBlock`] to storage. These blocks will be concat into one. + #[fastrace::trace(name = "Spiller::spill")] + pub async fn spill(&self, data_block: Vec) -> Result { + let (location, layout, data_size) = self.spill_unmanage(data_block).await?; + + // Record columns layout for spilled data. + self.adapter + .add_spill_file(location.clone(), layout, data_size); + Ok(location) + } + + /// Read a certain file to a [`DataBlock`]. + #[fastrace::trace(name = "Spiller::read_spilled_file")] + pub async fn read_spilled_file(&self, location: &Location) -> Result { + let layout = self.adapter.get_spill_layout(location).unwrap(); + self.read_unmanage_spilled_file(location, &layout).await } } -pub(super) fn record_read_profile(location: &Location, start: &Instant, read_bytes: usize) { - match location { - Location::Remote(_) => { - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::RemoteSpillReadTime, - start.elapsed().as_millis() as usize, - ); - } - Location::Local(_) => { - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); - Profile::record_usize_profile( - ProfileStatisticsName::LocalSpillReadTime, - start.elapsed().as_millis() as usize, - ); - } +pub(super) fn record_write_profile(is_local: bool, start: &Instant, write_bytes: usize) { + if !is_local { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + start.elapsed().as_millis() as usize, + ); + } else { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillWriteTime, + start.elapsed().as_millis() as usize, + ); + } +} + +pub(super) fn record_read_profile(is_local: bool, start: &Instant, read_bytes: usize) { + if is_local { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + start.elapsed().as_millis() as usize, + ); + } else { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillReadTime, + start.elapsed().as_millis() as usize, + ); } } diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 5f624bd9b54d0..f538c3a25eb44 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -21,8 +21,10 @@ mod partition_buffer; mod serialize; #[cfg(test)] mod test_memory; +mod union_file; pub use adapter::*; +pub use async_buffer::BufferPool; pub use block_writer::*; pub use databend_common_pipeline_transforms::traits::Location; pub use inner::*; diff --git a/src/query/service/src/spillers/partition_buffer.rs b/src/query/service/src/spillers/partition_buffer.rs index 2ff44869d64c9..d3c0bc7705339 100644 --- a/src/query/service/src/spillers/partition_buffer.rs +++ b/src/query/service/src/spillers/partition_buffer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_exception::Result; use databend_common_expression::DataBlock; pub enum PartitionBufferFetchOption { @@ -52,8 +51,8 @@ impl PartitionBuffer { &mut self, partition_id: usize, option: &PartitionBufferFetchOption, - ) -> Result>> { - let data_blocks = match option { + ) -> Option> { + match option { PartitionBufferFetchOption::ReadPartition => { if !self.partition_data[partition_id].is_empty() { Some(self.partition_data[partition_id].clone()) @@ -71,8 +70,7 @@ impl PartitionBuffer { None } } - }; - Ok(data_blocks) + } } pub fn memory_size(&self) -> usize { diff --git a/src/query/service/src/spillers/union_file.rs b/src/query/service/src/spillers/union_file.rs new file mode 100644 index 0000000000000..546a1a679ee7f --- /dev/null +++ b/src/query/service/src/spillers/union_file.rs @@ -0,0 +1,729 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future; +use std::io; +use std::io::Cursor; +use std::io::Write; +use std::sync::Arc; + +use arrow_schema::Schema; +use databend_common_base::base::dma_buffer_to_bytes; +use databend_common_base::base::AsyncDmaFile; +use databend_common_base::base::DmaWriteBuf; +use databend_common_base::base::SyncDmaFile; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::Value; +use databend_storages_common_cache::ParquetMetaData; +use databend_storages_common_cache::TempDir; +use databend_storages_common_cache::TempPath; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use opendal::Reader; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; +use parquet::arrow::arrow_writer::compute_leaves; +use parquet::arrow::arrow_writer::get_column_writers; +use parquet::arrow::arrow_writer::ArrowColumnWriter; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::async_reader::ParquetRecordBatchStream; +use parquet::arrow::ArrowSchemaConverter; +use parquet::errors; +use parquet::file::metadata::RowGroupMetaDataPtr; +use parquet::file::properties::WriterProperties; +use parquet::file::properties::WriterPropertiesPtr; +use parquet::file::writer::SerializedFileWriter; +use parquet::file::writer::SerializedRowGroupWriter; +use parquet::schema::types::SchemaDescriptor; + +use super::async_buffer::BufferPool; +use super::async_buffer::BufferWriter; +use super::SpillerInner; + +pub struct RowGroupEncoder { + schema: Arc, + props: WriterPropertiesPtr, + writers: Vec, +} + +impl RowGroupEncoder { + fn new(props: &WriterPropertiesPtr, schema: Arc, parquet: &SchemaDescriptor) -> Self { + let writers = get_column_writers(parquet, props, &schema).unwrap(); + Self { + schema, + props: props.clone(), + writers, + } + } + + pub fn add(&mut self, block: DataBlock) -> errors::Result<()> { + let columns = block.take_columns(); + let mut writer_iter = self.writers.iter_mut(); + for (field, entry) in self.schema.fields().iter().zip(columns) { + let array = (&entry.to_column()).into(); + for col in compute_leaves(field, &array).unwrap() { + writer_iter.next().unwrap().write(&col)?; + } + } + Ok(()) + } + + fn close( + self, + writer: &mut SerializedRowGroupWriter<'_, W>, + ) -> errors::Result<()> { + for w in self.writers { + w.close()?.append_to_row_group(writer)? + } + Ok(()) + } + + pub fn memory_size(&self) -> usize { + self.writers.iter().map(|w| w.memory_size()).sum() + } + + pub fn into_block(self) -> Result { + let RowGroupEncoder { + schema, + props, + writers, + } = self; + + let data_schema = DataSchema::try_from(schema.as_ref())?; + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema)?; + + let mut file_writer = SerializedFileWriter::new( + // todo: find a nocopy way + Cursor::new(Vec::new()), + parquet_schema.root_schema_ptr(), + props.clone(), + )?; + + let mut row_group_writer = file_writer.next_row_group()?; + for writer in writers { + writer.close()?.append_to_row_group(&mut row_group_writer)?; + } + row_group_writer.close()?; + + let cursor = file_writer.into_inner()?; + let parquet_bytes = bytes::Bytes::from(cursor.into_inner()); + + let reader = ParquetRecordBatchReader::try_new(parquet_bytes, usize::MAX)?; + let blocks = reader + .map(|batch| Ok(DataBlock::from_record_batch(&data_schema, &batch?)?.0)) + .collect::>>()?; + + if blocks.is_empty() { + return Ok(DataBlock::empty_with_schema(Arc::new(data_schema))); + } + + let block = if blocks.len() == 1 { + blocks.into_iter().next().unwrap() + } else { + DataBlock::concat(&blocks)? + }; + + Ok(block) + } +} + +pub struct FileWriter { + schema: Arc, + row_groups: Vec, + writer: SerializedFileWriter, +} + +impl FileWriter { + fn new(props: Arc, data_schema: &DataSchema, w: W) -> errors::Result { + let schema = Arc::new(Schema::from(data_schema)); + + let parquet = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema)?; + + let writer = SerializedFileWriter::new(w, parquet.root_schema_ptr(), props.clone())?; + Ok(Self { + schema, + writer, + row_groups: vec![], + }) + } + + pub(super) fn new_row_group(&self) -> RowGroupEncoder { + RowGroupEncoder::new( + self.writer.properties(), + self.schema.clone(), + self.writer.schema_descr(), + ) + } + + pub(super) fn flush_row_group( + &mut self, + row_group: RowGroupEncoder, + ) -> errors::Result { + let mut row_group_writer = self.writer.next_row_group()?; + row_group.close(&mut row_group_writer)?; + let meta = row_group_writer.close()?; + self.row_groups.push(meta.clone()); + Ok(meta) + } + + pub fn spill(&mut self, blocks: Vec) -> Result { + let mut row_group = self.new_row_group(); + for block in blocks { + row_group.add(block)?; + } + + Ok(self.flush_row_group(row_group)?) + } +} + +impl FileWriter { + pub(super) fn finish(mut self) -> errors::Result<(ParquetMetaData, UnionFile)> { + let file_metadata = self.writer.finish()?; + let tp = self.writer.schema_descr().root_schema_ptr(); + let schema_descr = Arc::new(SchemaDescriptor::new(tp)); + + let metadata = parquet::file::metadata::FileMetaData::new( + file_metadata.version, + file_metadata.num_rows, + file_metadata.created_by.clone(), + file_metadata.key_value_metadata.clone(), + schema_descr, + None, + ); + let file = self.writer.inner_mut().finish()?; + let row_groups = std::mem::take(&mut self.row_groups); + drop(self); + let row_groups = row_groups.into_iter().map(Arc::unwrap_or_clone).collect(); + Ok((ParquetMetaData::new(metadata, row_groups), file)) + } + + pub fn has_opening_local(&self) -> bool { + self.writer.inner().has_opening_local() + } +} + +struct LocalDst { + dir: Arc, + path: TempPath, + file: Option, + buf: Option, +} + +impl LocalDst { + fn close(&mut self) -> io::Result { + let file = self.file.take().unwrap(); + let mut dma = self.buf.take().unwrap(); + let file_size = dma.flush_and_close(file)?; + + self.path.set_size(file_size).unwrap(); + Ok(file_size) + } +} + +pub struct UnionFileWriter { + local: Option, + remote: String, + remote_writer: Option, + remote_offset: u64, +} + +impl UnionFileWriter { + fn new( + dir: Arc, + path: TempPath, + file: SyncDmaFile, + buf: DmaWriteBuf, + remote: String, + remote_writer: BufferWriter, + ) -> Self { + UnionFileWriter { + local: Some(LocalDst { + dir, + path, + file: Some(file), + buf: Some(buf), + }), + remote, + remote_writer: Some(remote_writer), + remote_offset: 0, + } + } + + fn remote_only(remote: String, remote_writer: BufferWriter) -> Self { + UnionFileWriter { + local: None, + remote, + remote_writer: Some(remote_writer), + remote_offset: 0, + } + } + + fn finish(&mut self) -> io::Result { + let remote_size = self.remote_writer.take().unwrap().close()?.content_length(); + match self.local.take() { + Some( + mut local @ LocalDst { + file: Some(_), + buf: Some(_), + .. + }, + ) => { + let dma = local.buf.as_mut().unwrap(); + + let file = local.file.take().unwrap(); + let file_size = dma.flush_and_close(file)?; + local.path.set_size(file_size).unwrap(); + + Ok(UnionFile { + local_path: Some(local.path), + remote_path: std::mem::take(&mut self.remote), + remote_offset: None, + remote_size, + }) + } + Some(LocalDst { path, .. }) => Ok(UnionFile { + local_path: Some(path), + remote_path: std::mem::take(&mut self.remote), + remote_offset: Some(self.remote_offset), + remote_size, + }), + None => Ok(UnionFile { + local_path: None, + remote_path: std::mem::take(&mut self.remote), + remote_offset: Some(0), + remote_size, + }), + } + } + + pub fn has_opening_local(&self) -> bool { + self.local + .as_ref() + .map(|local| local.file.is_some()) + .unwrap_or(false) + } +} + +impl io::Write for UnionFileWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + if let Some( + local @ LocalDst { + file: Some(_), + buf: Some(_), + .. + }, + ) = &mut self.local + { + let n = buf.len(); + let dma = local.buf.as_mut().unwrap(); + if dma.fast_write(buf) { + return Ok(n); + } + + if local.dir.grow_size(&mut local.path, buf.len(), false)? { + dma.write(buf)?; + let file = local.file.as_mut().unwrap(); + dma.flush_if_full(file)?; + local.path.set_size(file.length()).unwrap(); + return Ok(n); + } + + self.remote_offset = local.close()? as _; + }; + + self.remote_writer.as_mut().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + if let Some(LocalDst { + file: Some(file), + buf: Some(dma), + .. + }) = &mut self.local + { + return dma.flush(file); + } + + self.remote_writer.as_mut().unwrap().flush() + } +} + +#[derive(Debug, Clone)] +pub struct UnionFile { + pub local_path: Option, + pub remote_path: String, + pub remote_offset: Option, + pub remote_size: u64, +} + +pub(super) struct FileReader { + meta: Arc, + local: Option<(TempPath, AsyncDmaFile)>, + remote_reader: Reader, + remote_offset: Option, +} + +impl AsyncFileReader for FileReader { + fn get_bytes( + &mut self, + range: std::ops::Range, + ) -> BoxFuture<'_, errors::Result> { + async move { + let local_bytes = if let Some((_, file)) = &mut self.local { + let local_range = self + .remote_offset + .map(|offset| { + if range.end <= offset { + return range.clone(); + } + if range.start < offset { + range.start..offset + } else { + offset..offset + } + }) + .unwrap_or(range.clone()); + + let (dma_buf, rt_range) = file.read_range(local_range.clone()).await?; + let bytes = dma_buffer_to_bytes(dma_buf).slice(rt_range); + if local_range == range { + return Ok(bytes); + } + Some(bytes) + } else { + None + }; + + let remote_range = self + .remote_offset + .map(|offset| (range.start - offset)..(range.end - offset)) + .unwrap_or(range); + + let remote_bytes = self + .remote_reader + .read(remote_range) + .await + .map_err(|err| errors::ParquetError::External(Box::new(err)))?; + + if local_bytes.is_some() { + Ok( + opendal::Buffer::from_iter(local_bytes.into_iter().chain(remote_bytes)) + .to_bytes(), + ) + } else { + Ok(remote_bytes.to_bytes()) + } + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + _options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, errors::Result>> { + future::ready(Ok(self.meta.clone())).boxed() + } +} + +impl SpillerInner { + pub(super) async fn new_file_writer( + &self, + schema: &DataSchema, + pool: &Arc, + chunk: usize, + ) -> Result> { + let op = &self.operator; + + let remote_location = self.create_unique_location(); + let remote_writer = op.writer(&remote_location).await?; + let remote = pool.buffer_write(remote_writer); + + let union = if let Some(disk) = &self.temp_dir { + if let Some(path) = disk.new_file_with_size(0)? { + let file = SyncDmaFile::create(&path, true)?; + let align = disk.block_alignment(); + let buf = DmaWriteBuf::new(align, chunk); + UnionFileWriter::new(disk.clone(), path, file, buf, remote_location, remote) + } else { + UnionFileWriter::remote_only(remote_location, remote) + } + } else { + UnionFileWriter::remote_only(remote_location, remote) + }; + + let props = WriterProperties::default().into(); + Ok(FileWriter::new(props, schema, union)?) + } + + pub(super) async fn load_row_groups( + &self, + UnionFile { + local_path, + remote_path, + remote_offset, + .. + }: UnionFile, + meta: Arc, + schema: &DataSchema, + row_groups: Vec, + ) -> Result> { + let op = &self.operator; + + let input = FileReader { + meta, + local: if let Some(path) = local_path { + let alignment = Some(self.temp_dir.as_ref().unwrap().block_alignment()); + let file = AsyncDmaFile::open(&path, true, alignment).await?; + Some((path, file)) + } else { + None + }, + remote_offset, + remote_reader: op.reader(&remote_path).await?, + }; + + let builder = ArrowReaderBuilder::new(input).await?; + let stream = builder + .with_row_groups(row_groups) + .with_batch_size(usize::MAX) + .build()?; + + load_blocks_from_stream(schema, stream).await + } +} + +async fn load_blocks_from_stream( + schema: &DataSchema, + mut stream: ParquetRecordBatchStream, +) -> Result> +where + T: AsyncFileReader + Unpin + Send + 'static, +{ + let mut blocks = Vec::new(); + while let Some(reader) = stream.next_row_group().await? { + for record in reader { + let record = record?; + let num_rows = record.num_rows(); + let mut columns = Vec::with_capacity(record.num_columns()); + for (array, field) in record.columns().iter().zip(schema.fields()) { + let data_type = field.data_type(); + columns.push(BlockEntry::new( + Value::from_arrow_rs(array.clone(), data_type)?, + || (data_type.clone(), num_rows), + )) + } + let block = DataBlock::new(columns, num_rows); + blocks.push(block); + } + } + + Ok(blocks) +} + +#[cfg(test)] +mod tests { + use databend_common_base::base::GlobalUniqName; + use databend_common_base::runtime::GlobalIORuntime; + use databend_common_catalog::table_context::TableContext; + use databend_common_config::SpillConfig; + use databend_common_exception::Result; + use databend_common_expression::types::array::ArrayColumnBuilder; + use databend_common_expression::types::number::Int32Type; + use databend_common_expression::types::ArgType; + use databend_common_expression::types::DataType; + use databend_common_expression::types::StringType; + use databend_common_expression::Column; + use databend_common_expression::FromData; + use databend_common_storage::DataOperator; + use databend_storages_common_cache::TempDirManager; + use parquet::file::properties::WriterProperties; + use parquet::file::properties::WriterPropertiesPtr; + use tempfile::TempDir; + + use super::*; + use crate::spillers::async_buffer::BufferPool; + use crate::test_kits::ConfigBuilder; + use crate::test_kits::TestFixture; + + #[tokio::test(flavor = "multi_thread")] + async fn test_union_file_writer_without_local() -> Result<()> { + let spill_dir = TempDir::new().expect("create spill temp dir"); + let mut config = ConfigBuilder::create().off_log().build(); + config.spill = SpillConfig::new_for_test( + spill_dir.path().to_string_lossy().into_owned(), + 0.01, + 1 << 30, + ); + + let fixture = TestFixture::setup_with_config(&config).await?; + let ctx = fixture.new_query_ctx().await?; + + let executor = GlobalIORuntime::instance(); + let memory = 1024 * 1024 * 100; + + let pool = BufferPool::create(executor, memory, 3); + let op = DataOperator::instance().operator(); + + let remote_path = format!( + "{}/{}", + ctx.query_id_spill_prefix(), + GlobalUniqName::unique() + ); + let writer = op.writer(&remote_path).await?; + let remote = pool.buffer_write(writer); + + let mut writer = UnionFileWriter::remote_only(remote_path.clone(), remote); + let mut expected = b"hello union writer".to_vec(); + writer.write_all(&expected)?; + let extra = b" write bytes"; + writer.write_all(extra)?; + expected.extend_from_slice(extra); + writer.flush()?; + + let file = writer.finish()?; + assert!(file.local_path.is_none()); + assert_eq!(file.remote_offset, Some(0)); + assert_eq!(file.remote_size, expected.len() as u64); + + let reader = op.reader(&file.remote_path).await?; + let buffer = reader.read(0..file.remote_size).await?; + assert_eq!(buffer.to_vec(), expected); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_union_file_writer_with_local() -> Result<()> { + let spill_dir = TempDir::new().expect("create spill temp dir"); + let mut config = ConfigBuilder::create().off_log().build(); + config.spill = SpillConfig::new_for_test( + spill_dir.path().to_string_lossy().into_owned(), + 0.01, + 1 << 30, + ); + + let fixture = TestFixture::setup_with_config(&config).await?; + let ctx = fixture.new_query_ctx().await?; + + let executor = GlobalIORuntime::instance(); + let memory = 1024 * 1024 * 100; + + let pool = BufferPool::create(executor, memory, 3); + let op = DataOperator::instance().operator(); + + let remote_path = format!( + "{}/{}", + ctx.query_id_spill_prefix(), + GlobalUniqName::unique() + ); + let writer = op.writer(&remote_path).await?; + let remote = pool.buffer_write(writer); + + let query_id = ctx.get_id(); + let temp_dir = TempDirManager::instance() + .get_disk_spill_dir(memory, &query_id) + .expect("local spill directory should be available"); + let temp_path = temp_dir + .new_file_with_size(0)? + .expect("spill temp file should be allocated"); + + let dio = false; + let file = SyncDmaFile::create(&temp_path, dio)?; + let buf = DmaWriteBuf::new(temp_dir.block_alignment(), 4 * 1024 * 1024); + + let mut union_writer = UnionFileWriter::new( + temp_dir.clone(), + temp_path, + file, + buf, + remote_path.clone(), + remote, + ); + + assert!(union_writer.has_opening_local()); + + let mut expected = b"bytes on disk".to_vec(); + union_writer.write_all(&expected)?; + let extra = b" via union writer"; + union_writer.write_all(extra)?; + expected.extend_from_slice(extra); + union_writer.flush()?; + + let file = union_writer.finish()?; + + let local_path = file.local_path.clone().expect("local path should exist"); + assert!(file.remote_offset.is_none()); + assert_eq!(file.remote_size, 0); + + let local_bytes = std::fs::read(local_path.as_ref())?; + assert_eq!(local_bytes, expected); + + Ok(()) + } + + #[test] + fn test_row_group_writer_restores() -> Result<()> { + let mut array_builder = ArrayColumnBuilder::::with_capacity(3, 3, &[]); + { + let mut arrays = array_builder.as_mut(); + arrays.put_item(1); + arrays.put_item(2); + arrays.commit_row(); + + arrays.put_item(3); + arrays.commit_row(); + + arrays.push_default(); + } + let array_column = Column::Array(Box::new( + array_builder + .build() + .upcast(&DataType::Array(Int32Type::data_type().into())), + )); + + let block = DataBlock::new_from_columns(vec![ + StringType::from_data(vec!["alpha", "beta", "gamma"]), + array_column, + StringType::from_opt_data(vec![Some("nullable"), None, Some("value")]), + ]); + + let data_schema = block.infer_schema(); + + let props: WriterPropertiesPtr = WriterProperties::default().into(); + let file_writer = FileWriter::new(props.clone(), &data_schema, Vec::::new())?; + let mut row_group = file_writer.new_row_group(); + + row_group.add(block.clone())?; + row_group.add(block.clone())?; + let restored = row_group.into_block()?; + + for (a, b) in DataBlock::concat(&[block.clone(), block])? + .columns() + .iter() + .zip(restored.columns()) + { + assert_eq!(a, b); + } + + Ok(()) + } +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1fba150428237..749749ce80ac3 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -228,6 +228,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=500)), }), + ("enable_backpressure_spiller", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Use new backpressure spiller.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("max_spill_io_requests", DefaultSettingValue { value: UserSettingValue::UInt64(default_max_spill_io_requests), desc: "Sets the maximum number of concurrent spill I/O requests.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 3ba9d9b0fa2d0..087b24b7196f4 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -897,6 +897,10 @@ impl Settings { self.try_get_u64("dynamic_sample_time_budget_ms") } + pub fn get_enable_backpressure_spiller(&self) -> Result { + Ok(self.try_get_u64("enable_backpressure_spiller")? != 0) + } + pub fn get_max_spill_io_requests(&self) -> Result { self.try_get_u64("max_spill_io_requests") } diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 979751e1f6229..d978777843232 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -15,6 +15,7 @@ #![feature(write_all_vectored)] #![feature(associated_type_defaults)] #![feature(assert_matches)] +#![feature(io_const_error)] mod cache; mod cache_items; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index c345595e099ba..d69dbcbfa5c07 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fs; use std::hash::Hash; +use std::io; use std::io::ErrorKind; use std::ops::Deref; use std::ops::Drop; @@ -182,12 +183,11 @@ impl TempDirManager { self.alignment } - fn insufficient_disk(&self, size: u64) -> Result { - let stat = statvfs(self.root.as_ref().unwrap().as_ref()) - .map_err(|e| ErrorCode::Internal(e.to_string()))?; + fn insufficient_disk(&self, grow: u64) -> io::Result { + let stat = statvfs(self.root.as_ref().unwrap().as_ref())?; debug_assert_eq!(stat.f_frsize, self.alignment.as_usize() as u64); - let n = self.alignment.align_up_count(size as usize) as u64; + let n = self.alignment.align_up_count(grow as usize) as u64; Ok(stat.f_bavail < self.reserved + n) } } @@ -215,9 +215,11 @@ impl TempDir { pub fn new_file_with_size(&self, size: usize) -> Result> { let path = self.path.join(GlobalUniqName::unique()).into_boxed_path(); - if self.dir_info.limit < *self.dir_info.size.lock().unwrap() + size - || self.manager.global_limit < self.manager.group.lock().unwrap().size() + size - || self.manager.insufficient_disk(size as u64)? + if self.manager.global_limit < self.manager.group.lock().unwrap().size() + size + || self + .manager + .insufficient_disk(size as u64) + .map_err(|e| ErrorCode::Internal(format!("insufficient_disk fail {e}")))? { return Ok(None); } @@ -242,6 +244,38 @@ impl TempDir { })))) } + pub fn grow_size( + &self, + path: &mut TempPath, + grow: usize, + check_disk: bool, + ) -> io::Result { + let Some(path) = Arc::get_mut(&mut path.0) else { + return Err(io::const_error!( + io::ErrorKind::InvalidInput, + "can't set size after share" + )); + }; + + if self.manager.global_limit < self.manager.group.lock().unwrap().size() + grow { + return Ok(false); + } + + if check_disk && self.manager.insufficient_disk(grow as u64)? { + return Ok(false); + } + + let mut dir_size = self.dir_info.size.lock().unwrap(); + if self.dir_info.limit < *dir_size + grow { + return Ok(false); + } + + *dir_size += grow; + path.size += grow; + + Ok(true) + } + fn init_dir(&self) -> Result<()> { let mut rt = Ok(()); self.dir_info.inited.call_once(|| { @@ -261,6 +295,10 @@ impl TempDir { pub fn path(&self) -> &Path { &self.path } + + pub fn insufficient_disk(&self, grow: usize) -> io::Result { + self.manager.insufficient_disk(grow as _) + } } struct DirInfo {