diff --git a/main.rs b/main.rs index e18a402..3b35678 100644 --- a/main.rs +++ b/main.rs @@ -1,15 +1,12 @@ -use lambda_runtime::{handler_fn, Error}; use aws_lambda_events::s3::S3Event; -use serde_json::Value; -use flate2::read::{GzDecoder}; +use flate2::bufread::GzDecoder; use flate2::write::GzEncoder; -use flate2::GzBuilder; use flate2::Compression; -use std::io::Read; +use flate2::GzBuilder; +use lambda_runtime::{handler_fn, Error}; +use serde_json::Value; use std::io::prelude::*; -use std::io::BufReader; -use std::fs::File; - +use std::io::{BufRead, BufReader}; #[tokio::main] async fn main() -> Result<(), lambda_runtime::Error> { @@ -19,56 +16,58 @@ async fn main() -> Result<(), lambda_runtime::Error> { } async fn handler(req: Value, _ctx: lambda_runtime::Context) -> Result<(), Box> { - let events: S3Event = serde_json::from_value(req).unwrap();; + let events: S3Event = serde_json::from_value(req).unwrap(); for e in events.records { let bucket_name: String = e.s3.bucket.name.expect("Unable to get s3 bucket name."); let key: String = e.s3.object.key.expect("unable to get s3 file key"); - + let config: aws_config::SdkConfig = aws_config::load_from_env().await; let s3_client: aws_sdk_s3::Client = aws_sdk_s3::Client::new(&config); let data: aws_lambda_events::bytes::Bytes = s3_client - .get_object() - .bucket(&bucket_name) - .key(&key) - .send() - .await.unwrap().body - .collect().await.unwrap().into_bytes(); - - let mut d: GzDecoder<&[u8]> = GzDecoder::new(&data[..]); - let mut csv_data: String = String::new(); - d.read_to_string(&mut csv_data).unwrap(); + .get_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await + .unwrap() + .body + .collect() + .await + .unwrap() + .into_bytes(); + + let decoder: BufReader> = BufReader::new(GzDecoder::new(&data[..])); - let split: std::str::Lines<'_> = csv_data.lines(); - let result_vector: Vec<&str> = split.collect(); + let output = Vec::new(); + let mut encoder: GzEncoder> = GzBuilder::new() + .filename("tab_converted.txt") + .write(output, Compression::default()); - let mut tab_converted: String = String::new(); - for line in result_vector.iter().skip(1) { + for line in decoder.lines().skip(1) { + let line = line.unwrap(); let date: &&str = &line[0..14].trim(); let serial_number: &&str = &line[15..35].trim(); let model: &&str = &line[36..78].trim(); let capacity_bytes: &&str = &line[79..97].trim(); let failure: &&str = &line[98..108].trim(); - let tab_line: String = format!( "{}\t{}\t{}\t{}\t{}\n", date, serial_number, model, capacity_bytes, failure); - tab_converted.push_str(&tab_line); - + writeln!( + &mut encoder, + "{}\t{}\t{}\t{}\t{}", + date, serial_number, model, capacity_bytes, failure + ) + .unwrap(); } - let f: File = File::create("/tmp/file.gz").expect("failed to create file"); - let mut gz: GzEncoder = GzBuilder::new() - .filename("tab_converted.txt") - .write(f, Compression::default()); - gz.write_all(tab_converted.as_bytes()).expect("failed to write bytes to file"); - gz.finish().expect("failed to flush bytes to file"); - - let file: File = File::open("/tmp/file.gz").expect("problem reading file"); - let mut reader: BufReader = BufReader::new(file); - let mut buffer: Vec = Vec::new(); - - reader.read_to_end(&mut buffer).expect("error"); + let output = encoder.finish().expect("failed to flush bytes to file"); let remote_uri: &String = &key.replace("fixed_width_raw/", "tab_converted/"); - s3_client.put_object().bucket(&bucket_name).key(remote_uri).body(buffer.into()).send().await.unwrap(); - + s3_client + .put_object() + .bucket(&bucket_name) + .key(remote_uri) + .body(output.into()) + .send() + .await + .unwrap(); } Ok(()) } -