1- import { Handler , KinesisStreamEvent } from "aws-lambda" ;
1+ import { Handler , KinesisStreamEvent , KinesisStreamRecord } from "aws-lambda" ;
2+ import { MI } from "@internal/datastore" ;
23import {
34 PublishBatchCommand ,
45 PublishBatchRequestEntry ,
@@ -15,33 +16,38 @@ function* generateBatches(events: MISubmittedEvent[]) {
1516 }
1617}
1718
18- function buildMessage ( event : MISubmittedEvent ) : PublishBatchRequestEntry {
19- return {
19+ function buildMessage (
20+ event : MISubmittedEvent ,
21+ deps : Deps ,
22+ ) : PublishBatchRequestEntry {
23+ const message = {
2024 Id : event . id ,
2125 Message : JSON . stringify ( event ) ,
2226 } ;
27+ deps . logger . info ( { description : "Built message" , message } ) ;
28+ return message ;
29+ }
30+
31+ function extractPayload ( record : KinesisStreamRecord , deps : Deps ) : MI {
32+ const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString ( "utf8" ) ;
33+ deps . logger . info ( { description : "Extracted payload" , payload } ) ;
34+ return JSON . parse ( payload ) ;
2335}
2436
2537export function createHandler ( deps : Deps ) : Handler < KinesisStreamEvent > {
2638 return async ( streamEvent : KinesisStreamEvent ) => {
2739 deps . logger . info ( { description : "Received event" , streamEvent } ) ;
2840
29- const cloudEvents : MISubmittedEvent [ ] = streamEvent . Records . map (
30- ( record ) => {
31- // Kinesis data is base64 encoded
32- const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString (
33- "utf8" ,
34- ) ;
35- return JSON . parse ( payload ) ;
36- } ,
41+ const cloudEvents : MISubmittedEvent [ ] = streamEvent . Records . map ( ( record ) =>
42+ extractPayload ( record , deps ) ,
3743 ) . map ( ( element ) => mapMIToCloudEvent ( element ) ) ;
3844
3945 for ( const batch of generateBatches ( cloudEvents ) ) {
4046 await deps . snsClient . send (
4147 new PublishBatchCommand ( {
4248 TopicArn : deps . env . EVENTPUB_SNS_TOPIC_ARN ,
4349 PublishBatchRequestEntries : batch . map ( ( element ) =>
44- buildMessage ( element ) ,
50+ buildMessage ( element , deps ) ,
4551 ) ,
4652 } ) ,
4753 ) ;
0 commit comments