Skip to content

Commit c48c163

Browse files
authored
Collect label and metadata from header (#79)
This PR allows for metadata and tags to be fetched from headers instead of body of the event. This gives some flexibility in schema and multiple sources with different schema for available metadata can send logs to the same stream. Client needs to use prefix x-p-meta- and x-p-tags- in headers to send key value pairs in a server post event. These headers are parsed and validated before being merged into body with p_metadata and p_tags field respectively Changes: - introduced header_parsing::collect_labelled which parses headers key value pairs and gives a string formatted as "k1=v1,k2=v2" - New Error type ParseHeaderError which implements ResponseError. - Use above types in post event handler Fixes #75
1 parent e960ceb commit c48c163

File tree

2 files changed

+103
-48
lines changed

2 files changed

+103
-48
lines changed

server/src/handlers/event.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*
1717
*/
1818

19+
use std::collections::HashMap;
20+
1921
use actix_web::http::StatusCode;
20-
use actix_web::{web, HttpRequest, HttpResponse};
22+
use actix_web::{web, HttpRequest, HttpResponse, ResponseError};
2123
use serde_json::Value;
2224

2325
use crate::event;
@@ -26,7 +28,12 @@ use crate::query::Query;
2628
use crate::response::{self, EventResponse};
2729
use crate::s3::S3;
2830
use crate::storage::ObjectStorage;
29-
use crate::utils;
31+
use crate::utils::header_parsing::collect_labelled_headers;
32+
use crate::utils::{self, merge};
33+
34+
const PREFIX_TAGS: &str = "x-p-tags-";
35+
const PREFIX_META: &str = "x-p-meta-";
36+
const SEPARATOR: char = ',';
3037

3138
pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> HttpResponse {
3239
let json = json.into_inner();
@@ -74,7 +81,16 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> HttpResponse {
7481

7582
pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) -> HttpResponse {
7683
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
77-
let labels = utils::collect_labels(&req);
84+
85+
let tags = match collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR) {
86+
Ok(tags) => HashMap::from([("p_tags".to_string(), tags)]),
87+
Err(e) => return e.error_response(),
88+
};
89+
90+
let metadata = match collect_labelled_headers(&req, PREFIX_META, SEPARATOR) {
91+
Ok(metadata) => HashMap::from([("p_metadata".to_string(), metadata)]),
92+
Err(e) => return e.error_response(),
93+
};
7894

7995
if let Err(e) = metadata::STREAM_INFO.schema(&stream_name) {
8096
// if stream doesn't exist, fail to post data
@@ -94,7 +110,10 @@ pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) ->
94110
let mut i = 0;
95111

96112
for body in array {
97-
let body = utils::flatten_json_body(web::Json(body.clone()), labels.clone()).unwrap();
113+
let body = merge(body.clone(), metadata.clone());
114+
let body = merge(body, tags.clone());
115+
let body = utils::flatten_json_body(web::Json(body)).unwrap();
116+
98117
let e = event::Event {
99118
body,
100119
stream_name: stream_name.clone(),
@@ -118,8 +137,11 @@ pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) ->
118137
.to_http();
119138
}
120139

140+
let body = merge(body.clone(), metadata);
141+
let body = merge(body, tags);
142+
121143
let event = event::Event {
122-
body: utils::flatten_json_body(body, labels).unwrap(),
144+
body: utils::flatten_json_body(web::Json(body)).unwrap(),
123145
stream_name,
124146
};
125147

server/src/utils.rs

Lines changed: 76 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,104 @@
1717
*/
1818

1919
use actix_web::web;
20-
use actix_web::HttpRequest;
2120
use chrono::{Date, DateTime, Timelike, Utc};
2221
use serde_json::{json, Value};
2322
use std::collections::HashMap;
2423

2524
use crate::Error;
2625

27-
const META_LABEL: &str = "x-p-meta";
28-
29-
pub fn flatten_json_body(
30-
body: web::Json<serde_json::Value>,
31-
labels: Option<String>,
32-
) -> Result<String, Error> {
33-
let mut collector_labels = HashMap::new();
34-
35-
collector_labels.insert("labels".to_string(), labels.unwrap());
36-
26+
pub fn flatten_json_body(body: web::Json<serde_json::Value>) -> Result<String, Error> {
3727
let mut flat_value: Value = json!({});
38-
let new_body = merge(&body, &collector_labels);
39-
flatten_json::flatten(&new_body, &mut flat_value, None, true, Some("_")).unwrap();
28+
flatten_json::flatten(&body, &mut flat_value, None, true, Some("_")).unwrap();
4029
let flattened = serde_json::to_string(&flat_value)?;
4130

4231
Ok(flattened)
4332
}
4433

45-
fn merge(v: &Value, fields: &HashMap<String, String>) -> Value {
46-
match v {
47-
Value::Object(m) => {
48-
let mut m = m.clone();
34+
pub fn merge(value: Value, fields: HashMap<String, String>) -> Value {
35+
match value {
36+
Value::Object(mut m) => {
4937
for (k, v) in fields {
50-
match m.get(k) {
51-
Some(curr_val) => {
38+
match m.get_mut(&k) {
39+
Some(val) => {
5240
let mut final_val = String::new();
53-
final_val.push_str(curr_val.as_str().unwrap());
41+
final_val.push_str(val.as_str().unwrap());
5442
final_val.push(',');
55-
final_val.push_str(v);
56-
m.insert(k.clone(), Value::String(final_val));
43+
final_val.push_str(&v);
44+
*val = Value::String(final_val);
5745
}
5846
None => {
59-
m.insert(k.clone(), Value::String(v.to_string()));
47+
m.insert(k, Value::String(v));
6048
}
6149
}
6250
}
6351
Value::Object(m)
6452
}
65-
v => v.clone(),
53+
value => value,
54+
}
55+
}
56+
57+
pub mod header_parsing {
58+
const MAX_HEADERS_ALLOWED: usize = 5;
59+
use actix_web::{HttpRequest, HttpResponse, ResponseError};
60+
61+
pub fn collect_labelled_headers(
62+
req: &HttpRequest,
63+
prefix: &str,
64+
kv_separator: char,
65+
) -> Result<String, ParseHeaderError> {
66+
// filter out headers which has right prefix label and convert them into str;
67+
let headers = req.headers().iter().filter_map(|(key, value)| {
68+
let key = key.as_str().strip_prefix(prefix)?;
69+
Some((key, value))
70+
});
71+
72+
let mut labels: Vec<String> = Vec::new();
73+
74+
for (key, value) in headers {
75+
let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?;
76+
if key.is_empty() {
77+
return Err(ParseHeaderError::Emptykey);
78+
}
79+
if key.contains(kv_separator) {
80+
return Err(ParseHeaderError::SeperatorInKey(kv_separator));
81+
}
82+
if value.contains(kv_separator) {
83+
return Err(ParseHeaderError::SeperatorInValue(kv_separator));
84+
}
85+
86+
labels.push(format!("{}={}", key, value));
87+
}
88+
89+
if labels.len() > MAX_HEADERS_ALLOWED {
90+
return Err(ParseHeaderError::MaxHeadersLimitExceeded);
91+
}
92+
93+
Ok(labels.join(","))
94+
}
95+
96+
#[derive(Debug, thiserror::Error)]
97+
pub enum ParseHeaderError {
98+
#[error("Too many headers received. Limit is of 5 headers")]
99+
MaxHeadersLimitExceeded,
100+
#[error("A value passed in header is not formattable to plain visible ASCII")]
101+
InvalidValue,
102+
#[error("Invalid Key was passed which terminated just after the end of prefix")]
103+
Emptykey,
104+
#[error("A key passed in header contains reserved char {0}")]
105+
SeperatorInKey(char),
106+
#[error("A value passed in header contains reserved char {0}")]
107+
SeperatorInValue(char),
108+
}
109+
110+
impl ResponseError for ParseHeaderError {
111+
fn status_code(&self) -> http::StatusCode {
112+
http::StatusCode::BAD_REQUEST
113+
}
114+
115+
fn error_response(&self) -> HttpResponse {
116+
HttpResponse::build(self.status_code()).body(self.to_string())
117+
}
66118
}
67119
}
68120

@@ -110,25 +162,6 @@ pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option<String> {
110162
))
111163
}
112164

113-
/// collect labels passed from http headers
114-
/// format: labels = "app=k8s, cloud=gcp"
115-
pub fn collect_labels(req: &HttpRequest) -> Option<String> {
116-
let keys = req.headers().keys().cloned().collect::<Vec<_>>();
117-
118-
let mut labels_vec = Vec::with_capacity(100);
119-
for key in keys {
120-
if key.to_string().to_lowercase().starts_with(META_LABEL) {
121-
let value = req.headers().get(&key)?.to_str().ok();
122-
let remove_meta_char = format!("{}-", META_LABEL);
123-
let kv =
124-
format! {"{}={}", key.to_string().replace(&remove_meta_char, ""), value.unwrap()};
125-
labels_vec.push(kv);
126-
}
127-
}
128-
129-
Some(labels_vec.join(","))
130-
}
131-
132165
pub struct TimePeriod {
133166
start: DateTime<Utc>,
134167
end: DateTime<Utc>,

0 commit comments

Comments
 (0)