Skip to content

Commit 6d178c9

Browse files
authored
Pull out message cache from layout reader (#1773)
* More closely aligns to the poll_read pattern where the thing being read is passed in. * Working towards moving to MessageIds instead of layout paths, so this is a temporary intermediate state.
1 parent 80b9684 commit 6d178c9

File tree

14 files changed

+286
-283
lines changed

14 files changed

+286
-283
lines changed

vortex-datafusion/src/persistent/format.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::any::Any;
2-
use std::sync::{Arc, RwLock};
2+
use std::sync::Arc;
33

44
use arrow_schema::{Schema, SchemaRef};
55
use async_trait::async_trait;
@@ -24,8 +24,7 @@ use vortex_array::Context;
2424
use vortex_error::VortexResult;
2525
use vortex_file::metadata::fetch_metadata;
2626
use vortex_file::{
27-
LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, Scan,
28-
VORTEX_FILE_EXTENSION,
27+
LayoutContext, LayoutDeserializer, LayoutMessageCache, LayoutPath, Scan, VORTEX_FILE_EXTENSION,
2928
};
3029
use vortex_io::{IoDispatcher, ObjectStoreReadAt};
3130

@@ -145,23 +144,23 @@ impl FileFormat for VortexFormat {
145144

146145
let layout_deserializer =
147146
LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into());
148-
let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::new()));
149-
let relative_message_cache = RelativeLayoutCache::new(layout_message_cache.clone());
150147

151148
let root_layout = layout_deserializer.read_layout(
149+
LayoutPath::default(),
152150
initial_read.fb_layout(),
153151
Scan::empty(),
154152
initial_read.lazy_dtype().into(),
155-
relative_message_cache,
156153
)?;
157154

158155
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
159156
let io = IoDispatcher::default();
160157
let mut stats = Statistics::new_unknown(&table_schema);
161158
stats.num_rows = Precision::Exact(row_count as usize);
162159

