Skip to content

Commit 142f76d

Browse files
authored
[query-engine] Add select expression (open-telemetry#1363)
Relates to open-telemetry#1362 ## Changes * Adds an expression for selecting data from a scalar result using a path potentially not known until execution time ## Details Working towards KQL `extract_json` expression support.
1 parent 71bc65a commit 142f76d

File tree

14 files changed

+1143
-83
lines changed

14 files changed

+1143
-83
lines changed

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/attached_records.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,16 @@ impl MapValue for Resource {
4949
key == "Attributes"
5050
}
5151

52-
fn get(&self, key: &str) -> Option<&(dyn AsStaticValue + 'static)> {
52+
fn get(&self, key: &str) -> Option<&dyn AsValue> {
53+
self.get_static(key).unwrap().map(|v| v as &dyn AsValue)
54+
}
55+
56+
fn get_static(&self, key: &str) -> Result<Option<&(dyn AsStaticValue + 'static)>, String> {
5357
if key == "Attributes" {
54-
return Some(&self.attributes);
58+
return Ok(Some(&self.attributes));
5559
}
5660

57-
None
61+
Ok(None)
5862
}
5963

6064
fn get_items(&self, item_callback: &mut dyn KeyValueCallback) -> bool {
@@ -81,13 +85,17 @@ impl MapValue for InstrumentationScope {
8185
matches!(key, "Attributes" | "Name" | "Version")
8286
}
8387

84-
fn get(&self, key: &str) -> Option<&(dyn AsStaticValue + 'static)> {
85-
match key {
88+
fn get(&self, key: &str) -> Option<&dyn AsValue> {
89+
self.get_static(key).unwrap().map(|v| v as &dyn AsValue)
90+
}
91+
92+
fn get_static(&self, key: &str) -> Result<Option<&(dyn AsStaticValue + 'static)>, String> {
93+
Ok(match key {
8694
"Attributes" => Some(&self.attributes),
8795
"Name" => self.name.as_ref().map(|v| v as &dyn AsStaticValue),
8896
"Version" => self.version.as_ref().map(|v| v as &dyn AsStaticValue),
8997
_ => None,
90-
}
98+
})
9199
}
92100

93101
fn get_items(&self, item_callback: &mut dyn KeyValueCallback) -> bool {

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/logs.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,12 @@ impl MapValue for LogRecord {
7979
}
8080
}
8181

82-
fn get(&self, key: &str) -> Option<&(dyn AsStaticValue + 'static)> {
83-
match key {
82+
fn get(&self, key: &str) -> Option<&dyn AsValue> {
83+
self.get_static(key).unwrap().map(|v| v as &dyn AsValue)
84+
}
85+
86+
fn get_static(&self, key: &str) -> Result<Option<&(dyn AsStaticValue + 'static)>, String> {
87+
Ok(match key {
8488
"Attributes" => Some(&self.attributes),
8589
"Timestamp" => self.timestamp.as_ref().map(|v| v as &dyn AsStaticValue),
8690
"ObservedTimestamp" => self
@@ -98,7 +102,7 @@ impl MapValue for LogRecord {
98102
"TraceFlags" => self.flags.as_ref().map(|v| v as &dyn AsStaticValue),
99103
"EventName" => self.event_name.as_ref().map(|v| v as &dyn AsStaticValue),
100104
_ => None,
101-
}
105+
})
102106
}
103107

104108
fn get_items(&self, item_callback: &mut dyn KeyValueCallback) -> bool {

rust/experimental/query_engine/engine-recordset/src/execution_context.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,20 @@ impl<'a> ExecutionContextVariables<'a> {
125125
) -> Option<Ref<'_, dyn AsStaticValue + 'static>> {
126126
let vars = self.local_variables.borrow();
127127

128-
let var = Ref::filter_map(vars, |v| v.get(name));
128+
let var = Ref::filter_map(vars, |v| {
129+
v.get_static(name)
130+
.expect("Static access not supported by underlying map")
131+
});
129132

130133
if let Ok(v) = var {
131134
return Some(v);
132135
}
133136

134-
Ref::filter_map(self.global_variables.borrow(), |v| v.get(name)).ok()
137+
Ref::filter_map(self.global_variables.borrow(), |v| {
138+
v.get_static(name)
139+
.expect("Static access not supported by underlying map")
140+
})
141+
.ok()
135142
}
136143

137144
#[cfg(test)]

rust/experimental/query_engine/engine-recordset/src/primitives/owned_value.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,19 @@ impl From<&dyn ArrayValue> for ArrayValueStorage<OwnedValue> {
144144
}
145145
}
146146

147+
impl From<&dyn MapValue> for MapValueStorage<OwnedValue> {
148+
fn from(value: &dyn MapValue) -> Self {
149+
let mut values: HashMap<Box<str>, _> = HashMap::new();
150+
151+
value.get_items(&mut KeyValueClosureCallback::new(|k, v| {
152+
values.insert(k.into(), v.into());
153+
true
154+
}));
155+
156+
MapValueStorage::new(values)
157+
}
158+
}
159+
147160
#[cfg(test)]
148161
mod tests {
149162
use super::*;

rust/experimental/query_engine/engine-recordset/src/primitives/resolved_value.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,48 @@ impl<'a> ResolvedValue<'a> {
194194
ResolvedValue::Sequence(s) => Ok(ResolvedArrayValue::Sequence(s)),
195195
}
196196
}
197+
198+
pub fn try_resolve_map(self) -> Result<ResolvedMapValue<'a>, Self> {
199+
if self.get_value_type() != ValueType::Map {
200+
return Err(self);
201+
}
202+
203+
match self {
204+
ResolvedValue::Value(v) => {
205+
if let Value::Map(s) = v {
206+
Ok(ResolvedMapValue::Value(s))
207+
} else {
208+
panic!()
209+
}
210+
}
211+
ResolvedValue::Borrowed(s, b) => {
212+
match Ref::filter_map(Ref::clone(&b), |v| {
213+
if let StaticValue::Map(s) = v.to_static_value() {
214+
Some(s)
215+
} else {
216+
None
217+
}
218+
}) {
219+
Ok(v) => Ok(ResolvedMapValue::Borrowed(BorrowedMapValue {
220+
source: s,
221+
_orig: b,
222+
value: v,
223+
})),
224+
Err(_) => panic!(),
225+
}
226+
}
227+
ResolvedValue::Computed(o) => {
228+
if let OwnedValue::Map(s) = o {
229+
Ok(ResolvedMapValue::Computed(s))
230+
} else {
231+
panic!()
232+
}
233+
}
234+
ResolvedValue::Slice(_) => panic!(),
235+
ResolvedValue::List(_) => panic!(),
236+
ResolvedValue::Sequence(_) => panic!(),
237+
}
238+
}
197239
}
198240

199241
impl From<ResolvedValue<'_>> for OwnedValue {
@@ -896,6 +938,159 @@ impl Display for ResolvedArrayValue<'_> {
896938
}
897939
}
898940

941+
#[derive(Debug)]
942+
pub enum ResolvedMapValue<'a> {
943+
/// A value resolved from the expression tree or an attached record
944+
Value(&'a dyn MapValue),
945+
946+
/// A value borrowed from the record being modified by the engine
947+
Borrowed(BorrowedMapValue<'a>),
948+
949+
/// A value computed by the engine as the result of a dynamic expression
950+
Computed(MapValueStorage<OwnedValue>),
951+
}
952+
953+
#[derive(Debug)]
954+
pub struct BorrowedMapValue<'a> {
955+
source: BorrowSource,
956+
// Note: orig is not currently used but left in the code to support future
957+
// needing to return a resolved map as a static value. See usage on
958+
// BorrowedArrayValue for example
959+
_orig: Ref<'a, dyn AsStaticValue + 'static>,
960+
value: Ref<'a, dyn MapValue + 'static>,
961+
}
962+
963+
impl<'a> ResolvedMapValue<'a> {
964+
pub fn copy_if_borrowed_from_target(&mut self, target: &MutableValueExpression) -> bool {
965+
let v = match self {
966+
ResolvedMapValue::Borrowed(b) => Some((&b.source, &b.value)),
967+
_ => None,
968+
};
969+
970+
if let Some((s, v)) = v {
971+
let writing_while_holding_borrow = match target {
972+
MutableValueExpression::Source(_) => {
973+
matches!(s, BorrowSource::Source)
974+
}
975+
MutableValueExpression::Variable(_) => {
976+
matches!(s, BorrowSource::Variable)
977+
}
978+
};
979+
980+
if writing_while_holding_borrow {
981+
*self = ResolvedMapValue::Computed((&**v).into());
982+
return true;
983+
}
984+
}
985+
986+
false
987+
}
988+
989+
pub fn take<FConvert, FTake, R>(
990+
self,
991+
keys: &[&str],
992+
convert: FConvert,
993+
mut take: FTake,
994+
) -> Result<(), ExpressionError>
995+
where
996+
FConvert: Fn(&str, ResolvedValue<'a>) -> Result<R, ExpressionError>,
997+
FTake: FnMut(R),
998+
{
999+
match self {
1000+
ResolvedMapValue::Value(m) => {
1001+
for key in keys {
1002+
let v = (convert)(
1003+
key,
1004+
ResolvedValue::Value(m.get(key).expect("Map key was not found").to_value()),
1005+
)?;
1006+
(take)(v);
1007+
}
1008+
Ok(())
1009+
}
1010+
ResolvedMapValue::Borrowed(b) => {
1011+
let m = &b.value;
1012+
for key in keys {
1013+
match Ref::filter_map(Ref::clone(m), |m| {
1014+
m.get_static(key)
1015+
.expect("Borrowed map does not implement get_static")
1016+
}) {
1017+
Ok(v) => {
1018+
let v = (convert)(key, ResolvedValue::Borrowed(b.source.clone(), v))?;
1019+
(take)(v);
1020+
}
1021+
Err(_) => panic!("Map key was not found"),
1022+
}
1023+
}
1024+
Ok(())
1025+
}
1026+
ResolvedMapValue::Computed(mut m) => {
1027+
let values = m.get_values_mut();
1028+
1029+
for key in keys {
1030+
match values.remove(*key) {
1031+
Some(v) => {
1032+
let v = (convert)(key, ResolvedValue::Computed(v))?;
1033+
(take)(v);
1034+
}
1035+
None => panic!("Map key was not found"),
1036+
}
1037+
}
1038+
Ok(())
1039+
}
1040+
}
1041+
}
1042+
1043+
fn get_map(&self) -> &dyn MapValue {
1044+
match self {
1045+
ResolvedMapValue::Value(v) => *v,
1046+
ResolvedMapValue::Borrowed(b) => &*b.value,
1047+
ResolvedMapValue::Computed(c) => c,
1048+
}
1049+
}
1050+
}
1051+
1052+
impl MapValue for ResolvedMapValue<'_> {
1053+
fn is_empty(&self) -> bool {
1054+
self.get_map().is_empty()
1055+
}
1056+
1057+
fn len(&self) -> usize {
1058+
self.get_map().len()
1059+
}
1060+
1061+
fn contains_key(&self, key: &str) -> bool {
1062+
self.get_map().contains_key(key)
1063+
}
1064+
1065+
fn get(&self, key: &str) -> Option<&dyn AsValue> {
1066+
self.get_map().get(key)
1067+
}
1068+
1069+
fn get_static(&self, key: &str) -> Result<Option<&(dyn AsStaticValue + 'static)>, String> {
1070+
self.get_map().get_static(key)
1071+
}
1072+
1073+
fn get_items(&self, item_callback: &mut dyn KeyValueCallback) -> bool {
1074+
self.get_map().get_items(item_callback)
1075+
}
1076+
}
1077+
1078+
impl AsValue for ResolvedMapValue<'_> {
1079+
fn get_value_type(&self) -> ValueType {
1080+
ValueType::Map
1081+
}
1082+
1083+
fn to_value(&self) -> Value<'_> {
1084+
Value::Map(self.get_map())
1085+
}
1086+
}
1087+
1088+
impl Display for ResolvedMapValue<'_> {
1089+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1090+
self.to_value().fmt(f)
1091+
}
1092+
}
1093+
8991094
#[cfg(test)]
9001095
mod tests {
9011096
use super::*;

rust/experimental/query_engine/engine-recordset/src/primitives/value_storage.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,12 @@ impl<T: EnumerableValueSource<T>> MapValue for MapValueStorage<T> {
450450
self.values.contains_key(key)
451451
}
452452

453-
fn get(&self, key: &str) -> Option<&(dyn AsStaticValue + 'static)> {
454-
self.values.get(key).map(|v| v as &dyn AsStaticValue)
453+
fn get(&self, key: &str) -> Option<&(dyn AsValue + 'static)> {
454+
self.values.get(key).map(|v| v as &dyn AsValue)
455+
}
456+
457+
fn get_static(&self, key: &str) -> Result<Option<&(dyn AsStaticValue + 'static)>, String> {
458+
Ok(self.values.get(key).map(|v| v as &dyn AsStaticValue))
455459
}
456460

457461
fn get_items(&self, item_callback: &mut dyn KeyValueCallback) -> bool {

0 commit comments

Comments
 (0)