Skip to content

Commit de9f463

Browse files
feat[buffer]: use BufferHandle in more places (#5567)
Use BufferHandle in segment source Signed-off-by: Joe Isaacs <[email protected]>
1 parent 771e2f5 commit de9f463

File tree

8 files changed

+97
-13
lines changed

8 files changed

+97
-13
lines changed

vortex-array/src/serde.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,59 @@ impl ArrayParts {
415415
this.flatbuffer_loc = root._tab.loc();
416416
this
417417
}
418+
419+
/// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
420+
/// only the data buffers (without the flatbuffer suffix).
421+
///
422+
/// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
423+
/// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
424+
pub fn from_flatbuffer_and_segment(
425+
array_tree: ByteBuffer,
426+
segment: BufferHandle,
427+
) -> VortexResult<Self> {
428+
// TODO: this can also work with device buffers.
429+
let segment = segment.try_to_bytes()?;
430+
// We align each buffer individually, so we remove alignment requirements on the buffer.
431+
let segment = segment.aligned(Alignment::none());
432+
433+
let fb_buffer = FlatBuffer::align_from(array_tree);
434+
435+
// Parse the flatbuffer to extract buffer descriptors and root location.
436+
let (flatbuffer_loc, buffers) = {
437+
let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
438+
let fb_root = fb_array.root().vortex_expect("Array must have a root node");
439+
let flatbuffer_loc = fb_root._tab.loc();
440+
441+
let mut offset = 0;
442+
let buffers: Arc<[_]> = fb_array
443+
.buffers()
444+
.unwrap_or_default()
445+
.iter()
446+
.map(|fb_buf| {
447+
// Skip padding
448+
offset += fb_buf.padding() as usize;
449+
450+
let buffer_len = fb_buf.length() as usize;
451+
452+
// Extract a buffer and ensure it's aligned, copying if necessary
453+
let buffer = segment
454+
.slice(offset..(offset + buffer_len))
455+
.aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));
456+
457+
offset += buffer_len;
458+
BufferHandle::Buffer(buffer)
459+
})
460+
.collect();
461+
462+
(flatbuffer_loc, buffers)
463+
};
464+
465+
Ok(ArrayParts {
466+
flatbuffer: fb_buffer,
467+
flatbuffer_loc,
468+
buffers,
469+
})
470+
}
418471
}
419472

420473
struct ArrayPartsChildren<'a> {
@@ -484,3 +537,11 @@ impl TryFrom<ByteBuffer> for ArrayParts {
484537
})
485538
}
486539
}
540+
541+
impl TryFrom<BufferHandle> for ArrayParts {
542+
type Error = VortexError;
543+
544+
fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
545+
Self::try_from(value.try_to_bytes()?)
546+
}
547+
}

vortex-file/src/segments/source.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55

66
use futures::FutureExt;
77
use futures::TryFutureExt;
8+
use vortex_buffer::BufferHandle;
89
use vortex_error::VortexError;
910
use vortex_error::vortex_err;
1011
use vortex_io::VortexReadAt;
@@ -40,6 +41,7 @@ impl SegmentSource for FileSegmentSource {
4041
maybe_fut
4142
.ok_or_else(|| vortex_err!("Missing segment: {}", id))?
4243
.await
44+
.map(BufferHandle::Buffer)
4345
}
4446
.boxed()
4547
}

vortex-layout/src/layouts/flat/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ impl FlatLayout {
179179
pub fn array_ctx(&self) -> &ArrayContext {
180180
&self.ctx
181181
}
182+
183+
#[inline]
184+
pub fn array_tree(&self) -> Option<&ByteBuffer> {
185+
self.array_tree.as_ref()
186+
}
182187
}
183188

