44 * @group unit/parser/schema/
55 */
66
7+ import { gunzipSync } from 'node:zlib' ;
78import {
89 KinesisDataStreamRecord ,
910 KinesisDataStreamSchema ,
1011 KinesisFirehoseRecordSchema ,
1112 KinesisFirehoseSchema ,
1213 KinesisFirehoseSqsRecordSchema ,
1314 KinesisFirehoseSqsSchema ,
15+ SqsRecordSchema ,
1416} from '../../../src/schemas/' ;
1517import type {
1618 KinesisDataStreamEvent ,
@@ -25,61 +27,131 @@ import { TestEvents } from './utils.js';
2527
2628describe ( 'Kinesis ' , ( ) => {
2729 it ( 'should parse kinesis event' , ( ) => {
28- const kinesisStreamEvent = TestEvents . kinesisStreamEvent ;
30+ const kinesisStreamEvent =
31+ TestEvents . kinesisStreamEvent as KinesisDataStreamEvent ;
2932 const parsed = KinesisDataStreamSchema . parse ( kinesisStreamEvent ) ;
3033
31- expect ( parsed . Records [ 0 ] . kinesis . data ) . toEqual ( 'Hello, this is a test.' ) ;
34+ const transformedInput = {
35+ Records : kinesisStreamEvent . Records . map ( ( record , index ) => {
36+ return {
37+ ...record ,
38+ kinesis : {
39+ ...record . kinesis ,
40+ data : Buffer . from ( record . kinesis . data , 'base64' ) . toString ( ) ,
41+ } ,
42+ } ;
43+ } ) ,
44+ } ;
45+
46+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
3247 } ) ;
3348 it ( 'should parse single kinesis record' , ( ) => {
34- const kinesisStreamEventOneRecord = TestEvents . kinesisStreamEventOneRecord ;
49+ const kinesisStreamEventOneRecord =
50+ TestEvents . kinesisStreamEventOneRecord as KinesisDataStreamEvent ;
3551 const parsed = KinesisDataStreamSchema . parse ( kinesisStreamEventOneRecord ) ;
3652
37- expect ( parsed . Records [ 0 ] . kinesis . data ) . toEqual ( {
38- message : 'test message' ,
39- username : 'test' ,
40- } ) ;
53+ const transformedInput = {
54+ Records : kinesisStreamEventOneRecord . Records . map ( ( record , index ) => {
55+ return {
56+ ...record ,
57+ kinesis : {
58+ ...record . kinesis ,
59+ data : JSON . parse (
60+ Buffer . from ( record . kinesis . data , 'base64' ) . toString ( )
61+ ) ,
62+ } ,
63+ } ;
64+ } ) ,
65+ } ;
66+
67+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
4168 } ) ;
4269 it ( 'should parse Firehose event' , ( ) => {
43- const kinesisFirehoseKinesisEvent = TestEvents . kinesisFirehoseKinesisEvent ;
70+ const kinesisFirehoseKinesisEvent =
71+ TestEvents . kinesisFirehoseKinesisEvent as KinesisFireHoseEvent ;
4472 const parsed = KinesisFirehoseSchema . parse ( kinesisFirehoseKinesisEvent ) ;
45- expect ( parsed . records [ 0 ] . data ) . toEqual ( 'Hello World' ) ;
73+
74+ const transformedInput = {
75+ ...kinesisFirehoseKinesisEvent ,
76+ records : kinesisFirehoseKinesisEvent . records . map ( ( record ) => {
77+ return {
78+ ...record ,
79+ data : Buffer . from ( record . data , 'base64' ) . toString ( ) ,
80+ kinesisRecordMetadata : record . kinesisRecordMetadata ,
81+ } ;
82+ } ) ,
83+ } ;
84+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
4685 } ) ;
4786 it ( 'should parse Kinesis Firehose PutEvents event' , ( ) => {
48- const kinesisFirehosePutEvent = TestEvents . kinesisFirehosePutEvent ;
87+ const kinesisFirehosePutEvent =
88+ TestEvents . kinesisFirehosePutEvent as KinesisFireHoseEvent ;
4989 const parsed = KinesisFirehoseSchema . parse ( kinesisFirehosePutEvent ) ;
50- expect ( JSON . parse ( parsed . records [ 1 ] . data ) ) . toEqual ( {
51- Hello : 'World' ,
52- } ) ;
90+
91+ const transformedInput = {
92+ ...kinesisFirehosePutEvent ,
93+ records : kinesisFirehosePutEvent . records . map ( ( record ) => {
94+ return {
95+ ...record ,
96+ data : Buffer . from ( record . data , 'base64' ) . toString ( ) ,
97+ } ;
98+ } ) ,
99+ } ;
100+
101+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
53102 } ) ;
54103 it ( 'should parse Firehose event with SQS event' , ( ) => {
55- const kinesisFirehoseSQSEvent = TestEvents . kinesisFirehoseSQSEvent ;
104+ const kinesisFirehoseSQSEvent =
105+ TestEvents . kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent ;
56106 const parsed = KinesisFirehoseSqsSchema . parse ( kinesisFirehoseSQSEvent ) ;
57- expect ( parsed . records [ 0 ] . data ) . toMatchObject ( {
58- messageId : '5ab807d4-5644-4c55-97a3-47396635ac74' ,
59- body : 'Test message.' ,
60- } ) ;
107+
108+ const transformedInput = {
109+ ...kinesisFirehoseSQSEvent ,
110+ records : kinesisFirehoseSQSEvent . records . map ( ( record ) => {
111+ return {
112+ ...record ,
113+ data : JSON . parse (
114+ Buffer . from ( record . data as string , 'base64' ) . toString ( )
115+ ) ,
116+ } ;
117+ } ) ,
118+ } ;
119+
120+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
61121 } ) ;
62122 it ( 'should parse Kinesis event with CloudWatch event' , ( ) => {
63123 const kinesisStreamCloudWatchLogsEvent =
64- TestEvents . kinesisStreamCloudWatchLogsEvent ;
124+ TestEvents . kinesisStreamCloudWatchLogsEvent as KinesisDataStreamEvent ;
65125 const parsed = KinesisDataStreamSchema . parse (
66126 kinesisStreamCloudWatchLogsEvent
67127 ) ;
68128
69- expect ( parsed . Records [ 0 ] . kinesis . data ) . toMatchObject ( {
70- messageType : 'DATA_MESSAGE' ,
71- owner : '231436140809' ,
72- logGroup : '/aws/lambda/pt-1488-DummyLogDataFunction-gnWXPvL6jJyG' ,
73- logStream : '2022/11/10/[$LATEST]26b6a45d574f442ea28438923cbf7bf7' ,
74- } ) ;
129+ const transformedInput = {
130+ Records : kinesisStreamCloudWatchLogsEvent . Records . map ( ( record , index ) => {
131+ return {
132+ ...record ,
133+ kinesis : {
134+ ...record . kinesis ,
135+ data : JSON . parse (
136+ gunzipSync ( Buffer . from ( record . kinesis . data , 'base64' ) ) . toString (
137+ 'utf8'
138+ )
139+ ) ,
140+ } ,
141+ } ;
142+ } ) ,
143+ } ;
144+
145+ expect ( parsed ) . toStrictEqual ( transformedInput ) ;
75146 } ) ;
76147 it ( 'should return original value if cannot parse KinesisFirehoseSqsRecord' , ( ) => {
77148 const kinesisFirehoseSQSEvent = TestEvents . kinesisFirehoseSQSEvent as {
78149 records : { data : string } [ ] ;
79150 } ;
80151 kinesisFirehoseSQSEvent . records [ 0 ] . data = 'not a valid json' ;
81152 const parsed = KinesisFirehoseSqsSchema . parse ( kinesisFirehoseSQSEvent ) ;
82- expect ( parsed . records [ 0 ] . data ) . toEqual ( 'not a valid json' ) ;
153+
154+ expect ( parsed ) . toStrictEqual ( kinesisFirehoseSQSEvent ) ;
83155 } ) ;
84156 it ( 'should parse a kinesis record from a kinesis event' , ( ) => {
85157 const kinesisStreamEvent : KinesisDataStreamEvent =
@@ -88,6 +160,7 @@ describe('Kinesis ', () => {
88160 kinesisStreamEvent . Records [ 0 ]
89161 ) ;
90162
163+ expect ( parsedRecord . eventSource ) . toEqual ( 'aws:kinesis' ) ;
91164 expect ( parsedRecord . eventName ) . toEqual ( 'aws:kinesis:record' ) ;
92165 } ) ;
93166
0 commit comments