Skip to content

Commit c118348

Browse files
authored
Array Operator API (#5022)
# Open-ish Questions These aren't really open questions, more just documenting our reasoning and trade-offs for arriving at the current API. We may well re-visit some of these assumptions. ## 1. Mutable output vectors These are a little annoying to use, and in many places we end up allocating zero-capacity vectors to pass in, that ultimately never get used. This is because we allow implementations to return their own data zero-copy if they can. This API allows us to execute chunks of a ChunkedArray directly into contiguous output memory. I cannot think of another use-case besides this. The question is how much do we hit this, and how much do we care? Our current scanning logic currently chooses splits every time a column is partitioned, meaning we only ever see chunked arrays with at most one chunk (and hence we don't benefit from the optimization). If we were to remove the output argument, we can also make `BatchKernelRef == BoxFuture<VortexResult<Vector>>`. Currently it is an async_trait, meaning we allocate to create the `Box<dyn KernelRef>`, then we allocate again to create the boxed future returned by the async trait. So we could save an allocation and pointer de-reference. ### Decision * It's not worth the complexity or runtime cost of heap-allocating mutable output vectors just for them to potentially not be used. * This also allows batch kernels to be pure `BoxFuture`, saving another heap allocation from what was formerly an async_trait. ## 2. To Arrow or not to Arrow Instead of: ```rust /// Trait for batch execution kernels. #[async_trait] pub trait BatchKernel: Send { /// Execute the batch kernel and produce a canonicalized vector. /// /// If the kernel can return data zero-copy from its own state, then it should prefer to do so. /// If the kernel is producing _new_ data, it should write this data into the provided `out` /// vector. async fn execute(self: Box<Self>, out: VectorMut) -> VortexResult<Vector>; } ``` We would have ```rust /// Trait for batch execution kernels. #[async_trait] pub trait BatchKernel: Send { async fn execute(self: Box<Self>) -> VortexResult<ArrowArrayRef>; } ``` Note that we also lost the mutable output argument. That's because Arrow doesn't currently have mutable arrays. We could PR them if we really care about the ChunkedArray optimization, but as we say above, perhaps it doesn't matter. This Arrow-focussed API would let us delegate a lot of array implementations almost directly to arrow-rs. Admittedly, the rust crate has some bugs we've been running into and fixing them upstream takes time, particularly if we want to stay in sync with DataFusion's arrow version. There are also many missing kernels, e.g. most compute functions of ListView arrays. That said, it would make writing our plugin infrastructure significantly easier and more efficient if we could simply use Arrow for the vector part of our ABI. We would also see community benefits as we upstream more compute into arrow-rs. This would also help us migrate (as we believe we inevitably will) to have physical types in Vortex. This would mean we are not bound by the Vortex canonical representation, and arrays could emit VarBin _or_ VarBinView depending on the desired output type (indicated using a PhysicalCastArray), and the actual input type found serialized on disk. It's easy to grok what Vortex is in this world. Simply scalar expressions over Arrow arrays. ### Decision - not to Arrow For now, we will continue to use Vortex Vectors. They will for now remain Arrow-compatible. Since vectors represent physical data, they actually have less metadata attached to them than arrow-rs arrays, for example structs do not carry field names. This also makes them much cheaper to convert directly to/from Arrow C ABI if we don't go via arrow-rs as we can avoid constructing the Arc'd tree of arrow arrays. We may one day decide we are so close to Arrow that we should unwind and unify with vectors. But for now, we benefit from having independent APIs. ## 3. To async or not to async This PR shows an async API for `BatchKernel`. This allows arrays to perform I/O during execution. Note that we will have an alternate API for pipelined compute over 1k elements at a time that is not async. Most scalar performance-sensitive kernels would implement this API in the future. In a non-async world, Vortex would struggle to represent expressions such as "map these strings through an LLM". We may have to introduce interesting shortcuts to evaluate these expressions within a scan (which _is_ an async API). Having native support for async expressions allows these to be naturally expressed and executed as part of a Vortex array evaluation, rather than delegating the evaluation to part of the scan, or to the query engine that is ultimately calling into Vortex. Async also allows us to implement short-circuiting of I/O. For example, only load the VarBin buffers that are referenced by a filtered varbin array, instead of eagerly loading all of them. One major downside is that our plugin ABI becomes quite a bit more complex. Foreign implementations would need to return an enum similar to the Rust Future API, and we'd need to poll across an FFI boundary. We could wrap up synchronous arrays as plugins, but there's no way to short-circuit. So we would need to evaluate all children eagerly up-front. ### Decision - to async The batch execution mode for a vortex array will be async, allowing us to perform I/O and spill-to-disk buffer management. However, we also support "pipelined" CPU execution which will be a push-based blocking DAG that most high performance arrays should prefer to implement. Eventually, arrays will have the choice to implement either of these. --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent bc47986 commit c118348

File tree

71 files changed

+827
-142
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+827
-142
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/alp/src/alp/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl VTable for ALPVTable {
3232
type ComputeVTable = NotSupported;
3333
type EncodeVTable = Self;
3434
type SerdeVTable = Self;
35-
type PipelineVTable = NotSupported;
35+
type OperatorVTable = NotSupported;
3636

3737
fn id(_encoding: &Self::Encoding) -> EncodingId {
3838
EncodingId::new_ref("vortex.alp")

encodings/alp/src/alp_rd/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl VTable for ALPRDVTable {
3535
type ComputeVTable = NotSupported;
3636
type EncodeVTable = Self;
3737
type SerdeVTable = Self;
38-
type PipelineVTable = NotSupported;
38+
type OperatorVTable = NotSupported;
3939

4040
fn id(_encoding: &Self::Encoding) -> EncodingId {
4141
EncodingId::new_ref("vortex.alprd")

encodings/bytebool/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl VTable for ByteBoolVTable {
3434
type ComputeVTable = NotSupported;
3535
type EncodeVTable = NotSupported;
3636
type SerdeVTable = Self;
37-
type PipelineVTable = NotSupported;
37+
type OperatorVTable = NotSupported;
3838

3939
fn id(_encoding: &Self::Encoding) -> EncodingId {
4040
EncodingId::new_ref("vortex.bytebool")

encodings/datetime-parts/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl VTable for DateTimePartsVTable {
2828
type ComputeVTable = NotSupported;
2929
type EncodeVTable = Self;
3030
type SerdeVTable = Self;
31-
type PipelineVTable = NotSupported;
31+
type OperatorVTable = NotSupported;
3232

3333
fn id(_encoding: &Self::Encoding) -> EncodingId {
3434
EncodingId::new_ref("vortex.datetimeparts")

encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl VTable for DecimalBytePartsVTable {
3535
type ComputeVTable = NotSupported;
3636
type EncodeVTable = NotSupported;
3737
type SerdeVTable = Self;
38-
type PipelineVTable = NotSupported;
38+
type OperatorVTable = NotSupported;
3939

4040
fn id(_encoding: &Self::Encoding) -> EncodingId {
4141
EncodingId::new_ref("vortex.decimal_byte_parts")

encodings/dict/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl VTable for DictVTable {
2828
type ComputeVTable = NotSupported;
2929
type EncodeVTable = Self;
3030
type SerdeVTable = Self;
31-
type PipelineVTable = NotSupported;
31+
type OperatorVTable = NotSupported;
3232

3333
fn id(_encoding: &Self::Encoding) -> EncodingId {
3434
EncodingId::new_ref("vortex.dict")

encodings/fastlanes/src/bitpacking/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl VTable for BitPackedVTable {
4545
type ComputeVTable = NotSupported;
4646
type EncodeVTable = Self;
4747
type SerdeVTable = Self;
48-
type PipelineVTable = Self;
48+
type OperatorVTable = Self;
4949

5050
fn id(_encoding: &Self::Encoding) -> EncodingId {
5151
EncodingId::new_ref("fastlanes.bitpacked")

encodings/fastlanes/src/bitpacking/operator/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ use vortex_array::operator::{
1313
LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef,
1414
};
1515
use vortex_array::pipeline::{BindContext, Kernel, PipelinedOperator, RowSelection};
16-
use vortex_array::vtable::PipelineVTable;
16+
use vortex_array::vtable::OperatorVTable;
1717
use vortex_buffer::Buffer;
1818
use vortex_dtype::{DType, PhysicalPType, match_each_integer_ptype};
1919
use vortex_error::VortexResult;
2020

2121
use crate::operator::aligned_kernel::BitPackedKernel;
2222
use crate::{BitPackedArray, BitPackedVTable};
2323

24-
impl PipelineVTable<BitPackedVTable> for BitPackedVTable {
24+
impl OperatorVTable<BitPackedVTable> for BitPackedVTable {
2525
fn to_operator(array: &BitPackedArray) -> VortexResult<Option<OperatorRef>> {
2626
if array.dtype.is_nullable() {
2727
log::trace!("BitPackedVTable does not support nullable arrays");

encodings/fastlanes/src/delta/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl VTable for DeltaVTable {
4040
type ComputeVTable = NotSupported;
4141
type EncodeVTable = NotSupported;
4242
type SerdeVTable = Self;
43-
type PipelineVTable = NotSupported;
43+
type OperatorVTable = NotSupported;
4444

4545
fn id(_encoding: &Self::Encoding) -> EncodingId {
4646
EncodingId::new_ref("fastlanes.delta")

0 commit comments

Comments
 (0)