Skip to content

Commit 99ab87c

Browse files
authored
Implement CatalogProviderList in FFI (#18657)
## Which issue does this PR close? None ## Rationale for this change This is continuation of work in the FFI crate to expose useful traits. We currently have FFI catalog, schema, and table providers. The next layer up in the heirarchy is the catalog provider list. ## What changes are included in this PR? - Implement FFI_CatalogProviderList - Add unit tests and integration tests - Minor rearrangement of integration test for catalog ## Are these changes tested? Yes, included in PR. ## Are there any user-facing changes? This is only new addition to the FFI crate. No existing code is modified except making one data member pub(crate)
1 parent 356616e commit 99ab87c

File tree

7 files changed

+428
-30
lines changed

7 files changed

+428
-30
lines changed

datafusion/ffi/src/catalog_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl FFI_CatalogProvider {
204204
/// defined on this struct must only use the stable functions provided in
205205
/// FFI_CatalogProvider to interact with the foreign table provider.
206206
#[derive(Debug)]
207-
pub struct ForeignCatalogProvider(FFI_CatalogProvider);
207+
pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider);
208208

209209
unsafe impl Send for ForeignCatalogProvider {}
210210
unsafe impl Sync for ForeignCatalogProvider {}
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{any::Any, ffi::c_void, sync::Arc};
19+
20+
use abi_stable::{
21+
std_types::{ROption, RString, RVec},
22+
StableAbi,
23+
};
24+
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
25+
use tokio::runtime::Handle;
26+
27+
use crate::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
28+
29+
/// A stable struct for sharing [`CatalogProviderList`] across FFI boundaries.
30+
#[repr(C)]
31+
#[derive(Debug, StableAbi)]
32+
#[allow(non_camel_case_types)]
33+
pub struct FFI_CatalogProviderList {
34+
/// Register a catalog
35+
pub register_catalog: unsafe extern "C" fn(
36+
&Self,
37+
name: RString,
38+
catalog: &FFI_CatalogProvider,
39+
) -> ROption<FFI_CatalogProvider>,
40+
41+
/// List of existing catalogs
42+
pub catalog_names: unsafe extern "C" fn(&Self) -> RVec<RString>,
43+
44+
/// Access a catalog
45+
pub catalog:
46+
unsafe extern "C" fn(&Self, name: RString) -> ROption<FFI_CatalogProvider>,
47+
48+
/// Used to create a clone on the provider. This should only need to be called
49+
/// by the receiver of the plan.
50+
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
51+
52+
/// Release the memory of the private data when it is no longer being used.
53+
pub release: unsafe extern "C" fn(arg: &mut Self),
54+
55+
/// Return the major DataFusion version number of this provider.
56+
pub version: unsafe extern "C" fn() -> u64,
57+
58+
/// Internal data. This is only to be accessed by the provider of the plan.
59+
/// A [`ForeignCatalogProviderList`] should never attempt to access this data.
60+
pub private_data: *mut c_void,
61+
}
62+
63+
unsafe impl Send for FFI_CatalogProviderList {}
64+
unsafe impl Sync for FFI_CatalogProviderList {}
65+
66+
struct ProviderPrivateData {
67+
provider: Arc<dyn CatalogProviderList + Send>,
68+
runtime: Option<Handle>,
69+
}
70+
71+
impl FFI_CatalogProviderList {
72+
unsafe fn inner(&self) -> &Arc<dyn CatalogProviderList + Send> {
73+
let private_data = self.private_data as *const ProviderPrivateData;
74+
&(*private_data).provider
75+
}
76+
77+
unsafe fn runtime(&self) -> Option<Handle> {
78+
let private_data = self.private_data as *const ProviderPrivateData;
79+
(*private_data).runtime.clone()
80+
}
81+
}
82+
83+
unsafe extern "C" fn catalog_names_fn_wrapper(
84+
provider: &FFI_CatalogProviderList,
85+
) -> RVec<RString> {
86+
let names = provider.inner().catalog_names();
87+
names.into_iter().map(|s| s.into()).collect()
88+
}
89+
90+
unsafe extern "C" fn register_catalog_fn_wrapper(
91+
provider: &FFI_CatalogProviderList,
92+
name: RString,
93+
catalog: &FFI_CatalogProvider,
94+
) -> ROption<FFI_CatalogProvider> {
95+
let runtime = provider.runtime();
96+
let provider = provider.inner();
97+
let catalog = Arc::new(ForeignCatalogProvider::from(catalog));
98+
99+
provider
100+
.register_catalog(name.into(), catalog)
101+
.map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
102+
.into()
103+
}
104+
105+
unsafe extern "C" fn catalog_fn_wrapper(
106+
provider: &FFI_CatalogProviderList,
107+
name: RString,
108+
) -> ROption<FFI_CatalogProvider> {
109+
let runtime = provider.runtime();
110+
let provider = provider.inner();
111+
provider
112+
.catalog(name.as_str())
113+
.map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
114+
.into()
115+
}
116+
117+
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) {
118+
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
119+
drop(private_data);
120+
}
121+
122+
unsafe extern "C" fn clone_fn_wrapper(
123+
provider: &FFI_CatalogProviderList,
124+
) -> FFI_CatalogProviderList {
125+
let old_private_data = provider.private_data as *const ProviderPrivateData;
126+
let runtime = (*old_private_data).runtime.clone();
127+
128+
let private_data = Box::into_raw(Box::new(ProviderPrivateData {
129+
provider: Arc::clone(&(*old_private_data).provider),
130+
runtime,
131+
})) as *mut c_void;
132+
133+
FFI_CatalogProviderList {
134+
register_catalog: register_catalog_fn_wrapper,
135+
catalog_names: catalog_names_fn_wrapper,
136+
catalog: catalog_fn_wrapper,
137+
clone: clone_fn_wrapper,
138+
release: release_fn_wrapper,
139+
version: super::version,
140+
private_data,
141+
}
142+
}
143+
144+
impl Drop for FFI_CatalogProviderList {
145+
fn drop(&mut self) {
146+
unsafe { (self.release)(self) }
147+
}
148+
}
149+
150+
impl FFI_CatalogProviderList {
151+
/// Creates a new [`FFI_CatalogProviderList`].
152+
pub fn new(
153+
provider: Arc<dyn CatalogProviderList + Send>,
154+
runtime: Option<Handle>,
155+
) -> Self {
156+
let private_data = Box::new(ProviderPrivateData { provider, runtime });
157+
158+
Self {
159+
register_catalog: register_catalog_fn_wrapper,
160+
catalog_names: catalog_names_fn_wrapper,
161+
catalog: catalog_fn_wrapper,
162+
clone: clone_fn_wrapper,
163+
release: release_fn_wrapper,
164+
version: super::version,
165+
private_data: Box::into_raw(private_data) as *mut c_void,
166+
}
167+
}
168+
}
169+
170+
/// This wrapper struct exists on the receiver side of the FFI interface, so it has
171+
/// no guarantees about being able to access the data in `private_data`. Any functions
172+
/// defined on this struct must only use the stable functions provided in
173+
/// FFI_CatalogProviderList to interact with the foreign catalog provider list.
174+
#[derive(Debug)]
175+
pub struct ForeignCatalogProviderList(FFI_CatalogProviderList);
176+
177+
unsafe impl Send for ForeignCatalogProviderList {}
178+
unsafe impl Sync for ForeignCatalogProviderList {}
179+
180+
impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList {
181+
fn from(provider: &FFI_CatalogProviderList) -> Self {
182+
Self(provider.clone())
183+
}
184+
}
185+
186+
impl Clone for FFI_CatalogProviderList {
187+
fn clone(&self) -> Self {
188+
unsafe { (self.clone)(self) }
189+
}
190+
}
191+
192+
impl CatalogProviderList for ForeignCatalogProviderList {
193+
fn as_any(&self) -> &dyn Any {
194+
self
195+
}
196+
197+
fn register_catalog(
198+
&self,
199+
name: String,
200+
catalog: Arc<dyn CatalogProvider>,
201+
) -> Option<Arc<dyn CatalogProvider>> {
202+
unsafe {
203+
let catalog = match catalog.as_any().downcast_ref::<ForeignCatalogProvider>()
204+
{
205+
Some(s) => &s.0,
206+
None => &FFI_CatalogProvider::new(catalog, None),
207+
};
208+
209+
(self.0.register_catalog)(&self.0, name.into(), catalog)
210+
.map(|s| Arc::new(ForeignCatalogProvider(s)) as Arc<dyn CatalogProvider>)
211+
.into()
212+
}
213+
}
214+
215+
fn catalog_names(&self) -> Vec<String> {
216+
unsafe {
217+
(self.0.catalog_names)(&self.0)
218+
.into_iter()
219+
.map(Into::into)
220+
.collect()
221+
}
222+
}
223+
224+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
225+
unsafe {
226+
(self.0.catalog)(&self.0, name.into())
227+
.map(|catalog| {
228+
Arc::new(ForeignCatalogProvider(catalog)) as Arc<dyn CatalogProvider>
229+
})
230+
.into()
231+
}
232+
}
233+
}
234+
235+
#[cfg(test)]
236+
mod tests {
237+
use datafusion::catalog::{MemoryCatalogProvider, MemoryCatalogProviderList};
238+
239+
use super::*;
240+
241+
#[test]
242+
fn test_round_trip_ffi_catalog_provider_list() {
243+
let prior_catalog = Arc::new(MemoryCatalogProvider::new());
244+
245+
let catalog_list = Arc::new(MemoryCatalogProviderList::new());
246+
assert!(catalog_list
247+
.as_ref()
248+
.register_catalog("prior_catalog".to_owned(), prior_catalog)
249+
.is_none());
250+
251+
let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None);
252+
253+
let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into();
254+
255+
let prior_catalog_names = foreign_catalog_list.catalog_names();
256+
assert_eq!(prior_catalog_names.len(), 1);
257+
assert_eq!(prior_catalog_names[0], "prior_catalog");
258+
259+
// Replace an existing catalog with one of the same name
260+
let returned_catalog = foreign_catalog_list.register_catalog(
261+
"prior_catalog".to_owned(),
262+
Arc::new(MemoryCatalogProvider::new()),
263+
);
264+
assert!(returned_catalog.is_some());
265+
assert_eq!(foreign_catalog_list.catalog_names().len(), 1);
266+
267+
// Add a new catalog
268+
let returned_catalog = foreign_catalog_list.register_catalog(
269+
"second_catalog".to_owned(),
270+
Arc::new(MemoryCatalogProvider::new()),
271+
);
272+
assert!(returned_catalog.is_none());
273+
assert_eq!(foreign_catalog_list.catalog_names().len(), 2);
274+
275+
// Retrieve non-existent catalog
276+
let returned_catalog = foreign_catalog_list.catalog("non_existent_catalog");
277+
assert!(returned_catalog.is_none());
278+
279+
// Retrieve valid catalog
280+
let returned_catalog = foreign_catalog_list.catalog("second_catalog");
281+
assert!(returned_catalog.is_some());
282+
}
283+
}

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
pub mod arrow_wrappers;
3131
pub mod catalog_provider;
32+
pub mod catalog_provider_list;
3233
pub mod execution_plan;
3334
pub mod insert_op;
3435
pub mod plan_properties;

