Skip to content

Commit 98cb948

Browse files
alambjonahgao
andauthored
Improve deserialize_to_struct example (#13958)
* Cleanup deserialize_to_struct example * prettier * Apply suggestions from code review Co-authored-by: Jonah Gao <[email protected]> --------- Co-authored-by: Jonah Gao <[email protected]>
1 parent ab1de2c commit 98cb948

File tree

2 files changed

+117
-43
lines changed

2 files changed

+117
-43
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ cargo run --example dataframe
5858
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
5959
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
6060
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
61-
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
61+
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
6262
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
6363
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
6464
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients

datafusion-examples/examples/deserialize_to_struct.rs

Lines changed: 116 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,62 +15,136 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::AsArray;
18+
use arrow::array::{AsArray, PrimitiveArray};
1919
use arrow::datatypes::{Float64Type, Int32Type};
2020
use datafusion::error::Result;
2121
use datafusion::prelude::*;
22+
use datafusion_common::assert_batches_eq;
2223
use futures::StreamExt;
2324

24-
/// This example shows that it is possible to convert query results into Rust structs .
25+
/// This example shows how to convert query results into Rust structs by using
26+
/// the Arrow APIs to convert the results into Rust native types.
27+
///
28+
/// This is a bit tricky initially as the results are returned as columns stored
29+
/// as [ArrayRef]
30+
///
31+
/// [ArrayRef]: arrow::array::ArrayRef
2532
#[tokio::main]
2633
async fn main() -> Result<()> {
27-
let data_list = Data::new().await?;
28-
println!("{data_list:#?}");
29-
Ok(())
30-
}
34+
// Run a query that returns two columns of data
35+
let ctx = SessionContext::new();
36+
let testdata = datafusion::test_util::parquet_test_data();
37+
ctx.register_parquet(
38+
"alltypes_plain",
39+
&format!("{testdata}/alltypes_plain.parquet"),
40+
ParquetReadOptions::default(),
41+
)
42+
.await?;
43+
let df = ctx
44+
.sql("SELECT int_col, double_col FROM alltypes_plain")
45+
.await?;
3146

32-
#[derive(Debug)]
33-
struct Data {
34-
#[allow(dead_code)]
35-
int_col: i32,
36-
#[allow(dead_code)]
37-
double_col: f64,
38-
}
47+
// print out the results showing we have an int32 and a float64 column
48+
let results = df.clone().collect().await?;
49+
assert_batches_eq!(
50+
[
51+
"+---------+------------+",
52+
"| int_col | double_col |",
53+
"+---------+------------+",
54+
"| 0 | 0.0 |",
55+
"| 1 | 10.1 |",
56+
"| 0 | 0.0 |",
57+
"| 1 | 10.1 |",
58+
"| 0 | 0.0 |",
59+
"| 1 | 10.1 |",
60+
"| 0 | 0.0 |",
61+
"| 1 | 10.1 |",
62+
"+---------+------------+",
63+
],
64+
&results
65+
);
3966

40-
impl Data {
41-
pub async fn new() -> Result<Vec<Self>> {
42-
// this group is almost the same as the one you find it in parquet_sql.rs
43-
let ctx = SessionContext::new();
67+
// We will now convert the query results into a Rust struct
68+
let mut stream = df.execute_stream().await?;
69+
let mut list = vec![];
4470

45-
let testdata = datafusion::test_util::parquet_test_data();
71+
// DataFusion produces data in chunks called `RecordBatch`es which are
72+
// typically 8000 rows each. This loop processes each `RecordBatch` as it is
73+
// produced by the query plan and adds it to the list
74+
while let Some(b) = stream.next().await.transpose()? {
75+
// Each `RecordBatch` has one or more columns. Each column is stored as
76+
// an `ArrayRef`. To interact with data using Rust native types we need to
77+
// convert these `ArrayRef`s into concrete array types using APIs from
78+
// the arrow crate.
4679

47-
ctx.register_parquet(
48-
"alltypes_plain",
49-
&format!("{testdata}/alltypes_plain.parquet"),
50-
ParquetReadOptions::default(),
51-
)
52-
.await?;
80+
// In this case, we know that each batch has two columns of the Arrow
81+
// types Int32 and Float64, so first we cast the two columns to the
82+
// appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).:
83+
let int_col: &PrimitiveArray<Int32Type> = b.column(0).as_primitive();
84+
let float_col: &PrimitiveArray<Float64Type> = b.column(1).as_primitive();
5385

54-
let df = ctx
55-
.sql("SELECT int_col, double_col FROM alltypes_plain")
56-
.await?;
86+
// With PrimitiveArrays, we can access to the values as native Rust
87+
// types i32 and f64, and forming the desired `Data` structs
88+
for (i, f) in int_col.values().iter().zip(float_col.values()) {
89+
list.push(Data {
90+
int_col: *i,
91+
double_col: *f,
92+
})
93+
}
94+
}
5795

58-
df.clone().show().await?;
96+
// Finally, we have the results in the list of Rust structs
97+
let res = format!("{list:#?}");
98+
assert_eq!(
99+
res,
100+
r#"[
101+
Data {
102+
int_col: 0,
103+
double_col: 0.0,
104+
},
105+
Data {
106+
int_col: 1,
107+
double_col: 10.1,
108+
},
109+
Data {
110+
int_col: 0,
111+
double_col: 0.0,
112+
},
113+
Data {
114+
int_col: 1,
115+
double_col: 10.1,
116+
},
117+
Data {
118+
int_col: 0,
119+
double_col: 0.0,
120+
},
121+
Data {
122+
int_col: 1,
123+
double_col: 10.1,
124+
},
125+
Data {
126+
int_col: 0,
127+
double_col: 0.0,
128+
},
129+
Data {
130+
int_col: 1,
131+
double_col: 10.1,
132+
},
133+
]"#
134+
);
59135

60-
let mut stream = df.execute_stream().await?;
61-
let mut list = vec![];
62-
while let Some(b) = stream.next().await.transpose()? {
63-
let int_col = b.column(0).as_primitive::<Int32Type>();
64-
let float_col = b.column(1).as_primitive::<Float64Type>();
136+
// Use the fields in the struct to avoid clippy complaints
137+
let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col);
138+
let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col);
139+
assert_eq!(int_sum, 4);
140+
assert_eq!(double_sum, 40.4);
65141

66-
for (i, f) in int_col.values().iter().zip(float_col.values()) {
67-
list.push(Data {
68-
int_col: *i,
69-
double_col: *f,
70-
})
71-
}
72-
}
142+
Ok(())
143+
}
73144

74-
Ok(list)
75-
}
145+
/// This is target struct where we want the query results.
146+
#[derive(Debug)]
147+
struct Data {
148+
int_col: i32,
149+
double_col: f64,
76150
}

0 commit comments

Comments
 (0)