Skip to content

Commit 123a317

Browse files
decode and flatten kinesis log before ingesting to parseable (#613)
add logic for decoding and flattening AWS Kinesis Firehose events --------- Signed-off-by: Nikhil Sinha <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 8a9288f commit 123a317

File tree

5 files changed

+119
-9
lines changed

5 files changed

+119
-9
lines changed

server/src/handlers.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@ const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const AUTHORIZATION_KEY: &str = "authorization";
2626
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
27+
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2728
const SEPARATOR: char = '^';
2829

2930
const OIDC_SCOPE: &str = "openid profile email";
3031
const COOKIE_AGE_DAYS: usize = 7;
3132
const SESSION_COOKIE_NAME: &str = "session";
3233
const USER_COOKIE_NAME: &str = "username";
34+
35+
// constants for Log Source values for known sources
36+
const LOG_SOURCE_KINESIS: &str = "kinesis";
37+
const LOG_SOURCE_OTEL: &str = "otel";

server/src/handlers/http.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use self::middleware::{DisAllowRootUser, RouteExt};
4040
mod about;
4141
mod health_check;
4242
mod ingest;
43+
mod kinesis;
4344
mod llm;
4445
mod logstream;
4546
mod middleware;

server/src/handlers/http/ingest.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,25 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
20-
use std::sync::Arc;
21-
22-
use actix_web::http::header::ContentType;
23-
use actix_web::{HttpRequest, HttpResponse};
19+
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2420
use arrow_schema::Field;
2521
use bytes::Bytes;
2622
use http::StatusCode;
2723
use serde_json::Value;
24+
use std::collections::{BTreeMap, HashMap};
25+
use std::sync::Arc;
2826

2927
use crate::event::error::EventError;
3028
use crate::event::format::EventFormat;
3129
use crate::event::{self, format};
32-
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY};
30+
use crate::handlers::{
31+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
32+
STREAM_NAME_HEADER_KEY,
33+
};
3334
use crate::metadata::STREAM_INFO;
3435
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3536

37+
use super::kinesis;
3638
use super::logstream::error::CreateStreamError;
3739

3840
// Handler for POST /api/v1/ingest
@@ -46,19 +48,48 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4648
{
4749
let stream_name = stream_name.to_str().unwrap().to_owned();
4850
create_stream_if_not_exists(&stream_name).await?;
49-
push_logs(stream_name, req, body).await?;
51+
52+
flatten_and_push_logs(req, body, stream_name).await?;
5053
Ok(HttpResponse::Ok().finish())
5154
} else {
5255
Err(PostError::Header(ParseHeaderError::MissingStreamName))
5356
}
5457
}
5558

59+
async fn flatten_and_push_logs(
60+
req: HttpRequest,
61+
body: Bytes,
62+
stream_name: String,
63+
) -> Result<(), PostError> {
64+
//flatten logs
65+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
66+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
67+
let log_source: String = log_source.to_str().unwrap().to_owned();
68+
match log_source.as_str() {
69+
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
70+
LOG_SOURCE_OTEL => {}
71+
_ => {
72+
log::warn!("Unknown log source: {}", log_source);
73+
push_logs(stream_name.to_string(), req.clone(), body).await?;
74+
}
75+
}
76+
for record in json.iter_mut() {
77+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
78+
push_logs(stream_name.to_string(), req.clone(), body).await?;
79+
}
80+
} else {
81+
push_logs(stream_name.to_string(), req, body).await?;
82+
}
83+
Ok(())
84+
}
85+
5686
// Handler for POST /api/v1/logstream/{logstream}
5787
// only ingests events into the specified logstream
5888
// fails if the logstream does not exist
5989
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
6090
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
61-
push_logs(stream_name, req, body).await?;
91+
92+
flatten_and_push_logs(req, body, stream_name).await?;
6293
Ok(HttpResponse::Ok().finish())
6394
}
6495

server/src/handlers/http/kinesis.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use base64::{engine::general_purpose::STANDARD, Engine as _};
20+
use bytes::Bytes;
21+
use serde::{Deserialize, Serialize};
22+
use serde_json::Value;
23+
use std::collections::BTreeMap;
24+
use std::str;
25+
26+
#[derive(Serialize, Deserialize, Debug)]
27+
struct Message {
28+
#[serde(rename = "records")]
29+
records: Vec<Data>,
30+
#[serde(rename = "requestId")]
31+
request_id: String,
32+
timestamp: u64,
33+
}
34+
#[derive(Serialize, Deserialize, Debug)]
35+
struct Data {
36+
#[serde(rename = "data")]
37+
data: String,
38+
}
39+
40+
pub fn flatten_kinesis_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
41+
let body_str = std::str::from_utf8(body).unwrap();
42+
let message: Message = serde_json::from_str(body_str).unwrap();
43+
let mut vec_kinesis_json: Vec<BTreeMap<String, Value>> = Vec::new();
44+
45+
for record in message.records.iter() {
46+
let bytes = STANDARD.decode(record.data.clone()).unwrap();
47+
let json_string: String = String::from_utf8(bytes).unwrap();
48+
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
49+
let mut kinesis_json: BTreeMap<String, Value> = match serde_json::from_value(json) {
50+
Ok(value) => value,
51+
Err(error) => panic!("Failed to deserialize JSON: {}", error),
52+
};
53+
54+
kinesis_json.insert(
55+
"requestId".to_owned(),
56+
Value::String(message.request_id.clone()),
57+
);
58+
kinesis_json.insert(
59+
"timestamp".to_owned(),
60+
Value::String(message.timestamp.to_string()),
61+
);
62+
63+
vec_kinesis_json.push(kinesis_json);
64+
}
65+
vec_kinesis_json
66+
}

server/src/handlers/http/middleware.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use actix_web::{
2727
};
2828
use futures_util::future::LocalBoxFuture;
2929

30-
use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
30+
use crate::handlers::{
31+
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
32+
STREAM_NAME_HEADER_KEY,
33+
};
3134
use crate::{
3235
option::CONFIG,
3336
rbac::Users,
@@ -149,6 +152,10 @@ where
149152
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
150153
.unwrap(),
151154
);
155+
req.headers_mut().insert(
156+
HeaderName::from_static(LOG_SOURCE_KEY),
157+
header::HeaderValue::from_static(LOG_SOURCE_KINESIS),
158+
);
152159
}
153160

154161
/* ## Section end */

0 commit comments

Comments
 (0)