1717
1818use chrono:: { DateTime , Utc } ;
1919use datafusion:: common:: utils:: get_available_parallelism;
20- use datafusion:: error:: DataFusionError ;
2120use datafusion:: { error:: Result , DATAFUSION_VERSION } ;
2221use serde:: { Deserialize , Deserializer , Serialize , Serializer } ;
23- use serde_json:: Value ;
24- use std:: error:: Error ;
2522use std:: {
26- collections:: HashMap ,
2723 path:: Path ,
2824 time:: { Duration , SystemTime } ,
2925} ;
@@ -69,6 +65,10 @@ pub struct RunContext {
6965 pub datafusion_version : String ,
7066 /// Number of CPU cores
7167 pub num_cpus : usize ,
68+ /// Number of workers involved in a distributed query
69+ pub workers : usize ,
70+ /// Number of physical threads used per worker
71+ pub threads : usize ,
7272 /// Start time
7373 #[ serde(
7474 serialize_with = "serialize_start_time" ,
@@ -79,18 +79,14 @@ pub struct RunContext {
7979 pub arguments : Vec < String > ,
8080}
8181
82- impl Default for RunContext {
83- fn default ( ) -> Self {
84- Self :: new ( )
85- }
86- }
87-
8882impl RunContext {
89- pub fn new ( ) -> Self {
83+ pub fn new ( workers : usize , threads : usize ) -> Self {
9084 Self {
9185 benchmark_version : env ! ( "CARGO_PKG_VERSION" ) . to_owned ( ) ,
9286 datafusion_version : DATAFUSION_VERSION . to_owned ( ) ,
9387 num_cpus : get_available_parallelism ( ) ,
88+ workers,
89+ threads,
9490 start_time : SystemTime :: now ( ) ,
9591 arguments : std:: env:: args ( ) . skip ( 1 ) . collect :: < Vec < String > > ( ) ,
9692 }
@@ -125,23 +121,18 @@ pub struct QueryResult {
125121 pub row_count : usize ,
126122}
127123/// collects benchmark run data and then serializes it at the end
124+ #[ derive( Debug , Serialize , Deserialize ) ]
128125pub struct BenchmarkRun {
129126 context : RunContext ,
130127 queries : Vec < BenchQuery > ,
131128 current_case : Option < usize > ,
132129}
133130
134- impl Default for BenchmarkRun {
135- fn default ( ) -> Self {
136- Self :: new ( )
137- }
138- }
139-
140131impl BenchmarkRun {
141132 // create new
142- pub fn new ( ) -> Self {
133+ pub fn new ( workers : usize , threads : usize ) -> Self {
143134 Self {
144- context : RunContext :: new ( ) ,
135+ context : RunContext :: new ( workers , threads ) ,
145136 queries : vec ! [ ] ,
146137 current_case : None ,
147138 }
@@ -195,10 +186,7 @@ impl BenchmarkRun {
195186
196187 /// Stringify data into formatted json
197188 pub fn to_json ( & self ) -> String {
198- let mut output = HashMap :: < & str , Value > :: new ( ) ;
199- output. insert ( "context" , serde_json:: to_value ( & self . context ) . unwrap ( ) ) ;
200- output. insert ( "queries" , serde_json:: to_value ( & self . queries ) . unwrap ( ) ) ;
201- serde_json:: to_string_pretty ( & output) . unwrap ( )
189+ serde_json:: to_string_pretty ( & self ) . unwrap ( )
202190 }
203191
204192 /// Write data as json into output path if it exists.
@@ -217,15 +205,14 @@ impl BenchmarkRun {
217205 return Ok ( ( ) ) ;
218206 } ;
219207
220- let mut prev_output: HashMap < & str , Value > =
221- serde_json:: from_slice ( & prev) . map_err ( external) ?;
222-
223- let prev_queries: Vec < BenchQuery > =
224- serde_json:: from_value ( prev_output. remove ( "queries" ) . unwrap ( ) ) . map_err ( external) ?;
208+ let Ok ( prev_output) = serde_json:: from_slice :: < Self > ( & prev) else {
209+ return Ok ( ( ) ) ;
210+ } ;
225211
226212 let mut header_printed = false ;
227213 for query in self . queries . iter ( ) {
228- let Some ( prev_query) = prev_queries. iter ( ) . find ( |v| v. query == query. query ) else {
214+ let Some ( prev_query) = prev_output. queries . iter ( ) . find ( |v| v. query == query. query )
215+ else {
229216 continue ;
230217 } ;
231218 if prev_query. iterations . is_empty ( ) {
@@ -248,10 +235,24 @@ impl BenchmarkRun {
248235 if !header_printed {
249236 header_printed = true ;
250237 let datetime: DateTime < Utc > = prev_query. start_time . into ( ) ;
251- println ! (
238+ let header = format ! (
252239 "==== Comparison with the previous benchmark from {} ====" ,
253240 datetime. format( "%Y-%m-%d %H:%M:%S UTC" )
254241 ) ;
242+ println ! ( "{header}" ) ;
243+ // Print machine information
244+ println ! ( "os: {}" , std:: env:: consts:: OS ) ;
245+ println ! ( "arch: {}" , std:: env:: consts:: ARCH ) ;
246+ println ! ( "cpu cores: {}" , get_available_parallelism( ) ) ;
247+ println ! (
248+ "threads: {} -> {}" ,
249+ prev_output. context. threads, self . context. threads
250+ ) ;
251+ println ! (
252+ "workers: {} -> {}" ,
253+ prev_output. context. workers, self . context. workers
254+ ) ;
255+ println ! ( "{}" , "=" . repeat( header. len( ) ) )
255256 }
256257 println ! (
257258 "{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}" ,
@@ -272,7 +273,3 @@ impl BenchQuery {
272273 / self . iterations . len ( ) as u128
273274 }
274275}
275-
276- fn external ( err : impl Error + Send + Sync + ' static ) -> DataFusionError {
277- DataFusionError :: External ( Box :: new ( err) )
278- }
0 commit comments