datafusion/ffi/src/tests/catalog.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@
2828
use std::{any::Any, fmt::Debug, sync::Arc};
2929

3030
use crate::catalog_provider::FFI_CatalogProvider;
31+
use crate::catalog_provider_list::FFI_CatalogProviderList;
3132
use arrow::datatypes::Schema;
3233
use async_trait::async_trait;
3334
use datafusion::{
3435
catalog::{
35-
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
36-
TableProvider,
36+
CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
37+
MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider, TableProvider,
3738
},
3839
common::exec_err,
3940
datasource::MemTable,
@@ -181,3 +182,55 @@ pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
181182
let catalog_provider = Arc::new(FixedCatalogProvider::default());
182183
FFI_CatalogProvider::new(catalog_provider, None)
183184
}
185+
186+
/// This catalog provider list is intended only for unit tests. It prepopulates with one
187+
/// catalog and only allows for catalogs named after four colors.
188+
#[derive(Debug)]
189+
pub struct FixedCatalogProviderList {
190+
inner: MemoryCatalogProviderList,
191+
}
192+
193+
impl Default for FixedCatalogProviderList {
194+
fn default() -> Self {
195+
let inner = MemoryCatalogProviderList::new();
196+
197+
let _ = inner.register_catalog(
198+
"blue".to_owned(),
199+
Arc::new(FixedCatalogProvider::default()),
200+
);
201+
202+
Self { inner }
203+
}
204+
}
205+
206+
impl CatalogProviderList for FixedCatalogProviderList {
207+
fn as_any(&self) -> &dyn Any {
208+
self
209+
}
210+
211+
fn catalog_names(&self) -> Vec<String> {
212+
self.inner.catalog_names()
213+
}
214+
215+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
216+
self.inner.catalog(name)
217+
}
218+
219+
fn register_catalog(
220+
&self,
221+
name: String,
222+
catalog: Arc<dyn CatalogProvider>,
223+
) -> Option<Arc<dyn CatalogProvider>> {
224+
if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
225+
log::warn!("FixedCatalogProviderList only provides four catalogs: blue, red, green, yellow");
226+
return None;
227+
}
228+
229+
self.inner.register_catalog(name, catalog)
230+
}
231+
}
232+
233+
pub(crate) extern "C" fn create_catalog_provider_list() -> FFI_CatalogProviderList {
234+
let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
235+
FFI_CatalogProviderList::new(catalog_provider_list, None)
236+
}

datafusion/ffi/src/tests/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use crate::udaf::FFI_AggregateUDF;
3434
use crate::udwf::FFI_WindowUDF;
3535

3636
use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
37+
use crate::catalog_provider_list::FFI_CatalogProviderList;
38+
use crate::tests::catalog::create_catalog_provider_list;
3739
use arrow::array::RecordBatch;
3840
use async_provider::create_async_table_provider;
3941
use datafusion::{
@@ -62,6 +64,9 @@ pub struct ForeignLibraryModule {
6264
/// Construct an opinionated catalog provider
6365
pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,
6466

67+
/// Construct an opinionated catalog provider list
68+
pub create_catalog_list: extern "C" fn() -> FFI_CatalogProviderList,
69+
6570
/// Constructs the table provider
6671
pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
6772

@@ -123,6 +128,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider {
123128
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
124129
ForeignLibraryModule {
125130
create_catalog: create_catalog_provider,
131+
create_catalog_list: create_catalog_provider_list,
126132
create_table: construct_table_provider,
127133
create_scalar_udf: create_ffi_abs_func,
128134
create_nullary_udf: create_ffi_random_func,

0 commit comments

Comments
 (0)