184189
#[derive(prost::Message)]

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,17 @@ impl FlatReader {
6464

6565
let ctx = self.layout.array_ctx().clone();
6666
let dtype = self.layout.dtype().clone();
67+
let array_tree = self.layout.array_tree().cloned();
6768
async move {
6869
let segment = segment_fut.await?;
69-
ArrayParts::try_from(segment)?
70-
.decode(&ctx, &dtype, row_count)
71-
.map_err(Arc::new)
70+
let parts = if let Some(array_tree) = array_tree {
71+
// Use the pre-stored flatbuffer from layout metadata combined with segment buffers.
72+
ArrayParts::from_flatbuffer_and_segment(array_tree, segment)?
73+
} else {
74+
// Parse the flatbuffer from the segment itself.
75+
ArrayParts::try_from(segment)?
76+
};
77+
parts.decode(&ctx, &dtype, row_count).map_err(Arc::new)
7278
}
7379
.boxed()
7480
.shared()

vortex-layout/src/segments/cache.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use moka::future::Cache;
99
use moka::future::CacheBuilder;
1010
use moka::policy::EvictionPolicy;
1111
use rustc_hash::FxBuildHasher;
12+
use vortex_buffer::BufferHandle;
1213
use vortex_buffer::ByteBuffer;
1314
use vortex_error::VortexExpect;
1415
use vortex_error::VortexResult;
@@ -129,10 +130,13 @@ impl SegmentSource for SegmentCacheSourceAdapter {
129130
async move {
130131
if let Ok(Some(segment)) = cache.get(id).await {
131132
log::debug!("Resolved segment {} from cache", id);
132-
return Ok(segment);
133+
return Ok(BufferHandle::Buffer(segment));
133134
}
134135
let result = delegate.await?;
135-
if let Err(e) = cache.put(id, result.clone()).await {
136+
// Cache only CPU buffers; device buffers are not cached.
137+
if let BufferHandle::Buffer(ref buffer) = result
138+
&& let Err(e) = cache.put(id, buffer.clone()).await
139+
{
136140
log::warn!("Failed to store segment {} in cache: {}", id, e);
137141
}
138142
Ok(result)

vortex-layout/src/segments/shared.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use futures::FutureExt;
77
use futures::TryFutureExt;
88
use futures::future::BoxFuture;
99
use futures::future::WeakShared;
10-
use vortex_buffer::ByteBuffer;
10+
use vortex_buffer::BufferHandle;
1111
use vortex_error::SharedVortexResult;
1212
use vortex_error::VortexError;
1313
use vortex_error::VortexExpect;
@@ -25,7 +25,7 @@ pub struct SharedSegmentSource<S> {
2525
in_flight: DashMap<SegmentId, WeakShared<SharedSegmentFuture>>,
2626
}
2727

28-
type SharedSegmentFuture = BoxFuture<'static, SharedVortexResult<ByteBuffer>>;
28+
type SharedSegmentFuture = BoxFuture<'static, SharedVortexResult<BufferHandle>>;
2929

3030
impl<S: SegmentSource> SharedSegmentSource<S> {
3131
/// Create a new `SharedSegmentSource` wrapping the provided inner source.
@@ -111,8 +111,8 @@ mod tests {
111111

112112
// Both futures should resolve to the same data
113113
let (result1, result2) = futures::join!(future1, future2);
114-
assert_eq!(result1.unwrap(), data);
115-
assert_eq!(result2.unwrap(), data);
114+
assert_eq!(*result1.unwrap().bytes(), data);
115+
assert_eq!(*result2.unwrap().bytes(), data);
116116

117117
// The inner source should have been called only once
118118
assert_eq!(source.request_count.load(Ordering::Relaxed), 1);
@@ -142,7 +142,7 @@ mod tests {
142142

143143
// A new request should still work correctly
144144
let result = shared_source.request(id).await;
145-
assert_eq!(result.unwrap(), data);
145+
assert_eq!(*result.unwrap().bytes(), data);
146146

147147
// Should have made 2 requests since the first was dropped before completion
148148
assert_eq!(source.request_count.load(Ordering::Relaxed), 2);

vortex-layout/src/segments/source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use futures::future::BoxFuture;
5-
use vortex_buffer::ByteBuffer;
5+
use vortex_buffer::BufferHandle;
66
use vortex_error::VortexResult;
77

88
use crate::segments::SegmentId;
99
/// Static future resolving to a segment byte buffer.
10-
pub type SegmentFuture = BoxFuture<'static, VortexResult<ByteBuffer>>;
10+
pub type SegmentFuture = BoxFuture<'static, VortexResult<BufferHandle>>;
1111

1212
/// A trait for providing segment data to a [`crate::LayoutReader`].
1313
pub trait SegmentSource: 'static + Send + Sync {

vortex-layout/src/segments/test.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use async_trait::async_trait;
77
use futures::FutureExt;
88
use parking_lot::Mutex;
9+
use vortex_buffer::BufferHandle;
910
use vortex_buffer::ByteBuffer;
1011
use vortex_buffer::ByteBufferMut;
1112
use vortex_error::VortexExpect;
@@ -27,7 +28,12 @@ pub struct TestSegments {
2728
impl SegmentSource for TestSegments {
2829
fn request(&self, id: SegmentId) -> SegmentFuture {
2930
let buffer = self.segments.lock().get(*id as usize).cloned();
30-
async move { buffer.ok_or_else(|| vortex_err!("Segment not found")) }.boxed()
31+
async move {
32+
buffer
33+
.map(BufferHandle::Buffer)
34+
.ok_or_else(|| vortex_err!("Segment not found"))
35+
}
36+
.boxed()
3137
}
3238
}
3339

0 commit comments

Comments
 (0)