Skip to content

Commit 48d70ac

Browse files
authored
[query-engine] Simply lifetimes on RecordSetEngine ExecutionContext struct (open-telemetry#1476)
## Changes * Removes `'c` lifetime from `ExecutionContext` struct ## Details `ExecutionContext` has `'a` lifetime for tracking the `PipelineExpression` and `'c` lifetime for tracking diagnostics associated with expressions in that pipeline. Really `'a` and `'c` are the same lifetime so what this PR does is just remove `'c` in favor of `'a`. Net gain is just simpler code.
1 parent 2d49406 commit 48d70ac

18 files changed

+101
-131
lines changed

rust/experimental/query_engine/engine-recordset-otlp-bridge/tests/common/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44
use data_engine_expressions::*;
55
use data_engine_recordset::*;
66

7-
pub(crate) fn process_records<'a, 'b, 'c, TRecords, TRecord>(
7+
pub(crate) fn process_records<'a, TRecords, TRecord>(
88
pipeline: &'a PipelineExpression,
9-
engine: &'b RecordSetEngine,
9+
engine: &RecordSetEngine,
1010
records: &mut TRecords,
11-
) -> RecordSetEngineResults<'a, 'c, TRecord>
11+
) -> RecordSetEngineResults<'a, TRecord>
1212
where
13-
'a: 'c,
14-
'b: 'c,
1513
TRecords: RecordSet<TRecord>,
1614
TRecord: Record + 'static,
1715
{

rust/experimental/query_engine/engine-recordset/src/engine.rs

Lines changed: 53 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -78,38 +78,30 @@ impl RecordSetEngine {
7878
}
7979
}
8080

81-
pub fn begin_batch<'a, 'b, 'c, TRecord: Record + 'static>(
81+
pub fn begin_batch<'a, 'b, TRecord: Record + 'static>(
8282
&'b self,
8383
pipeline: &'a PipelineExpression,
84-
) -> Result<RecordSetEngineBatch<'a, 'b, 'c, TRecord>, ExpressionError>
85-
where
86-
'a: 'b,
87-
'b: 'c,
88-
{
84+
) -> Result<RecordSetEngineBatch<'a, 'b, TRecord>, ExpressionError> {
8985
let mut batch = RecordSetEngineBatch::new(pipeline, self);
9086
batch.initialize()?;
9187
Ok(batch)
9288
}
9389
}
9490

95-
pub struct RecordSetEngineBatch<'a, 'b, 'c, TRecord: Record> {
91+
pub struct RecordSetEngineBatch<'a, 'b, TRecord: Record> {
9692
engine: &'b RecordSetEngine,
9793
pipeline: &'a PipelineExpression,
98-
diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
94+
diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
9995
global_variables: RefCell<MapValueStorage<OwnedValue>>,
10096
summaries: Summaries<'a>,
101-
included_records: Vec<RecordSetEngineRecord<'a, 'c, TRecord>>,
97+
included_records: Vec<RecordSetEngineRecord<'a, TRecord>>,
10298
}
10399

