Skip to content

Commit ce330ec

Browse files
authored
refactor: replace OnceLock with LazyLock (round 2) (#13674)
* refactor: replace `OnceLock` with `LazyLock` * Update UDWF macros * Fix comment format
1 parent 2464703 commit ce330ec

File tree

17 files changed

+161
-214
lines changed

17 files changed

+161
-214
lines changed

benchmarks/src/bin/external_aggr.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::collections::HashMap;
2121
use std::path::PathBuf;
2222
use std::sync::Arc;
23-
use std::sync::OnceLock;
23+
use std::sync::LazyLock;
2424
use structopt::StructOpt;
2525

2626
use arrow::record_batch::RecordBatch;
@@ -91,7 +91,13 @@ struct QueryResult {
9191
/// Memory limits to run: 64MiB, 32MiB, 16MiB
9292
/// Q2 requires 250MiB for aggregation
9393
/// Memory limits to run: 512MiB, 256MiB, 128MiB, 64MiB, 32MiB
94-
static QUERY_MEMORY_LIMITS: OnceLock<HashMap<usize, Vec<u64>>> = OnceLock::new();
94+
static QUERY_MEMORY_LIMITS: LazyLock<HashMap<usize, Vec<u64>>> = LazyLock::new(|| {
95+
use units::*;
96+
let mut map = HashMap::new();
97+
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
98+
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
99+
map
100+
});
95101

96102
impl ExternalAggrConfig {
97103
const AGGR_TABLES: [&'static str; 1] = ["lineitem"];
@@ -114,16 +120,6 @@ impl ExternalAggrConfig {
114120
"#,
115121
];
116122

117-
fn init_query_memory_limits() -> &'static HashMap<usize, Vec<u64>> {
118-
use units::*;
119-
QUERY_MEMORY_LIMITS.get_or_init(|| {
120-
let mut map = HashMap::new();
121-
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
122-
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
123-
map
124-
})
125-
}
126-
127123
/// If `--query` and `--memory-limit` is not speicified, run all queries
128124
/// with pre-configured memory limits
129125
/// If only `--query` is specified, run the query with all memory limits
@@ -161,8 +157,7 @@ impl ExternalAggrConfig {
161157
query_executions.push((query_id, limit));
162158
}
163159
None => {
164-
let memory_limits_table = Self::init_query_memory_limits();
165-
let memory_limits = memory_limits_table.get(&query_id).unwrap();
160+
let memory_limits = QUERY_MEMORY_LIMITS.get(&query_id).unwrap();
166161
for limit in memory_limits {
167162
query_executions.push((query_id, *limit));
168163
}

datafusion-cli/src/main.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::HashMap;
1919
use std::env;
2020
use std::path::Path;
2121
use std::process::ExitCode;
22-
use std::sync::{Arc, OnceLock};
22+
use std::sync::{Arc, LazyLock};
2323

2424
use datafusion::error::{DataFusionError, Result};
2525
use datafusion::execution::context::SessionConfig;
@@ -279,9 +279,8 @@ impl ByteUnit {
279279
}
280280

281281
fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
282-
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
283-
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
284-
BYTE_SUFFIXES.get_or_init(|| {
282+
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
283+
LazyLock::new(|| {
285284
let mut m = HashMap::new();
286285
m.insert("b", ByteUnit::Byte);
287286
m.insert("k", ByteUnit::KiB);
@@ -293,23 +292,20 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
293292
m.insert("t", ByteUnit::TiB);
294293
m.insert("tb", ByteUnit::TiB);
295294
m
296-
})
297-
}
295+
});
298296

299-
fn suffix_re() -> &'static regex::Regex {
300-
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
301-
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
302-
}
297+
static SUFFIX_REGEX: LazyLock<regex::Regex> =
298+
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());
303299

304300
let lower = size.to_lowercase();
305-
if let Some(caps) = suffix_re().captures(&lower) {
301+
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
306302
let num_str = caps.get(1).unwrap().as_str();
307303
let num = num_str.parse::<usize>().map_err(|_| {
308304
format!("Invalid numeric value in memory pool size '{}'", size)
309305
})?;
310306

311307
let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
312-
let unit = byte_suffixes()
308+
let unit = &BYTE_SUFFIXES
313309
.get(suffix)
314310
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
315311
let memory_pool_size = usize::try_from(unit.multiplier())

datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ mod test {
345345
use parquet::basic::LogicalType;
346346
use parquet::file::metadata::ColumnChunkMetaData;
347347
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
348-
use std::sync::{Arc, OnceLock};
348+
use std::sync::{Arc, LazyLock};
349349

350350
#[test]
351351
fn test_only_scans() {
@@ -358,7 +358,7 @@ mod test {
358358

359359
let row_group_indexes = access_plan.row_group_indexes();
360360
let row_selection = access_plan
361-
.into_overall_row_selection(row_group_metadata())
361+
.into_overall_row_selection(&ROW_GROUP_METADATA)
362362
.unwrap();
363363

364364
// scan all row groups, no selection
@@ -377,7 +377,7 @@ mod test {
377377

378378
let row_group_indexes = access_plan.row_group_indexes();
379379
let row_selection = access_plan
380-
.into_overall_row_selection(row_group_metadata())
380+
.into_overall_row_selection(&ROW_GROUP_METADATA)
381381
.unwrap();
382382

383383
// skip all row groups, no selection
@@ -403,7 +403,7 @@ mod test {
403403

404404
let row_group_indexes = access_plan.row_group_indexes();
405405
let row_selection = access_plan
406-
.into_overall_row_selection(row_group_metadata())
406+
.into_overall_row_selection(&ROW_GROUP_METADATA)
407407
.unwrap();
408408

409409
assert_eq!(row_group_indexes, vec![0, 1]);
@@ -442,7 +442,7 @@ mod test {
442442

443443
let row_group_indexes = access_plan.row_group_indexes();
444444
let row_selection = access_plan
445-
.into_overall_row_selection(row_group_metadata())
445+
.into_overall_row_selection(&ROW_GROUP_METADATA)
446446
.unwrap();
447447

448448
assert_eq!(row_group_indexes, vec![1, 2, 3]);
@@ -478,7 +478,7 @@ mod test {
478478

479479
let row_group_indexes = access_plan.row_group_indexes();
480480
let err = access_plan
481-
.into_overall_row_selection(row_group_metadata())
481+
.into_overall_row_selection(&ROW_GROUP_METADATA)
482482
.unwrap_err()
483483
.to_string();
484484
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
@@ -504,39 +504,35 @@ mod test {
504504

505505
let row_group_indexes = access_plan.row_group_indexes();
506506
let err = access_plan
507-
.into_overall_row_selection(row_group_metadata())
507+
.into_overall_row_selection(&ROW_GROUP_METADATA)
508508
.unwrap_err()
509509
.to_string();
510510
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
511511
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
512512
}
513513

514-
static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();
515-
516514
/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
517515
/// respectively
518-
fn row_group_metadata() -> &'static [RowGroupMetaData] {
519-
ROW_GROUP_METADATA.get_or_init(|| {
520-
let schema_descr = get_test_schema_descr();
521-
let row_counts = [10, 20, 30, 40];
522-
523-
row_counts
524-
.into_iter()
525-
.map(|num_rows| {
526-
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
527-
.set_num_values(num_rows)
528-
.build()
529-
.unwrap();
530-
531-
RowGroupMetaData::builder(schema_descr.clone())
532-
.set_num_rows(num_rows)
533-
.set_column_metadata(vec![column])
534-
.build()
535-
.unwrap()
536-
})
537-
.collect()
538-
})
539-
}
516+
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
517+
let schema_descr = get_test_schema_descr();
518+
let row_counts = [10, 20, 30, 40];
519+
520+
row_counts
521+
.into_iter()
522+
.map(|num_rows| {
523+
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
524+
.set_num_values(num_rows)
525+
.build()
526+
.unwrap();
527+
528+
RowGroupMetaData::builder(schema_descr.clone())
529+
.set_num_rows(num_rows)
530+
.set_column_metadata(vec![column])
531+
.build()
532+
.unwrap()
533+
})
534+
.collect()
535+
});
540536

541537
/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
542538
fn get_test_schema_descr() -> SchemaDescPtr {

datafusion/core/tests/expr_api/mod.rs

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
2828
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
2929
use sqlparser::ast::NullTreatment;
3030
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
31-
use std::sync::{Arc, OnceLock};
31+
use std::sync::{Arc, LazyLock};
3232

3333
mod parse_sql_expr;
3434
mod simplification;
@@ -305,13 +305,11 @@ async fn test_aggregate_ext_null_treatment() {
305305
/// Evaluates the specified expr as an aggregate and compares the result to the
306306
/// expected result.
307307
async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
308-
let batch = test_batch();
309-
310308
let ctx = SessionContext::new();
311309
let group_expr = vec![];
312310
let agg_expr = vec![expr];
313311
let result = ctx
314-
.read_batch(batch)
312+
.read_batch(TEST_BATCH.clone())
315313
.unwrap()
316314
.aggregate(group_expr, agg_expr)
317315
.unwrap()
@@ -332,13 +330,13 @@ async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
332330
/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
333331
/// `RecordBatch` and compares the result to the expected result.
334332
fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
335-
let batch = test_batch();
333+
let batch = &TEST_BATCH;
336334
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
337335
let physical_expr = SessionContext::new()
338336
.create_physical_expr(expr, &df_schema)
339337
.unwrap();
340338

341-
let result = physical_expr.evaluate(&batch).unwrap();
339+
let result = physical_expr.evaluate(batch).unwrap();
342340
let array = result.into_array(1).unwrap();
343341
let result = pretty_format_columns("expr", &[array]).unwrap().to_string();
344342
let actual_lines = result.lines().collect::<Vec<_>>();
@@ -350,39 +348,33 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
350348
);
351349
}
352350

353-
static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();
354-
355-
fn test_batch() -> RecordBatch {
356-
TEST_BATCH
357-
.get_or_init(|| {
358-
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
359-
let int_array: ArrayRef =
360-
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));
361-
362-
// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
363-
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
364-
Arc::new(Field::new("a", DataType::Utf8, false)),
365-
Arc::new(StringArray::from(vec![
366-
"2021-02-01",
367-
"2021-02-02",
368-
"2021-02-03",
369-
])) as _,
370-
)]));
371-
372-
// ["one"] ["two", "three", "four"] ["five"]
373-
let mut builder = ListBuilder::new(StringBuilder::new());
374-
builder.append_value([Some("one")]);
375-
builder.append_value([Some("two"), Some("three"), Some("four")]);
376-
builder.append_value([Some("five")]);
377-
let list_array: ArrayRef = Arc::new(builder.finish());
378-
379-
RecordBatch::try_from_iter(vec![
380-
("id", string_array),
381-
("i", int_array),
382-
("props", struct_array),
383-
("list", list_array),
384-
])
385-
.unwrap()
386-
})
387-
.clone()
388-
}
351+
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
352+
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
353+
let int_array: ArrayRef =
354+
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));
355+
356+
// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
357+
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
358+
Arc::new(Field::new("a", DataType::Utf8, false)),
359+
Arc::new(StringArray::from(vec![
360+
"2021-02-01",
361+
"2021-02-02",
362+
"2021-02-03",
363+
])) as _,
364+
)]));
365+
366+
// ["one"] ["two", "three", "four"] ["five"]
367+
let mut builder = ListBuilder::new(StringBuilder::new());
368+
builder.append_value([Some("one")]);
369+
builder.append_value([Some("two"), Some("three"), Some("four")]);
370+
builder.append_value([Some("five")]);
371+
let list_array: ArrayRef = Arc::new(builder.finish());
372+
373+
RecordBatch::try_from_iter(vec![
374+
("id", string_array),
375+
("i", int_array),
376+
("props", struct_array),
377+
("list", list_array),
378+
])
379+
.unwrap()
380+
});

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_physical_plan::spill::get_record_batch_memory_size;
3535
use futures::StreamExt;
3636
use std::any::Any;
3737
use std::num::NonZeroUsize;
38-
use std::sync::{Arc, OnceLock};
38+
use std::sync::{Arc, LazyLock};
3939
use tokio::fs::File;
4040

