|
18 | 18 | //! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory |
19 | 19 | //! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. |
20 | 20 |
|
21 | | -use crate::catalog::{ |
22 | | - CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider, |
23 | | -}; |
| 21 | +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; |
24 | 22 | use async_trait::async_trait; |
25 | 23 | use dashmap::DashMap; |
26 | 24 | use datafusion_common::{exec_err, DataFusionError}; |
@@ -200,156 +198,3 @@ impl SchemaProvider for MemorySchemaProvider { |
200 | 198 | self.tables.contains_key(name) |
201 | 199 | } |
202 | 200 | } |
203 | | - |
204 | | -#[cfg(test)] |
205 | | -mod test { |
206 | | - use super::*; |
207 | | - use crate::catalog::CatalogProvider; |
208 | | - use crate::catalog_common::memory::MemorySchemaProvider; |
209 | | - use crate::datasource::empty::EmptyTable; |
210 | | - use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; |
211 | | - use crate::prelude::SessionContext; |
212 | | - use arrow_schema::Schema; |
213 | | - use datafusion_common::assert_batches_eq; |
214 | | - use std::any::Any; |
215 | | - use std::sync::Arc; |
216 | | - |
217 | | - #[test] |
218 | | - fn memory_catalog_dereg_nonempty_schema() { |
219 | | - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>; |
220 | | - |
221 | | - let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>; |
222 | | - let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) |
223 | | - as Arc<dyn TableProvider>; |
224 | | - schema.register_table("t".into(), test_table).unwrap(); |
225 | | - |
226 | | - cat.register_schema("foo", schema.clone()).unwrap(); |
227 | | - |
228 | | - assert!( |
229 | | - cat.deregister_schema("foo", false).is_err(), |
230 | | - "dropping empty schema without cascade should error" |
231 | | - ); |
232 | | - assert!(cat.deregister_schema("foo", true).unwrap().is_some()); |
233 | | - } |
234 | | - |
235 | | - #[test] |
236 | | - fn memory_catalog_dereg_empty_schema() { |
237 | | - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>; |
238 | | - |
239 | | - let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>; |
240 | | - cat.register_schema("foo", schema).unwrap(); |
241 | | - |
242 | | - assert!(cat.deregister_schema("foo", false).unwrap().is_some()); |
243 | | - } |
244 | | - |
245 | | - #[test] |
246 | | - fn memory_catalog_dereg_missing() { |
247 | | - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>; |
248 | | - assert!(cat.deregister_schema("foo", false).unwrap().is_none()); |
249 | | - } |
250 | | - |
251 | | - #[test] |
252 | | - fn default_register_schema_not_supported() { |
253 | | - // mimic a new CatalogProvider and ensure it does not support registering schemas |
254 | | - #[derive(Debug)] |
255 | | - struct TestProvider {} |
256 | | - impl CatalogProvider for TestProvider { |
257 | | - fn as_any(&self) -> &dyn Any { |
258 | | - self |
259 | | - } |
260 | | - |
261 | | - fn schema_names(&self) -> Vec<String> { |
262 | | - unimplemented!() |
263 | | - } |
264 | | - |
265 | | - fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> { |
266 | | - unimplemented!() |
267 | | - } |
268 | | - } |
269 | | - |
270 | | - let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>; |
271 | | - let catalog = Arc::new(TestProvider {}); |
272 | | - |
273 | | - match catalog.register_schema("foo", schema) { |
274 | | - Ok(_) => panic!("unexpected OK"), |
275 | | - Err(e) => assert_eq!(e.strip_backtrace(), "This feature is not implemented: Registering new schemas is not supported"), |
276 | | - }; |
277 | | - } |
278 | | - |
279 | | - #[tokio::test] |
280 | | - async fn test_mem_provider() { |
281 | | - let provider = MemorySchemaProvider::new(); |
282 | | - let table_name = "test_table_exist"; |
283 | | - assert!(!provider.table_exist(table_name)); |
284 | | - assert!(provider.deregister_table(table_name).unwrap().is_none()); |
285 | | - let test_table = EmptyTable::new(Arc::new(Schema::empty())); |
286 | | - // register table successfully |
287 | | - assert!(provider |
288 | | - .register_table(table_name.to_string(), Arc::new(test_table)) |
289 | | - .unwrap() |
290 | | - .is_none()); |
291 | | - assert!(provider.table_exist(table_name)); |
292 | | - let other_table = EmptyTable::new(Arc::new(Schema::empty())); |
293 | | - let result = |
294 | | - provider.register_table(table_name.to_string(), Arc::new(other_table)); |
295 | | - assert!(result.is_err()); |
296 | | - } |
297 | | - |
298 | | - #[tokio::test] |
299 | | - async fn test_schema_register_listing_table() { |
300 | | - let testdata = crate::test_util::parquet_test_data(); |
301 | | - let testdir = if testdata.starts_with('/') { |
302 | | - format!("file://{testdata}") |
303 | | - } else { |
304 | | - format!("file:///{testdata}") |
305 | | - }; |
306 | | - let filename = if testdir.ends_with('/') { |
307 | | - format!("{}{}", testdir, "alltypes_plain.parquet") |
308 | | - } else { |
309 | | - format!("{}/{}", testdir, "alltypes_plain.parquet") |
310 | | - }; |
311 | | - |
312 | | - let table_path = ListingTableUrl::parse(filename).unwrap(); |
313 | | - |
314 | | - let catalog = MemoryCatalogProvider::new(); |
315 | | - let schema = MemorySchemaProvider::new(); |
316 | | - |
317 | | - let ctx = SessionContext::new(); |
318 | | - |
319 | | - let config = ListingTableConfig::new(table_path) |
320 | | - .infer(&ctx.state()) |
321 | | - .await |
322 | | - .unwrap(); |
323 | | - let table = ListingTable::try_new(config).unwrap(); |
324 | | - |
325 | | - schema |
326 | | - .register_table("alltypes_plain".to_string(), Arc::new(table)) |
327 | | - .unwrap(); |
328 | | - |
329 | | - catalog.register_schema("active", Arc::new(schema)).unwrap(); |
330 | | - ctx.register_catalog("cat", Arc::new(catalog)); |
331 | | - |
332 | | - let df = ctx |
333 | | - .sql("SELECT id, bool_col FROM cat.active.alltypes_plain") |
334 | | - .await |
335 | | - .unwrap(); |
336 | | - |
337 | | - let actual = df.collect().await.unwrap(); |
338 | | - |
339 | | - let expected = [ |
340 | | - "+----+----------+", |
341 | | - "| id | bool_col |", |
342 | | - "+----+----------+", |
343 | | - "| 4 | true |", |
344 | | - "| 5 | false |", |
345 | | - "| 6 | true |", |
346 | | - "| 7 | false |", |
347 | | - "| 2 | true |", |
348 | | - "| 3 | false |", |
349 | | - "| 0 | true |", |
350 | | - "| 1 | false |", |
351 | | - "+----+----------+", |
352 | | - ]; |
353 | | - assert_batches_eq!(expected, &actual); |
354 | | - } |
355 | | -} |
0 commit comments