Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ rust-version = { workspace = true }
[lints]
workspace = true

[dependencies]
arrow = { workspace = true }
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
datafusion-common = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }

[dev-dependencies]
arrow = { workspace = true }
# arrow_schema is required for record_batch! macro :sad:
Expand Down
9 changes: 5 additions & 4 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ cargo run --example dataframe -- dataframe

#### Category: Single Process

| Subcommand | File Path | Description |
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------ |
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |
| Subcommand | File Path | Description |
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------- |
| cache_factory | [`dataframe/cache_factory.rs`](examples/dataframe/cache_factory.rs) | Custom lazy caching for DataFrames using `CacheFactory` |
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |

## Execution Monitoring Examples

Expand Down
25 changes: 25 additions & 0 deletions datafusion-examples/data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Example datasets

| Filename | Path | Description |
| ----------- | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `cars.csv` | [`data/csv/cars.csv`](./csv/cars.csv) | Time-series–like dataset containing car identifiers, speed values, and timestamps. Used in window function and time-based query examples (e.g. ordering, window frames). |
| `regex.csv` | [`data/csv/regex.csv`](./csv/regex.csv) | Dataset for regular expression examples. Contains input values, regex patterns, replacement strings, and optional flags. Covers ASCII, Unicode, and locale-specific text processing. |
26 changes: 26 additions & 0 deletions datafusion-examples/data/csv/cars.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
car,speed,time
red,20.0,1996-04-12T12:05:03.000000000
red,20.3,1996-04-12T12:05:04.000000000
red,21.4,1996-04-12T12:05:05.000000000
red,21.5,1996-04-12T12:05:06.000000000
red,19.0,1996-04-12T12:05:07.000000000
red,18.0,1996-04-12T12:05:08.000000000
red,17.0,1996-04-12T12:05:09.000000000
red,7.0,1996-04-12T12:05:10.000000000
red,7.1,1996-04-12T12:05:11.000000000
red,7.2,1996-04-12T12:05:12.000000000
red,3.0,1996-04-12T12:05:13.000000000
red,1.0,1996-04-12T12:05:14.000000000
red,0.0,1996-04-12T12:05:15.000000000
green,10.0,1996-04-12T12:05:03.000000000
green,10.3,1996-04-12T12:05:04.000000000
green,10.4,1996-04-12T12:05:05.000000000
green,10.5,1996-04-12T12:05:06.000000000
green,11.0,1996-04-12T12:05:07.000000000
green,12.0,1996-04-12T12:05:08.000000000
green,14.0,1996-04-12T12:05:09.000000000
green,15.0,1996-04-12T12:05:10.000000000
green,15.1,1996-04-12T12:05:11.000000000
green,15.2,1996-04-12T12:05:12.000000000
green,8.0,1996-04-12T12:05:13.000000000
green,2.0,1996-04-12T12:05:14.000000000
12 changes: 12 additions & 0 deletions datafusion-examples/data/csv/regex.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
values,patterns,replacement,flags
abc,^(a),bb\1bb,i
ABC,^(A).*,B,i
aBc,(b|d),e,i
AbC,(B|D),e,
aBC,^(b|c),d,
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
Düsseldorf,[\p{Letter}-]+,München,
Москва,[\p{L}-]+,Moscow,
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
اليوم,^\p{Arabic}+$,Today,
39 changes: 12 additions & 27 deletions datafusion-examples/examples/builtin_functions/regexp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand All @@ -18,12 +17,11 @@

//! See `main.rs` for how to run it.

use std::{fs::File, io::Write};
use std::path::PathBuf;

use datafusion::common::{assert_batches_eq, assert_contains};
use datafusion::error::Result;
use datafusion::prelude::*;
use tempfile::tempdir;