4141
use datafusion::datasource::streaming::StreamingTable;
@@ -730,15 +730,14 @@ fn maybe_split_batches(
730730
.collect()
731731
}
732732

733-
static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();
734-
735733
/// Returns 5 sorted string dictionary batches each with 50 rows with
736734
/// this schema.
737735
///
738736
/// a: Dictionary<Utf8, Int32>,
739737
/// b: Dictionary<Utf8, Int32>,
740738
fn dict_batches() -> Vec<RecordBatch> {
741-
DICT_BATCHES.get_or_init(make_dict_batches).clone()
739+
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(make_dict_batches);
740+
DICT_BATCHES.clone()
742741
}
743742

744743
fn make_dict_batches() -> Vec<RecordBatch> {

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::cmp::Ordering;
2121
use std::collections::{HashMap, HashSet};
2222
use std::fmt::{self, Debug, Display, Formatter};
2323
use std::hash::{Hash, Hasher};
24-
use std::sync::{Arc, OnceLock};
24+
use std::sync::{Arc, LazyLock};
2525

2626
use super::dml::CopyTo;
2727
use super::DdlStatement;
@@ -3067,12 +3067,12 @@ impl Aggregate {
30673067

30683068
/// Get the output expressions.
30693069
fn output_expressions(&self) -> Result<Vec<&Expr>> {
3070-
static INTERNAL_ID_EXPR: OnceLock<Expr> = OnceLock::new();
3070+
static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3071+
Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3072+
});
30713073
let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
30723074
if self.is_grouping_set() {
3073-
exprs.push(INTERNAL_ID_EXPR.get_or_init(|| {
3074-
Expr::Column(Column::from_name(Self::INTERNAL_GROUPING_ID))
3075-
}));
3075+
exprs.push(&INTERNAL_ID_EXPR);
30763076
}
30773077
exprs.extend(self.aggr_expr.iter());
30783078
debug_assert!(exprs.len() == self.schema.fields().len());

0 commit comments

Comments
 (0)