File tree Expand file tree Collapse file tree 1 file changed +16
-1
lines changed
services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis Expand file tree Collapse file tree 1 file changed +16
-1
lines changed Original file line number Diff line number Diff line change 4343import software .amazon .awssdk .services .kinesis .model .ConsumerStatus ;
4444import software .amazon .awssdk .services .kinesis .model .PutRecordRequest ;
4545import software .amazon .awssdk .services .kinesis .model .Record ;
46+ import software .amazon .awssdk .services .kinesis .model .ResourceInUseException ;
4647import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
4748import software .amazon .awssdk .services .kinesis .model .StreamStatus ;
4849import software .amazon .awssdk .services .kinesis .model .SubscribeToShardEvent ;
@@ -290,8 +291,22 @@ private static void waitForStreamToBeActive() {
290291 Waiter .run (() -> asyncClient .describeStream (r -> r .streamName (streamName )).join ())
291292 .until (b -> b .streamDescription ().streamStatus ().equals (StreamStatus .ACTIVE ))
292293 .orFailAfter (Duration .ofMinutes (5 ));
293- }
294294
295+ // Additional verification to ensure stream is fully operational
296+ Waiter .run (() -> {
297+ try {
298+ asyncClient .listShards (r -> r .streamName (streamName )).join ();
299+ return true ;
300+ } catch (Exception e ) {
301+ if (e .getCause () instanceof ResourceInUseException ) {
302+ return false ;
303+ }
304+ throw e ;
305+ }
306+ })
307+ .until (Boolean ::booleanValue )
308+ .orFailAfter (Duration .ofMinutes (1 ));
309+ }
295310
296311 /**
297312 * Puts a random record to the stream.
You can’t perform that action at this time.
0 commit comments