160+
let msgs = LayoutMessageCache::default();
161+
163162
if let Some(metadata_table) =
164-
fetch_metadata(os_read_at, io.into(), root_layout, layout_message_cache).await?
163+
fetch_metadata(os_read_at, io.into(), root_layout, msgs).await?
165164
{
166165
let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
167166
let mut total_size = 0_u64;

vortex-file/src/read/buffered.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ use std::collections::VecDeque;
22
use std::io;
33
use std::io::ErrorKind;
44
use std::pin::Pin;
5-
use std::sync::{Arc, RwLock};
5+
use std::sync::Arc;
66
use std::task::{Context, Poll, Waker};
77

88
use futures::Stream;
99
use futures_util::future::BoxFuture;
1010
use futures_util::{FutureExt, StreamExt};
1111
use vortex_array::ArrayData;
12-
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
12+
use vortex_error::{vortex_err, VortexExpect, VortexResult};
1313
use vortex_io::{Dispatch, IoDispatcher, VortexReadAt, VortexReadRanges};
1414

15-
use crate::{LayoutMessageCache, LayoutReader, Message, MessageLocator, PollRead, RowMask};
15+
use crate::{
16+
LayoutMessageCache, LayoutReader, Message, MessageCache, MessageLocator, PollRead, RowMask,
17+
};
1618

1719
const NUM_TO_COALESCE: usize = 8;
1820

@@ -21,7 +23,11 @@ pub(crate) trait ReadMasked {
2123

2224
/// Read a Layout into a `V`, applying the given bitmask. Only entries corresponding to positions
2325
/// where mask is `true` will be included in the output.
24-
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<PollRead<Self::Value>>>;
26+
fn read_masked(
27+
&self,
28+
mask: &RowMask,
29+
msgs: &dyn MessageCache,
30+
) -> VortexResult<Option<PollRead<Self::Value>>>;
2531
}
2632

2733
/// Read an array with a [`RowMask`].
@@ -39,8 +45,12 @@ impl ReadMasked for ReadArray {
3945
type Value = ArrayData;
4046

4147
/// Read given mask out of the reader
42-
fn read_masked(&self, mask: &RowMask) -> VortexResult<Option<PollRead<ArrayData>>> {
43-
self.layout.poll_read(mask)
48+
fn read_masked(
49+
&self,
50+
mask: &RowMask,
51+
msgs: &dyn MessageCache,
52+
) -> VortexResult<Option<PollRead<ArrayData>>> {
53+
self.layout.poll_read(mask, msgs)
4454
}
4555
}
4656

@@ -58,7 +68,7 @@ pub struct BufferedLayoutReader<R, S, V, RM> {
5868
queued: VecDeque<RowMaskState<V>>,
5969
io_read: VortexReadRanges<R>,
6070
dispatcher: Arc<IoDispatcher>,
61-
cache: Arc<RwLock<LayoutMessageCache>>,
71+
msgs: LayoutMessageCache,
6272
}
6373

6474
impl<R, S, V, RM> BufferedLayoutReader<R, S, V, RM>
@@ -72,7 +82,7 @@ where
7282
dispatcher: Arc<IoDispatcher>,
7383
read_masks: S,
7484
row_mask_reader: RM,
75-
cache: Arc<RwLock<LayoutMessageCache>>,
85+
msgs: LayoutMessageCache,
7686
) -> Self {
7787
Self {
7888
read_masks,
@@ -81,18 +91,13 @@ where
8191
queued: VecDeque::new(),
8292
io_read: VortexReadRanges::new(read, dispatcher.clone(), 1 << 20),
8393
dispatcher,
84-
cache,
94+
msgs,
8595
}
8696
}
8797

88-
fn store_messages(&self, messages: Vec<Message>) {
89-
let mut write_cache_guard = self
90-
.cache
91-
.write()
92-
.unwrap_or_else(|poison| vortex_panic!("Failed to write to message cache: {poison}"));
93-
for Message(message_id, buf) in messages {
94-
write_cache_guard.set(message_id, buf);
95-
}
98+
fn store_messages(&mut self, messages: Vec<Message>) {
99+
self.msgs
100+
.set_many(messages.into_iter().map(|msg| (msg.0, msg.1)))
96101
}
97102

98103
fn gather_read_messages(
@@ -106,7 +111,9 @@ where
106111
for queued_res in self.queued.iter_mut() {
107112
match queued_res {
108113
RowMaskState::Pending(pending_mask) => {
109-
if let Some(pending_read) = self.row_mask_reader.read_masked(pending_mask)? {
114+
if let Some(pending_read) =
115+
self.row_mask_reader.read_masked(pending_mask, &self.msgs)?
116+
{
110117
match pending_read {
111118
PollRead::ReadMore(m) => {
112119
to_read.extend(m);
@@ -129,7 +136,9 @@ where
129136
while read_more_count < NUM_TO_COALESCE {
130137
match self.read_masks.poll_next_unpin(cx) {
131138
Poll::Ready(Some(Ok(next_mask))) => {
132-
if let Some(read_result) = self.row_mask_reader.read_masked(&next_mask)? {
139+
if let Some(read_result) =
140+
self.row_mask_reader.read_masked(&next_mask, &self.msgs)?
141+
{
133142
match read_result {
134143
PollRead::ReadMore(m) => {
135144
self.queued.push_back(RowMaskState::Pending(next_mask));

vortex-file/src/read/builder/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::{Arc, RwLock};
1+
use std::sync::Arc;
22

33
use initial_read::read_initial_bytes;
44
use vortex_array::{ArrayDType, ArrayData};
@@ -8,11 +8,12 @@ use vortex_io::{IoDispatcher, VortexReadAt};
88

99
use super::handle::VortexReadHandle;
1010
use super::InitialRead;
11-
use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache};
11+
use crate::read::cache::LayoutMessageCache;
1212
use crate::read::context::LayoutDeserializer;
1313
use crate::read::filtering::RowFilter;
1414
use crate::read::projection::Projection;
1515
use crate::read::{RowMask, Scan};
16+
use crate::LayoutPath;
1617

1718
pub(crate) mod initial_read;
1819

@@ -147,25 +148,25 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
147148
Projection::Flat(ref fields) => lazy_dtype.project(fields)?,
148149
};
149150

150-
let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
151+
let message_cache = LayoutMessageCache::default();
151152
let layout_reader = self.layout_serde.read_layout(
153+
LayoutPath::default(),
152154
initial_read.fb_layout(),
153155
match self.projection {
154156
Projection::All => Scan::empty(),
155157
Projection::Flat(p) => Scan::new(Arc::new(Select::include(p))),
156158
},
157159
lazy_dtype.clone(),
158-
RelativeLayoutCache::new(message_cache.clone()),
159160
)?;
160161

161162
let filter_reader = self
162163
.row_filter
163164
.map(|row_filter| {
164165
self.layout_serde.read_layout(
166+
LayoutPath::default(),
165167
initial_read.fb_layout(),
166168
Scan::new(Arc::new(row_filter)),
167169
lazy_dtype,
168-
RelativeLayoutCache::new(message_cache.clone()),
169170
)
170171
})
171172
.transpose()?;

vortex-file/src/read/cache.rs

Lines changed: 38 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,59 @@ use vortex_buffer::ByteBuffer;
99
use vortex_dtype::field::Field;
1010
use vortex_dtype::flatbuffers::{extract_field, project_and_deserialize, resolve_field};
1111
use vortex_dtype::{DType, FieldNames};
12-
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult};
12+
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
1313
use vortex_flatbuffers::dtype as fbd;
1414

1515
use crate::read::projection::Projection;
1616
use crate::read::{LayoutPartId, MessageId};
1717

18-
#[derive(Default, Debug)]
18+
/// A read-only cache of messages.
19+
pub trait MessageCache {
20+
fn get(&self, path: &[LayoutPartId]) -> Option<Bytes>;
21+
}
22+
23+
#[derive(Default, Debug, Clone)]
1924
pub struct LayoutMessageCache {
20-
cache: HashMap<MessageId, Bytes>,
25+
cache: Arc<RwLock<HashMap<MessageId, Bytes>>>,
2126
}
2227

2328
impl LayoutMessageCache {
24-
pub fn new() -> Self {
25-
Self {
26-
cache: HashMap::new(),
27-
}
29+
pub fn remove(&mut self, path: &[LayoutPartId]) -> Option<Bytes> {
30+
self.cache
31+
.write()
32+
.map_err(|_| vortex_err!("Poisoned cache"))
33+
.vortex_expect("poisoned")
34+
.remove(path)
2835
}
2936

30-
pub fn get(&self, path: &[LayoutPartId]) -> Option<Bytes> {
31-
self.cache.get(path).cloned()
37+
pub fn set(&mut self, path: MessageId, value: Bytes) {
38+
self.cache
39+
.write()
40+
.map_err(|_| vortex_err!("Poisoned cache"))
41+
.vortex_expect("poisoned")
42+
.insert(path, value);
3243
}
3344

34-
pub fn remove(&mut self, path: &[LayoutPartId]) -> Option<Bytes> {
35-
self.cache.remove(path)
45+
pub fn set_many<I: IntoIterator<Item = (MessageId, Bytes)>>(&mut self, iter: I) {
46+
let mut guard = self
47+
.cache
48+
.write()
49+
.map_err(|_| vortex_err!("Poisoned cache"))
50+
.vortex_expect("poisoned");
51+
for (id, bytes) in iter.into_iter() {
52+
guard.insert(id, bytes);
53+
}
3654
}
55+
}
3756

38-
pub fn set(&mut self, path: MessageId, value: Bytes) {
39-
self.cache.insert(path, value);
57+
impl MessageCache for LayoutMessageCache {
58+
fn get(&self, path: &[LayoutPartId]) -> Option<Bytes> {
59+
self.cache
60+
.read()
61+
.map_err(|_| vortex_err!("Poisoned cache"))
62+
.vortex_expect("poisoned")
63+
.get(path)
64+
.cloned()
4065
}
4166
}
4267

@@ -258,61 +283,3 @@ fn fb_struct(bytes: &[u8]) -> VortexResult<fbd::Struct_> {
258283
fn fb_dtype(bytes: &[u8]) -> fbd::DType {
259284
unsafe { root_unchecked::<fbd::DType>(bytes) }
260285
}
261-
262-
#[derive(Debug, Clone)]
263-
pub struct RelativeLayoutCache {
264-
root: Arc<RwLock<LayoutMessageCache>>,
265-
path: MessageId,
266-
}
267-
268-
impl RelativeLayoutCache {
269-
pub fn new(root: Arc<RwLock<LayoutMessageCache>>) -> Self {
270-
Self {
271-
root,
272-
path: Vec::new(),
273-
}
274-
}
275-
276-
pub fn relative(&self, id: LayoutPartId) -> Self {
277-
let mut new_path = Vec::with_capacity(self.path.len() + 1);
278-
new_path.clone_from(&self.path);
279-
new_path.push(id);
280-
Self {
281-
root: self.root.clone(),
282-
path: new_path,
283-
}
284-
}
285-
286-
pub fn get(&self, path: &[LayoutPartId]) -> Option<Bytes> {
287-
self.root
288-
.read()
289-
.unwrap_or_else(|poison| {
290-
vortex_panic!(
291-
"Failed to read from layout cache at path {:?} with error {}",
292-
path,
293-
poison
294-
);
295-
})
296-
.get(&self.absolute_id(path))
297-
}
298-
299-
pub fn remove(&mut self, path: &[LayoutPartId]) -> Option<Bytes> {
300-
self.root
301-
.write()
302-
.unwrap_or_else(|poison| {
303-
vortex_panic!(
304-
"Failed to write to layout cache at path {:?} with error {}",
305-
path,
306-
poison
307-
)
308-
})
309-
.remove(&self.absolute_id(path))
310-
}
311-
312-
pub fn absolute_id(&self, path: &[LayoutPartId]) -> MessageId {
313-
let mut lookup_key = Vec::with_capacity(self.path.len() + path.len());
314-
lookup_key.clone_from(&self.path);
315-
lookup_key.extend_from_slice(path);
316-
lookup_key
317-
}
318-
}

vortex-file/src/read/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use vortex_error::{vortex_err, VortexResult};
77
use vortex_flatbuffers::footer as fb;
88

99
use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout};
10-
use crate::{LayoutReader, LazyDType, RelativeLayoutCache, Scan};
10+
use crate::{LayoutPath, LayoutReader, LazyDType, Scan};
1111

1212
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1313
pub struct LayoutId(pub u16);
@@ -23,11 +23,11 @@ pub trait Layout: Debug + Send + Sync {
2323

2424
fn reader(
2525
&self,
26+
path: LayoutPath,
2627
layout: fb::Layout,
2728
dtype: Arc<LazyDType>,
2829
scan: Scan,
2930
layout_serde: LayoutDeserializer,
30-
message_cache: RelativeLayoutCache,
3131
) -> VortexResult<Arc<dyn LayoutReader>>;
3232
}
3333

@@ -72,16 +72,16 @@ impl LayoutDeserializer {
7272

7373
pub fn read_layout(
7474
&self,
75+
path: LayoutPath,
7576
layout: fb::Layout,
7677
scan: Scan,
7778
dtype: Arc<LazyDType>,
78-
message_cache: RelativeLayoutCache,
7979
) -> VortexResult<Arc<dyn LayoutReader>> {
8080
let layout_id = LayoutId(layout.encoding());
8181
self.layout_ctx
8282
.lookup_layout(&layout_id)
8383
.ok_or_else(|| vortex_err!("Unknown layout definition {layout_id}"))?
84-
.reader(layout, dtype, scan, self.clone(), message_cache)
84+
.reader(path, layout, dtype, scan, self.clone())
8585
}
8686

8787
pub(crate) fn ctx(&self) -> Arc<Context> {

0 commit comments

Comments
 (0)