Skip to content

Commit f18c098

Browse files
taiminglSubhra264
andauthored
chore: porting pipeline fix to main (openobserve#6986)
Co-authored-by: Subhra264 <[email protected]>
1 parent 5ca894a commit f18c098

File tree

9 files changed

+124
-58
lines changed

9 files changed

+124
-58
lines changed

src/config/src/meta/function.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// You should have received a copy of the GNU Affero General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16+
use once_cell::sync::Lazy;
17+
use regex::Regex;
1618
use serde::{Deserialize, Serialize};
1719
use utoipa::ToSchema;
1820
use vrl::{
@@ -22,6 +24,10 @@ use vrl::{
2224

2325
use crate::{meta::stream::StreamType, utils::json};
2426

27+
// Checks for #ResultArray#
28+
pub static RESULT_ARRAY: Lazy<Regex> =
29+
Lazy::new(|| Regex::new(r"^#[ \s]*Result[ \s]*Array[ \s]*#").unwrap());
30+
2531
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
2632
#[serde(rename_all = "camelCase")]
2733
pub struct Transform {
@@ -39,6 +45,15 @@ pub struct Transform {
3945
pub streams: Option<Vec<StreamOrder>>,
4046
}
4147

48+
impl Transform {
49+
pub fn is_vrl(&self) -> bool {
50+
self.trans_type == Some(0)
51+
}
52+
pub fn is_result_array_vrl(&self) -> bool {
53+
self.is_vrl() && RESULT_ARRAY.is_match(&self.function)
54+
}
55+
}
56+
4257
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
4358
pub struct TestVRLRequest {
4459
pub function: String, // VRL function as a string

src/handler/http/request/search/multi_streams.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use chrono::Utc;
2020
use config::{
2121
TIMESTAMP_COL_NAME, get_config,
2222
meta::{
23-
function::VRLResultResolver,
23+
function::{RESULT_ARRAY, VRLResultResolver},
2424
search::{self, PARTIAL_ERROR_RESPONSE_MESSAGE},
2525
self_reporting::usage::{RequestStats, UsageType},
2626
sql::resolve_stream_names,
@@ -464,11 +464,9 @@ pub async fn search_multi(
464464
// compile vrl function & apply the same before returning the response
465465
let mut input_fn = query_fn.unwrap().trim().to_string();
466466

467-
let apply_over_hits = SearchService::RESULT_ARRAY.is_match(&input_fn);
467+
let apply_over_hits = RESULT_ARRAY.is_match(&input_fn);
468468
if apply_over_hits {
469-
input_fn = SearchService::RESULT_ARRAY
470-
.replace(&input_fn, "")
471-
.to_string();
469+
input_fn = RESULT_ARRAY.replace(&input_fn, "").to_string();
472470
}
473471
let mut runtime = crate::common::utils::functions::init_vrl_runtime();
474472
let program = match crate::service::ingestion::compile_vrl_function(&input_fn, &org_id) {

src/service/functions.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ use actix_web::{
2121
};
2222
use config::{
2323
meta::{
24-
function::{FunctionList, TestVRLResponse, Transform, VRLResult, VRLResultResolver},
24+
function::{
25+
FunctionList, RESULT_ARRAY, TestVRLResponse, Transform, VRLResult, VRLResultResolver,
26+
},
2527
pipeline::{PipelineDependencyItem, PipelineDependencyResponse},
2628
},
27-
utils::json,
29+
utils::json::Value,
2830
};
2931

3032
use crate::{
@@ -36,7 +38,7 @@ use crate::{
3638
handler::http::{
3739
request::search::error_utils::map_error_to_http_response, router::ERROR_HEADER,
3840
},
39-
service::{db, ingestion::compile_vrl_function, search::RESULT_ARRAY},
41+
service::{db, ingestion::compile_vrl_function},
4042
};
4143

4244
const FN_SUCCESS: &str = "Function saved successfully";
@@ -80,7 +82,7 @@ pub async fn save_function(org_id: String, mut func: Transform) -> Result<HttpRe
8082
pub async fn test_run_function(
8183
org_id: &str,
8284
mut function: String,
83-
events: Vec<json::Value>,
85+
events: Vec<Value>,
8486
) -> Result<HttpResponse, anyhow::Error> {
8587
// Append a dot at the end of the function if it doesn't exist
8688
if !function.ends_with('.') {
@@ -119,7 +121,7 @@ pub async fn test_run_function(
119121
program: program.clone(),
120122
fields: fields.clone(),
121123
},
122-
json::Value::Array(events),
124+
Value::Array(events),
123125
org_id,
124126
&[String::new()],
125127
);
@@ -135,20 +137,20 @@ pub async fn test_run_function(
135137
.as_array()
136138
.unwrap()
137139
.iter()
138-
.filter_map(|v| {
139-
let flattened_array = v
140-
.as_array()
141-
.unwrap_or(&vec![])
142-
.iter()
143-
.map(|item| config::utils::flatten::flatten(item.clone()).unwrap())
144-
.collect::<Vec<_>>();
145-
if flattened_array.is_empty() {
146-
return None;
147-
}
148-
Some(serde_json::Value::Array(flattened_array))
149-
})
150-
.for_each(|transform| {
151-
transformed_events.push(VRLResult::new("", transform));
140+
.for_each(|record| match record {
141+
Value::Object(hit) => transformed_events.push(VRLResult::new(
142+
"",
143+
config::utils::flatten::flatten(Value::Object(hit.clone())).unwrap(),
144+
)),
145+
Value::Array(hits) => hits.iter().for_each(|hit| {
146+
if let Value::Object(hit) = hit {
147+
transformed_events.push(VRLResult::new(
148+
"",
149+
config::utils::flatten::flatten(Value::Object(hit.clone())).unwrap(),
150+
))
151+
}
152+
}),
153+
_ => {}
152154
});
153155
} else {
154156
events.into_iter().for_each(|event| {

src/service/logs/bulk.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,10 @@ pub async fn ingest(
441441
}
442442

443443
// add `_original` and '_record_id` if required by StreamSettings
444-
if streams_need_original_map
445-
.get(&destination_stream)
446-
.is_some_and(|v| *v)
444+
if idx != usize::MAX
445+
&& streams_need_original_map
446+
.get(&destination_stream)
447+
.is_some_and(|v| *v)
447448
&& originals[idx].is_some()
448449
{
449450
local_val.insert(

src/service/logs/ingest.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,13 @@ pub async fn ingest(
368368
local_val = crate::service::logs::refactor_map(local_val, fields);
369369
}
370370

371+
// usize::MAX used as a flag when pipeline is applied with ResultArray vrl
372+
// - invalid original_data
371373
// add `_original` and '_record_id` if required by StreamSettings
372-
if streams_need_original_map
373-
.get(&destination_stream)
374-
.is_some_and(|v| *v)
374+
if idx != usize::MAX
375+
&& streams_need_original_map
376+
.get(&destination_stream)
377+
.is_some_and(|v| *v)
375378
&& original_options[idx].is_some()
376379
{
377380
local_val.insert(

src/service/logs/otlp.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,10 @@ pub async fn handle_request(
364364
}
365365

366366
// add `_original` and '_record_id` if required by StreamSettings
367-
if streams_need_original_map
368-
.get(&destination_stream)
369-
.is_some_and(|v| *v)
367+
if idx != usize::MAX
368+
&& streams_need_original_map
369+
.get(&destination_stream)
370+
.is_some_and(|v| *v)
370371
&& original_options[idx].is_some()
371372
{
372373
local_val.insert(

src/service/logs/syslog.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,10 @@ pub async fn ingest(msg: &str, addr: SocketAddr) -> Result<HttpResponse> {
323323
}
324324

325325
// add `_original` and '_record_id` if required by StreamSettings
326-
if streams_need_original_map
327-
.get(&destination_stream)
328-
.is_some_and(|v| *v)
326+
if idx != usize::MAX
327+
&& streams_need_original_map
328+
.get(&destination_stream)
329+
.is_some_and(|v| *v)
329330
&& original_options[idx].is_some()
330331
{
331332
local_val.insert(

src/service/pipeline/batch_execution.rs

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use config::{
2727
},
2828
utils::{
2929
flatten,
30-
json::{Value, get_string_value},
30+
json::{self, Value, get_string_value},
3131
},
3232
};
3333
use futures::future::try_join_all;
@@ -51,12 +51,12 @@ static DYNAMIC_STREAM_NAME_PATTERN: Lazy<regex::Regex> =
5151
pub trait PipelineExt: Sync + Send + 'static {
5252
/// Registers the function of all the FunctionNode of this pipeline once for execution.
5353
/// Returns a map of node_id -> VRLResultResolver for quick lookup
54-
async fn register_functions(&self) -> Result<HashMap<String, VRLResultResolver>>;
54+
async fn register_functions(&self) -> Result<HashMap<String, (VRLResultResolver, bool)>>;
5555
}
5656

5757
#[async_trait]
5858
impl PipelineExt for Pipeline {
59-
async fn register_functions(&self) -> Result<HashMap<String, VRLResultResolver>> {
59+
async fn register_functions(&self) -> Result<HashMap<String, (VRLResultResolver, bool)>> {
6060
let mut vrl_map = HashMap::new();
6161
for node in &self.nodes {
6262
if let NodeData::Function(func_params) = &node.data {
@@ -69,10 +69,13 @@ impl PipelineExt for Pipeline {
6969
registry.finish_load();
7070
vrl_map.insert(
7171
node.get_node_id(),
72-
VRLResultResolver {
73-
program: vrl_runtime_config.program,
74-
fields: vrl_runtime_config.fields,
75-
},
72+
(
73+
VRLResultResolver {
74+
program: vrl_runtime_config.program,
75+
fields: vrl_runtime_config.fields,
76+
},
77+
transform.is_result_array_vrl(),
78+
),
7679
);
7780
}
7881
}
@@ -86,7 +89,7 @@ pub struct ExecutablePipeline {
8689
name: String,
8790
source_node_id: String,
8891
sorted_nodes: Vec<String>,
89-
vrl_map: HashMap<String, VRLResultResolver>,
92+
vrl_map: HashMap<String, (VRLResultResolver, bool)>,
9093
node_map: HashMap<String, ExecutableNode>,
9194
}
9295

@@ -495,7 +498,7 @@ async fn process_node(
495498
node: ExecutableNode,
496499
mut receiver: Receiver<(usize, Value, bool)>,
497500
mut child_senders: Vec<Sender<(usize, Value, bool)>>,
498-
vrl_runtime: Option<VRLResultResolver>,
501+
vrl_runtime: Option<(VRLResultResolver, bool)>,
499502
result_sender: Option<Sender<(usize, StreamParams, Value)>>,
500503
error_sender: Sender<(String, String, String)>,
501504
pipeline_name: String,
@@ -646,8 +649,9 @@ async fn process_node(
646649
log::debug!("[Pipeline]: func node {node_idx} starts processing");
647650
let mut runtime = crate::service::ingestion::init_functions_runtime();
648651
let stream_name = stream_name.unwrap_or("pipeline".to_string());
652+
let mut result_array_records = Vec::new();
649653
while let Some((idx, mut record, mut flattened)) = receiver.recv().await {
650-
if let Some(vrl_runtime) = &vrl_runtime {
654+
if let Some((vrl_runtime, is_result_array_vrl)) = &vrl_runtime {
651655
if func_params.after_flatten && !flattened {
652656
record = match flatten::flatten_with_level(
653657
record,
@@ -670,10 +674,49 @@ async fn process_node(
670674
}
671675
};
672676
}
673-
record = match apply_vrl_fn(
677+
if !is_result_array_vrl {
678+
record = match apply_vrl_fn(
679+
&mut runtime,
680+
vrl_runtime,
681+
record,
682+
&org_id,
683+
&[stream_name.clone()],
684+
) {
685+
(res, None) => res,
686+
(res, Some(error)) => {
687+
let err_msg = format!("FunctionNode error: {}", error);
688+
if let Err(send_err) = error_sender
689+
.send((node.id.to_string(), node.node_type(), err_msg))
690+
.await
691+
{
692+
log::error!(
693+
"[Pipeline] {} : FunctionNode failed sending errors for collection caused by: {send_err}",
694+
pipeline_name
695+
);
696+
break;
697+
}
698+
res
699+
}
700+
};
701+
flattened = false; // since apply_vrl_fn can produce unflattened data
702+
send_to_children(
703+
&mut child_senders,
704+
(idx, record, flattened),
705+
"FunctionNode",
706+
)
707+
.await;
708+
} else {
709+
result_array_records.push(record);
710+
}
711+
}
712+
count += 1;
713+
}
714+
if !result_array_records.is_empty() {
715+
if let Some((vrl_runtime, true)) = &vrl_runtime {
716+
let result = match apply_vrl_fn(
674717
&mut runtime,
675718
vrl_runtime,
676-
record,
719+
json::Value::Array(result_array_records),
677720
&org_id,
678721
&[stream_name.clone()],
679722
) {
@@ -688,16 +731,22 @@ async fn process_node(
688731
"[Pipeline] {} : FunctionNode failed sending errors for collection caused by: {send_err}",
689732
pipeline_name
690733
);
691-
break;
734+
return Ok(());
692735
}
693736
res
694737
}
695738
};
696-
flattened = false; // since apply_vrl_fn can produce unflattened data
739+
// since apply_vrl_fn can produce unflattened data
740+
for record in result.as_array().unwrap().iter() {
741+
// use usize::MAX as a flag to disregard original_value
742+
send_to_children(
743+
&mut child_senders,
744+
(usize::MAX, record.clone(), false),
745+
"FunctionNode",
746+
)
747+
.await;
748+
}
697749
}
698-
send_to_children(&mut child_senders, (idx, record, flattened), "FunctionNode")
699-
.await;
700-
count += 1;
701750
}
702751
log::debug!("[Pipeline]: func node {node_idx} done processing {count} records");
703752
}

src/service/search/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use config::{
2525
get_config, ider,
2626
meta::{
2727
cluster::RoleGroup,
28+
function::RESULT_ARRAY,
2829
search,
2930
self_reporting::usage::{RequestStats, UsageType},
3031
sql::{OrderBy, SqlOperator, TableReferenceExt, resolve_stream_names},
@@ -48,7 +49,6 @@ use infra::{
4849
use once_cell::sync::Lazy;
4950
use opentelemetry::trace::TraceContextExt;
5051
use proto::cluster_rpc::{self, SearchQuery};
51-
use regex::Regex;
5252
use sql::Sql;
5353
use tokio::runtime::Runtime;
5454
use tracing::Instrument;
@@ -84,10 +84,6 @@ pub(crate) mod super_cluster;
8484
pub(crate) mod tantivy;
8585
pub(crate) mod utils;
8686

87-
// Checks for #ResultArray#
88-
pub static RESULT_ARRAY: Lazy<Regex> =
89-
Lazy::new(|| Regex::new(r"^#[ \s]*Result[ \s]*Array[ \s]*#").unwrap());
90-
9187
/// The result of search in cluster
9288
/// data, scan_stats, wait_in_queue, is_partial, partial_err
9389
type SearchResult = (Vec<RecordBatch>, search::ScanStats, usize, bool, String);

0 commit comments

Comments
 (0)