Skip to content

Commit f1fc6bd

Browse files
authored
[query-engine] Support binding functions to external implementations in RecordSet engine (open-telemetry#1629)
Relates to open-telemetry#1479 ## Changes * Support binding of functions to external implementations in RecordSet engine
1 parent 225df98 commit f1fc6bd

File tree

11 files changed

+383
-213
lines changed

11 files changed

+383
-213
lines changed

rust/experimental/query_engine/engine-recordset/src/engine.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use crate::{
2222
pub struct RecordSetEngineOptions {
2323
pub(crate) diagnostic_level: RecordSetEngineDiagnosticLevel,
2424
pub(crate) summary_cardinality_limit: usize,
25+
pub(crate) external_function_implementations:
26+
HashMap<Box<str>, Box<dyn RecordSetEngineFunctionCallback>>,
2527
}
2628

2729
impl Default for RecordSetEngineOptions {
@@ -35,6 +37,7 @@ impl RecordSetEngineOptions {
3537
Self {
3638
diagnostic_level: RecordSetEngineDiagnosticLevel::Warn,
3739
summary_cardinality_limit: 8192,
40+
external_function_implementations: HashMap::new(),
3841
}
3942
}
4043

@@ -53,11 +56,22 @@ impl RecordSetEngineOptions {
5356
self.summary_cardinality_limit = summary_cardinality_limit;
5457
self
5558
}
59+
60+
pub fn with_external_function_implementation<F: RecordSetEngineFunctionCallback + 'static>(
61+
mut self,
62+
name: &str,
63+
callback: F,
64+
) -> RecordSetEngineOptions {
65+
self.external_function_implementations
66+
.insert(name.into(), Box::new(callback));
67+
self
68+
}
5669
}
5770

5871
pub struct RecordSetEngine {
5972
diagnostic_level: RecordSetEngineDiagnosticLevel,
6073
summary_cardinality_limit: usize,
74+
external_function_implementations: HashMap<Box<str>, Box<dyn RecordSetEngineFunctionCallback>>,
6175
}
6276

6377
impl Default for RecordSetEngine {
@@ -75,6 +89,7 @@ impl RecordSetEngine {
7589
Self {
7690
diagnostic_level: options.diagnostic_level,
7791
summary_cardinality_limit: options.summary_cardinality_limit,
92+
external_function_implementations: options.external_function_implementations,
7893
}
7994
}
8095

@@ -121,6 +136,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
121136

122137
let execution_context = ExecutionContext::<TRecord>::new(
123138
self.engine.diagnostic_level.clone(),
139+
&self.engine.external_function_implementations,
124140
self.pipeline,
125141
&self.global_variables,
126142
&self.summaries,
@@ -186,6 +202,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
186202
self.diagnostics,
187203
process_summaries(
188204
self.engine.diagnostic_level.clone(),
205+
&self.engine.external_function_implementations,
189206
&self.global_variables,
190207
self.pipeline,
191208
&self.summaries,
@@ -206,6 +223,7 @@ impl<'a, 'b, TRecord: Record + 'static> RecordSetEngineBatch<'a, 'b, TRecord> {
206223

207224
let execution_context = ExecutionContext::new(
208225
diagnostic_level,
226+
&self.engine.external_function_implementations,
209227
self.pipeline,
210228
&self.global_variables,
211229
&self.summaries,
@@ -298,6 +316,7 @@ fn process_record<'a, TRecord: Record + 'static>(
298316

299317
fn process_summaries<'a>(
300318
diagnostic_level: RecordSetEngineDiagnosticLevel,
319+
external_function_implementations: &HashMap<Box<str>, Box<dyn RecordSetEngineFunctionCallback>>,
301320
global_variables: &RefCell<MapValueStorage<OwnedValue>>,
302321
pipeline: &'a PipelineExpression,
303322
summaries: &Summaries<'a>,
@@ -317,6 +336,7 @@ fn process_summaries<'a>(
317336

318337
let execution_context = ExecutionContext::new(
319338
diagnostic_level.clone(),
339+
external_function_implementations,
320340
pipeline,
321341
global_variables,
322342
&summaries,
@@ -350,6 +370,7 @@ fn process_summaries<'a>(
350370

351371
let results = process_summaries(
352372
diagnostic_level.clone(),
373+
external_function_implementations,
353374
global_variables,
354375
pipeline,
355376
&summaries,

0 commit comments

Comments
 (0)