@@ -656,9 +656,10 @@ describe('Class: AsyncBatchProcessor', () => {
656
656
const customSchema = z . object ( {
657
657
Message : z . string ( ) ,
658
658
} ) ;
659
- const firstRecord = dynamodbRecordFactory ( 'failure' ) ;
659
+
660
+ //@ts -expect-error Passing a number
661
+ const firstRecord = dynamodbRecordFactory ( 1 ) ;
660
662
const secondRecord = dynamodbRecordFactory ( 'success' ) ;
661
- const thirdRecord = dynamodbRecordFactory ( 'fail' ) ;
662
663
const records = [ firstRecord , secondRecord ] ;
663
664
const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
664
665
schema : customSchema ,
@@ -669,127 +670,60 @@ describe('Class: AsyncBatchProcessor', () => {
669
670
const processedMessages = await processor . process ( ) ;
670
671
671
672
// Assess
672
- expect ( processedMessages [ 1 ] ) . toStrictEqual ( [ 'success' , '' , secondRecord ] ) ;
673
- expect ( processor . failureMessages . length ) . toBe ( 2 ) ;
673
+ expect ( processedMessages [ 1 ] ) . toStrictEqual ( [
674
+ 'success' ,
675
+ 'success' ,
676
+ secondRecord ,
677
+ ] ) ;
678
+ expect ( processor . failureMessages . length ) . toBe ( 1 ) ;
674
679
expect ( processor . response ( ) ) . toStrictEqual ( {
675
680
batchItemFailures : [
676
681
{ itemIdentifier : firstRecord . dynamodb ?. SequenceNumber } ,
677
- { itemIdentifier : thirdRecord . dynamodb ?. SequenceNumber } ,
678
682
] ,
679
683
} ) ;
680
684
} ) ;
681
685
682
686
it ( 'completes the processing with no failures and parses the payload before passing to the record handler' , async ( ) => {
683
687
// Prepare
684
688
const customSchema = z . object ( {
685
- name : z . string ( ) ,
686
- age : z . number ( ) ,
689
+ Message : z . string ( ) ,
687
690
} ) ;
688
- const customObject1 = {
689
- name : 'test-1' ,
690
- age : 20 ,
691
- } ;
692
- const customObject2 = {
693
- name : 'test-2' ,
694
- age : 30 ,
695
- } ;
696
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
697
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
691
+
692
+ const firstRecord = dynamodbRecordFactory ( 'success' ) ;
693
+ const secondRecord = dynamodbRecordFactory ( 'success' ) ;
698
694
const records = [ firstRecord , secondRecord ] ;
699
- const processor = new BatchProcessor ( EventType . SQS , {
695
+ const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
700
696
schema : customSchema ,
701
697
} ) ;
702
698
703
699
// Act
704
- processor . register (
705
- records ,
706
- async (
707
- customObject : SQSRecord & { body : z . infer < typeof customSchema > }
708
- ) => {
709
- return customObject . body ;
710
- } ,
711
- options
712
- ) ;
700
+ processor . register ( records , asyncDynamodbRecordHandler , options ) ;
713
701
const processedMessages = await processor . process ( ) ;
714
702
715
703
// Assess
716
704
expect ( processedMessages ) . toStrictEqual ( [
717
- [ 'success' , customObject1 , firstRecord ] ,
718
- [ 'success' , customObject2 , secondRecord ] ,
705
+ [ 'success' , 'success' , firstRecord ] ,
706
+ [ 'success' , 'success' , secondRecord ] ,
719
707
] ) ;
720
708
} ) ;
721
709
722
710
it ( 'completes processing with all failures if all the payload does not match the passed schema' , async ( ) => {
723
711
// Prepare
724
712
const customSchema = z . object ( {
725
- name : z . string ( ) ,
726
- age : z . number ( ) ,
727
- } ) ;
728
- const customObject1 = {
729
- name : 'test-1' ,
730
- age : 'invalid-age' ,
731
- } ;
732
- const customObject2 = {
733
- name : 20 ,
734
- age : 30 ,
735
- } ;
736
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
737
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
738
-
739
- const records = [ firstRecord , secondRecord ] ;
740
- const processor = new BatchProcessor ( EventType . SQS , {
741
- schema : customSchema ,
713
+ Message : z . string ( ) ,
742
714
} ) ;
743
715
744
- // Act
745
- processor . register (
746
- records ,
747
- async (
748
- customObject : SQSRecord & { body : z . infer < typeof customSchema > }
749
- ) => {
750
- return customObject . body ;
751
- } ,
752
- options
753
- ) ;
754
-
755
- // Assess
756
- await expect ( processor . process ( ) ) . rejects . toThrowError (
757
- FullBatchFailureError
758
- ) ;
759
- } ) ;
760
-
761
- it ( 'completes processing with failures if an unsupported event type is used for parsing' , async ( ) => {
762
- // Prepare
763
- const customSchema = z . object ( {
764
- name : z . string ( ) ,
765
- age : z . number ( ) ,
766
- } ) ;
767
- const customObject1 = {
768
- name : 'test-1' ,
769
- age : 20 ,
770
- } ;
771
- const customObject2 = {
772
- name : 'test-2' ,
773
- age : 'invalid-age' ,
774
- } ;
775
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
776
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
716
+ //@ts -expect-error Passing a number
717
+ const firstRecord = dynamodbRecordFactory ( 1 ) ;
718
+ //@ts -expect-error Passing a number
719
+ const secondRecord = dynamodbRecordFactory ( 2 ) ;
777
720
const records = [ firstRecord , secondRecord ] ;
778
- //@ts -expect-error
779
- const processor = new BatchProcessor ( 'invalid-event-type' , {
721
+ const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
780
722
schema : customSchema ,
781
723
} ) ;
782
724
783
725
// Act
784
- processor . register (
785
- records ,
786
- async (
787
- customObject : SQSRecord & { body : z . infer < typeof customSchema > }
788
- ) => {
789
- return customObject . body ;
790
- } ,
791
- options
792
- ) ;
726
+ processor . register ( records , asyncDynamodbRecordHandler , options ) ;
793
727
794
728
// Assess
795
729
await expect ( processor . process ( ) ) . rejects . toThrowError (
@@ -800,32 +734,18 @@ describe('Class: AsyncBatchProcessor', () => {
800
734
it ( 'completes processing with failures if an unsupported schema type is used for parsing' , async ( ) => {
801
735
// Prepare
802
736
const customSchema = v . object ( {
803
- name : v . string ( ) ,
804
- age : v . number ( ) ,
737
+ Message : v . string ( ) ,
805
738
} ) ;
806
- const customObject1 = {
807
- name : 'test-1' ,
808
- age : 20 ,
809
- } ;
810
- const customObject2 = {
811
- name : 'test-2' ,
812
- age : 'invalid-age' ,
813
- } ;
814
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
815
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
739
+
740
+ const firstRecord = dynamodbRecordFactory ( 'success' ) ;
741
+ const secondRecord = dynamodbRecordFactory ( 'success' ) ;
816
742
const records = [ firstRecord , secondRecord ] ;
817
- const processor = new BatchProcessor ( EventType . SQS , {
743
+ const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
818
744
schema : customSchema ,
819
745
} ) ;
820
746
821
747
// Act
822
- processor . register (
823
- records ,
824
- async ( customObject : SQSRecord ) => {
825
- return customObject . body ;
826
- } ,
827
- options
828
- ) ;
748
+ processor . register ( records , asyncDynamodbRecordHandler , options ) ;
829
749
830
750
// Assess
831
751
await expect ( processor . process ( ) ) . rejects . toThrowError (
@@ -835,7 +755,7 @@ describe('Class: AsyncBatchProcessor', () => {
835
755
} ) ;
836
756
837
757
describe ( 'Batch processing with Parser Integration: Passing Extended DynamoDB Record Schema' , ( ) => {
838
- it . only ( 'completes the processing with failures if some of the payload does not match the passed schema' , async ( ) => {
758
+ it ( 'completes the processing with failures if some of the payload does not match the passed schema' , async ( ) => {
839
759
// Prepare
840
760
const customSchema = z . object ( {
841
761
Message : z . string ( ) ,
@@ -847,14 +767,12 @@ describe('Class: AsyncBatchProcessor', () => {
847
767
await import ( '@aws-lambda-powertools/parser/schemas/dynamodb' ) ;
848
768
const extendedSchema = DynamoDBStreamRecord . extend ( {
849
769
dynamodb : DynamoDBStreamChangeRecordBase . extend ( {
850
- OldImage : DynamoDBMarshalled ( customSchema ) . optional ( ) ,
770
+ NewImage : DynamoDBMarshalled ( customSchema ) . optional ( ) ,
851
771
} ) ,
852
772
} ) ;
853
773
//@ts -expect-error Passing a number
854
774
const firstRecord = dynamodbRecordFactory ( 1 ) ;
855
775
const secondRecord = dynamodbRecordFactory ( 'success' ) ;
856
- //@ts -expect-error Passing a number
857
- const thirdRecord = dynamodbRecordFactory ( 2 ) ;
858
776
const records = [ firstRecord , secondRecord ] ;
859
777
const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
860
778
schema : extendedSchema ,
@@ -870,107 +788,73 @@ describe('Class: AsyncBatchProcessor', () => {
870
788
'success' ,
871
789
secondRecord ,
872
790
] ) ;
873
- expect ( processor . failureMessages . length ) . toBe ( 2 ) ;
791
+ expect ( processor . failureMessages . length ) . toBe ( 1 ) ;
874
792
expect ( processor . response ( ) ) . toStrictEqual ( {
875
793
batchItemFailures : [
876
794
{ itemIdentifier : firstRecord . dynamodb ?. SequenceNumber } ,
877
- { itemIdentifier : thirdRecord . dynamodb ?. SequenceNumber } ,
878
795
] ,
879
796
} ) ;
880
797
} ) ;
881
798
882
799
it ( 'completes the processing with no failures and parses the payload before passing to the record handler' , async ( ) => {
883
800
// Prepare
884
801
const customSchema = z . object ( {
885
- name : z . string ( ) ,
886
- age : z . number ( ) ,
802
+ Message : z . string ( ) ,
887
803
} ) ;
888
- const { JSONStringified } = await import (
889
- '@aws-lambda-powertools/parser/helpers'
890
- ) ;
891
- const { SqsRecordSchema } = await import (
892
- '@aws-lambda-powertools/parser/schemas/sqs'
804
+ const { DynamoDBMarshalled } = await import (
805
+ '@aws-lambda-powertools/parser/helpers/dynamodb'
893
806
) ;
894
- const extendedSchema = SqsRecordSchema . extend ( {
895
- // biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema
896
- body : JSONStringified ( customSchema as any ) ,
807
+ const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
808
+ await import ( '@aws-lambda-powertools/parser/schemas/dynamodb' ) ;
809
+ const extendedSchema = DynamoDBStreamRecord . extend ( {
810
+ dynamodb : DynamoDBStreamChangeRecordBase . extend ( {
811
+ NewImage : DynamoDBMarshalled ( customSchema ) . optional ( ) ,
812
+ } ) ,
897
813
} ) ;
898
- const customObject1 = {
899
- name : 'test-1' ,
900
- age : 20 ,
901
- } ;
902
- const customObject2 = {
903
- name : 'test-2' ,
904
- age : 30 ,
905
- } ;
906
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
907
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
814
+ const firstRecord = dynamodbRecordFactory ( 'success' ) ;
815
+ const secondRecord = dynamodbRecordFactory ( 'success' ) ;
908
816
const records = [ firstRecord , secondRecord ] ;
909
- const processor = new BatchProcessor ( EventType . SQS , {
817
+ const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
910
818
schema : extendedSchema ,
911
819
} ) ;
912
820
913
821
// Act
914
- processor . register (
915
- records ,
916
- async (
917
- customObject : SQSRecord & { body : z . infer < typeof customSchema > }
918
- ) => {
919
- return customObject . body ;
920
- } ,
921
- options
922
- ) ;
822
+ processor . register ( records , asyncDynamodbRecordHandler , options ) ;
923
823
const processedMessages = await processor . process ( ) ;
924
824
925
825
// Assess
926
826
expect ( processedMessages ) . toStrictEqual ( [
927
- [ 'success' , customObject1 , firstRecord ] ,
928
- [ 'success' , customObject2 , secondRecord ] ,
827
+ [ 'success' , 'success' , firstRecord ] ,
828
+ [ 'success' , 'success' , secondRecord ] ,
929
829
] ) ;
930
830
} ) ;
931
831
932
832
it ( 'completes processing with all failures if all the payload does not match the passed schema' , async ( ) => {
933
833
// Prepare
934
834
const customSchema = z . object ( {
935
- name : z . string ( ) ,
936
- age : z . number ( ) ,
835
+ Message : z . string ( ) ,
937
836
} ) ;
938
- const { JSONStringified } = await import (
939
- '@aws-lambda-powertools/parser/helpers'
940
- ) ;
941
- const { SqsRecordSchema } = await import (
942
- '@aws-lambda-powertools/parser/schemas/sqs'
837
+ const { DynamoDBMarshalled } = await import (
838
+ '@aws-lambda-powertools/parser/helpers/dynamodb'
943
839
) ;
944
- const extendedSchema = SqsRecordSchema . extend ( {
945
- // biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema
946
- body : JSONStringified ( customSchema as any ) ,
840
+ const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
841
+ await import ( '@aws-lambda-powertools/parser/schemas/dynamodb' ) ;
842
+ const extendedSchema = DynamoDBStreamRecord . extend ( {
843
+ dynamodb : DynamoDBStreamChangeRecordBase . extend ( {
844
+ NewImage : DynamoDBMarshalled ( customSchema ) . optional ( ) ,
845
+ } ) ,
947
846
} ) ;
948
- const customObject1 = {
949
- name : 'test-1' ,
950
- age : 'invalid-age' ,
951
- } ;
952
- const customObject2 = {
953
- name : 20 ,
954
- age : 30 ,
955
- } ;
956
- const firstRecord = sqsRecordFactory ( JSON . stringify ( customObject1 ) ) ;
957
- const secondRecord = sqsRecordFactory ( JSON . stringify ( customObject2 ) ) ;
958
-
847
+ //@ts -expect-error Passing a number
848
+ const firstRecord = dynamodbRecordFactory ( 1 ) ;
849
+ //@ts -expect-error Passing a number
850
+ const secondRecord = dynamodbRecordFactory ( 2 ) ;
959
851
const records = [ firstRecord , secondRecord ] ;
960
- const processor = new BatchProcessor ( EventType . SQS , {
852
+ const processor = new BatchProcessor ( EventType . DynamoDBStreams , {
961
853
schema : extendedSchema ,
962
854
} ) ;
963
855
964
856
// Act
965
- processor . register (
966
- records ,
967
- async (
968
- customObject : SQSRecord & { body : z . infer < typeof customSchema > }
969
- ) => {
970
- return customObject . body ;
971
- } ,
972
- options
973
- ) ;
857
+ processor . register ( records , asyncDynamodbRecordHandler , options ) ;
974
858
975
859
// Assess
976
860
await expect ( processor . process ( ) ) . rejects . toThrowError (
0 commit comments