Skip to content

Commit 238b9ae

Browse files
feat: add Kinesis integration & support (#603)
AWS Kinesis doesn't allow out of the box Auth header support, instead all the custom (user provided) headers are sent under the header `x-amz-firehose-common-attributes`, with value as {"commonAttributes":{"Authorization":"Basic <<base64 encoded credentials","X-P-Stream":"<>","Content-Type":"application/json"}} This PR adds a mechanism to look for the kinesis custom header in the actix middleware and update the contained headers to proper top level headers fixes #602
1 parent ce58c6c commit 238b9ae

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

server/src/handlers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub mod livetail;
2222
const PREFIX_TAGS: &str = "x-p-tag-";
2323
const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
25+
const AUTHORIZATION_KEY: &str = "authorization";
26+
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
2527
const SEPARATOR: char = '^';
2628

2729
const OIDC_SCOPE: &str = "openid profile email";

server/src/handlers/http/middleware.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,35 @@ use std::future::{ready, Ready};
2222
use actix_web::{
2323
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
2424
error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized},
25+
http::header::{self, HeaderName},
2526
Error, Route,
2627
};
2728
use futures_util::future::LocalBoxFuture;
2829

30+
use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
2931
use crate::{
3032
option::CONFIG,
3133
rbac::Users,
3234
rbac::{self, role::Action},
3335
utils::actix::extract_session_key,
3436
};
3537

38+
use serde::{Deserialize, Serialize};
39+
40+
#[derive(Serialize, Deserialize, Debug)]
41+
struct Message {
42+
#[serde(rename = "commonAttributes")]
43+
common_attributes: CommonAttributes,
44+
}
45+
46+
#[derive(Serialize, Deserialize, Debug)]
47+
struct CommonAttributes {
48+
#[serde(rename = "Authorization")]
49+
authorization: String,
50+
#[serde(rename = "X-P-Stream")]
51+
x_p_stream: String,
52+
}
53+
3654
pub trait RouteExt {
3755
fn authorize(self, action: Action) -> Self;
3856
fn authorize_for_stream(self, action: Action) -> Self;
@@ -108,6 +126,33 @@ where
108126
forward_ready!(service);
109127

110128
fn call(&self, mut req: ServiceRequest) -> Self::Future {
129+
/*Below section is added to extract the Authorization and X-P-Stream headers from x-amz-firehose-common-attributes custom header
130+
when request is made from Kinesis Firehose.
131+
For requests made from other clients, no change.
132+
133+
## Section start */
134+
if let Some((_, kinesis_common_attributes)) = req
135+
.request()
136+
.headers()
137+
.iter()
138+
.find(|&(key, _)| key == KINESIS_COMMON_ATTRIBUTES_KEY)
139+
{
140+
let attribute_value: &str = kinesis_common_attributes.to_str().unwrap();
141+
let message: Message = serde_json::from_str(attribute_value).unwrap();
142+
req.headers_mut().insert(
143+
HeaderName::from_static(AUTHORIZATION_KEY),
144+
header::HeaderValue::from_str(&message.common_attributes.authorization.clone())
145+
.unwrap(),
146+
);
147+
req.headers_mut().insert(
148+
HeaderName::from_static(STREAM_NAME_HEADER_KEY),
149+
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
150+
.unwrap(),
151+
);
152+
}
153+
154+
/* ## Section end */
155+
111156
let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action);
112157
let fut = self.service.call(req);
113158
Box::pin(async move {

0 commit comments

Comments
 (0)