|
| 1 | +use aws_config; |
| 2 | +use aws_lambda_events::event::sqs::SqsEvent; |
| 3 | +use aws_lambda_events::s3::S3EventRecord; |
| 4 | +use aws_sdk_s3; |
| 5 | +use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; |
| 6 | +use oxbow_lambda_shared::*; |
| 7 | +use parquet::file::reader::FileReader; |
| 8 | +use parquet::file::serialized_reader::SerializedFileReader; |
| 9 | +use parquet::record::RowAccessor; |
| 10 | +use tracing::log::*; |
| 11 | + |
| 12 | +mod mysql_value; |
| 13 | +mod s3_upload; |
| 14 | +use mysql_value::ToMysqlValue; |
| 15 | + |
| 16 | +const CHANGE_TYPE_COLUMN: &str = "_change_type"; |
| 17 | +const INSERT: &str = "insert"; |
| 18 | +const UPDATE_POSTIMAGE: &str = "update_postimage"; |
| 19 | +const DELETE: &str = "delete"; |
| 20 | + |
| 21 | +#[tokio::main] |
| 22 | +async fn main() -> Result<(), Error> { |
| 23 | + tracing_subscriber::fmt() |
| 24 | + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) |
| 25 | + // disable printing the name of the module in every log line. |
| 26 | + .with_target(false) |
| 27 | + // disabling time is handy because CloudWatch will add the ingestion time. |
| 28 | + .without_time() |
| 29 | + .init(); |
| 30 | + |
| 31 | + info!("Starting cdf-to-csv"); |
| 32 | + |
| 33 | + run(service_fn(function_handler)).await |
| 34 | +} |
| 35 | + |
| 36 | +async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> { |
| 37 | + debug!("Receiving event: {:?}", event); |
| 38 | + |
| 39 | + let config = aws_config::load_defaults(aws_config::BehaviorVersion::v2025_01_17()).await; |
| 40 | + let client = aws_sdk_s3::Client::new(&config); |
| 41 | + |
| 42 | + let records = match std::env::var("UNWRAP_SNS_ENVELOPE") { |
| 43 | + Ok(_) => s3_from_sns(event.payload)?, |
| 44 | + Err(_) => s3_from_sqs(event.payload)?, |
| 45 | + }; |
| 46 | + |
| 47 | + let records: Vec<S3EventRecord> = records_with_url_decoded_keys(&records); |
| 48 | + debug!("processing records: {records:?}"); |
| 49 | + |
| 50 | + for record in records { |
| 51 | + match suffix_from_record(&record) { |
| 52 | + RecordType::Parquet => { |
| 53 | + process_parquet_file( |
| 54 | + &client, |
| 55 | + record |
| 56 | + .s3 |
| 57 | + .bucket |
| 58 | + .name |
| 59 | + .as_ref() |
| 60 | + .expect("Failed to retrieve a bucket name"), |
| 61 | + record |
| 62 | + .s3 |
| 63 | + .object |
| 64 | + .url_decoded_key |
| 65 | + .as_ref() |
| 66 | + .expect("Failed to get URL decoded key"), |
| 67 | + ) |
| 68 | + .await |
| 69 | + .map_err(|e| { |
| 70 | + format!( |
| 71 | + "error while processing s3://{}/{}: {:?}", |
| 72 | + &record.s3.bucket.name.unwrap(), |
| 73 | + &record.s3.object.url_decoded_key.unwrap(), |
| 74 | + e |
| 75 | + ) |
| 76 | + })?; |
| 77 | + } |
| 78 | + RecordType::Unknown => { |
| 79 | + info!("cdf-to-csv was invoked for a file with an unknown extension! Ignoring: {record:?}"); |
| 80 | + } |
| 81 | + } |
| 82 | + } |
| 83 | + Ok(()) |
| 84 | +} |
| 85 | + |
| 86 | +async fn process_parquet_file( |
| 87 | + client: &aws_sdk_s3::Client, |
| 88 | + bucket: &String, |
| 89 | + key: &String, |
| 90 | +) -> Result<(), Box<dyn std::error::Error>> { |
| 91 | + info!("Processing s3://{}/{}", bucket, key); |
| 92 | + |
| 93 | + let body = client |
| 94 | + .get_object() |
| 95 | + .bucket(bucket.clone()) |
| 96 | + .key(key.clone()) |
| 97 | + .send() |
| 98 | + .await |
| 99 | + .map_err(|e| format!("S3 get object: {:?}", e))? |
| 100 | + .body |
| 101 | + .collect() |
| 102 | + .await |
| 103 | + .map(|data| data.into_bytes()) |
| 104 | + .map_err(|e| format!("S3 reading body: {:?}", e))?; |
| 105 | + |
| 106 | + let reader = |
| 107 | + SerializedFileReader::new(body).map_err(|e| format!("can't read parquet {:?}", e))?; |
| 108 | + |
| 109 | + let max_rows_per_file = std::env::var("CSV_MAX_LINES_PER_FILE") |
| 110 | + .unwrap_or_else(|_| "1000000".to_string()) |
| 111 | + .parse::<usize>() |
| 112 | + .expect("CSV_MAX_LINES_PER_FILE must be a number"); |
| 113 | + |
| 114 | + let bucket = std::env::var("CSV_OUTPUT_BUCKET").expect("CSV_OUTPUT_BUCKET not set"); |
| 115 | + let update_prefix = std::env::var("CSV_OUTPUT_PREFIX").expect("CSV_OUTPUT_PREFIX not set"); |
| 116 | + let delete_prefix = std::env::var("CSV_DELETE_PREFIX").ok(); |
| 117 | + let basename = basename_from_key(key); |
| 118 | + |
| 119 | + let change_type_index = |
| 120 | + column_index(&reader, CHANGE_TYPE_COLUMN).ok_or("_change_type column not")?; |
| 121 | + let primary_key_column = std::env::var("DELETE_PRIMARY_KEY") |
| 122 | + .ok() |
| 123 | + .and_then(|name| column_index(&reader, &name)); |
| 124 | + |
| 125 | + let row_iter = reader |
| 126 | + .get_row_iter(None) |
| 127 | + .map_err(|e| format!("can't read row: {:?}", e))?; |
| 128 | + |
| 129 | + let mut updates = s3_upload::S3Upload::new( |
| 130 | + client, |
| 131 | + &bucket, |
| 132 | + &update_prefix, |
| 133 | + &basename, |
| 134 | + max_rows_per_file, |
| 135 | + ); |
| 136 | + let mut deletes: Option<s3_upload::S3Upload> = delete_prefix.map(|prefix| { |
| 137 | + s3_upload::S3Upload::new(client, &bucket, &prefix, &basename, max_rows_per_file) |
| 138 | + }); |
| 139 | + |
| 140 | + for row in row_iter { |
| 141 | + let row = row.map_err(|e| format!("failed reading row: {:?}", e))?; |
| 142 | + |
| 143 | + let change_type = row.get_string(change_type_index)?; |
| 144 | + if change_type == INSERT || change_type == UPDATE_POSTIMAGE { |
| 145 | + let fields = row |
| 146 | + .get_column_iter() |
| 147 | + .filter_map(|(name, field)| { |
| 148 | + if name != CHANGE_TYPE_COLUMN { |
| 149 | + Some( |
| 150 | + field |
| 151 | + .to_mysql_value() |
| 152 | + .map_err(|e| format!("error encoding {}: {:?}", name, e)), |
| 153 | + ) |
| 154 | + } else { |
| 155 | + None |
| 156 | + } |
| 157 | + }) |
| 158 | + .collect::<Result<Vec<_>, _>>()?; |
| 159 | + |
| 160 | + updates.write_record(fields).await?; |
| 161 | + } else if change_type == DELETE { |
| 162 | + match (primary_key_column, deletes.as_mut()) { |
| 163 | + (Some(primary_key_column), Some(ref mut deletes)) => { |
| 164 | + let primary_key_value = row |
| 165 | + .get_column_iter() |
| 166 | + .skip(primary_key_column) |
| 167 | + .next() |
| 168 | + .ok_or("cannot get primary key, row does not have enough columns")? |
| 169 | + .1 |
| 170 | + .to_mysql_value() |
| 171 | + .map_err(|e| { |
| 172 | + format!("cannot convert primary key to mysql value: {:?}", e) |
| 173 | + })?; |
| 174 | + deletes.write_record(vec![primary_key_value]).await?; |
| 175 | + } |
| 176 | + _ => {} |
| 177 | + } |
| 178 | + } |
| 179 | + } |
| 180 | + |
| 181 | + updates.close().await?; |
| 182 | + if let Some(deletes) = deletes { |
| 183 | + deletes.close().await?; |
| 184 | + } |
| 185 | + |
| 186 | + Ok(()) |
| 187 | +} |
| 188 | + |
| 189 | +fn column_index(parquet: &dyn FileReader, column_name: &str) -> Option<usize> { |
| 190 | + parquet |
| 191 | + .metadata() |
| 192 | + .file_metadata() |
| 193 | + .schema_descr() |
| 194 | + .columns() |
| 195 | + .iter() |
| 196 | + .enumerate() |
| 197 | + .find(|(_idx, column)| column.name() == column_name) |
| 198 | + .map(|(idx, _)| idx) |
| 199 | +} |
| 200 | + |
| 201 | +fn basename_from_key(key: &str) -> String { |
| 202 | + key.split('/') |
| 203 | + .last() |
| 204 | + .unwrap() |
| 205 | + .split('.') |
| 206 | + .next() |
| 207 | + .unwrap() |
| 208 | + .to_string() |
| 209 | +} |
| 210 | + |
| 211 | +#[derive(Debug, PartialEq)] |
| 212 | +enum RecordType { |
| 213 | + Parquet, |
| 214 | + Unknown, |
| 215 | +} |
| 216 | + |
| 217 | +fn suffix_from_record(record: &S3EventRecord) -> RecordType { |
| 218 | + if let Some(key) = record.s3.object.key.as_ref() { |
| 219 | + if key.ends_with(".parquet") { |
| 220 | + return RecordType::Parquet; |
| 221 | + } |
| 222 | + } |
| 223 | + if let Some(key) = record.s3.object.url_decoded_key.as_ref() { |
| 224 | + if key.ends_with(".parquet") { |
| 225 | + return RecordType::Parquet; |
| 226 | + } |
| 227 | + } |
| 228 | + RecordType::Unknown |
| 229 | +} |
| 230 | + |
| 231 | +#[cfg(test)] |
| 232 | +mod tests { |
| 233 | + use super::*; |
| 234 | + use aws_lambda_events::s3::{S3Entity, S3EventRecord, S3Object}; |
| 235 | + |
| 236 | + #[test] |
| 237 | + fn test_suffix_from_record() { |
| 238 | + let d = S3EventRecord::default(); |
| 239 | + assert_eq!(suffix_from_record(&d), RecordType::Unknown); |
| 240 | + |
| 241 | + let jsonl = S3EventRecord { |
| 242 | + s3: S3Entity { |
| 243 | + object: S3Object { |
| 244 | + key: Some("some/prefix/fileA.jsonl".into()), |
| 245 | + url_decoded_key: Some("some/prefix/fileA.parquet".into()), |
| 246 | + ..Default::default() |
| 247 | + }, |
| 248 | + ..Default::default() |
| 249 | + }, |
| 250 | + ..Default::default() |
| 251 | + }; |
| 252 | + assert_eq!(suffix_from_record(&jsonl), RecordType::Parquet); |
| 253 | + } |
| 254 | +} |
0 commit comments