Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions lib/enrichment/src/get_enrichment_table_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ fn get_enrichment_table_record(
})
.transpose()?;

let data = enrichment_tables.find_table_row(
table,
case_sensitive,
condition,
select.as_ref().map(|select| select.as_ref()),
wildcard.as_ref(),
index,
)?;
let data = enrichment_tables
.find_table_row(
table,
case_sensitive,
condition,
select.as_ref().map(|select| select.as_ref()),
wildcard.as_ref(),
index,
)
.map_err(Into::<ExpressionError>::into)?;

Ok(Value::Object(data))
}
Expand Down
42 changes: 39 additions & 3 deletions lib/enrichment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,42 @@ pub enum Case {
Insensitive,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Error {
NoRowsFound(String),
MoreThanOneRowFound(String),
InvalidInput(String),
TableError(String),
}

impl Error {
pub fn message(&self) -> String {
match self {
Error::NoRowsFound(message) => message.clone(),
Error::MoreThanOneRowFound(message) => message.clone(),
Error::InvalidInput(message) => message.clone(),
Error::TableError(message) => message.clone(),
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message())
}
}

impl std::error::Error for Error {}

impl From<Error> for vrl::prelude::ExpressionError {
fn from(error: Error) -> Self {
vrl::prelude::ExpressionError::Error {
message: error.message(),
labels: vec![],
notes: vec![],
}
}
}

/// Enrichment tables represent additional data sources that can be used to enrich the event data
/// passing through Vector.
pub trait Table: DynClone {
Expand All @@ -61,7 +97,7 @@ pub trait Table: DynClone {
select: Option<&[String]>,
wildcard: Option<&Value>,
index: Option<IndexHandle>,
) -> Result<ObjectMap, String>;
) -> Result<ObjectMap, Error>;

/// Search the enrichment table data with the given condition.
/// All conditions must match (AND).
Expand All @@ -73,14 +109,14 @@ pub trait Table: DynClone {
select: Option<&[String]>,
wildcard: Option<&Value>,
index: Option<IndexHandle>,
) -> Result<Vec<ObjectMap>, String>;
) -> Result<Vec<ObjectMap>, Error>;

/// Hints to the enrichment table what data is going to be searched to allow it to index the
/// data in advance.
///
/// # Errors
/// Errors if the fields are not in the table.
fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, String>;
fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, Error>;

/// Returns a list of the field names that are in each index
fn index_fields(&self) -> Vec<(Case, Vec<String>)>;
Expand Down
24 changes: 12 additions & 12 deletions lib/enrichment/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
use arc_swap::ArcSwap;
use vrl::value::{ObjectMap, Value};

use super::{Condition, IndexHandle, Table};
use super::{Condition, Error, IndexHandle, Table};
use crate::Case;

