@@ -230,7 +230,6 @@ object ProducerSpec extends ZIOSpecDefault {
230230 results <- ZIO .scoped {
231231 serverResource *> {
232232 for {
233- _ <- withFastClock.fork
234233 _ <- Utils .createQueue(queueName)
235234 queueUrl <- Utils .getQueueUrl(queueName)
236235 producer = Producer .make(queueUrl, Serializer .serializeString, settings)
@@ -254,7 +253,6 @@ object ProducerSpec extends ZIOSpecDefault {
254253 val client = failUnrecoverableClient
255254
256255 for {
257- _ <- withFastClock.fork
258256 errOrResult <- ZIO .scoped {
259257 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
260258 producer.flatMap(p => p.sendStream(ZStream (events : _* )).runDrain.either)
@@ -274,7 +272,6 @@ object ProducerSpec extends ZIOSpecDefault {
274272 results <- ZIO .scoped {
275273 serverResource *> {
276274 for {
277- _ <- withFastClock.fork
278275 _ <- Utils .createQueue(queueName)
279276 queueUrl <- Utils .getQueueUrl(queueName)
280277 producer = Producer .make(queueUrl, Serializer .serializeString, settings)
@@ -294,12 +291,11 @@ object ProducerSpec extends ZIOSpecDefault {
294291 val client = failUnrecoverableClient
295292
296293 for {
297- _ <- withFastClock.fork
298294 errOrResults <- ZIO .scoped {
299295 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
300296 producer.flatMap(p => ZIO .foreachPar(events)(event => p.produce(event))).either
301297 }.provide(client)
302- } yield assert (errOrResults.isLeft)(isTrue )
298+ } yield assertTrue (errOrResults.isLeft)
303299 },
304300 test(" events can be published using produceBatch and return the results" ) {
305301 val queueName = " produceBatch-" + UUID .randomUUID().toString
@@ -312,15 +308,13 @@ object ProducerSpec extends ZIOSpecDefault {
312308 .run(ZSink .head[Chunk [ProducerEvent [String ]]])
313309 .someOrFailException
314310 results <- ZIO .scoped {
315- serverResource *> {
316- for {
317- _ <- withFastClock.fork
311+ serverResource *>
312+ (for {
318313 _ <- Utils .createQueue(queueName)
319314 queueUrl <- Utils .getQueueUrl(queueName)
320- producer <- ZIO .succeed(Producer .make(queueUrl, Serializer .serializeString, settings))
321- results <- ZIO .scoped(producer.flatMap(p => p.produceBatchE(events)))
322- } yield results
323- }
315+ producer <- Producer .make(queueUrl, Serializer .serializeString, settings)
316+ results <- producer.produceBatchE(events)
317+ } yield results)
324318 }
325319 } yield assert(results.size)(equalTo(events.size)) &&
326320 assert(results.forall(_.isRight))(isTrue)
@@ -333,7 +327,6 @@ object ProducerSpec extends ZIOSpecDefault {
333327 val client = failUnrecoverableClient
334328
335329 for {
336- _ <- withFastClock.fork
337330 errOrResults <- ZIO .scoped {
338331 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
339332 producer.flatMap(p => p.produceBatch(events)).either
@@ -351,15 +344,13 @@ object ProducerSpec extends ZIOSpecDefault {
351344 .run(ZSink .head[Chunk [ProducerEvent [String ]]])
352345 .someOrFailException
353346 results <- ZIO .scoped {
354- serverResource *> {
355- for {
356- _ <- withFastClock.fork
347+ serverResource *>
348+ (for {
357349 _ <- Utils .createQueue(queueName)
358350 queueUrl <- Utils .getQueueUrl(queueName)
359351 producer = Producer .make(queueUrl, Serializer .serializeString, settings)
360352 results <- ZIO .scoped(producer.flatMap(p => ZStream .succeed(events).run(p.sendSink)))
361- } yield results
362- }
353+ } yield results)
363354 }
364355 } yield assert(results)(equalTo(()))
365356 },
@@ -379,27 +370,25 @@ object ProducerSpec extends ZIOSpecDefault {
379370 }
380371
381372 for {
382- _ <- withFastClock.fork
383373 errOrResults <- ZIO .scoped {
384374 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
385375 producer.flatMap(p => ZStream .succeed(events).run(p.sendSink)).either
386376 }.provide(client)
387- } yield assert (errOrResults.isLeft)(isTrue )
377+ } yield assertTrue (errOrResults.isLeft)
388378 },
389379 test(" events that published using sendSink and return an unrecoverable error should fail the sink on error" ) {
390380 val queueName = " sendSink-" + UUID .randomUUID().toString
391381 val queueUrl = s " sqs:// $queueName"
392- val settings : ProducerSettings = ProducerSettings ()
382+ val settings : ProducerSettings = ProducerSettings ().copy(retryMaxCount = 0 )
393383 val events = List (" A" ).map(ProducerEvent (_))
394384 val client = failUnrecoverableClient
395385
396386 for {
397- _ <- withFastClock.fork
398387 errOrResults <- ZIO .scoped {
399388 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
400389 producer.flatMap(p => ZStream .succeed(events).run(p.sendSink)).either
401390 }.provide(client)
402- } yield assert (errOrResults.isLeft)(isTrue )
391+ } yield assertTrue (errOrResults.isLeft)
403392 },
404393 test(" submitted events can succeed and fail if there are unrecoverable errors" ) {
405394 val queueName = " success-and-unrecoverable-failures-" + UUID .randomUUID().toString
@@ -429,7 +418,6 @@ object ProducerSpec extends ZIOSpecDefault {
429418 }
430419
431420 for {
432- _ <- withFastClock.fork
433421 results <- ZIO .scoped {
434422 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
435423 producer.flatMap(p => p.produceBatchE(events))
@@ -478,15 +466,15 @@ object ProducerSpec extends ZIOSpecDefault {
478466 }
479467
480468 for {
481- _ <- withFastClock.fork
482- results <- ZIO .scoped {
483- val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
484- producer.flatMap(p => p.produceBatchE(events))
485- }.provide(client)
469+ results <- ZIO
470+ .scoped(
471+ Producer
472+ .make(queueUrl, Serializer .serializeString, settings)
473+ .flatMap(_.produceBatchE(events))
474+ )
475+ .provide(client)
486476 } yield {
487- val successes = results.filter(_.isRight).collect {
488- case Right (x) => x.data
489- }
477+ val successes = results.collect { case Right (x) => x.data }
490478
491479 assert(results.size)(equalTo(events.size)) &&
492480 assert(successes)(hasSameElements(List (" A" , " B" , " C" ))) &&
@@ -521,15 +509,13 @@ object ProducerSpec extends ZIOSpecDefault {
521509 }
522510
523511 for {
524- _ <- withFastClock.fork
525512 results <- ZIO .scoped {
526- val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
527- producer.flatMap(p => p.produceBatchE(events))
513+ Producer
514+ .make(queueUrl, Serializer .serializeString, settings)
515+ .flatMap(_.produceBatchE(events))
528516 }.provide(client)
529517 } yield {
530- val failures = results.filter(_.isLeft).collect {
531- case Left (x) => x.event.data
532- }
518+ val failures = results.collect { case Left (x) => x.event.data }
533519
534520 assert(results.size)(equalTo(events.size)) &&
535521 assert(failures)(hasSameElements(List (" A" , " B" , " C" ))) &&
@@ -552,7 +538,6 @@ object ProducerSpec extends ZIOSpecDefault {
552538 }
553539
554540 for {
555- _ <- withFastClock.fork
556541 errOrResults <- ZIO .scoped {
557542 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
558543 producer.flatMap(p => p.produceBatchE(events)).either
@@ -581,7 +566,6 @@ object ProducerSpec extends ZIOSpecDefault {
581566 }
582567
583568 for {
584- _ <- withFastClock.fork
585569 results <- ZIO .scoped {
586570 val producer = Producer .make(queueUrl, Serializer .serializeString, settings)
587571 producer.flatMap(p => ZIO .foreach(events)(e => ZIO .sleep(100 .milliseconds) *> p.produce(e).either))
@@ -600,13 +584,11 @@ object ProducerSpec extends ZIOSpecDefault {
600584 val client = failUnrecoverableClient
601585
602586 for {
603- _ <- withFastClock.fork
604587 scope <- Scope .make
605588 producerPromise <- Producer
606589 .make[String ](queueUrl, Serializer .serializeString, settings)
607590 .provide(client, ZLayer .succeed(scope))
608591 .fork
609-
610592 producer <- producerPromise.await.flatten
611593
612594 errOrResults <- producer.produceBatch(events).either
@@ -615,7 +597,7 @@ object ProducerSpec extends ZIOSpecDefault {
615597 ).provideSomeLayerShared[TestEnvironment ]((zio.aws.netty.NettyHttpClient .default >>> zio.aws.core.config.AwsConfig .default >>> clientResource).orDie)
616598
617599 override def aspects : Chunk [TestAspect [Nothing , TestEnvironment , Nothing , Any ]] =
618- Chunk (TestAspect .executionStrategy(ExecutionStrategy .Sequential ))
600+ Chunk (TestAspect .executionStrategy(ExecutionStrategy .Sequential ), TestAspect .timeout( 60 .seconds), TestAspect .withLiveEnvironment )
619601
620602 def queueResource (capacity : Int ): ZIO [Scope , Throwable , Queue [SqsRequestEntry [String ]]] =
621603 ZIO .acquireRelease(Queue .bounded[SqsRequestEntry [String ]](capacity))(_.shutdown)
0 commit comments