| 
 | 1 | +/* eslint-disable no-console */  | 
 | 2 | +import {  | 
 | 3 | +  FirehoseClient,  | 
 | 4 | +  PutRecordBatchCommand,  | 
 | 5 | +} from "@aws-sdk/client-firehose";  | 
 | 6 | +import { unmarshall } from "@aws-sdk/util-dynamodb";  | 
 | 7 | +import type { DynamoDBStreamEvent, Context } from "aws-lambda";  | 
 | 8 | +import { AttributeValue } from "@aws-sdk/client-dynamodb";  | 
 | 9 | + | 
 | 10 | +const firehoseClient = new FirehoseClient({});  | 
 | 11 | + | 
 | 12 | +const FIREHOSE_STREAM_NAME = process.env.FIREHOSE_STREAM_NAME;  | 
 | 13 | + | 
 | 14 | +if (!FIREHOSE_STREAM_NAME) {  | 
 | 15 | +  console.error("The 'FIREHOSE_STREAM_NAME' environment variable is not set.");  | 
 | 16 | +  throw new Error("'FIREHOSE_STREAM_NAME' is not set.");  | 
 | 17 | +}  | 
 | 18 | + | 
 | 19 | +const toUtcIsoStringWithoutMillis = (date: Date): string => {  | 
 | 20 | +  return `${date.toISOString().slice(0, 19)}Z`;  | 
 | 21 | +};  | 
 | 22 | + | 
 | 23 | +/**  | 
 | 24 | + * Defines a map where keys are DynamoDB table names and values are functions  | 
 | 25 | + * that extract a meaningful timestamp from a record. The function should  | 
 | 26 | + * return a value parseable by the `Date` constructor (e.g., ISO 8601 string or epoch milliseconds).  | 
 | 27 | + */  | 
 | 28 | +const ARCHIVE_TIMESTAMP_MAPPER: Record<  | 
 | 29 | +  string,  | 
 | 30 | +  (record: Record<string, any>) => string | number  | 
 | 31 | +> = {  | 
 | 32 | +  "infra-core-api-room-requests-status": (record) =>  | 
 | 33 | +    record["createdAt#status"].split("#")[0],  | 
 | 34 | +  "infra-core-api-events": (record) => record.createdAt,  | 
 | 35 | +  "infra-core-api-audit-log": (record) => record.createdAt * 1000, // Convert Unix seconds to milliseconds  | 
 | 36 | +};  | 
 | 37 | + | 
 | 38 | +export const handler = async (  | 
 | 39 | +  event: DynamoDBStreamEvent,  | 
 | 40 | +  _context: Context,  | 
 | 41 | +): Promise<any> => {  | 
 | 42 | +  const firehoseRecordsToSend: { Data: Buffer }[] = [];  | 
 | 43 | + | 
 | 44 | +  for (const record of event.Records) {  | 
 | 45 | +    // 1. **Filter for TTL Deletions**: We only care about `REMOVE` events initiated by DynamoDB's TTL service.  | 
 | 46 | +    if (  | 
 | 47 | +      record.eventName === "REMOVE" &&  | 
 | 48 | +      record.userIdentity?.principalId === "dynamodb.amazonaws.com"  | 
 | 49 | +    ) {  | 
 | 50 | +      // 2. **Extract Table Name**: The table name is parsed from the event source ARN.  | 
 | 51 | +      //    ARN format: arn:aws:dynamodb:region:account-id:table/TABLE_NAME/stream/...  | 
 | 52 | +      const tableName = record.eventSourceARN?.split("/")[1];  | 
 | 53 | +      if (!tableName) {  | 
 | 54 | +        console.warn(  | 
 | 55 | +          `Could not parse table name from ARN: ${record.eventSourceARN}`,  | 
 | 56 | +        );  | 
 | 57 | +        continue; // Skip this record if the ARN is malformed  | 
 | 58 | +      }  | 
 | 59 | + | 
 | 60 | +      // 3. **Get and Deserialize Data**: The content of the expired record is in 'OldImage'.  | 
 | 61 | +      const oldImage = record.dynamodb?.OldImage;  | 
 | 62 | +      if (!oldImage) {  | 
 | 63 | +        continue; // Skip if there's no data to archive  | 
 | 64 | +      }  | 
 | 65 | + | 
 | 66 | +      // The `unmarshall` utility converts the DynamoDB format to a standard JavaScript object.  | 
 | 67 | +      const deserializedData = unmarshall(  | 
 | 68 | +        oldImage as { [key: string]: AttributeValue },  | 
 | 69 | +      );  | 
 | 70 | + | 
 | 71 | +      // 4. **Construct the Payload**: Add metadata to the original record data.  | 
 | 72 | +      const payload: Record<string, any> = {  | 
 | 73 | +        ...deserializedData,  | 
 | 74 | +        __infra_archive_resource: tableName,  | 
 | 75 | +        __infra_archive_timestamp: toUtcIsoStringWithoutMillis(new Date()), // Default timestamp is 'now'  | 
 | 76 | +      };  | 
 | 77 | + | 
 | 78 | +      // 5. **Apply Custom Timestamp**: If a specific timestamp extractor is defined for this table, use it.  | 
 | 79 | +      if (tableName in ARCHIVE_TIMESTAMP_MAPPER) {  | 
 | 80 | +        try {  | 
 | 81 | +          const timestampSource =  | 
 | 82 | +            ARCHIVE_TIMESTAMP_MAPPER[tableName](deserializedData);  | 
 | 83 | +          payload.__infra_archive_timestamp = toUtcIsoStringWithoutMillis(  | 
 | 84 | +            new Date(timestampSource),  | 
 | 85 | +          );  | 
 | 86 | +        } catch (e) {  | 
 | 87 | +          const error = e instanceof Error ? e.message : String(e);  | 
 | 88 | +          console.error(  | 
 | 89 | +            `Failed to extract timestamp for record from ${tableName}: ${error}. Using 'now' as timestamp.`,  | 
 | 90 | +          );  | 
 | 91 | +        }  | 
 | 92 | +      }  | 
 | 93 | + | 
 | 94 | +      firehoseRecordsToSend.push({  | 
 | 95 | +        Data: Buffer.from(JSON.stringify(payload)),  | 
 | 96 | +      });  | 
 | 97 | +    }  | 
 | 98 | +  }  | 
 | 99 | + | 
 | 100 | +  // 6. **Send Records to Firehose**: If we found any TTL-expired records, send them.  | 
 | 101 | +  if (firehoseRecordsToSend.length > 0) {  | 
 | 102 | +    console.info(  | 
 | 103 | +      `Found ${firehoseRecordsToSend.length} TTL-expired records to archive.`,  | 
 | 104 | +    );  | 
 | 105 | + | 
 | 106 | +    // The PutRecordBatch API has a limit of 500 records per call. We loop  | 
 | 107 | +    // in chunks of 500 to handle large events gracefully.  | 
 | 108 | +    for (let i = 0; i < firehoseRecordsToSend.length; i += 500) {  | 
 | 109 | +      const batch = firehoseRecordsToSend.slice(i, i + 500);  | 
 | 110 | +      try {  | 
 | 111 | +        const command = new PutRecordBatchCommand({  | 
 | 112 | +          DeliveryStreamName: FIREHOSE_STREAM_NAME,  | 
 | 113 | +          Records: batch,  | 
 | 114 | +        });  | 
 | 115 | +        const response = await firehoseClient.send(command);  | 
 | 116 | + | 
 | 117 | +        // Log any records that Firehose failed to ingest for monitoring purposes.  | 
 | 118 | +        if (response.FailedPutCount && response.FailedPutCount > 0) {  | 
 | 119 | +          console.error(  | 
 | 120 | +            `Failed to put ${response.FailedPutCount} records to Firehose.`,  | 
 | 121 | +          );  | 
 | 122 | +          // For critical apps, you could inspect `response.RequestResponses` for details.  | 
 | 123 | +        }  | 
 | 124 | +      } catch (e) {  | 
 | 125 | +        const error = e instanceof Error ? e.message : String(e);  | 
 | 126 | +        console.error(`Error sending batch to Firehose: ${error}`);  | 
 | 127 | +        // Re-throwing the exception will cause Lambda to retry the entire event batch.  | 
 | 128 | +        throw e;  | 
 | 129 | +      }  | 
 | 130 | +    }  | 
 | 131 | +  } else {  | 
 | 132 | +    console.info("No TTL-expired records found in this event.");  | 
 | 133 | +  }  | 
 | 134 | + | 
 | 135 | +  return {  | 
 | 136 | +    statusCode: 200,  | 
 | 137 | +    body: JSON.stringify(  | 
 | 138 | +      `Successfully processed ${firehoseRecordsToSend.length} records.`,  | 
 | 139 | +    ),  | 
 | 140 | +  };  | 
 | 141 | +};  | 
0 commit comments