Skip to content

Commit 67de72a

Browse files
committed
Genericise ioRetrying
1 parent 47bb2aa commit 67de72a

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed
Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
11
package base
22

3+
import cats.effect.Async
4+
import cats.effect.std.Dispatcher
35
import cats.effect.testing.scalatest.AsyncIOSpec
6+
import cats.syntax.all.*
7+
import org.scalactic.source.Position
48
import org.scalatest.OptionValues
59
import org.scalatest.concurrent.Eventually
10+
import org.scalatest.enablers.Retrying
611
import org.scalatest.matchers.should.Matchers
12+
import org.scalatest.time.Span
713
import org.scalatest.wordspec.AsyncWordSpec
814

915
import scala.concurrent.duration.*
1016

11-
trait AsyncIntSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually {
17+
trait AsyncIntSpec[F[_]] extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually {
1218
override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 500.millis)
19+
20+
implicit def fRetrying[T](implicit F: Async[F]): Retrying[F[T]] = new Retrying[F[T]] {
21+
override def retry(timeout: Span, interval: Span, pos: Position)(fun: => F[T]): F[T] =
22+
Dispatcher.sequential[F].use { dispatcher =>
23+
F.fromFuture(
24+
F.executionContext.map(
25+
Retrying.retryingNatureOfFutureT[T](_).retry(timeout, interval, pos)(dispatcher.unsafeToFuture(fun))
26+
)
27+
)
28+
}
29+
}
1330
}

src/test/scala/base/KafkaSpecBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ package base
22

33
import utils.{EmbeddedKafka, KafkaHelpers}
44

5-
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec with KafkaHelpers[F] with EmbeddedKafka[F]
5+
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec[F] with KafkaHelpers[F] with EmbeddedKafka[F]

src/test/scala/utils/KafkaHelpers.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@ import fs2.kafka.{AutoOffsetReset, ConsumerRecord, ConsumerSettings, KafkaConsum
1111
import io.github.embeddedkafka.EmbeddedKafkaConfig
1212
import org.apache.kafka.common.TopicPartition
1313
import org.scalatest.Assertion
14-
import org.scalatest.enablers.Retrying
1514
import org.typelevel.log4cats.LoggerFactory
1615
import org.typelevel.log4cats.slf4j.Slf4jFactory
1716
import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader}
1817

1918
import scala.concurrent.duration.*
2019

2120
trait KafkaHelpers[F[_]] {
22-
self: AsyncIntSpec & EmbeddedKafka[F] =>
21+
self: AsyncIntSpec[F] & EmbeddedKafka[F] =>
2322

2423
val groupId = "test-consumer-group"
2524
val testTopic1 = "load-state-topic-1"
@@ -95,14 +94,14 @@ trait KafkaHelpers[F[_]] {
9594
def publishToKafkaAndWaitForCompaction(
9695
partitions: NonEmptySet[TopicPartition],
9796
messages: Seq[(String, String)]
98-
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F], retrying: Retrying[F[Assertion]]): F[Unit] = for {
97+
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] = for {
9998
_ <- publishToKafkaAndTriggerCompaction(partitions, messages)
10099
_ <- waitForCompaction(partitions)
101100
} yield ()
102101

103102
def waitForCompaction(
104103
partitions: NonEmptySet[TopicPartition]
105-
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F], retrying: Retrying[F[Assertion]]): F[Assertion] =
104+
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
106105
consumeEventually(partitions) { r =>
107106
for {
108107
records <- r
@@ -118,7 +117,7 @@ trait KafkaHelpers[F[_]] {
118117
groupId: String = UUID.randomUUID().toString
119118
)(
120119
f: F[List[(String, String)]] => F[Assertion]
121-
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F], retrying: Retrying[F[Assertion]]): F[Assertion] =
120+
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
122121
eventually {
123122
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
124123
autoCommit = false,

0 commit comments

Comments
 (0)