|
1 | | -import { Handler, KinesisStreamEvent, KinesisStreamRecord } from "aws-lambda"; |
2 | | -import { MI } from "@internal/datastore"; |
| 1 | +import { |
| 2 | + DynamoDBRecord, |
| 3 | + Handler, |
| 4 | + KinesisStreamEvent, |
| 5 | + KinesisStreamRecord, |
| 6 | +} from "aws-lambda"; |
| 7 | +import { MI, MISchema } from "@internal/datastore"; |
3 | 8 | import { |
4 | 9 | PublishBatchCommand, |
5 | 10 | PublishBatchRequestEntry, |
@@ -28,18 +33,28 @@ function buildMessage( |
28 | 33 | return message; |
29 | 34 | } |
30 | 35 |
|
31 | | -function extractPayload(record: KinesisStreamRecord, deps: Deps): MI { |
| 36 | +function extractPayload( |
| 37 | + record: KinesisStreamRecord, |
| 38 | + deps: Deps, |
| 39 | +): DynamoDBRecord { |
32 | 40 | const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8"); |
33 | 41 | deps.logger.info({ description: "Extracted payload", payload }); |
34 | 42 | return JSON.parse(payload); |
35 | 43 | } |
36 | 44 |
|
| 45 | +function extractMIData(record: DynamoDBRecord): MI { |
| 46 | + const newImage = record.dynamodb?.NewImage!; |
| 47 | + return MISchema.parse(unmarshall(newImage as any)); |
| 48 | +} |
| 49 | + |
37 | 50 | export function createHandler(deps: Deps): Handler<KinesisStreamEvent> { |
38 | 51 | return async (streamEvent: KinesisStreamEvent) => { |
39 | 52 | deps.logger.info({ description: "Received event", streamEvent }); |
40 | 53 |
|
41 | | - const cloudEvents: MISubmittedEvent[] = streamEvent.Records |
42 | | - .map((record) => extractPayload(record, deps)) |
| 54 | + const cloudEvents: MISubmittedEvent[] = streamEvent.Records.map((record) => |
| 55 | + extractPayload(record, deps), |
| 56 | + ) |
| 57 | + .map((element) => extractMIData(element)) |
43 | 58 | .map((payload) => mapMIToCloudEvent(payload, deps)); |
44 | 59 |
|
45 | 60 | for (const batch of generateBatches(cloudEvents)) { |
|
0 commit comments