Skip to content

Commit e77c63f

Browse files
authored
feat(jni): Cache ObjectStore (#2872)
We opt for the same caching strategy that Datafusion uses for `register_store`. This yields substantial improvements on Iceberg tests
1 parent a057db1 commit e77c63f

File tree

1 file changed

+47
-15
lines changed

1 file changed

+47
-15
lines changed

vortex-jni/src/file.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::str::FromStr;
2-
use std::sync::Arc;
2+
use std::sync::{Arc, LazyLock, Mutex};
33

44
use jni::JNIEnv;
55
use jni::objects::{JByteArray, JClass, JObject, JString};
@@ -13,7 +13,7 @@ use prost::Message;
1313
use url::Url;
1414
use vortex::aliases::hash_map::HashMap;
1515
use vortex::dtype::DType;
16-
use vortex::error::{VortexError, VortexResult, vortex_bail};
16+
use vortex::error::{VortexError, VortexExpect, VortexResult, vortex_bail};
1717
use vortex::expr::{Identity, deserialize_expr, select};
1818
use vortex::file::{GenericVortexFile, VortexFile, VortexOpenOptions};
1919
use vortex::io::ObjectStoreReadAt;
@@ -73,15 +73,18 @@ pub extern "system" fn Java_dev_vortex_jni_NativeFileMethods_open(
7373
let opts = env.get_map(&options)?;
7474
let mut iterator = opts.iter(env)?;
7575
while let Some((key, val)) = iterator.next(env)? {
76-
let key_str: JString = key.into();
77-
let val_str: JString = val.into();
78-
let key_str = env.get_string(&key_str)?;
79-
let val_str = env.get_string(&val_str)?;
76+
let key = env.auto_local(key);
77+
let val = env.auto_local(val);
78+
let key_str = env.get_string(key.as_ref().into())?;
79+
let val_str = env.get_string(val.as_ref().into())?;
8080
properties.insert(key_str.into(), val_str.into());
8181
}
8282
}
8383

84+
let start = std::time::Instant::now();
8485
let (store, scheme) = make_object_store(&url, &properties)?;
86+
let duration = std::time::Instant::now().duration_since(start);
87+
log::debug!("make_object_store latency = {duration:?}");
8588
let reader = ObjectStoreReadAt::new(store.clone(), url.path().into(), Some(scheme));
8689
let open_file = block_on(
8790
"VortexOpenOptions.open()",
@@ -132,7 +135,9 @@ pub extern "system" fn Java_dev_vortex_jni_NativeFileMethods_scan(
132135
let mut projection: Vec<Arc<str>> = Vec::new();
133136
let mut iterator = proj.iter(env)?;
134137
while let Some(field) = iterator.next(env)? {
135-
let field_name: String = env.get_string(&JString::from(field))?.into();
138+
let field = env.auto_local(field);
139+
140+
let field_name: String = env.get_string(field.as_ref().into())?.into();
136141
projection.push(field_name.into());
137142
}
138143
let project_expr = select(projection, Identity::new_expr());
@@ -159,14 +164,26 @@ fn make_object_store(
159164
url: &Url,
160165
properties: &HashMap<String, String>,
161166
) -> VortexResult<(Arc<dyn ObjectStore>, ObjectStoreScheme)> {
167+
static OBJECT_STORES: LazyLock<Mutex<HashMap<String, Arc<dyn ObjectStore>>>> =
168+
LazyLock::new(|| Mutex::new(HashMap::new()));
169+
162170
let (scheme, _) =
163171
ObjectStoreScheme::parse(url).map_err(|error| VortexError::ObjectStore(error.into()))?;
164172

173+
let cache_key = url_cache_key(url);
174+
175+
{
176+
if let Some(cached) = OBJECT_STORES.lock().vortex_expect("poison").get(&cache_key) {
177+
return Ok((cached.clone(), scheme));
178+
}
179+
// guard dropped at close of scope
180+
}
181+
165182
// Configure extra properties on that scheme instead.
166-
match scheme {
183+
let store: Arc<dyn ObjectStore> = match scheme {
167184
ObjectStoreScheme::Local => {
168185
log::trace!("using LocalFileSystem object store");
169-
Ok((Arc::new(LocalFileSystem::default()), scheme))
186+
Arc::new(LocalFileSystem::default())
170187
}
171188
ObjectStoreScheme::AmazonS3 => {
172189
log::trace!("using AmazonS3 object store");
@@ -179,8 +196,7 @@ fn make_object_store(
179196
}
180197
}
181198

182-
let store = Arc::new(builder.build()?);
183-
Ok((store, scheme))
199+
Arc::new(builder.build()?)
184200
}
185201
ObjectStoreScheme::MicrosoftAzure => {
186202
log::trace!("using MicrosoftAzure object store");
@@ -194,8 +210,7 @@ fn make_object_store(
194210
}
195211
}
196212

197-
let store = Arc::new(builder.build()?);
198-
Ok((store, scheme))
213+
Arc::new(builder.build()?)
199214
}
200215
ObjectStoreScheme::GoogleCloudStorage => {
201216
log::trace!("using GoogleCloudStorage object store");
@@ -209,11 +224,28 @@ fn make_object_store(
209224
}
210225
}
211226

212-
let store = Arc::new(builder.build()?);
213-
Ok((store, scheme))
227+
Arc::new(builder.build()?)
214228
}
215229
store => {
216230
vortex_bail!("Unsupported store scheme: {store:?}");
217231
}
232+
};
233+
234+
{
235+
OBJECT_STORES
236+
.lock()
237+
.vortex_expect("poison")
238+
.insert(cache_key, store.clone());
239+
// Guard dropped at close of scope.
218240
}
241+
242+
Ok((store, scheme))
243+
}
244+
245+
fn url_cache_key(url: &Url) -> String {
246+
format!(
247+
"{}://{}",
248+
url.scheme(),
249+
&url[url::Position::BeforeHost..url::Position::AfterPort],
250+
)
219251
}

0 commit comments

Comments
 (0)