/// A hashmap of name => implementation of an enrichment table.
Expand Down Expand Up @@ -151,13 +151,13 @@ impl TableRegistry {
table: &str,
case: Case,
fields: &[&str],
) -> Result<IndexHandle, String> {
) -> Result<IndexHandle, Error> {
let mut locked = self.loading.lock().unwrap();

match *locked {
None => Err("finish_load has been called".to_string()),
None => Err(Error::TableError("finish_load has been called".into())),
Some(ref mut tables) => match tables.get_mut(table) {
None => Err(format!("table '{table}' not loaded")),
None => Err(Error::TableError(format!("table '{table}' not loaded"))),
Some(table) => table.add_index(case, fields),
},
}
Expand Down Expand Up @@ -218,15 +218,15 @@ impl TableSearch {
select: Option<&[String]>,
wildcard: Option<&Value>,
index: Option<IndexHandle>,
) -> Result<ObjectMap, String> {
) -> Result<ObjectMap, Error> {
let tables = self.0.load();
if let Some(ref tables) = **tables {
match tables.get(table) {
None => Err(format!("table {table} not loaded")),
None => Err(Error::TableError(format!("table {table} not loaded"))),
Some(table) => table.find_table_row(case, condition, select, wildcard, index),
}
} else {
Err("finish_load not called".to_string())
Err(Error::TableError("finish_load not called".into()))
}
}

Expand All @@ -241,15 +241,15 @@ impl TableSearch {
select: Option<&[String]>,
wildcard: Option<&Value>,
index: Option<IndexHandle>,
) -> Result<Vec<ObjectMap>, String> {
) -> Result<Vec<ObjectMap>, Error> {
let tables = self.0.load();
if let Some(ref tables) = **tables {
match tables.get(table) {
None => Err(format!("table {table} not loaded")),
None => Err(Error::TableError(format!("table {table} not loaded"))),
Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
}
} else {
Err("finish_load not called".to_string())
Err(Error::TableError("finish_load not called".into()))
}
}
}
Expand Down Expand Up @@ -331,7 +331,7 @@ mod tests {
let tables = registry.as_readonly();

assert_eq!(
Err("finish_load not called".to_string()),
Err(Error::TableError("finish_load not called".to_string())),
tables.find_table_row(
"dummy1",
Case::Sensitive,
Expand All @@ -355,7 +355,7 @@ mod tests {
registry.load(tables);
registry.finish_load();
assert_eq!(
Err("finish_load has been called".to_string()),
Err(Error::TableError("finish_load has been called".into())),
registry.add_index("dummy1", Case::Sensitive, &["erk"])
);
}
Expand Down
8 changes: 4 additions & 4 deletions lib/enrichment/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use vrl::value::{ObjectMap, Value};

use crate::{Case, Condition, IndexHandle, Table, TableRegistry};
use crate::{Case, Condition, Error, IndexHandle, Table, TableRegistry};

#[derive(Debug, Clone)]
pub(crate) struct DummyEnrichmentTable {
Expand Down Expand Up @@ -41,7 +41,7 @@ impl Table for DummyEnrichmentTable {
_select: Option<&[String]>,
_wildcard: Option<&Value>,
_index: Option<IndexHandle>,
) -> Result<ObjectMap, String> {
) -> Result<ObjectMap, Error> {
Ok(self.data.clone())
}

Expand All @@ -52,11 +52,11 @@ impl Table for DummyEnrichmentTable {
_select: Option<&[String]>,
_wildcard: Option<&Value>,
_index: Option<IndexHandle>,
) -> Result<Vec<ObjectMap>, String> {
) -> Result<Vec<ObjectMap>, Error> {
Ok(vec![self.data.clone()])
}

fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
let mut indexes = self.indexes.lock().unwrap();
indexes.push(fields.iter().map(|s| (*s).to_string()).collect());
Ok(IndexHandle(indexes.len() - 1))
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-vrl/tests/src/test_enrichment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl enrichment::Table for TestEnrichmentTable {
_select: Option<&[String]>,
_wildcard: Option<&Value>,
_index: Option<enrichment::IndexHandle>,
) -> Result<ObjectMap, String> {
) -> Result<ObjectMap, enrichment::Error> {
let mut result = ObjectMap::new();
result.insert("id".into(), Value::from(1));
result.insert("firstname".into(), Value::from("Bob"));
Expand All @@ -29,7 +29,7 @@ impl enrichment::Table for TestEnrichmentTable {
_select: Option<&[String]>,
_wildcard: Option<&Value>,
_index: Option<enrichment::IndexHandle>,
) -> Result<Vec<ObjectMap>, String> {
) -> Result<Vec<ObjectMap>, enrichment::Error> {
let mut result1 = ObjectMap::new();
result1.insert("id".into(), Value::from(1));
result1.insert("firstname".into(), Value::from("Bob"));
Expand All @@ -47,7 +47,7 @@ impl enrichment::Table for TestEnrichmentTable {
&mut self,
_case: enrichment::Case,
_fields: &[&str],
) -> Result<enrichment::IndexHandle, String> {
) -> Result<enrichment::IndexHandle, enrichment::Error> {
Ok(enrichment::IndexHandle(1))
}

Expand Down
Loading
Loading