/// This example demonstrates how to use the regexp_* functions
///
Expand All @@ -35,30 +33,17 @@ use tempfile::tempdir;
/// https://docs.rs/regex/latest/regex/#grouping-and-flags
pub async fn regexp() -> Result<()> {
let ctx = SessionContext::new();
// content from file 'datafusion/physical-expr/tests/data/regex.csv'
let csv_data = r#"values,patterns,replacement,flags
abc,^(a),bb\1bb,i
ABC,^(A).*,B,i
aBc,(b|d),e,i
AbC,(B|D),e,
aBC,^(b|c),d,
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
Düsseldorf,[\p{Letter}-]+,München,
Москва,[\p{L}-]+,Moscow,
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
اليوم,^\p{Arabic}+$,Today,"#;
let dir = tempdir()?;
let file_path = dir.path().join("regex.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(csv_data.as_bytes())?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();

ctx.register_csv("examples", file_path, CsvReadOptions::new())
.await?;
let csv_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("regex.csv");

ctx.register_csv(
"examples",
csv_path.to_str().unwrap(),
CsvReadOptions::new(),
)
.await?;

//
//
Expand Down
35 changes: 18 additions & 17 deletions datafusion-examples/examples/custom_data_source/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! See `main.rs` for how to run it.

use std::path::PathBuf;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -31,10 +32,10 @@ use datafusion::{
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use datafusion_examples::utils::datasets::cars;
use futures::StreamExt;
use object_store::{ObjectStore, local::LocalFileSystem, memory::InMemory};

Expand All @@ -50,12 +51,12 @@ pub async fn csv_json_opener() -> Result<()> {

async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();
let schema = cars::schema();

let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");

let path = std::path::Path::new(&path).canonicalize()?;
let csv_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("cars.csv");

let options = CsvOptions {
has_header: Some(true),
Expand All @@ -71,9 +72,9 @@ async fn csv_opener() -> Result<()> {

let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_projection_indices(Some(vec![12, 0]))?
.with_projection_indices(Some(vec![0, 1]))?
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.with_file(PartitionedFile::new(csv_path.display().to_string(), 10))
.build();

let opener =
Expand All @@ -89,15 +90,15 @@ async fn csv_opener() -> Result<()> {
}
assert_batches_eq!(
&[
"+--------------------------------+----+",
"| c13 | c1 |",
"+--------------------------------+----+",
"| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |",
"| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |",
"| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |",
"| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |",
"| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |",
"+--------------------------------+----+",
"+-----+-------+",
"| car | speed |",
"+-----+-------+",
"| red | 20.0 |",
"| red | 20.3 |",
"| red | 21.4 |",
"| red | 21.5 |",
"| red | 19.0 |",
"+-----+-------+",
],
&result
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! See `main.rs` for how to run it.

use datafusion::common::test_util::datafusion_test_data;
use std::path::PathBuf;

use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -27,33 +28,36 @@ pub async fn csv_sql_streaming() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion_test_data();
let csv_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("cars.csv");

// Register a table source and tell DataFusion the file is ordered by `ts ASC`.
// Register a table source and tell DataFusion the file is ordered by `car ASC`.
// Note it is the responsibility of the user to make sure
// that file indeed satisfies this condition or else incorrect answers may be produced.
let asc = true;
let nulls_first = true;
let sort_expr = vec![col("ts").sort(asc, nulls_first)];
let sort_expr = vec![col("car").sort(asc, nulls_first)];
// register csv file with the execution context
ctx.register_csv(
"ordered_table",
&format!("{testdata}/window_1.csv"),
csv_path.to_str().unwrap(),
CsvReadOptions::new().file_sort_order(vec![sort_expr]),
)
.await?;

// execute the query
// Following query can be executed with unbounded sources because group by expressions (e.g ts) is
// Following query can be executed with unbounded sources because group by expressions (e.g car) is
// already ordered at the source.
//
// Unbounded sources means that if the input came from a "never ending" source (such as a FIFO
// file on unix) the query could produce results incrementally as data was read.
let df = ctx
.sql(
"SELECT ts, MIN(inc_col), MAX(inc_col) \
"SELECT car, MIN(speed), MAX(speed) \
FROM ordered_table \
GROUP BY ts",
GROUP BY car",
)
.await?;

Expand All @@ -64,7 +68,7 @@ pub async fn csv_sql_streaming() -> Result<()> {
// its result in streaming fashion, because its required ordering is already satisfied at the source.
let df = ctx
.sql(
"SELECT ts, SUM(inc_col) OVER(ORDER BY ts ASC) \
"SELECT car, SUM(speed) OVER(ORDER BY car ASC) \
FROM ordered_table",
)
.await?;
Expand Down
36 changes: 22 additions & 14 deletions datafusion-examples/examples/data_io/parquet_encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,34 @@

//! See `main.rs` for how to run it.

use std::path::PathBuf;
use std::sync::Arc;

use datafusion::common::DataFusionError;
use datafusion::config::{ConfigFileEncryptionProperties, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::logical_expr::{col, lit};
use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use std::sync::Arc;
use datafusion_examples::utils::write_csv_to_parquet;
use tempfile::TempDir;

/// Read and write encrypted Parquet files using DataFusion
pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();

// Find the local path of "alltypes_plain.parquet"
let testdata = datafusion::test_util::parquet_test_data();
let filename = &format!("{testdata}/alltypes_plain.parquet");
// Convert the CSV input into a temporary Parquet directory for querying
let csv_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("cars.csv");
let parquet_temp = write_csv_to_parquet(&ctx, &csv_path).await?;

// Read the sample parquet file
let parquet_df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.read_parquet(parquet_temp.path_str()?, ParquetReadOptions::default())
.await?;

// Show information from the dataframe
Expand All @@ -52,27 +58,28 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
let (encrypt, decrypt) = setup_encryption(&parquet_df)?;

// Create a temporary file location for the encrypted parquet file
let tmp_dir = TempDir::new()?;
let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
let tempfile_str = tempfile.into_os_string().into_string().unwrap();
let tmp_source = TempDir::new()?;
let tempfile = tmp_source.path().join("cars_encrypted");

// Write encrypted parquet
let mut options = TableParquetOptions::default();
options.crypto.file_encryption = Some(ConfigFileEncryptionProperties::from(&encrypt));
parquet_df
.write_parquet(
tempfile_str.as_str(),
tempfile.to_str().unwrap(),
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Read encrypted parquet
// Read encrypted parquet back as a DataFrame using matching decryption config
let ctx: SessionContext = SessionContext::new();
let read_options =
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());

let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?;
let encrypted_parquet_df = ctx
.read_parquet(tempfile.to_str().unwrap(), read_options)
.await?;

// Show information from the dataframe
println!(
Expand All @@ -91,11 +98,12 @@ async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
df.clone().describe().await?.show().await?;

// Select three columns and filter the results
// so that only rows where id > 1 are returned
// so that only rows where speed > 5 are returned
// select car, speed, time from t where speed > 5
println!("\nSelected rows and columns:");
df.clone()
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(5)))?
.select_columns(&["car", "speed", "time"])?
.filter(col("speed").gt(lit(5)))?
.show()
.await?;

Expand Down
Loading