Skip to content

Commit 3f76def

Browse files
feat[ffi]: add a session used for caching (#3358)
For now this is a footer cache but we can cache more if we want too Signed-off-by: Joe Isaacs <[email protected]> --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent 3df7cf5 commit 3f76def

File tree

9 files changed

+189
-18
lines changed

9 files changed

+189
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

duckdb-vortex/src/include/vortex_common.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "vortex.hpp"
88
#include "vortex_error.hpp"
9+
#include "vortex_session.hpp"
910

1011
namespace vortex {
1112

@@ -55,8 +56,8 @@ struct FileReader {
5556
vx_file_reader_free(file);
5657
}
5758

58-
static duckdb::unique_ptr<FileReader> Open(const vx_file_open_options *options) {
59-
auto file = Try([&](auto err) { return vx_file_open_reader(options, err); });
59+
static duckdb::unique_ptr<FileReader> Open(const vx_file_open_options *options, VortexSession &session) {
60+
auto file = Try([&](auto err) { return vx_file_open_reader(options, session.session, err); });
6061
return duckdb::make_uniq<FileReader>(file);
6162
}
6263

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include "vortex.hpp"
4+
#include "duckdb/storage/object_cache.hpp"
5+
6+
namespace vortex {
7+
8+
class VortexSession : public duckdb::ObjectCacheEntry {
9+
public:
10+
VortexSession() : session(vx_session_create()) {
11+
}
12+
13+
~VortexSession() override {
14+
vx_session_free(session);
15+
}
16+
17+
vx_session *session;
18+
19+
static std::string ObjectType() {
20+
return "vortex_session_cache_metadata";
21+
}
22+
23+
std::string GetObjectType() override {
24+
return ObjectType();
25+
}
26+
};
27+
28+
} // namespace vortex

duckdb-vortex/src/vortex_scan.cpp

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "duckdb/function/table_function.hpp"
1414
#include "duckdb/main/extension_util.hpp"
1515
#include "duckdb/common/file_system.hpp"
16+
#include "duckdb/storage/object_cache.hpp"
1617

1718
#include "concurrentqueue.h"
1819

@@ -22,6 +23,7 @@
2223
#include "vortex_scan.hpp"
2324
#include "vortex_common.hpp"
2425
#include "vortex_expr.hpp"
26+
#include "vortex_session.hpp"
2527

2628
using namespace duckdb;
2729

@@ -31,6 +33,9 @@ namespace vortex {
3133
/// file and its schema. This data is populated during the bind phase, which
3234
/// happens during the query planning phase.
3335
struct BindData : public TableFunctionData {
36+
// Session used to caching
37+
shared_ptr<VortexSession> session;
38+
3439
shared_ptr<MultiFileList> file_list;
3540
vector<LogicalType> columns_types;
3641
vector<string> column_names;
@@ -182,12 +187,12 @@ std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) {
182187
return prefix + absolute_path;
183188
}
184189

185-
static unique_ptr<FileReader> OpenFile(const std::string &filename, vector<LogicalType> &column_types,
186-
vector<string> &column_names) {
190+
static unique_ptr<FileReader> OpenFile(const std::string &filename, VortexSession &session,
191+
vector<LogicalType> &column_types, vector<string> &column_names) {
187192
vx_file_open_options options {
188193
.uri = filename.c_str(), .property_keys = nullptr, .property_vals = nullptr, .property_len = 0};
189194

190-
auto file = FileReader::Open(&options);
195+
auto file = FileReader::Open(&options, session);
191196
if (!file) {
192197
throw IOException("Failed to open Vortex file: " + filename);
193198
}
@@ -224,15 +229,15 @@ static void VerifyNewFile(const BindData &bind_data, vector<LogicalType> &column
224229
}
225230
}
226231

227-
static unique_ptr<FileReader> OpenFileAndVerify(FileSystem &fs, const std::string &filename,
232+
static unique_ptr<FileReader> OpenFileAndVerify(FileSystem &fs, VortexSession &session, const std::string &filename,
228233
const BindData &bind_data) {
229234
auto new_column_names = vector<string>();
230235
new_column_names.reserve(bind_data.column_names.size());
231236

232237
auto new_column_types = vector<LogicalType>();
233238
new_column_types.reserve(bind_data.columns_types.size());
234239

235-
auto file = OpenFile(EnsureFileProtocol(fs, filename), new_column_types, new_column_names);
240+
auto file = OpenFile(EnsureFileProtocol(fs, filename), session, new_column_types, new_column_names);
236241
VerifyNewFile(bind_data, new_column_types, new_column_names);
237242
return file;
238243
}
@@ -383,7 +388,8 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data,
383388
if (auto file_idx = global_state.next_file_idx.fetch_add(1);
384389
file_idx < global_state.expanded_files.size()) {
385390
auto file_name = global_state.expanded_files[file_idx];
386-
auto vortex_file = OpenFileAndVerify(FileSystem::GetFileSystem(context), file_name, bind_data);
391+
auto vortex_file =
392+
OpenFileAndVerify(FileSystem::GetFileSystem(context), *bind_data.session, file_name, bind_data);
387393
global_state.layout_readers[file_idx] = LayoutReader::CreateFromFile(vortex_file.get());
388394
CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, vortex_file);
389395
}
@@ -412,12 +418,21 @@ static unique_ptr<FunctionData> VortexBind(ClientContext &context, TableFunction
412418
auto result = make_uniq<BindData>();
413419
result->arena = make_uniq<google::protobuf::Arena>();
414420

421+
const static string VortexExtensionKey = std::string("vortex_extension:vortex_session");
422+
auto session = ObjectCache::GetObjectCache(context).Get<VortexSession>(VortexExtensionKey);
423+
if (session == nullptr) {
424+
ObjectCache::GetObjectCache(context).Put(VortexExtensionKey, make_shared_ptr<VortexSession>());
425+
session = ObjectCache::GetObjectCache(context).Get<VortexSession>(VortexExtensionKey);
426+
}
427+
428+
result->session = session;
429+
415430
auto file_glob = duckdb::vector<string> {input.inputs[0].GetValue<string>()};
416431
result->file_list = make_shared_ptr<GlobMultiFileList>(context, file_glob, FileGlobOptions::DISALLOW_EMPTY);
417432

418433
// Open the first file to extract the schema.
419434
auto filename = EnsureFileProtocol(FileSystem::GetFileSystem(context), result->file_list->GetFirstFile());
420-
result->initial_file = OpenFile(filename, column_types, column_names);
435+
result->initial_file = OpenFile(filename, *result->session, column_types, column_names);
421436

422437
result->column_names = column_names;
423438
result->columns_types = column_types;

vortex-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ futures = { workspace = true }
2222
itertools = { workspace = true }
2323
log = { workspace = true }
2424
mimalloc = { workspace = true, optional = true }
25+
moka = { workspace = true, features = ["sync"] }
2526
object_store = { workspace = true, features = ["aws", "azure", "gcp"] }
2627
paste = { workspace = true }
2728
prost = { workspace = true }

vortex-ffi/cinclude/vortex.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ typedef struct vx_file_reader vx_file_reader;
124124
*/
125125
typedef struct vx_layout_reader vx_layout_reader;
126126

127+
/**
128+
* An object that stores registries and caches.
129+
* This should if possible be reused between queries in ann interactive session.
130+
*/
131+
typedef struct vx_session vx_session;
132+
127133
/**
128134
* Options supplied for opening a file.
129135
*/
@@ -381,6 +387,7 @@ void vx_error_free(struct vx_error *error);
381387
* Open a file at the given path on the file system.
382388
*/
383389
struct vx_file_reader *vx_file_open_reader(const struct vx_file_open_options *options,
390+
struct vx_session *session,
384391
struct vx_error **error);
385392

386393
void vx_file_write_array(const char *path, struct vx_array *ffi_array, struct vx_error **error);
@@ -430,6 +437,16 @@ void vx_file_reader_free(struct vx_file_reader *file);
430437
*/
431438
void vx_init_logging(enum vx_log_level level);
432439

440+
/**
441+
* Create a session to be used for the lifetime of an interactive session.
442+
*/
443+
struct vx_session *vx_session_create(void);
444+
445+
/**
446+
* Free a session
447+
*/
448+
void vx_session_free(struct vx_session *session);
449+
433450
/**
434451
* Opens a writable array stream, where sink is used to push values into the stream.
435452
* To close the stream close the sink with `vx_array_sink_close`.

vortex-ffi/src/file.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
//! FFI interface for Vortex File I/O.
22
3-
#![allow(non_camel_case_types)]
4-
53
use std::ffi::{CStr, c_char, c_int, c_uint, c_ulong};
64
use std::str::FromStr;
75
use std::sync::Arc;
@@ -26,14 +24,17 @@ use vortex::proto::expr::Expr;
2624

2725
use crate::array::{vx_array, vx_array_iterator};
2826
use crate::error::{try_or, vx_error};
27+
use crate::session::{FileKey, vx_session};
2928
use crate::{RUNTIME, to_string, to_string_vec};
3029

3130
/// A file reader that can be used to read from a file.
31+
#[allow(non_camel_case_types)]
3232
pub struct vx_file_reader {
3333
pub inner: VortexFile,
3434
}
3535

3636
/// A Vortex layout reader.
37+
#[allow(non_camel_case_types)]
3738
pub struct vx_layout_reader {
3839
pub inner: Arc<dyn LayoutReader>,
3940
}
@@ -121,6 +122,7 @@ impl vx_file_scan_options {
121122
#[unsafe(no_mangle)]
122123
pub unsafe extern "C-unwind" fn vx_file_open_reader(
123124
options: *const vx_file_open_options,
125+
session: *mut vx_session,
124126
error: *mut *mut vx_error,
125127
) -> *mut vx_file_reader {
126128
try_or(error, ptr::null_mut(), || {
@@ -133,19 +135,39 @@ pub unsafe extern "C-unwind" fn vx_file_open_reader(
133135
if options.uri.is_null() {
134136
vortex_bail!("null uri")
135137
}
136-
let uri = unsafe { CStr::from_ptr(options.uri) }.to_string_lossy();
137-
let uri: Url = uri.parse().vortex_expect("File_open: parse uri");
138+
let uri_str = unsafe { CStr::from_ptr(options.uri) }.to_string_lossy();
139+
let uri: Url = uri_str.parse().vortex_expect("File_open: parse uri");
138140

139141
let prop_keys = unsafe { to_string_vec(options.property_keys, options.property_len) };
140142
let prop_vals = unsafe { to_string_vec(options.property_vals, options.property_len) };
141143

142144
let object_store = make_object_store(&uri, &prop_keys, &prop_vals)?;
143145

144-
let inner = RUNTIME.block_on(async move {
145-
VortexOpenOptions::file()
146-
.open_object_store(&object_store, uri.path())
147-
.await
148-
})?;
146+
let file = VortexOpenOptions::file();
147+
let (file, cache_hit) = if let Some(footer) = unsafe { session.as_ref() }.and_then(|s| {
148+
s.inner.get_footer(&FileKey {
149+
location: uri_str.to_string(),
150+
})
151+
}) {
152+
(file.with_footer(footer), true)
153+
} else {
154+
(file, false)
155+
};
156+
157+
let inner = RUNTIME
158+
.block_on(async move { file.open_object_store(&object_store, uri.path()).await })?;
159+
160+
if !cache_hit {
161+
let _ = unsafe { session.as_ref() }.is_some_and(|s| {
162+
s.inner.put_footer(
163+
FileKey {
164+
location: uri_str.to_string(),
165+
},
166+
inner.footer().clone(),
167+
);
168+
true
169+
});
170+
}
149171

150172
Ok(Box::into_raw(Box::new(vx_file_reader { inner })))
151173
})

vortex-ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod duckdb;
99
mod error;
1010
mod file;
1111
mod log;
12+
mod session;
1213
mod sink;
1314

1415
use std::ffi::{CStr, c_char, c_int};

vortex-ffi/src/session.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use std::sync::Arc;
2+
3+
use moka::sync::Cache;
4+
use vortex::aliases::DefaultHashBuilder;
5+
use vortex::dtype::DType;
6+
use vortex::file::{Footer, SegmentSpec};
7+
use vortex::layout::segments::SegmentId;
8+
use vortex::scalar::ScalarValue;
9+
use vortex::stats::{Precision, Stat};
10+
11+
/// An object that stores registries and caches.
12+
/// This should if possible be reused between queries in ann interactive session.
13+
#[allow(non_camel_case_types)]
14+
pub struct vx_session {
15+
pub inner: Arc<VortexSession>,
16+
}
17+
18+
/// Create a session to be used for the lifetime of an interactive session.
19+
#[unsafe(no_mangle)]
20+
pub unsafe extern "C-unwind" fn vx_session_create() -> *mut vx_session {
21+
Box::into_raw(Box::new(vx_session {
22+
inner: Arc::new(VortexSession::new()),
23+
}))
24+
}
25+
26+
/// Free a session
27+
#[unsafe(no_mangle)]
28+
pub unsafe extern "C-unwind" fn vx_session_free(session: *mut vx_session) {
29+
drop(unsafe { Box::from_raw(session) })
30+
}
31+
32+
pub struct VortexSession {
33+
file_cache: Cache<FileKey, Footer, DefaultHashBuilder>,
34+
}
35+
36+
/// Cache key for a [`VortexFile`].
37+
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
38+
pub struct FileKey {
39+
// TODO: support last modified ts.
40+
pub location: String,
41+
}
42+
43+
impl VortexSession {
44+
pub fn new() -> Self {
45+
let file_cache = Cache::builder()
46+
.max_capacity(64u64 * (1 << 20))
47+
.eviction_listener(|k: Arc<FileKey>, _v: Footer, cause| {
48+
log::trace!("Removed {:?} due to {:?}", k, cause);
49+
})
50+
.weigher(|_k, footer| u32::try_from(estimate_layout_size(footer)).unwrap_or(u32::MAX))
51+
.build_with_hasher(DefaultHashBuilder::default());
52+
53+
Self { file_cache }
54+
}
55+
56+
pub fn get_footer(&self, file_key: &FileKey) -> Option<Footer> {
57+
self.file_cache.get(file_key)
58+
}
59+
60+
pub fn put_footer(&self, file_key: FileKey, footer: Footer) {
61+
self.file_cache.insert(file_key, footer)
62+
}
63+
}
64+
65+
// TODO(joe): unify with the df impl
66+
/// Approximate the in-memory size of a layout
67+
fn estimate_layout_size(footer: &Footer) -> usize {
68+
let segments_size = footer.segment_map().len() * size_of::<SegmentSpec>();
69+
let stats_size = footer
70+
.statistics()
71+
.iter()
72+
.map(|v| {
73+
v.iter()
74+
.map(|_| size_of::<Stat>() + size_of::<Precision<ScalarValue>>())
75+
.sum::<usize>()
76+
})
77+
.sum::<usize>();
78+
79+
let root_layout = footer.layout();
80+
let layout_size = size_of::<DType>()
81+
+ root_layout.metadata().len()
82+
+ root_layout.segment_ids().len() * size_of::<SegmentId>();
83+
84+
segments_size + stats_size + layout_size
85+
}

0 commit comments

Comments
 (0)