104-
impl<'a, 'b, 'c, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, 'c, TRecord>
105-
where
106-
'a: 'b,
107-
'b: 'c,
108-
{
100+
impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
109101
pub(crate) fn new(
110102
pipeline: &'a PipelineExpression,
111103
engine: &'b RecordSetEngine,
112-
) -> RecordSetEngineBatch<'a, 'b, 'c, TRecord> {
104+
) -> RecordSetEngineBatch<'a, 'b, TRecord> {
113105
Self {
114106
engine,
115107
pipeline,
@@ -174,7 +166,7 @@ where
174166
pub fn push_records<TRecords: RecordSet<TRecord>>(
175167
&mut self,
176168
records: &mut TRecords,
177-
) -> Vec<RecordSetEngineRecord<'a, 'c, TRecord>> {
169+
) -> Vec<RecordSetEngineRecord<'a, TRecord>> {
178170
let mut dropped_records = Vec::new();
179171

180172
records.drain(&mut |attached_records, record| match self
@@ -187,7 +179,7 @@ where
187179
dropped_records
188180
}
189181

190-
pub fn flush(self) -> RecordSetEngineResults<'a, 'c, TRecord> {
182+
pub fn flush(self) -> RecordSetEngineResults<'a, TRecord> {
191183
RecordSetEngineResults::new(
192184
self.pipeline,
193185
self.diagnostics,
@@ -206,7 +198,7 @@ where
206198
&self,
207199
attached_records: Option<&'d dyn AttachedRecords>,
208200
record: TRecord,
209-
) -> RecordSetEngineResult<'a, 'c, TRecord> {
201+
) -> RecordSetEngineResult<'a, TRecord> {
210202
let diagnostic_level = record
211203
.get_diagnostic_level()
212204
.unwrap_or(self.engine.diagnostic_level.clone());
@@ -224,10 +216,10 @@ where
224216
}
225217
}
226218

227-
fn process_record<'a, 'c, TRecord: Record + 'static>(
228-
execution_context: ExecutionContext<'a, '_, 'c, TRecord>,
219+
fn process_record<'a, TRecord: Record + 'static>(
220+
execution_context: ExecutionContext<'a, '_, TRecord>,
229221
expressions: &'a [DataExpression],
230-
) -> RecordSetEngineResult<'a, 'c, TRecord> {
222+
) -> RecordSetEngineResult<'a, TRecord> {
231223
for expression in expressions {
232224
match expression {
233225
DataExpression::Discard(d) => {
@@ -302,15 +294,12 @@ fn process_record<'a, 'c, TRecord: Record + 'static>(
302294
RecordSetEngineResult::Include(execution_context.into())
303295
}
304296

305-
fn process_summaries<'a, 'b>(
297+
fn process_summaries<'a>(
306298
diagnostic_level: RecordSetEngineDiagnosticLevel,
307299
global_variables: &RefCell<MapValueStorage<OwnedValue>>,
308300
pipeline: &'a PipelineExpression,
309301
summaries: &Summaries<'a>,
310-
) -> RecordSetEngineSummaryResults<'a, 'b>
311-
where
312-
'a: 'b,
313-
{
302+
) -> RecordSetEngineSummaryResults<'a> {
314303
let mut summaries = summaries.values.take();
315304

316305
let mut included_summaries = Vec::with_capacity(summaries.len());
@@ -394,24 +383,24 @@ pub trait AttachedRecords {
394383
fn get_attached_record(&self, name: &str) -> Option<&(dyn MapValue + 'static)>;
395384
}
396385

397-
pub enum RecordSetEngineResult<'a, 'b, TRecord: Record> {
398-
Drop(RecordSetEngineRecord<'a, 'b, TRecord>),
399-
Include(RecordSetEngineRecord<'a, 'b, TRecord>),
386+
pub enum RecordSetEngineResult<'a, TRecord: Record> {
387+
Drop(RecordSetEngineRecord<'a, TRecord>),
388+
Include(RecordSetEngineRecord<'a, TRecord>),
400389
}
401390

402391
#[derive(Debug)]
403-
pub struct RecordSetEngineRecord<'a, 'b, TRecord: Record> {
392+
pub struct RecordSetEngineRecord<'a, TRecord: Record> {
404393
pipeline: &'a PipelineExpression,
405394
record: TRecord,
406-
diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
395+
diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
407396
}
408397

409-
impl<'a, 'b, TRecord: Record> RecordSetEngineRecord<'a, 'b, TRecord> {
398+
impl<'a, TRecord: Record> RecordSetEngineRecord<'a, TRecord> {
410399
pub(crate) fn new(
411400
pipeline: &'a PipelineExpression,
412401
record: TRecord,
413-
diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
414-
) -> RecordSetEngineRecord<'a, 'b, TRecord> {
402+
diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
403+
) -> RecordSetEngineRecord<'a, TRecord> {
415404
Self {
416405
pipeline,
417406
record,
@@ -423,7 +412,7 @@ impl<'a, 'b, TRecord: Record> RecordSetEngineRecord<'a, 'b, TRecord> {
423412
&self.record
424413
}
425414

426-
pub fn get_diagnostics(&self) -> &[RecordSetEngineDiagnostic<'b>] {
415+
pub fn get_diagnostics(&self) -> &[RecordSetEngineDiagnostic<'a>] {
427416
&self.diagnostics
428417
}
429418

@@ -432,15 +421,15 @@ impl<'a, 'b, TRecord: Record> RecordSetEngineRecord<'a, 'b, TRecord> {
432421
}
433422
}
434423

435-
impl<'a, 'b, 'c, TRecord: Record> From<ExecutionContext<'a, 'b, 'c, TRecord>>
436-
for RecordSetEngineRecord<'a, 'c, TRecord>
424+
impl<'a, 'b, TRecord: Record> From<ExecutionContext<'a, 'b, TRecord>>
425+
for RecordSetEngineRecord<'a, TRecord>
437426
{
438-
fn from(val: ExecutionContext<'a, 'b, 'c, TRecord>) -> Self {
427+
fn from(val: ExecutionContext<'a, 'b, TRecord>) -> Self {
439428
val.consume_into_record()
440429
}
441430
}
442431

443-
impl<TRecord: Record> Display for RecordSetEngineRecord<'_, '_, TRecord> {
432+
impl<TRecord: Record> Display for RecordSetEngineRecord<'_, TRecord> {
444433
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445434
format_diagnostics(self.pipeline.get_query(), &self.diagnostics, f)
446435
}
@@ -536,22 +525,22 @@ fn format_diagnostics(
536525
}
537526

538527
#[derive(Debug)]
539-
pub struct RecordSetEngineResults<'a, 'b, TRecord: Record> {
528+
pub struct RecordSetEngineResults<'a, TRecord: Record> {
540529
pipeline: &'a PipelineExpression,
541-
pub diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
542-
pub summaries: RecordSetEngineSummaryResults<'a, 'b>,
543-
pub included_records: Vec<RecordSetEngineRecord<'a, 'b, TRecord>>,
544-
pub dropped_records: Vec<RecordSetEngineRecord<'a, 'b, TRecord>>,
530+
pub diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
531+
pub summaries: RecordSetEngineSummaryResults<'a>,
532+
pub included_records: Vec<RecordSetEngineRecord<'a, TRecord>>,
533+
pub dropped_records: Vec<RecordSetEngineRecord<'a, TRecord>>,
545534
}
546535

547-
impl<'a, 'b, TRecord: Record> RecordSetEngineResults<'a, 'b, TRecord> {
536+
impl<'a, TRecord: Record> RecordSetEngineResults<'a, TRecord> {
548537
pub(crate) fn new(
549538
pipeline: &'a PipelineExpression,
550-
diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
551-
summaries: RecordSetEngineSummaryResults<'a, 'b>,
552-
included_records: Vec<RecordSetEngineRecord<'a, 'b, TRecord>>,
553-
dropped_records: Vec<RecordSetEngineRecord<'a, 'b, TRecord>>,
554-
) -> RecordSetEngineResults<'a, 'b, TRecord> {
539+
diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
540+
summaries: RecordSetEngineSummaryResults<'a>,
541+
included_records: Vec<RecordSetEngineRecord<'a, TRecord>>,
542+
dropped_records: Vec<RecordSetEngineRecord<'a, TRecord>>,
543+
) -> RecordSetEngineResults<'a, TRecord> {
555544
Self {
556545
pipeline,
557546
diagnostics,
@@ -566,25 +555,25 @@ impl<'a, 'b, TRecord: Record> RecordSetEngineResults<'a, 'b, TRecord> {
566555
}
567556
}
568557

569-
impl<TRecord: Record> Display for RecordSetEngineResults<'_, '_, TRecord> {
558+
impl<TRecord: Record> Display for RecordSetEngineResults<'_, TRecord> {
570559
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571560
format_diagnostics(self.pipeline.get_query(), &self.diagnostics, f)
572561
}
573562
}
574563

575564
#[derive(Debug)]
576-
pub struct RecordSetEngineSummaryResults<'a, 'b> {
577-
pub included_summaries: Vec<RecordSetEngineSummary<'a, 'b>>,
578-
pub dropped_summaries: Vec<RecordSetEngineSummary<'a, 'b>>,
565+
pub struct RecordSetEngineSummaryResults<'a> {
566+
pub included_summaries: Vec<RecordSetEngineSummary<'a>>,
567+
pub dropped_summaries: Vec<RecordSetEngineSummary<'a>>,
579568
// Note: Marker used to not allow manual construction of the struct
580569
marker: PhantomData<usize>,
581570
}
582571

583-
impl<'a, 'b> RecordSetEngineSummaryResults<'a, 'b> {
572+
impl<'a> RecordSetEngineSummaryResults<'a> {
584573
pub(crate) fn new(
585-
included_summaries: Vec<RecordSetEngineSummary<'a, 'b>>,
586-
dropped_summaries: Vec<RecordSetEngineSummary<'a, 'b>>,
587-
) -> RecordSetEngineSummaryResults<'a, 'b> {
574+
included_summaries: Vec<RecordSetEngineSummary<'a>>,
575+
dropped_summaries: Vec<RecordSetEngineSummary<'a>>,
576+
) -> RecordSetEngineSummaryResults<'a> {
588577
Self {
589578
included_summaries,
590579
dropped_summaries,
@@ -594,24 +583,24 @@ impl<'a, 'b> RecordSetEngineSummaryResults<'a, 'b> {
594583
}
595584

596585
#[derive(Debug)]
597-
pub struct RecordSetEngineSummary<'a, 'b> {
586+
pub struct RecordSetEngineSummary<'a> {
598587
pipeline: Option<&'a PipelineExpression>,
599-
pub diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
588+
pub diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
600589
pub summary_id: String,
601590
pub group_by_values: Vec<(Box<str>, OwnedValue)>,
602591
pub aggregation_values: HashMap<Box<str>, SummaryAggregation>,
603592
pub map: Option<MapValueStorage<OwnedValue>>,
604593
}
605594

606-
impl<'a, 'b> RecordSetEngineSummary<'a, 'b> {
595+
impl<'a> RecordSetEngineSummary<'a> {
607596
pub(crate) fn new(
608597
pipeline: Option<&'a PipelineExpression>,
609-
diagnostics: Vec<RecordSetEngineDiagnostic<'b>>,
598+
diagnostics: Vec<RecordSetEngineDiagnostic<'a>>,
610599
summary_id: String,
611600
group_by_values: Vec<(Box<str>, OwnedValue)>,
612601
aggregation_values: HashMap<Box<str>, SummaryAggregation>,
613602
map: Option<MapValueStorage<OwnedValue>>,
614-
) -> RecordSetEngineSummary<'a, 'b> {
603+
) -> RecordSetEngineSummary<'a> {
615604
Self {
616605
pipeline,
617606
diagnostics,
@@ -623,7 +612,7 @@ impl<'a, 'b> RecordSetEngineSummary<'a, 'b> {
623612
}
624613
}
625614

626-
impl Display for RecordSetEngineSummary<'_, '_> {
615+
impl Display for RecordSetEngineSummary<'_> {
627616
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628617
if let Some(pipeline) = self.pipeline {
629618
format_diagnostics(pipeline.get_query(), &self.diagnostics, f)

rust/experimental/query_engine/engine-recordset/src/execution_context.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,28 @@ use data_engine_expressions::{AsStaticValue, Expression, MapValue, PipelineExpre
99
use crate::TestRecord;
1010
use crate::*;
1111

12-
pub(crate) struct ExecutionContext<'a, 'b, 'c, TRecord>
12+
pub(crate) struct ExecutionContext<'a, 'b, TRecord>
1313
where
14-
'a: 'c,
1514
TRecord: MapValue + 'static,
1615
{
1716
diagnostic_level: RecordSetEngineDiagnosticLevel,
18-
diagnostics: RefCell<Vec<RecordSetEngineDiagnostic<'c>>>,
17+
diagnostics: RefCell<Vec<RecordSetEngineDiagnostic<'a>>>,
1918
pipeline: &'a PipelineExpression,
2019
variables: ExecutionContextVariables<'b>,
2120
summaries: &'b Summaries<'a>,
2221
attached_records: Option<&'b dyn AttachedRecords>,
2322
record: Option<RefCell<TRecord>>,
2423
}
2524

26-
impl<'a, 'b, 'c, TRecord: Record + 'static> ExecutionContext<'a, 'b, 'c, TRecord> {
25+
impl<'a, 'b, TRecord: Record + 'static> ExecutionContext<'a, 'b, TRecord> {
2726
pub fn new(
2827
diagnostic_level: RecordSetEngineDiagnosticLevel,
2928
pipeline: &'a PipelineExpression,
3029
global_variables: &'b RefCell<MapValueStorage<OwnedValue>>,
3130
summaries: &'b Summaries<'a>,
3231
attached_records: Option<&'b dyn AttachedRecords>,
3332
record: Option<TRecord>,
34-
) -> ExecutionContext<'a, 'b, 'c, TRecord> {
33+
) -> ExecutionContext<'a, 'b, TRecord> {
3534
Self {
3635
diagnostic_level,
3736
diagnostics: RefCell::new(Vec::new()),
@@ -69,7 +68,7 @@ impl<'a, 'b, 'c, TRecord: Record + 'static> ExecutionContext<'a, 'b, 'c, TRecord
6968
}
7069
}
7170

72-
pub fn add_diagnostic(&self, diagnostic: RecordSetEngineDiagnostic<'c>) {
71+
pub fn add_diagnostic(&self, diagnostic: RecordSetEngineDiagnostic<'a>) {
7372
self.diagnostics.borrow_mut().push(diagnostic);
7473
}
7574

@@ -93,11 +92,11 @@ impl<'a, 'b, 'c, TRecord: Record + 'static> ExecutionContext<'a, 'b, 'c, TRecord
9392
self.summaries
9493
}
9594

96-
pub fn take_diagnostics(self) -> Vec<RecordSetEngineDiagnostic<'c>> {
95+
pub fn take_diagnostics(self) -> Vec<RecordSetEngineDiagnostic<'a>> {
9796
self.diagnostics.take()
9897
}
9998

100-
pub fn consume_into_record(self) -> RecordSetEngineRecord<'a, 'c, TRecord> {
99+
pub fn consume_into_record(self) -> RecordSetEngineRecord<'a, TRecord> {
101100
RecordSetEngineRecord::new(
102101
self.pipeline,
103102
self.record.expect("record wasn't set").into_inner(),
@@ -199,7 +198,7 @@ impl<'a> TestExecutionContext<'a> {
199198
self.global_variables.borrow_mut().set(name, value);
200199
}
201200

202-
pub fn create_execution_context(&mut self) -> ExecutionContext<'_, 'a, '_, TestRecord> {
201+
pub fn create_execution_context(&mut self) -> ExecutionContext<'_, 'a, TestRecord> {
203202
ExecutionContext::new(
204203
RecordSetEngineDiagnosticLevel::Verbose,
205204
&self.pipeline,

rust/experimental/query_engine/engine-recordset/src/logical_expressions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use data_engine_expressions::*;
66
use crate::{execution_context::*, scalars::*, *};
77

88
pub fn execute_logical_expression<'a, TRecord: Record>(
9-
execution_context: &ExecutionContext<'a, '_, '_, TRecord>,
9+
execution_context: &ExecutionContext<'a, '_, TRecord>,
1010
logical_expression: &'a LogicalExpression,
1111
) -> Result<bool, ExpressionError> {
1212
let value = match logical_expression {

rust/experimental/query_engine/engine-recordset/src/scalars/collection_scalar_expressions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use data_engine_expressions::*;
66
use crate::{execution_context::*, scalars::execute_scalar_expression, *};
77

88
pub fn execute_collection_scalar_expression<'a, 'b, 'c, TRecord: Record>(
9-
execution_context: &'b ExecutionContext<'a, '_, '_, TRecord>,
9+
execution_context: &'b ExecutionContext<'a, '_, TRecord>,
1010
collection_scalar_expression: &'a CollectionScalarExpression,
1111
) -> Result<ResolvedValue<'c>, ExpressionError>
1212
where

rust/experimental/query_engine/engine-recordset/src/scalars/convert_scalar_expressions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use data_engine_expressions::*;
66
use crate::{execution_context::*, scalars::execute_scalar_expression, *};
77

88
pub fn execute_convert_scalar_expression<'a, 'b, 'c, TRecord: Record>(
9-
execution_context: &'b ExecutionContext<'a, '_, '_, TRecord>,
9+
execution_context: &'b ExecutionContext<'a, '_, TRecord>,
1010
convert_scalar_expression: &'a ConvertScalarExpression,
1111
) -> Result<ResolvedValue<'c>, ExpressionError>
1212
where

0 commit comments

Comments
 (0)