@@ -20,6 +20,7 @@ const BATCH_SIZE = 10;
2020export default function createHandler ( deps : Deps ) : Handler < KinesisStreamEvent > {
2121 return async ( streamEvent : KinesisStreamEvent ) => {
2222 deps . logger . info ( { description : "Received event" , streamEvent } ) ;
23+ deps . logger . info ( { description : "Number of records" , count : streamEvent . Records ?. length || 0 } ) ;
2324
2425 // Ensure logging by extracting all records first
2526 const ddbRecords : DynamoDBRecord [ ] = streamEvent . Records . map ( ( record ) =>
@@ -76,11 +77,20 @@ function extractPayload(
7677 record : KinesisStreamRecord ,
7778 deps : Deps ,
7879) : DynamoDBRecord {
79- // Kinesis data is base64 encoded
80- const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString ( "utf8" ) ;
81- const jsonParsed = JSON . parse ( payload ) ;
82- deps . logger . info ( { description : "Extracted dynamoDBRecord" , jsonParsed } ) ;
83- return jsonParsed
80+ try {
81+ deps . logger . info ( { description : "Processing Kinesis record" , recordId : record . kinesis . sequenceNumber } ) ;
82+
83+ // Kinesis data is base64 encoded
84+ const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString ( "utf8" ) ;
85+ deps . logger . info ( { description : "Decoded payload" , payload } ) ;
86+
87+ const jsonParsed = JSON . parse ( payload ) ;
88+ deps . logger . info ( { description : "Extracted dynamoDBRecord" , jsonParsed } ) ;
89+ return jsonParsed ;
90+ } catch ( error ) {
91+ deps . logger . error ( { description : "Error extracting payload" , error, record } ) ;
92+ throw error ;
93+ }
8494}
8595
8696function isChanged ( record : DynamoDBRecord , property : string ) : boolean {
0 commit comments