Skip to content

Commit f3e0fa2

Browse files
authored
Consolidate Examples: memtable.rs and parquet_multiple_files.rs (#13913)
1 parent 0989649 commit f3e0fa2

File tree

3 files changed

+83
-88
lines changed

3 files changed

+83
-88
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,8 @@ cargo run --example dataframe
6464
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
6565
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
6666
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
67-
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
6867
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
6968
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
70-
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
7169
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
7270
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
7371
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
@@ -83,6 +81,7 @@ cargo run --example dataframe
8381
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
8482
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
8583
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
84+
- [`sql_query.rs`](examples/memtable.rs): Query data using SQL (in memory `RecordBatch`es, local Parquet files)q
8685
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
8786
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions
8887

datafusion-examples/examples/memtable.rs

Lines changed: 0 additions & 74 deletions
This file was deleted.

datafusion-examples/examples/parquet_sql_multiple_files.rs renamed to datafusion-examples/examples/sql_query.rs

Lines changed: 82 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,90 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::arrow::array::{UInt64Array, UInt8Array};
19+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20+
use datafusion::arrow::record_batch::RecordBatch;
21+
use datafusion::datasource::file_format::parquet::ParquetFormat;
22+
use datafusion::datasource::listing::ListingOptions;
23+
use datafusion::datasource::MemTable;
24+
use datafusion::error::{DataFusionError, Result};
25+
use datafusion::prelude::SessionContext;
26+
use datafusion_common::exec_datafusion_err;
27+
use object_store::local::LocalFileSystem;
1828
use std::path::Path;
1929
use std::sync::Arc;
30+
use std::time::Duration;
31+
use tokio::time::timeout;
32+
33+
/// Examples of various ways to execute queries using SQL
34+
///
35+
/// [`query_memtable`]: a simple query against a [`MemTable`]
36+
/// [`query_parquet`]: a simple query against a directory with multiple Parquet files
37+
///
38+
#[tokio::main]
39+
async fn main() -> Result<()> {
40+
query_memtable().await?;
41+
query_parquet().await?;
42+
Ok(())
43+
}
2044

21-
use datafusion::datasource::file_format::parquet::ParquetFormat;
22-
use datafusion::datasource::listing::ListingOptions;
23-
use datafusion::prelude::*;
45+
/// Run a simple query against a [`MemTable`]
46+
pub async fn query_memtable() -> Result<()> {
47+
let mem_table = create_memtable()?;
2448

25-
use object_store::local::LocalFileSystem;
49+
// create local execution context
50+
let ctx = SessionContext::new();
2651

27-
/// This example demonstrates executing a simple query against an Arrow data source (a directory
28-
/// with multiple Parquet files) and fetching results. The query is run twice, once showing
29-
/// how to used `register_listing_table` with an absolute path, and once registering an
30-
/// ObjectStore to use a relative path.
31-
#[tokio::main]
32-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
52+
// Register the in-memory table containing the data
53+
ctx.register_table("users", Arc::new(mem_table))?;
54+
55+
let dataframe = ctx.sql("SELECT * FROM users;").await?;
56+
57+
timeout(Duration::from_secs(10), async move {
58+
let result = dataframe.collect().await.unwrap();
59+
let record_batch = result.first().unwrap();
60+
61+
assert_eq!(1, record_batch.column(0).len());
62+
dbg!(record_batch.columns());
63+
})
64+
.await
65+
.unwrap();
66+
67+
Ok(())
68+
}
69+
70+
fn create_memtable() -> Result<MemTable> {
71+
MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]])
72+
}
73+
74+
fn create_record_batch() -> Result<RecordBatch> {
75+
let id_array = UInt8Array::from(vec![1]);
76+
let account_array = UInt64Array::from(vec![9000]);
77+
78+
Ok(RecordBatch::try_new(
79+
get_schema(),
80+
vec![Arc::new(id_array), Arc::new(account_array)],
81+
)
82+
.unwrap())
83+
}
84+
85+
fn get_schema() -> SchemaRef {
86+
SchemaRef::new(Schema::new(vec![
87+
Field::new("id", DataType::UInt8, false),
88+
Field::new("bank_account", DataType::UInt64, true),
89+
]))
90+
}
91+
92+
/// The simplest way to query parquet files is to use the
93+
/// [`SessionContext::read_parquet`] API
94+
///
95+
/// For more control, you can use the lower level [`ListingOptions`] and
96+
/// [`ListingTable`] APIS
97+
///
98+
/// This example shows how to use relative and absolute paths.
99+
///
100+
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
101+
async fn query_parquet() -> Result<()> {
33102
// create local execution context
34103
let ctx = SessionContext::new();
35104

@@ -73,13 +142,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
73142
let test_data_path = Path::new(&test_data);
74143
let test_data_path_parent = test_data_path
75144
.parent()
76-
.ok_or("test_data path needs a parent")?;
145+
.ok_or(exec_datafusion_err!("test_data path needs a parent"))?;
77146

78147
std::env::set_current_dir(test_data_path_parent)?;
79148

80149
let local_fs = Arc::new(LocalFileSystem::default());
81150

82-
let u = url::Url::parse("file://./")?;
151+
let u = url::Url::parse("file://./")
152+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
83153
ctx.register_object_store(&u, local_fs);
84154

85155
// Register a listing table - this will use all files in the directory as data sources

0 commit comments

Comments
 (0)