Skip to content

Commit 7a5e578

Browse files
authored
[query-engine] Add data record and batch definitions to recordset engine (open-telemetry#585)
## Changes * Defines data record and data record batch in the recordset query engine
1 parent acf0438 commit 7a5e578

File tree

5 files changed

+594
-0
lines changed

5 files changed

+594
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use core::fmt::Debug;
2+
use std::{any::Any, time::SystemTime};
3+
4+
use crate::{primitives::any_value::AnyValue, value_path::ValuePath};
5+
6+
use super::data_record_resolver::*;
7+
8+
pub trait DataRecord: Any + Debug {
9+
fn get_timestamp(&self) -> Option<SystemTime>;
10+
11+
fn get_observed_timestamp(&self) -> Option<SystemTime>;
12+
13+
fn get_any_value_resolver_for_path(path: &ValuePath) -> DataRecordAnyValueResolver<Self>
14+
where
15+
Self: Sized;
16+
17+
fn get_summary_id(&self) -> Option<&str>;
18+
19+
fn set_summary_id(&mut self, summary_id: &str);
20+
21+
fn clear(&mut self);
22+
}
23+
24+
pub trait AttachedDataRecords: Debug {
25+
fn get_attached_data_record(&self, name: &str) -> Option<&dyn DataRecord>;
26+
}
27+
28+
pub fn create_string_value_resolver<T: DataRecord, R, M, S>(
29+
path: &ValuePath,
30+
read_action: &'static R,
31+
read_mut_action: &'static M,
32+
set_action: &'static S,
33+
) -> DataRecordAnyValueResolver<T>
34+
where
35+
R: Fn(&T) -> Option<&AnyValue>,
36+
M: Fn(&mut T) -> Option<&mut AnyValue>,
37+
S: Fn(&mut T, Option<AnyValue>) -> Option<AnyValue>,
38+
{
39+
if !path.is_value_selector() {
40+
return DataRecordAnyValueResolver::new_no_op();
41+
}
42+
43+
return DataRecordAnyValueResolver::new(
44+
path.clone(),
45+
|_, data_record| {
46+
let root = read_action(data_record);
47+
match root {
48+
Some(v) => DataRecordReadAnyValueResult::Found(v),
49+
None => DataRecordReadAnyValueResult::NotFound,
50+
}
51+
},
52+
|_, data_record| {
53+
let root = read_mut_action(data_record);
54+
match root {
55+
Some(v) => DataRecordReadMutAnyValueResult::Found(v),
56+
None => DataRecordReadMutAnyValueResult::NotFound,
57+
}
58+
},
59+
move |_, data_record, v| {
60+
if let AnyValue::StringValue(_) = &v {
61+
let old_value = set_action(data_record, Some(v));
62+
63+
if old_value.is_none() {
64+
return DataRecordSetAnyValueResult::Created;
65+
}
66+
67+
return DataRecordSetAnyValueResult::Updated(old_value.unwrap());
68+
} else if let AnyValue::NullValue = &v {
69+
let old_value = set_action(data_record, None);
70+
71+
if old_value.is_none() {
72+
return DataRecordSetAnyValueResult::NotFound;
73+
}
74+
75+
return DataRecordSetAnyValueResult::Updated(old_value.unwrap());
76+
}
77+
78+
return DataRecordSetAnyValueResult::NotSupported("Value was not a String");
79+
},
80+
move |_, data_record| {
81+
let old_value = set_action(data_record, None);
82+
83+
if old_value.is_none() {
84+
return DataRecordRemoveAnyValueResult::NotFound;
85+
}
86+
87+
return DataRecordRemoveAnyValueResult::Removed(old_value.unwrap());
88+
},
89+
);
90+
}
91+
92+
pub fn create_map_value_resolver<T: DataRecord, R, M, S>(
93+
path: &ValuePath,
94+
read_action: &'static R,
95+
read_mut_action: &'static M,
96+
set_action: &'static S,
97+
) -> DataRecordAnyValueResolver<T>
98+
where
99+
R: Fn(&T) -> Option<&AnyValue>,
100+
M: Fn(&mut T) -> Option<&mut AnyValue>,
101+
S: Fn(&mut T, Option<AnyValue>) -> Option<AnyValue>,
102+
{
103+
return DataRecordAnyValueResolver::new(
104+
path.clone(),
105+
|path, data_record| {
106+
let root = read_action(data_record);
107+
match root {
108+
Some(v) => path.read(v),
109+
None => DataRecordReadAnyValueResult::NotFound,
110+
}
111+
},
112+
|path, data_record| {
113+
let root = read_mut_action(data_record);
114+
match root {
115+
Some(v) => path.read_mut(v),
116+
None => DataRecordReadMutAnyValueResult::NotFound,
117+
}
118+
},
119+
move |path, data_record, v| {
120+
if path.is_value_selector() {
121+
if let AnyValue::MapValue(_) = &v {
122+
let old_value = set_action(data_record, Some(v));
123+
124+
if old_value.is_none() {
125+
return DataRecordSetAnyValueResult::Created;
126+
}
127+
128+
return DataRecordSetAnyValueResult::Updated(old_value.unwrap());
129+
} else if let AnyValue::NullValue = &v {
130+
let old_value = set_action(data_record, None);
131+
132+
if old_value.is_none() {
133+
return DataRecordSetAnyValueResult::NotFound;
134+
}
135+
136+
return DataRecordSetAnyValueResult::Updated(old_value.unwrap());
137+
}
138+
139+
return DataRecordSetAnyValueResult::NotSupported("Value was not a Map");
140+
} else {
141+
let root = read_mut_action(data_record);
142+
match root {
143+
Some(r) => path.set(r, v),
144+
None => DataRecordSetAnyValueResult::NotFound,
145+
}
146+
}
147+
},
148+
move |path, data_record| {
149+
if path.is_value_selector() {
150+
let old_value = set_action(data_record, None);
151+
152+
if old_value.is_none() {
153+
return DataRecordRemoveAnyValueResult::NotFound;
154+
}
155+
156+
return DataRecordRemoveAnyValueResult::Removed(old_value.unwrap());
157+
} else {
158+
let root = read_mut_action(data_record);
159+
match root {
160+
Some(r) => path.remove(r),
161+
None => DataRecordRemoveAnyValueResult::NotFound,
162+
}
163+
}
164+
},
165+
);
166+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use core::fmt::Debug;
2+
3+
use crate::{data_engine::DataEngineState, error::Error};
4+
5+
pub trait DataRecordBatch<T>: Debug {
6+
fn drain<S: DataEngineState, F>(&mut self, state: &mut S, action: F) -> Result<(), Error>
7+
where
8+
F: Fn(&mut S, T) -> Result<(), Error>;
9+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
use std::{any::Any, cell::RefCell, mem::replace};
2+
3+
use crate::{Error, ValuePath, execution_context::ExecutionContext, primitives::AnyValue};
4+
5+
use super::DataRecord;
6+
7+
pub(crate) trait DynamicDataRecordAnyValueResolver: Any {
8+
fn read_value(
9+
&self,
10+
expression_id: usize,
11+
execution_context: &dyn ExecutionContext,
12+
path: &ValuePath,
13+
data_record: &dyn DataRecord,
14+
action: &mut dyn DataRecordAnyValueReadCallback,
15+
) -> Result<(), Error>;
16+
}
17+
18+
pub struct DataRecordAnyValueResolver<T: DataRecord> {
19+
path: ValuePath,
20+
read_value_fn:
21+
Box<dyn for<'a, 'b> Fn(&'a ValuePath, &'b T) -> DataRecordReadAnyValueResult<'b>>,
22+
read_value_mut_fn:
23+
Box<dyn for<'a, 'b> Fn(&'a ValuePath, &'b mut T) -> DataRecordReadMutAnyValueResult<'b>>,
24+
set_value_fn: Box<dyn Fn(&ValuePath, &mut T, AnyValue) -> DataRecordSetAnyValueResult>,
25+
remove_value_fn: Box<dyn Fn(&ValuePath, &mut T) -> DataRecordRemoveAnyValueResult>,
26+
}
27+
28+
impl<T: DataRecord> DataRecordAnyValueResolver<T> {
29+
pub fn new(
30+
path: ValuePath,
31+
read_value: impl for<'a, 'b> Fn(&'a ValuePath, &'b T) -> DataRecordReadAnyValueResult<'b>
32+
+ 'static,
33+
read_value_mut: impl for<'a, 'b> Fn(
34+
&'a ValuePath,
35+
&'b mut T,
36+
) -> DataRecordReadMutAnyValueResult<'b>
37+
+ 'static,
38+
set_value: impl Fn(&ValuePath, &mut T, AnyValue) -> DataRecordSetAnyValueResult + 'static,
39+
remove_value: impl Fn(&ValuePath, &mut T) -> DataRecordRemoveAnyValueResult + 'static,
40+
) -> DataRecordAnyValueResolver<T> {
41+
Self {
42+
path,
43+
read_value_fn: Box::new(read_value),
44+
read_value_mut_fn: Box::new(read_value_mut),
45+
set_value_fn: Box::new(set_value),
46+
remove_value_fn: Box::new(remove_value),
47+
}
48+
}
49+
50+
pub fn new_no_op() -> DataRecordAnyValueResolver<T> {
51+
DataRecordAnyValueResolver::new(
52+
ValuePath::new("").unwrap(),
53+
|_, _| DataRecordReadAnyValueResult::NotFound,
54+
|_, _| DataRecordReadMutAnyValueResult::NotFound,
55+
|_, _, _| DataRecordSetAnyValueResult::NotFound,
56+
|_, _| DataRecordRemoveAnyValueResult::NotFound,
57+
)
58+
}
59+
60+
pub(crate) fn read_value<F>(&self, data_record: &RefCell<T>, action: F)
61+
where
62+
F: FnOnce(DataRecordReadAnyValueResult),
63+
{
64+
let borrow = data_record.borrow();
65+
66+
let result = (self.read_value_fn)(&self.path, &borrow);
67+
68+
action(result);
69+
}
70+
71+
pub(crate) fn read_value_direct<F>(&self, data_record: &T, action: F)
72+
where
73+
F: FnOnce(DataRecordReadAnyValueResult),
74+
{
75+
let result = (self.read_value_fn)(&self.path, data_record);
76+
77+
action(result);
78+
}
79+
80+
pub(crate) fn read_value_mut<F>(&self, data_record: &RefCell<T>, action: F)
81+
where
82+
F: FnOnce(DataRecordReadMutAnyValueResult),
83+
{
84+
let mut borrow = data_record.borrow_mut();
85+
86+
let result = (self.read_value_mut_fn)(&self.path, &mut borrow);
87+
88+
action(result);
89+
}
90+
91+
pub(crate) fn set_value(
92+
&self,
93+
data_record: &RefCell<T>,
94+
value: AnyValue,
95+
) -> DataRecordSetAnyValueResult {
96+
let mut borrow = data_record.borrow_mut();
97+
98+
return (self.set_value_fn)(&self.path, &mut borrow, value);
99+
}
100+
101+
pub(crate) fn remove_value(&self, data_record: &RefCell<T>) -> DataRecordRemoveAnyValueResult {
102+
let mut borrow = data_record.borrow_mut();
103+
104+
return (self.remove_value_fn)(&self.path, &mut borrow);
105+
}
106+
}
107+
108+
#[derive(Debug)]
109+
pub enum DataRecordReadAnyValueResult<'a> {
110+
NotFound,
111+
Found(&'a AnyValue),
112+
}
113+
114+
#[derive(Debug)]
115+
pub enum DataRecordReadMutAnyValueResult<'a> {
116+
NotFound,
117+
NotSupported(&'static str),
118+
Found(&'a mut AnyValue),
119+
}
120+
121+
#[derive(Debug)]
122+
pub enum DataRecordSetAnyValueResult {
123+
NotFound,
124+
NotSupported(&'static str),
125+
Created,
126+
Updated(AnyValue),
127+
}
128+
129+
#[derive(Debug)]
130+
pub enum DataRecordRemoveAnyValueResult {
131+
NotFound,
132+
NotSupported(&'static str),
133+
Removed(AnyValue),
134+
}
135+
136+
pub(crate) trait DataRecordAnyValueReadCallback {
137+
fn invoke_once(&mut self, result: DataRecordReadAnyValueResult);
138+
}
139+
140+
pub(crate) struct DataRecordAnyValueReadClosureCallback<F>
141+
where
142+
F: FnOnce(DataRecordReadAnyValueResult),
143+
{
144+
callback: Option<F>,
145+
}
146+
147+
impl<F> DataRecordAnyValueReadClosureCallback<F>
148+
where
149+
F: FnOnce(DataRecordReadAnyValueResult),
150+
{
151+
pub fn new(callback: F) -> DataRecordAnyValueReadClosureCallback<F> {
152+
Self {
153+
callback: Some(callback),
154+
}
155+
}
156+
}
157+
158+
impl<F> DataRecordAnyValueReadCallback for DataRecordAnyValueReadClosureCallback<F>
159+
where
160+
F: FnOnce(DataRecordReadAnyValueResult),
161+
{
162+
fn invoke_once(&mut self, result: DataRecordReadAnyValueResult) {
163+
let callback = replace(&mut self.callback, None);
164+
if !callback.is_none() {
165+
(callback.unwrap())(result);
166+
}
167+
}
168+
}
169+
170+
pub(crate) trait DataRecordAnyValueReadMutCallback {
171+
fn invoke_once(&mut self, result: DataRecordReadMutAnyValueResult);
172+
}
173+
174+
pub(crate) struct DataRecordAnyValueReadMutClosureCallback<F>
175+
where
176+
F: FnOnce(DataRecordReadMutAnyValueResult),
177+
{
178+
callback: Option<F>,
179+
}
180+
181+
impl<F> DataRecordAnyValueReadMutClosureCallback<F>
182+
where
183+
F: FnOnce(DataRecordReadMutAnyValueResult),
184+
{
185+
pub fn new(callback: F) -> DataRecordAnyValueReadMutClosureCallback<F> {
186+
Self {
187+
callback: Some(callback),
188+
}
189+
}
190+
}
191+
192+
impl<F> DataRecordAnyValueReadMutCallback for DataRecordAnyValueReadMutClosureCallback<F>
193+
where
194+
F: FnOnce(DataRecordReadMutAnyValueResult),
195+
{
196+
fn invoke_once(&mut self, result: DataRecordReadMutAnyValueResult) {
197+
let callback = replace(&mut self.callback, None);
198+
if !callback.is_none() {
199+
(callback.unwrap())(result);
200+
}
201+
}
202+
}

0 commit comments

Comments
 (0)