|
1 |
| -use std::{collections::HashMap, fmt::Debug, sync::Arc}; |
| 1 | +/* |
| 2 | + * Parseable Server (C) 2022 - 2024 Parseable, Inc. |
| 3 | + * |
| 4 | + * This program is free software: you can redistribute it and/or modify |
| 5 | + * it under the terms of the GNU Affero General Public License as |
| 6 | + * published by the Free Software Foundation, either version 3 of the |
| 7 | + * License, or (at your option) any later version. |
| 8 | + * |
| 9 | + * This program is distributed in the hope that it will be useful, |
| 10 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | + * GNU Affero General Public License for more details. |
| 13 | + * |
| 14 | + * You should have received a copy of the GNU Affero General Public License |
| 15 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 16 | + * |
| 17 | + */ |
| 18 | + |
| 19 | +use std::{ |
| 20 | + collections::HashMap, |
| 21 | + fmt::{Debug, Display}, |
| 22 | + sync::Arc, |
| 23 | +}; |
2 | 24 |
|
3 | 25 | use crate::about::current;
|
4 | 26 | use crate::handlers::http::modal::utils::rbac_utils::get_metadata;
|
5 | 27 |
|
6 | 28 | use super::option::CONFIG;
|
7 | 29 | use chrono::{DateTime, Utc};
|
| 30 | +use once_cell::sync::Lazy; |
8 | 31 | use reqwest::Client;
|
9 | 32 | use serde::Serialize;
|
10 |
| -use serde_json::{json, Map, Value}; |
11 |
| -use tokio::runtime::Handle; |
12 |
| -use tracing::info; |
13 |
| -use tracing::{ |
14 |
| - error, |
15 |
| - field::{Field, Visit}, |
16 |
| - Event, Metadata, Subscriber, |
17 |
| -}; |
18 |
| -use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; |
| 33 | +use serde_json::{json, Value}; |
| 34 | +use tracing::error; |
| 35 | + |
19 | 36 | use ulid::Ulid;
|
20 | 37 | use url::Url;
|
21 | 38 |
|
22 |
| -pub struct AuditLayer { |
| 39 | +static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new); |
| 40 | + |
| 41 | +pub struct AuditLogger { |
23 | 42 | client: Arc<Client>,
|
24 | 43 | log_endpoint: Url,
|
25 | 44 | username: Option<String>,
|
26 | 45 | password: Option<String>,
|
27 |
| - runtime_handle: Handle, |
28 | 46 | }
|
29 | 47 |
|
30 |
| -impl AuditLayer { |
31 |
| - /// Create an audit layer that works with the tracing system to capture |
32 |
| - /// and push audit logs to the appropriate logger over HTTP |
33 |
| - pub fn new(runtime_handle: Handle) -> Option<Self> { |
34 |
| - let audit_logger = CONFIG.parseable.audit_logger.as_ref()?; |
35 |
| - let client = Arc::new(reqwest::Client::new()); |
36 |
| - let log_endpoint = match audit_logger.join("/api/v1/ingest") { |
| 48 | +impl AuditLogger { |
| 49 | + /// Create an audit logger that can be used to capture |
| 50 | + /// and push audit logs to the appropriate logging system over HTTP |
| 51 | + pub fn new() -> Option<AuditLogger> { |
| 52 | + let log_endpoint = match CONFIG |
| 53 | + .parseable |
| 54 | + .audit_logger |
| 55 | + .as_ref()? |
| 56 | + .join("/api/v1/ingest") |
| 57 | + { |
37 | 58 | Ok(url) => url,
|
38 | 59 | Err(err) => {
|
39 |
| - error!("Couldn't setup audit logger: {err}"); |
| 60 | + eprintln!("Couldn't setup audit logger: {err}"); |
40 | 61 | return None;
|
41 | 62 | }
|
42 | 63 | };
|
43 | 64 |
|
| 65 | + let client = Arc::new(reqwest::Client::new()); |
| 66 | + |
44 | 67 | let username = CONFIG.parseable.audit_username.clone();
|
45 | 68 | let password = CONFIG.parseable.audit_password.clone();
|
46 | 69 |
|
47 |
| - Some(Self { |
| 70 | + Some(AuditLogger { |
48 | 71 | client,
|
49 | 72 | log_endpoint,
|
50 | 73 | username,
|
51 | 74 | password,
|
52 |
| - runtime_handle, |
53 | 75 | })
|
54 | 76 | }
|
55 |
| -} |
56 |
| - |
57 |
| -impl<S> Layer<S> for AuditLayer |
58 |
| -where |
59 |
| - S: Subscriber + for<'a> LookupSpan<'a>, |
60 |
| -{ |
61 |
| - fn enabled(&self, _: &Metadata<'_>, _: Context<'_, S>) -> bool { |
62 |
| - true // log everything if it is auditable |
63 |
| - } |
64 |
| - |
65 |
| - fn on_event(&self, event: &Event<'_>, _: Context<'_, S>) { |
66 |
| - let mut visitor = AuditVisitor::default(); |
67 |
| - event.record(&mut visitor); |
68 |
| - |
69 |
| - // if the log line contains `audit` string with serialized json object, construct an HTTP request and push to configured audit endpoint |
70 |
| - // NOTE: We only support the ingest API of parseable for audit logging parseable |
71 |
| - if visitor.audit { |
72 |
| - let mut req = self |
73 |
| - .client |
74 |
| - .post(self.log_endpoint.as_str()) |
75 |
| - .json(&visitor.json) |
76 |
| - .header("x-p-stream", "audit_log"); |
77 |
| - if let Some(username) = self.username.as_ref() { |
78 |
| - req = req.basic_auth(username, self.password.as_ref()) |
79 |
| - } |
80 | 77 |
|
81 |
| - self.runtime_handle.spawn(async move { |
82 |
| - match req.send().await { |
83 |
| - Ok(r) => { |
84 |
| - if let Err(e) = r.error_for_status() { |
85 |
| - println!("{e}") |
86 |
| - } |
87 |
| - } |
88 |
| - Err(e) => eprintln!("Failed to send audit event: {}", e), |
89 |
| - } |
90 |
| - }); |
| 78 | + async fn send_log(&self, json: Value) { |
| 79 | + let mut req = self |
| 80 | + .client |
| 81 | + .post(self.log_endpoint.as_str()) |
| 82 | + .json(&json) |
| 83 | + .header("x-p-stream", "audit_log"); |
| 84 | + if let Some(username) = self.username.as_ref() { |
| 85 | + req = req.basic_auth(username, self.password.as_ref()) |
91 | 86 | }
|
92 |
| - } |
93 |
| -} |
94 | 87 |
|
95 |
| -#[derive(Debug, Default)] |
96 |
| -struct AuditVisitor { |
97 |
| - json: Map<String, Value>, |
98 |
| - audit: bool, |
99 |
| -} |
100 |
| - |
101 |
| -impl Visit for AuditVisitor { |
102 |
| - fn record_str(&mut self, field: &Field, value: &str) { |
103 |
| - if field.name() == "audit" { |
104 |
| - if let Ok(Value::Object(json)) = serde_json::from_str(value) { |
105 |
| - self.audit = true; |
106 |
| - self.json = json; |
| 88 | + match req.send().await { |
| 89 | + Ok(r) => { |
| 90 | + if let Err(e) = r.error_for_status() { |
| 91 | + error!("{e}") |
| 92 | + } |
107 | 93 | }
|
| 94 | + Err(e) => error!("Failed to send audit event: {}", e), |
108 | 95 | }
|
109 | 96 | }
|
110 |
| - |
111 |
| - fn record_debug(&mut self, _: &Field, _: &dyn Debug) {} |
112 | 97 | }
|
113 | 98 |
|
114 | 99 | #[non_exhaustive]
|
115 | 100 | #[repr(u8)]
|
116 |
| -#[derive(Debug, Clone, Copy)] |
| 101 | +#[derive(Debug, Clone, Copy, Serialize)] |
117 | 102 | pub enum AuditLogVersion {
|
118 | 103 | V1 = 1,
|
119 | 104 | }
|
@@ -152,60 +137,119 @@ impl Default for ResponseLog {
|
152 | 137 | }
|
153 | 138 | }
|
154 | 139 |
|
155 |
| -pub struct AuditLogBuilder { |
156 |
| - version: AuditLogVersion, |
157 |
| - deployment_id: Ulid, |
158 |
| - audit_id: Ulid, |
159 |
| - start_time: DateTime<Utc>, |
160 |
| - stream: String, |
| 140 | +#[derive(Serialize)] |
| 141 | +#[serde(rename_all = "camelCase")] |
| 142 | +pub struct AuditLog { |
| 143 | + pub version: AuditLogVersion, |
| 144 | + pub parseable_version: String, |
| 145 | + pub deployment_id: Ulid, |
| 146 | + pub audit_id: Ulid, |
| 147 | + pub start_time: DateTime<Utc>, |
| 148 | + pub end_time: DateTime<Utc>, |
| 149 | + pub stream: String, |
161 | 150 | pub actor: ActorLog,
|
162 | 151 | pub request: RequestLog,
|
163 | 152 | pub response: ResponseLog,
|
164 | 153 | }
|
165 | 154 |
|
| 155 | +pub struct AuditLogBuilder { |
| 156 | + start_time: DateTime<Utc>, |
| 157 | + stream: String, |
| 158 | + pub actor: Option<ActorLog>, |
| 159 | + pub request: Option<RequestLog>, |
| 160 | + pub response: Option<ResponseLog>, |
| 161 | +} |
| 162 | + |
166 | 163 | impl Default for AuditLogBuilder {
|
167 | 164 | fn default() -> Self {
|
168 | 165 | AuditLogBuilder {
|
169 |
| - version: AuditLogVersion::V1, |
170 |
| - deployment_id: Ulid::nil(), |
171 |
| - audit_id: Ulid::new(), |
172 | 166 | start_time: Utc::now(),
|
173 | 167 | stream: String::default(),
|
174 |
| - actor: ActorLog::default(), |
175 |
| - request: RequestLog::default(), |
176 |
| - response: ResponseLog::default(), |
| 168 | + actor: None, |
| 169 | + request: None, |
| 170 | + response: None, |
177 | 171 | }
|
178 | 172 | }
|
179 | 173 | }
|
180 | 174 |
|
181 | 175 | impl AuditLogBuilder {
|
182 |
| - pub async fn set_deployment_id(&mut self) { |
183 |
| - self.deployment_id = get_metadata().await.unwrap().deployment_id; |
| 176 | + pub fn set_stream_name(&mut self, stream: impl Into<String>) { |
| 177 | + if AUDIT_LOGGER.is_none() { |
| 178 | + return; |
| 179 | + } |
| 180 | + self.stream = stream.into(); |
184 | 181 | }
|
185 | 182 |
|
186 |
| - pub fn set_response_error(&mut self, err: String) { |
187 |
| - self.response.error = Some(err); |
| 183 | + pub fn set_actor( |
| 184 | + &mut self, |
| 185 | + host: impl Into<String>, |
| 186 | + username: impl Into<String>, |
| 187 | + user_agent: impl Into<String>, |
| 188 | + auth_method: impl Into<String>, |
| 189 | + ) { |
| 190 | + if AUDIT_LOGGER.is_none() { |
| 191 | + return; |
| 192 | + } |
| 193 | + self.actor = Some(ActorLog { |
| 194 | + remote_host: host.into(), |
| 195 | + user_agent: user_agent.into(), |
| 196 | + username: username.into(), |
| 197 | + authorization_method: auth_method.into(), |
| 198 | + }); |
188 | 199 | }
|
189 | 200 |
|
190 |
| - pub fn set_stream_name(&mut self, stream: String) { |
191 |
| - self.stream = stream; |
| 201 | + pub fn set_request( |
| 202 | + &mut self, |
| 203 | + method: impl Into<String>, |
| 204 | + path: impl Into<String>, |
| 205 | + protocol: impl Into<String>, |
| 206 | + headers: impl IntoIterator<Item = (String, String)>, |
| 207 | + ) { |
| 208 | + if AUDIT_LOGGER.is_none() { |
| 209 | + return; |
| 210 | + } |
| 211 | + self.request = Some(RequestLog { |
| 212 | + method: method.into(), |
| 213 | + path: path.into(), |
| 214 | + protocol: protocol.into(), |
| 215 | + headers: headers.into_iter().collect(), |
| 216 | + }); |
192 | 217 | }
|
193 |
| -} |
194 | 218 |
|
195 |
| -impl Drop for AuditLogBuilder { |
196 |
| - fn drop(&mut self) { |
197 |
| - let audit_json = json!({ |
198 |
| - "version": self.version as u8, |
199 |
| - "parseableVersion": current().released_version.to_string(), |
200 |
| - "deploymentId" : self.deployment_id, |
201 |
| - "auditId" : self.audit_id, |
202 |
| - "startTime" : self.start_time.to_rfc3339(), |
203 |
| - "endTime" : Utc::now().to_rfc3339(), |
204 |
| - "stream" : self.stream, |
205 |
| - "actor" : self.actor, |
206 |
| - "request" : self.request, |
207 |
| - "response" : self.response, |
208 |
| - }); |
209 |
| - info!(audit = audit_json.to_string()) |
| 219 | + pub fn set_response(&mut self, status_code: u16, err: impl Display) { |
| 220 | + if AUDIT_LOGGER.is_none() { |
| 221 | + return; |
| 222 | + } |
| 223 | + let error = err.to_string(); |
| 224 | + let error = error.is_empty().then(|| error); |
| 225 | + self.response = Some(ResponseLog { status_code, error }); |
| 226 | + } |
| 227 | + |
| 228 | + // NOTE: Ensure that the logger has been constructed by Default |
| 229 | + pub async fn send(self) { |
| 230 | + let AuditLogBuilder { |
| 231 | + start_time, |
| 232 | + stream, |
| 233 | + actor, |
| 234 | + request, |
| 235 | + response, |
| 236 | + } = self; |
| 237 | + let Some(logger) = AUDIT_LOGGER.as_ref() else { |
| 238 | + return; |
| 239 | + }; |
| 240 | + let audit_log = AuditLog { |
| 241 | + version: AuditLogVersion::V1, |
| 242 | + parseable_version: current().released_version.to_string(), |
| 243 | + deployment_id: get_metadata().await.unwrap().deployment_id, |
| 244 | + audit_id: Ulid::new(), |
| 245 | + start_time, |
| 246 | + end_time: Utc::now(), |
| 247 | + stream, |
| 248 | + actor: actor.unwrap_or_default(), |
| 249 | + request: request.unwrap_or_default(), |
| 250 | + response: response.unwrap_or_default(), |
| 251 | + }; |
| 252 | + |
| 253 | + logger.send_log(json!(audit_log)).await |
210 | 254 | }
|
211 | 255 | }
|
0 commit comments