1515package software .amazon .lambda .powertools ;
1616
1717import static org .assertj .core .api .Assertions .assertThat ;
18- import static software .amazon .lambda .powertools .testutils .Infrastructure .FUNCTION_NAME_OUTPUT ;
1918
20- import com .fasterxml .jackson .core .JsonProcessingException ;
21- import com .fasterxml .jackson .databind .ObjectMapper ;
2219import java .util .Arrays ;
2320import java .util .HashMap ;
2421import java .util .List ;
2522import java .util .Map ;
2623import java .util .UUID ;
2724import java .util .concurrent .TimeUnit ;
2825import java .util .stream .Collectors ;
26+
2927import org .junit .jupiter .api .AfterAll ;
3028import org .junit .jupiter .api .AfterEach ;
3129import org .junit .jupiter .api .BeforeAll ;
3230import org .junit .jupiter .api .Test ;
3331import org .junit .jupiter .api .Timeout ;
32+
33+ import com .fasterxml .jackson .core .JsonProcessingException ;
34+ import com .fasterxml .jackson .databind .ObjectMapper ;
35+
3436import software .amazon .awssdk .core .SdkBytes ;
3537import software .amazon .awssdk .http .SdkHttpClient ;
3638import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
4446import software .amazon .awssdk .services .kinesis .KinesisClient ;
4547import software .amazon .awssdk .services .kinesis .model .PutRecordsRequest ;
4648import software .amazon .awssdk .services .kinesis .model .PutRecordsRequestEntry ;
47- import software .amazon .awssdk .services .kinesis .model .PutRecordsResponse ;
4849import software .amazon .awssdk .services .sqs .SqsClient ;
4950import software .amazon .awssdk .services .sqs .model .SendMessageBatchRequest ;
5051import software .amazon .awssdk .services .sqs .model .SendMessageBatchRequestEntry ;
52+ import software .amazon .lambda .powertools .testutils .DataNotReadyException ;
5153import software .amazon .lambda .powertools .testutils .Infrastructure ;
54+ import software .amazon .lambda .powertools .testutils .RetryUtils ;
5255import software .amazon .lambda .powertools .utilities .JsonConfig ;
5356
54- public class BatchE2ET {
57+ class BatchE2ET {
5558 private static final SdkHttpClient httpClient = UrlConnectionHttpClient .builder ().build ();
5659 private static final Region region = Region .of (System .getProperty ("AWS_DEFAULT_REGION" , "eu-west-1" ));
5760 private static Infrastructure infrastructure ;
58- private static String functionName ;
5961 private static String queueUrl ;
6062 private static String kinesisStreamName ;
6163
@@ -71,13 +73,12 @@ public BatchE2ET() {
7173 testProducts = Arrays .asList (
7274 new Product (1 , "product1" , 1.23 ),
7375 new Product (2 , "product2" , 4.56 ),
74- new Product (3 , "product3" , 6.78 )
75- );
76+ new Product (3 , "product3" , 6.78 ));
7677 }
7778
7879 @ BeforeAll
7980 @ Timeout (value = 5 , unit = TimeUnit .MINUTES )
80- public static void setup () {
81+ static void setup () {
8182 String random = UUID .randomUUID ().toString ().substring (0 , 6 );
8283 String queueName = "batchqueue" + random ;
8384 kinesisStreamName = "batchstream" + random ;
@@ -94,7 +95,6 @@ public static void setup() {
9495 .build ();
9596
9697 Map <String , String > outputs = infrastructure .deploy ();
97- functionName = outputs .get (FUNCTION_NAME_OUTPUT );
9898 queueUrl = outputs .get ("QueueURL" );
9999 kinesisStreamName = outputs .get ("KinesisStreamName" );
100100 outputTable = outputs .get ("TableNameForAsyncTests" );
@@ -117,14 +117,14 @@ public static void setup() {
117117 }
118118
119119 @ AfterAll
120- public static void tearDown () {
120+ static void tearDown () {
121121 if (infrastructure != null ) {
122122 infrastructure .destroy ();
123123 }
124124 }
125125
126126 @ AfterEach
127- public void cleanUpTest () {
127+ void cleanUpTest () {
128128 // Delete everything in the output table
129129 ScanResponse items = ddbClient .scan (ScanRequest .builder ()
130130 .tableName (outputTable )
@@ -150,7 +150,7 @@ public void cleanUpTest() {
150150 }
151151
152152 @ Test
153- public void sqsBatchProcessingSucceeds () throws InterruptedException {
153+ void sqsBatchProcessingSucceeds () {
154154 List <SendMessageBatchRequestEntry > entries = testProducts .stream ()
155155 .map (p -> {
156156 try {
@@ -169,17 +169,25 @@ public void sqsBatchProcessingSucceeds() throws InterruptedException {
169169 .entries (entries )
170170 .queueUrl (queueUrl )
171171 .build ());
172- Thread .sleep (30000 ); // wait for function to be executed
173172
174173 // THEN
175- ScanResponse items = ddbClient .scan (ScanRequest .builder ()
176- .tableName (outputTable )
177- .build ());
178- validateAllItemsHandled (items );
174+ // When the retry loop finishes it means we found all products in the result
175+ RetryUtils .withRetry (() -> {
176+ ScanResponse items = ddbClient .scan (ScanRequest .builder ()
177+ .tableName (outputTable )
178+ .build ());
179+ if (!areAllTestProductsPresent (items )) {
180+ throw new DataNotReadyException ("sqs-batch-processing not complete yet" );
181+ }
182+ return null ;
183+ }, "sqs-batch-processing" , DataNotReadyException .class ).get ();
184+
185+ ScanResponse finalItems = ddbClient .scan (ScanRequest .builder ().tableName (outputTable ).build ());
186+ assertThat (areAllTestProductsPresent (finalItems )).isTrue ();
179187 }
180188
181189 @ Test
182- public void kinesisBatchProcessingSucceeds () throws InterruptedException {
190+ void kinesisBatchProcessingSucceeds () {
183191 List <PutRecordsRequestEntry > entries = testProducts .stream ()
184192 .map (p -> {
185193 try {
@@ -194,21 +202,29 @@ public void kinesisBatchProcessingSucceeds() throws InterruptedException {
194202 .collect (Collectors .toList ());
195203
196204 // WHEN
197- PutRecordsResponse result = kinesisClient .putRecords (PutRecordsRequest .builder ()
205+ kinesisClient .putRecords (PutRecordsRequest .builder ()
198206 .streamName (kinesisStreamName )
199207 .records (entries )
200208 .build ());
201- Thread .sleep (30000 ); // wait for function to be executed
202209
203210 // THEN
204- ScanResponse items = ddbClient .scan (ScanRequest .builder ()
205- .tableName (outputTable )
206- .build ());
207- validateAllItemsHandled (items );
211+ // When the retry loop finishes it means we found all products in the result
212+ RetryUtils .withRetry (() -> {
213+ ScanResponse items = ddbClient .scan (ScanRequest .builder ()
214+ .tableName (outputTable )
215+ .build ());
216+ if (!areAllTestProductsPresent (items )) {
217+ throw new DataNotReadyException ("kinesis-batch-processing not complete yet" );
218+ }
219+ return null ;
220+ }, "kinesis-batch-processing" , DataNotReadyException .class ).get ();
221+
222+ ScanResponse finalItems = ddbClient .scan (ScanRequest .builder ().tableName (outputTable ).build ());
223+ assertThat (areAllTestProductsPresent (finalItems )).isTrue ();
208224 }
209225
210226 @ Test
211- public void ddbStreamsBatchProcessingSucceeds () throws InterruptedException {
227+ void ddbStreamsBatchProcessingSucceeds () {
212228 // GIVEN
213229 String theId = "my-test-id" ;
214230
@@ -223,27 +239,38 @@ public void ddbStreamsBatchProcessingSucceeds() throws InterruptedException {
223239 }
224240 })
225241 .build ());
226- Thread .sleep (90000 ); // wait for function to be executed
227242
228243 // THEN
229- ScanResponse items = ddbClient .scan (ScanRequest .builder ()
230- .tableName (outputTable )
231- .build ());
244+ RetryUtils .withRetry (() -> {
245+ ScanResponse items = ddbClient .scan (ScanRequest .builder ()
246+ .tableName (outputTable )
247+ .build ());
232248
233- assertThat (items .count ()).isEqualTo (1 );
234- assertThat (items .items ().get (0 ).get ("id" ).s ()).isEqualTo (theId );
249+ if (items .count () != 1 ) {
250+ throw new DataNotReadyException ("DDB streams processing not complete yet" );
251+ }
252+ return null ;
253+ }, "ddb-streams-batch-processing" , DataNotReadyException .class ).get ();
254+
255+ ScanResponse finalItems = ddbClient .scan (ScanRequest .builder ().tableName (outputTable ).build ());
256+ assertThat (finalItems .count ()).isEqualTo (1 );
257+ assertThat (finalItems .items ().get (0 ).get ("id" ).s ()).isEqualTo (theId );
235258 }
236259
237- private void validateAllItemsHandled (ScanResponse items ) {
260+ private boolean areAllTestProductsPresent (ScanResponse items ) {
238261 for (Product p : testProducts ) {
239262 boolean foundIt = false ;
240263 for (Map <String , AttributeValue > a : items .items ()) {
241264 if (a .get ("id" ).s ().equals (Long .toString (p .id ))) {
242265 foundIt = true ;
266+ break ;
243267 }
244268 }
245- assertThat (foundIt ).isTrue ();
269+ if (!foundIt ) {
270+ return false ;
271+ }
246272 }
273+ return true ;
247274 }
248275
249276 class Product {
0 commit comments