-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathmain.rs
More file actions
125 lines (96 loc) · 3.94 KB
/
main.rs
File metadata and controls
125 lines (96 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use anyhow::Result;
use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use clap::{Parser, ValueEnum};
use rdkafka::consumer::Consumer;
use std::net::SocketAddr;
use tips_audit_lib::{
KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer,
};
mod logger;
mod metrics;
use logger::init_logger_with_format;
use metrics::init_prometheus_exporter;
use tracing::info;
#[derive(Debug, Clone, ValueEnum)]
enum S3ConfigType {
Aws,
Manual,
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long, env = "TIPS_AUDIT_KAFKA_PROPERTIES_FILE")]
kafka_properties_file: String,
#[arg(long, env = "TIPS_AUDIT_KAFKA_TOPIC")]
kafka_topic: String,
#[arg(long, env = "TIPS_AUDIT_S3_BUCKET")]
s3_bucket: String,
#[arg(long, env = "TIPS_AUDIT_LOG_LEVEL", default_value = "info")]
log_level: String,
#[arg(long, env = "TIPS_AUDIT_LOG_FORMAT", default_value = "pretty")]
log_format: logger::LogFormat,
#[arg(long, env = "TIPS_AUDIT_S3_CONFIG_TYPE", default_value = "aws")]
s3_config_type: S3ConfigType,
#[arg(long, env = "TIPS_AUDIT_S3_ENDPOINT")]
s3_endpoint: Option<String>,
#[arg(long, env = "TIPS_AUDIT_S3_REGION", default_value = "us-east-1")]
s3_region: String,
#[arg(long, env = "TIPS_AUDIT_S3_ACCESS_KEY_ID")]
s3_access_key_id: Option<String>,
#[arg(long, env = "TIPS_AUDIT_S3_SECRET_ACCESS_KEY")]
s3_secret_access_key: Option<String>,
#[arg(long, env = "TIPS_AUDIT_METRICS_ADDR", default_value = "0.0.0.0:9002")]
metrics_addr: SocketAddr,
}
#[tokio::main]
async fn main() -> Result<()> {
dotenvy::dotenv().ok();
let args = Args::parse();
init_logger_with_format(&args.log_level, args.log_format);
init_prometheus_exporter(args.metrics_addr).expect("Failed to install Prometheus exporter");
info!(
kafka_properties_file = %args.kafka_properties_file,
kafka_topic = %args.kafka_topic,
s3_bucket = %args.s3_bucket,
metrics_addr = %args.metrics_addr,
"Starting audit archiver"
);
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
consumer.subscribe(&[&args.kafka_topic])?;
let reader = KafkaAuditLogReader::new(consumer, args.kafka_topic.clone())?;
let s3_client = create_s3_client(&args).await?;
let s3_bucket = args.s3_bucket.clone();
let writer = S3EventReaderWriter::new(s3_client, s3_bucket);
let mut archiver = KafkaAuditArchiver::new(reader, writer);
info!("Audit archiver initialized, starting main loop");
archiver.run().await
}
async fn create_s3_client(args: &Args) -> Result<S3Client> {
match args.s3_config_type {
S3ConfigType::Manual => {
let region = args.s3_region.clone();
let mut config_builder =
aws_config::defaults(BehaviorVersion::latest()).region(Region::new(region));
if let Some(endpoint) = &args.s3_endpoint {
config_builder = config_builder.endpoint_url(endpoint);
}
if let (Some(access_key), Some(secret_key)) =
(&args.s3_access_key_id, &args.s3_secret_access_key)
{
let credentials = Credentials::new(access_key, secret_key, None, None, "manual");
config_builder = config_builder.credentials_provider(credentials);
}
let config = config_builder.load().await;
let s3_config_builder = S3ConfigBuilder::from(&config).force_path_style(true);
info!(message = "manually configuring s3 client");
Ok(S3Client::from_conf(s3_config_builder.build()))
}
S3ConfigType::Aws => {
info!(message = "using aws s3 client");
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
Ok(S3Client::new(&config))
}
}
}