1- import { Handler , KinesisStreamEvent } from "aws-lambda" ;
1+ import { Handler , KinesisStreamEvent , KinesisStreamRecord } from "aws-lambda" ;
2+ import { MI } from "@internal/datastore" ;
23import {
34 PublishBatchCommand ,
45 PublishBatchRequestEntry ,
@@ -15,33 +16,37 @@ function* generateBatches(events: MISubmittedEvent[]) {
1516 }
1617}
1718
18- function buildMessage ( event : MISubmittedEvent ) : PublishBatchRequestEntry {
19+ function buildMessage (
20+ event : MISubmittedEvent ,
21+ deps : Deps ,
22+ ) : PublishBatchRequestEntry {
23+ deps . logger . info ( { description : "Building message" , event } ) ;
1924 return {
2025 Id : event . id ,
2126 Message : JSON . stringify ( event ) ,
2227 } ;
2328}
2429
30+ function extractPayload ( record : KinesisStreamRecord , deps : Deps ) : MI {
31+ const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString ( "utf8" ) ;
32+ deps . logger . info ( { description : "Extracted payload" , payload } ) ;
33+ return JSON . parse ( payload ) ;
34+ }
35+
2536export function createHandler ( deps : Deps ) : Handler < KinesisStreamEvent > {
2637 return async ( streamEvent : KinesisStreamEvent ) => {
2738 deps . logger . info ( { description : "Received event" , streamEvent } ) ;
2839
29- const cloudEvents : MISubmittedEvent [ ] = streamEvent . Records . map (
30- ( record ) => {
31- // Kinesis data is base64 encoded
32- const payload = Buffer . from ( record . kinesis . data , "base64" ) . toString (
33- "utf8" ,
34- ) ;
35- return JSON . parse ( payload ) ;
36- } ,
40+ const cloudEvents : MISubmittedEvent [ ] = streamEvent . Records . map ( ( record ) =>
41+ extractPayload ( record , deps ) ,
3742 ) . map ( ( element ) => mapMIToCloudEvent ( element ) ) ;
3843
3944 for ( const batch of generateBatches ( cloudEvents ) ) {
4045 await deps . snsClient . send (
4146 new PublishBatchCommand ( {
4247 TopicArn : deps . env . EVENTPUB_SNS_TOPIC_ARN ,
4348 PublishBatchRequestEntries : batch . map ( ( element ) =>
44- buildMessage ( element ) ,
49+ buildMessage ( element , deps ) ,
4550 ) ,
4651 } ) ,
4752 ) ;
0 commit comments