Skip to content

Commit 5cf2883

Browse files
logging and refactor
1 parent a6b7a52 commit 5cf2883

File tree

1 file changed

+18
-12
lines changed

1 file changed

+18
-12
lines changed

lambdas/mi-updates-transformer/src/mi-updates-transformer.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Handler, KinesisStreamEvent } from "aws-lambda";
1+
import { Handler, KinesisStreamEvent, KinesisStreamRecord } from "aws-lambda";
2+
import { MI } from "@internal/datastore";
23
import {
34
PublishBatchCommand,
45
PublishBatchRequestEntry,
@@ -15,33 +16,38 @@ function* generateBatches(events: MISubmittedEvent[]) {
1516
}
1617
}
1718

18-
function buildMessage(event: MISubmittedEvent): PublishBatchRequestEntry {
19-
return {
19+
function buildMessage(
20+
event: MISubmittedEvent,
21+
deps: Deps,
22+
): PublishBatchRequestEntry {
23+
const message = {
2024
Id: event.id,
2125
Message: JSON.stringify(event),
2226
};
27+
deps.logger.info({ description: "Built message", message });
28+
return message;
29+
}
30+
31+
function extractPayload(record: KinesisStreamRecord, deps: Deps): MI {
32+
const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
33+
deps.logger.info({ description: "Extracted payload", payload });
34+
return JSON.parse(payload);
2335
}
2436

2537
export function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
2638
return async (streamEvent: KinesisStreamEvent) => {
2739
deps.logger.info({ description: "Received event", streamEvent });
2840

29-
const cloudEvents: MISubmittedEvent[] = streamEvent.Records.map(
30-
(record) => {
31-
// Kinesis data is base64 encoded
32-
const payload = Buffer.from(record.kinesis.data, "base64").toString(
33-
"utf8",
34-
);
35-
return JSON.parse(payload);
36-
},
41+
const cloudEvents: MISubmittedEvent[] = streamEvent.Records.map((record) =>
42+
extractPayload(record, deps),
3743
).map((element) => mapMIToCloudEvent(element));
3844

3945
for (const batch of generateBatches(cloudEvents)) {
4046
await deps.snsClient.send(
4147
new PublishBatchCommand({
4248
TopicArn: deps.env.EVENTPUB_SNS_TOPIC_ARN,
4349
PublishBatchRequestEntries: batch.map((element) =>
44-
buildMessage(element),
50+
buildMessage(element, deps),
4551
),
4652
}),
4753
);

0 commit comments

Comments
 (0)