Skip to content

Commit 142f597

Browse files
cj-zhukovSergey Zhukovalamb
authored
Store example data directly inside the datafusion-examples (#19141) (#19319)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes ##19141. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Sergey Zhukov <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3a0ca4e commit 142f597

File tree

31 files changed

+1176
-419
lines changed

31 files changed

+1176
-419
lines changed

datafusion-examples/Cargo.toml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,21 @@ rust-version = { workspace = true }
3535
[lints]
3636
workspace = true
3737

38-
[dev-dependencies]
38+
[dependencies]
3939
arrow = { workspace = true }
40-
# arrow_schema is required for record_batch! macro :sad:
41-
arrow-flight = { workspace = true }
4240
arrow-schema = { workspace = true }
41+
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
42+
datafusion-common = { workspace = true }
43+
tempfile = { workspace = true }
44+
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
45+
46+
[dev-dependencies]
47+
arrow-flight = { workspace = true }
4348
async-trait = { workspace = true }
4449
bytes = { workspace = true }
4550
dashmap = { workspace = true }
4651
# note only use main datafusion crate for examples
4752
base64 = "0.22.1"
48-
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
49-
datafusion-common = { workspace = true }
5053
datafusion-expr = { workspace = true }
5154
datafusion-physical-expr-adapter = { workspace = true }
5255
datafusion-proto = { workspace = true }
@@ -62,9 +65,7 @@ rand = { workspace = true }
6265
serde_json = { workspace = true }
6366
strum = { workspace = true }
6467
strum_macros = { workspace = true }
65-
tempfile = { workspace = true }
6668
test-utils = { path = "../test-utils" }
67-
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
6869
tonic = "0.14"
6970
tracing = { version = "0.1" }
7071
tracing-subscriber = { version = "0.3" }

datafusion-examples/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,11 @@ cargo run --example dataframe -- dataframe
106106

107107
#### Category: Single Process
108108

109-
| Subcommand | File Path | Description |
110-
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------ |
111-
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
112-
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |
109+
| Subcommand | File Path | Description |
110+
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------- |
111+
| cache_factory | [`dataframe/cache_factory.rs`](examples/dataframe/cache_factory.rs) | Custom lazy caching for DataFrames using `CacheFactory` |
112+
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
113+
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |
113114

114115
## Execution Monitoring Examples
115116

datafusion-examples/data/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
## Example datasets
21+
22+
| Filename | Path | Description |
23+
| ----------- | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
24+
| `cars.csv` | [`data/csv/cars.csv`](./csv/cars.csv) | Time-series–like dataset containing car identifiers, speed values, and timestamps. Used in window function and time-based query examples (e.g. ordering, window frames). |
25+
| `regex.csv` | [`data/csv/regex.csv`](./csv/regex.csv) | Dataset for regular expression examples. Contains input values, regex patterns, replacement strings, and optional flags. Covers ASCII, Unicode, and locale-specific text processing. |
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
car,speed,time
2+
red,20.0,1996-04-12T12:05:03.000000000
3+
red,20.3,1996-04-12T12:05:04.000000000
4+
red,21.4,1996-04-12T12:05:05.000000000
5+
red,21.5,1996-04-12T12:05:06.000000000
6+
red,19.0,1996-04-12T12:05:07.000000000
7+
red,18.0,1996-04-12T12:05:08.000000000
8+
red,17.0,1996-04-12T12:05:09.000000000
9+
red,7.0,1996-04-12T12:05:10.000000000
10+
red,7.1,1996-04-12T12:05:11.000000000
11+
red,7.2,1996-04-12T12:05:12.000000000
12+
red,3.0,1996-04-12T12:05:13.000000000
13+
red,1.0,1996-04-12T12:05:14.000000000
14+
red,0.0,1996-04-12T12:05:15.000000000
15+
green,10.0,1996-04-12T12:05:03.000000000
16+
green,10.3,1996-04-12T12:05:04.000000000
17+
green,10.4,1996-04-12T12:05:05.000000000
18+
green,10.5,1996-04-12T12:05:06.000000000
19+
green,11.0,1996-04-12T12:05:07.000000000
20+
green,12.0,1996-04-12T12:05:08.000000000
21+
green,14.0,1996-04-12T12:05:09.000000000
22+
green,15.0,1996-04-12T12:05:10.000000000
23+
green,15.1,1996-04-12T12:05:11.000000000
24+
green,15.2,1996-04-12T12:05:12.000000000
25+
green,8.0,1996-04-12T12:05:13.000000000
26+
green,2.0,1996-04-12T12:05:14.000000000
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
values,patterns,replacement,flags
2+
abc,^(a),bb\1bb,i
3+
ABC,^(A).*,B,i
4+
aBc,(b|d),e,i
5+
AbC,(B|D),e,
6+
aBC,^(b|c),d,
7+
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
8+
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
9+
Düsseldorf,[\p{Letter}-]+,München,
10+
Москва,[\p{L}-]+,Moscow,
11+
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
12+
اليوم,^\p{Arabic}+$,Today,

datafusion-examples/examples/builtin_functions/regexp.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
// Licensed to the Apache Software Foundation (ASF) under one
2-
// Licensed to the Apache Software Foundation (ASF) under one
32
// or more contributor license agreements. See the NOTICE file
43
// distributed with this work for additional information
54
// regarding copyright ownership. The ASF licenses this file
@@ -18,12 +17,10 @@
1817

1918
//! See `main.rs` for how to run it.
2019
21-
use std::{fs::File, io::Write};
22-
2320
use datafusion::common::{assert_batches_eq, assert_contains};
2421
use datafusion::error::Result;
2522
use datafusion::prelude::*;
26-
use tempfile::tempdir;
23+
use datafusion_examples::utils::datasets::ExampleDataset;
2724

2825
/// This example demonstrates how to use the regexp_* functions
2926
///
@@ -35,29 +32,9 @@ use tempfile::tempdir;
3532
/// https://docs.rs/regex/latest/regex/#grouping-and-flags
3633
pub async fn regexp() -> Result<()> {
3734
let ctx = SessionContext::new();
38-
// content from file 'datafusion/physical-expr/tests/data/regex.csv'
39-
let csv_data = r#"values,patterns,replacement,flags
40-
abc,^(a),bb\1bb,i
41-
ABC,^(A).*,B,i
42-
aBc,(b|d),e,i
43-
AbC,(B|D),e,
44-
aBC,^(b|c),d,
45-
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
46-
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
47-
Düsseldorf,[\p{Letter}-]+,München,
48-
Москва,[\p{L}-]+,Moscow,
49-
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
50-
اليوم,^\p{Arabic}+$,Today,"#;
51-
let dir = tempdir()?;
52-
let file_path = dir.path().join("regex.csv");
53-
{
54-
let mut file = File::create(&file_path)?;
55-
// write CSV data
56-
file.write_all(csv_data.as_bytes())?;
57-
} // scope closes the file
58-
let file_path = file_path.to_str().unwrap();
59-
60-
ctx.register_csv("examples", file_path, CsvReadOptions::new())
35+
let dataset = ExampleDataset::Regex;
36+
37+
ctx.register_csv("examples", dataset.path_str()?, CsvReadOptions::new())
6138
.await?;
6239

6340
//

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ use datafusion::{
3131
},
3232
error::Result,
3333
physical_plan::metrics::ExecutionPlanMetricsSet,
34-
test_util::aggr_test_schema,
3534
};
3635

3736
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
37+
use datafusion_examples::utils::datasets::ExampleDataset;
3838
use futures::StreamExt;
3939
use object_store::{ObjectStore, local::LocalFileSystem, memory::InMemory};
4040

@@ -50,12 +50,10 @@ pub async fn csv_json_opener() -> Result<()> {
5050

5151
async fn csv_opener() -> Result<()> {
5252
let object_store = Arc::new(LocalFileSystem::new());
53-
let schema = aggr_test_schema();
5453

55-
let testdata = datafusion::test_util::arrow_test_data();
56-
let path = format!("{testdata}/csv/aggregate_test_100.csv");
57-
58-
let path = std::path::Path::new(&path).canonicalize()?;
54+
let dataset = ExampleDataset::Cars;
55+
let csv_path = dataset.path();
56+
let schema = dataset.schema();
5957

6058
let options = CsvOptions {
6159
has_header: Some(true),
@@ -71,9 +69,9 @@ async fn csv_opener() -> Result<()> {
7169

7270
let scan_config =
7371
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
74-
.with_projection_indices(Some(vec![12, 0]))?
72+
.with_projection_indices(Some(vec![0, 1]))?
7573
.with_limit(Some(5))
76-
.with_file(PartitionedFile::new(path.display().to_string(), 10))
74+
.with_file(PartitionedFile::new(csv_path.display().to_string(), 10))
7775
.build();
7876

7977
let opener =
@@ -89,15 +87,15 @@ async fn csv_opener() -> Result<()> {
8987
}
9088
assert_batches_eq!(
9189
&[
92-
"+--------------------------------+----+",
93-
"| c13 | c1 |",
94-
"+--------------------------------+----+",
95-
"| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |",
96-
"| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |",
97-
"| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |",
98-
"| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |",
99-
"| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |",
100-
"+--------------------------------+----+",
90+
"+-----+-------+",
91+
"| car | speed |",
92+
"+-----+-------+",
93+
"| red | 20.0 |",
94+
"| red | 20.3 |",
95+
"| red | 21.4 |",
96+
"| red | 21.5 |",
97+
"| red | 19.0 |",
98+
"+-----+-------+",
10199
],
102100
&result
103101
);

datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,44 @@
1717

1818
//! See `main.rs` for how to run it.
1919
20-
use datafusion::common::test_util::datafusion_test_data;
2120
use datafusion::error::Result;
2221
use datafusion::prelude::*;
22+
use datafusion_examples::utils::datasets::ExampleDataset;
2323

2424
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
2525
/// fetching results with streaming aggregation and streaming window
2626
pub async fn csv_sql_streaming() -> Result<()> {
2727
// create local execution context
2828
let ctx = SessionContext::new();
2929

30-
let testdata = datafusion_test_data();
30+
let dataset = ExampleDataset::Cars;
31+
let csv_path = dataset.path();
3132

32-
// Register a table source and tell DataFusion the file is ordered by `ts ASC`.
33+
// Register a table source and tell DataFusion the file is ordered by `car ASC`.
3334
// Note it is the responsibility of the user to make sure
3435
// that file indeed satisfies this condition or else incorrect answers may be produced.
3536
let asc = true;
3637
let nulls_first = true;
37-
let sort_expr = vec![col("ts").sort(asc, nulls_first)];
38+
let sort_expr = vec![col("car").sort(asc, nulls_first)];
3839
// register csv file with the execution context
3940
ctx.register_csv(
4041
"ordered_table",
41-
&format!("{testdata}/window_1.csv"),
42+
csv_path.to_str().unwrap(),
4243
CsvReadOptions::new().file_sort_order(vec![sort_expr]),
4344
)
4445
.await?;
4546

4647
// execute the query
47-
// Following query can be executed with unbounded sources because group by expressions (e.g ts) is
48+
// Following query can be executed with unbounded sources because group by expressions (e.g car) is
4849
// already ordered at the source.
4950
//
5051
// Unbounded sources means that if the input came from a "never ending" source (such as a FIFO
5152
// file on unix) the query could produce results incrementally as data was read.
5253
let df = ctx
5354
.sql(
54-
"SELECT ts, MIN(inc_col), MAX(inc_col) \
55+
"SELECT car, MIN(speed), MAX(speed) \
5556
FROM ordered_table \
56-
GROUP BY ts",
57+
GROUP BY car",
5758
)
5859
.await?;
5960

@@ -64,7 +65,7 @@ pub async fn csv_sql_streaming() -> Result<()> {
6465
// its result in streaming fashion, because its required ordering is already satisfied at the source.
6566
let df = ctx
6667
.sql(
67-
"SELECT ts, SUM(inc_col) OVER(ORDER BY ts ASC) \
68+
"SELECT car, SUM(speed) OVER(ORDER BY car ASC) \
6869
FROM ordered_table",
6970
)
7071
.await?;

datafusion-examples/examples/data_io/parquet_encrypted.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,30 @@
1717

1818
//! See `main.rs` for how to run it.
1919
20+
use std::sync::Arc;
21+
2022
use datafusion::common::DataFusionError;
2123
use datafusion::config::{ConfigFileEncryptionProperties, TableParquetOptions};
2224
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
2325
use datafusion::logical_expr::{col, lit};
2426
use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
2527
use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
2628
use datafusion::prelude::{ParquetReadOptions, SessionContext};
27-
use std::sync::Arc;
29+
use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet};
2830
use tempfile::TempDir;
2931

3032
/// Read and write encrypted Parquet files using DataFusion
3133
pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
3234
// The SessionContext is the main high level API for interacting with DataFusion
3335
let ctx = SessionContext::new();
3436

35-
// Find the local path of "alltypes_plain.parquet"
36-
let testdata = datafusion::test_util::parquet_test_data();
37-
let filename = &format!("{testdata}/alltypes_plain.parquet");
37+
// Convert the CSV input into a temporary Parquet directory for querying
38+
let dataset = ExampleDataset::Cars;
39+
let parquet_temp = write_csv_to_parquet(&ctx, &dataset.path()).await?;
3840

3941
// Read the sample parquet file
4042
let parquet_df = ctx
41-
.read_parquet(filename, ParquetReadOptions::default())
43+
.read_parquet(parquet_temp.path_str()?, ParquetReadOptions::default())
4244
.await?;
4345

4446
// Show information from the dataframe
@@ -52,27 +54,28 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
5254
let (encrypt, decrypt) = setup_encryption(&parquet_df)?;
5355

5456
// Create a temporary file location for the encrypted parquet file
55-
let tmp_dir = TempDir::new()?;
56-
let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
57-
let tempfile_str = tempfile.into_os_string().into_string().unwrap();
57+
let tmp_source = TempDir::new()?;
58+
let tempfile = tmp_source.path().join("cars_encrypted");
5859

5960
// Write encrypted parquet
6061
let mut options = TableParquetOptions::default();
6162
options.crypto.file_encryption = Some(ConfigFileEncryptionProperties::from(&encrypt));
6263
parquet_df
6364
.write_parquet(
64-
tempfile_str.as_str(),
65+
tempfile.to_str().unwrap(),
6566
DataFrameWriteOptions::new().with_single_file_output(true),
6667
Some(options),
6768
)
6869
.await?;
6970

70-
// Read encrypted parquet
71+
// Read encrypted parquet back as a DataFrame using matching decryption config
7172
let ctx: SessionContext = SessionContext::new();
7273
let read_options =
7374
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());
7475

75-
let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?;
76+
let encrypted_parquet_df = ctx
77+
.read_parquet(tempfile.to_str().unwrap(), read_options)
78+
.await?;
7679

7780
// Show information from the dataframe
7881
println!(
@@ -91,11 +94,12 @@ async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
9194
df.clone().describe().await?.show().await?;
9295

9396
// Select three columns and filter the results
94-
// so that only rows where id > 1 are returned
97+
// so that only rows where speed > 5 are returned
98+
// select car, speed, time from t where speed > 5
9599
println!("\nSelected rows and columns:");
96100
df.clone()
97-
.select_columns(&["id", "bool_col", "timestamp_col"])?
98-
.filter(col("id").gt(lit(5)))?
101+
.select_columns(&["car", "speed", "time"])?
102+
.filter(col("speed").gt(lit(5)))?
99103
.show()
100104
.await?;
101105

0 commit comments

Comments
 (0)