Skip to content

Commit e80f241

Browse files
fix: otel logs flattening (#851)
Ingested logs missed few columns due to incorrect flattening. This PR fixes the multi-level hierarchical structure of the OTEL log flattening issue.
1 parent 16bf9b3 commit e80f241

File tree

1 file changed

+164
-141
lines changed

1 file changed

+164
-141
lines changed

server/src/handlers/http/otel.rs

Lines changed: 164 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818

1919
use bytes::Bytes;
20+
use proto::common::v1::KeyValue;
21+
use proto::logs::v1::LogRecord;
2022
use serde_json::Value;
2123
mod proto;
2224
use crate::handlers::http::otel::proto::logs::v1::LogRecordFlags;
@@ -127,28 +129,136 @@ fn value_to_string(value: serde_json::Value) -> String {
127129
}
128130
}
129131

132+
pub fn flatten_attributes(
133+
attributes: &Vec<KeyValue>,
134+
attribute_source_key: String,
135+
) -> BTreeMap<String, Value> {
136+
let mut attributes_json: BTreeMap<String, Value> = BTreeMap::new();
137+
for attribute in attributes {
138+
let key = &attribute.key;
139+
let value = &attribute.value;
140+
let value_json =
141+
collect_json_from_values(value, &format!("{}_{}", attribute_source_key, key));
142+
for key in value_json.keys() {
143+
attributes_json.insert(key.to_owned(), value_json[key].to_owned());
144+
}
145+
}
146+
attributes_json
147+
}
148+
149+
pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
150+
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
151+
if log_record.time_unix_nano.is_some() {
152+
log_record_json.insert(
153+
"time_unix_nano".to_string(),
154+
Value::String(log_record.time_unix_nano.as_ref().unwrap().to_string()),
155+
);
156+
}
157+
if log_record.observed_time_unix_nano.is_some() {
158+
log_record_json.insert(
159+
"observed_time_unix_nano".to_string(),
160+
Value::String(
161+
log_record
162+
.observed_time_unix_nano
163+
.as_ref()
164+
.unwrap()
165+
.to_string(),
166+
),
167+
);
168+
}
169+
if log_record.severity_number.is_some() {
170+
let severity_number: i32 = log_record.severity_number.unwrap();
171+
log_record_json.insert(
172+
"severity_number".to_string(),
173+
Value::Number(serde_json::Number::from(severity_number)),
174+
);
175+
if log_record.severity_text.is_none() {
176+
log_record_json.insert(
177+
"severity_text".to_string(),
178+
Value::String(SeverityNumber::as_str_name(severity_number).to_string()),
179+
);
180+
}
181+
}
182+
if log_record.severity_text.is_some() {
183+
log_record_json.insert(
184+
"severity_text".to_string(),
185+
Value::String(log_record.severity_text.as_ref().unwrap().to_string()),
186+
);
187+
}
188+
189+
if log_record.body.is_some() {
190+
let body = &log_record.body;
191+
let body_json = collect_json_from_values(body, &"body".to_string());
192+
for key in body_json.keys() {
193+
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
194+
}
195+
}
196+
197+
if let Some(attributes) = log_record.attributes.as_ref() {
198+
let attributes_json = flatten_attributes(attributes, "log_record".to_string());
199+
for key in attributes_json.keys() {
200+
log_record_json.insert(key.to_owned(), attributes_json[key].to_owned());
201+
}
202+
}
203+
204+
if log_record.dropped_attributes_count.is_some() {
205+
log_record_json.insert(
206+
"log_record_dropped_attributes_count".to_string(),
207+
Value::Number(serde_json::Number::from(
208+
log_record.dropped_attributes_count.unwrap(),
209+
)),
210+
);
211+
}
212+
213+
if log_record.flags.is_some() {
214+
let flags: u32 = log_record.flags.unwrap();
215+
log_record_json.insert(
216+
"flags_number".to_string(),
217+
Value::Number(serde_json::Number::from(flags)),
218+
);
219+
log_record_json.insert(
220+
"flags_string".to_string(),
221+
Value::String(LogRecordFlags::as_str_name(flags).to_string()),
222+
);
223+
}
224+
225+
if log_record.span_id.is_some() {
226+
log_record_json.insert(
227+
"span_id".to_string(),
228+
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
229+
);
230+
}
231+
232+
if log_record.trace_id.is_some() {
233+
log_record_json.insert(
234+
"trace_id".to_string(),
235+
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
236+
);
237+
}
238+
239+
log_record_json
240+
}
241+
130242
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
131243
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
132244
let body_str = std::str::from_utf8(body).unwrap();
133245
let message: LogsData = serde_json::from_str(body_str).unwrap();
134-
for records in message.resource_logs.iter() {
246+
247+
if let Some(records) = message.resource_logs.as_ref() {
248+
let mut vec_resource_logs_json: Vec<BTreeMap<String, Value>> = Vec::new();
135249
for record in records.iter() {
136-
let mut otel_json: BTreeMap<String, Value> = BTreeMap::new();
137-
for resource in record.resource.iter() {
138-
let attributes = &resource.attributes;
139-
for attributes in attributes.iter() {
140-
for attribute in attributes {
141-
let key = &attribute.key;
142-
let value = &attribute.value;
143-
let value_json =
144-
collect_json_from_values(value, &format!("resource_{}", key));
145-
for key in value_json.keys() {
146-
otel_json.insert(key.to_owned(), value_json[key].to_owned());
147-
}
250+
let mut resource_log_json: BTreeMap<String, Value> = BTreeMap::new();
251+
252+
if let Some(resource) = record.resource.as_ref() {
253+
if let Some(attributes) = resource.attributes.as_ref() {
254+
let attributes_json = flatten_attributes(attributes, "resource".to_string());
255+
for key in attributes_json.keys() {
256+
resource_log_json.insert(key.to_owned(), attributes_json[key].to_owned());
148257
}
149258
}
259+
150260
if resource.dropped_attributes_count.is_some() {
151-
otel_json.insert(
261+
resource_log_json.insert(
152262
"resource_dropped_attributes_count".to_string(),
153263
Value::Number(serde_json::Number::from(
154264
resource.dropped_attributes_count.unwrap(),
@@ -157,170 +267,83 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
157267
}
158268
}
159269

160-
for scope_logs in record.scope_logs.iter() {
270+
if let Some(scope_logs) = record.scope_logs.as_ref() {
271+
let mut vec_scope_log_json: Vec<BTreeMap<String, Value>> = Vec::new();
161272
for scope_log in scope_logs.iter() {
162-
for instrumentation_scope in scope_log.scope.iter() {
273+
let mut scope_log_json: BTreeMap<String, Value> = BTreeMap::new();
274+
if scope_log.scope.is_some() {
275+
let instrumentation_scope = scope_log.scope.as_ref().unwrap();
163276
if instrumentation_scope.name.is_some() {
164-
otel_json.insert(
277+
scope_log_json.insert(
165278
"instrumentation_scope_name".to_string(),
166279
Value::String(
167280
instrumentation_scope.name.as_ref().unwrap().to_string(),
168281
),
169282
);
170283
}
171284
if instrumentation_scope.version.is_some() {
172-
otel_json.insert(
285+
scope_log_json.insert(
173286
"instrumentation_scope_version".to_string(),
174287
Value::String(
175288
instrumentation_scope.version.as_ref().unwrap().to_string(),
176289
),
177290
);
178291
}
179-
let attributes = &instrumentation_scope.attributes;
180-
for attributes in attributes.iter() {
181-
for attribute in attributes {
182-
let key = &attribute.key;
183-
let value = &attribute.value;
184-
let value_json = collect_json_from_values(
185-
value,
186-
&format!("instrumentation_scope_{}", key),
187-
);
188-
for key in value_json.keys() {
189-
otel_json.insert(key.to_owned(), value_json[key].to_owned());
190-
}
292+
293+
if let Some(attributes) = instrumentation_scope.attributes.as_ref() {
294+
let attributes_json =
295+
flatten_attributes(attributes, "instrumentation_scope".to_string());
296+
for key in attributes_json.keys() {
297+
scope_log_json
298+
.insert(key.to_owned(), attributes_json[key].to_owned());
191299
}
192300
}
301+
193302
if instrumentation_scope.dropped_attributes_count.is_some() {
194-
otel_json.insert(
303+
scope_log_json.insert(
195304
"instrumentation_scope_dropped_attributes_count".to_string(),
196305
Value::Number(serde_json::Number::from(
197306
instrumentation_scope.dropped_attributes_count.unwrap(),
198307
)),
199308
);
200309
}
201310
}
311+
if scope_log.schema_url.is_some() {
312+
scope_log_json.insert(
313+
"scope_log_schema_url".to_string(),
314+
Value::String(scope_log.schema_url.as_ref().unwrap().to_string()),
315+
);
316+
}
202317

203318
for log_record in scope_log.log_records.iter() {
204-
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
205-
if log_record.time_unix_nano.is_some() {
206-
log_record_json.insert(
207-
"time_unix_nano".to_string(),
208-
Value::String(
209-
log_record.time_unix_nano.as_ref().unwrap().to_string(),
210-
),
211-
);
212-
}
213-
if log_record.observed_time_unix_nano.is_some() {
214-
log_record_json.insert(
215-
"observed_time_unix_nano".to_string(),
216-
Value::String(
217-
log_record
218-
.observed_time_unix_nano
219-
.as_ref()
220-
.unwrap()
221-
.to_string(),
222-
),
223-
);
224-
}
225-
if log_record.severity_number.is_some() {
226-
let severity_number: i32 = log_record.severity_number.unwrap();
227-
log_record_json.insert(
228-
"severity_number".to_string(),
229-
Value::Number(serde_json::Number::from(severity_number)),
230-
);
231-
if log_record.severity_text.is_none() {
232-
log_record_json.insert(
233-
"severity_text".to_string(),
234-
Value::String(
235-
SeverityNumber::as_str_name(severity_number).to_string(),
236-
),
237-
);
238-
}
239-
}
240-
if log_record.severity_text.is_some() {
241-
log_record_json.insert(
242-
"severity_text".to_string(),
243-
Value::String(
244-
log_record.severity_text.as_ref().unwrap().to_string(),
245-
),
246-
);
247-
}
248-
249-
if log_record.body.is_some() {
250-
let body = &log_record.body;
251-
let body_json = collect_json_from_values(body, &"body".to_string());
252-
for key in body_json.keys() {
253-
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
254-
}
255-
}
319+
let log_record_json = flatten_log_record(log_record);
256320

257-
for attributes in log_record.attributes.iter() {
258-
for attribute in attributes {
259-
let key = &attribute.key;
260-
let value = &attribute.value;
261-
let value_json =
262-
collect_json_from_values(value, &format!("log_record_{}", key));
263-
for key in value_json.keys() {
264-
log_record_json
265-
.insert(key.to_owned(), value_json[key].to_owned());
266-
}
267-
}
268-
}
269-
270-
if log_record.dropped_attributes_count.is_some() {
271-
log_record_json.insert(
272-
"log_record_dropped_attributes_count".to_string(),
273-
Value::Number(serde_json::Number::from(
274-
log_record.dropped_attributes_count.unwrap(),
275-
)),
276-
);
277-
}
278-
279-
if log_record.flags.is_some() {
280-
let flags: u32 = log_record.flags.unwrap();
281-
log_record_json.insert(
282-
"flags_number".to_string(),
283-
Value::Number(serde_json::Number::from(flags)),
284-
);
285-
log_record_json.insert(
286-
"flags_string".to_string(),
287-
Value::String(LogRecordFlags::as_str_name(flags).to_string()),
288-
);
289-
}
290-
291-
if log_record.span_id.is_some() {
292-
log_record_json.insert(
293-
"span_id".to_string(),
294-
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
295-
);
296-
}
297-
298-
if log_record.trace_id.is_some() {
299-
log_record_json.insert(
300-
"trace_id".to_string(),
301-
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
302-
);
303-
}
304321
for key in log_record_json.keys() {
305-
otel_json.insert(key.to_owned(), log_record_json[key].to_owned());
322+
scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned());
306323
}
307-
vec_otel_json.push(otel_json.clone());
308-
}
309-
310-
if scope_log.schema_url.is_some() {
311-
otel_json.insert(
312-
"scope_log_schema_url".to_string(),
313-
Value::String(scope_log.schema_url.as_ref().unwrap().to_string()),
314-
);
324+
vec_scope_log_json.push(scope_log_json.clone());
315325
}
316326
}
327+
for scope_log_json in vec_scope_log_json.iter() {
328+
vec_resource_logs_json.push(scope_log_json.clone());
329+
}
317330
}
318331
if record.schema_url.is_some() {
319-
otel_json.insert(
320-
"resource_schema_url".to_string(),
332+
resource_log_json.insert(
333+
"schema_url".to_string(),
321334
Value::String(record.schema_url.as_ref().unwrap().to_string()),
322335
);
323336
}
337+
338+
for resource_logs_json in vec_resource_logs_json.iter_mut() {
339+
for key in resource_log_json.keys() {
340+
resource_logs_json.insert(key.to_owned(), resource_log_json[key].to_owned());
341+
}
342+
}
343+
}
344+
345+
for resource_logs_json in vec_resource_logs_json.iter() {
346+
vec_otel_json.push(resource_logs_json.clone());
324347
}
325348
}
326349
vec_otel_json

0 commit comments

Comments
 (0)