@@ -2,22 +2,24 @@ package utils
22
33import java .util .UUID
44
5+ import base .AsyncIntSpec
56import cats .data .{NonEmptyList , NonEmptySet }
67import cats .effect .{Async , Resource }
78import cats .syntax .all .*
89import fs2 .Stream
910import fs2 .kafka .{AutoOffsetReset , ConsumerRecord , ConsumerSettings , KafkaConsumer }
1011import io .github .embeddedkafka .EmbeddedKafkaConfig
1112import org .apache .kafka .common .TopicPartition
12- import org .scalatest .exceptions .TestFailedException
13+ import org .scalatest .Assertion
14+ import org .scalatest .enablers .Retrying
1315import org .typelevel .log4cats .LoggerFactory
1416import org .typelevel .log4cats .slf4j .Slf4jFactory
1517import uk .sky .fs2 .kafka .topicloader .{LoadTopicStrategy , TopicLoader }
1618
1719import scala .concurrent .duration .*
1820
1921trait KafkaHelpers [F [_]] {
20- self : EmbeddedKafka [F ] =>
22+ self : AsyncIntSpec & EmbeddedKafka [F ] =>
2123
2224 val groupId = " test-consumer-group"
2325 val testTopic1 = " load-state-topic-1"
@@ -93,50 +95,37 @@ trait KafkaHelpers[F[_]] {
9395 def publishToKafkaAndWaitForCompaction (
9496 partitions : NonEmptySet [TopicPartition ],
9597 messages : Seq [(String , String )]
96- )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ]): F [Unit ] = for {
98+ )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ], retrying : Retrying [ F [ Assertion ]] ): F [Unit ] = for {
9799 _ <- publishToKafkaAndTriggerCompaction(partitions, messages)
98100 _ <- waitForCompaction(partitions)
99101 } yield ()
100102
101103 def waitForCompaction (
102104 partitions : NonEmptySet [TopicPartition ]
103- )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ]): F [Unit ] =
105+ )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ], retrying : Retrying [ F [ Assertion ]] ): F [Assertion ] =
104106 consumeEventually(partitions) { r =>
105107 for {
106108 records <- r
107109 messageKeys = records.map { case (k, _) => k }
108- result <-
109- if (messageKeys.sorted == messageKeys.toSet.toList.sorted) F .unit
110- else F .raiseError(new TestFailedException (" Topic has not compacted within timeout" , 1 ))
111- } yield result
110+ } yield messageKeys should contain theSameElementsAs messageKeys
112111 }
113112
114113 def consumeEventually (
115114 partitions : NonEmptySet [TopicPartition ],
116115 groupId : String = UUID .randomUUID().toString
117116 )(
118- f : F [List [(String , String )]] => F [Unit ]
119- )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ]): F [Unit ] =
120- retry(
121- fa = {
122- val records = withAssignedConsumer[F [List [ConsumerRecord [String , String ]]]](
123- autoCommit = false ,
124- offsetReset = AutoOffsetReset .Earliest ,
125- partitions,
126- groupId.some
127- )(
128- _.records
129- .map(_.record)
130- .interruptAfter(5 .second)
131- .compile
132- .toList
133- )
134-
135- f(records.map(_.map(r => r.key -> r.value)))
136- },
137- delay = 1 .second,
138- max = 5
139- )
117+ f : F [List [(String , String )]] => F [Assertion ]
118+ )(implicit kafkaConfig : EmbeddedKafkaConfig , F : Async [F ], retrying : Retrying [F [Assertion ]]): F [Assertion ] =
119+ eventually {
120+ val records = withAssignedConsumer[F [List [ConsumerRecord [String , String ]]]](
121+ autoCommit = false ,
122+ offsetReset = AutoOffsetReset .Earliest ,
123+ partitions,
124+ groupId.some
125+ )(_.records.map(_.record).interruptAfter(5 .second).compile.toList)
126+
127+ f(records.map(_.map(r => r.key -> r.value)))
128+ }
140129
141130 def withAssignedConsumer [T ](
142131 autoCommit : Boolean ,
@@ -172,11 +161,4 @@ trait KafkaHelpers[F[_]] {
172161 val settings = groupId.fold(baseSettings)(baseSettings.withGroupId)
173162 KafkaConsumer [F ].resource(settings)
174163 }
175-
176- def retry [A ](fa : F [A ], delay : FiniteDuration = 1 .second, max : Int = 10 )(implicit F : Async [F ]): F [A ] =
177- if (max <= 1 ) fa
178- else
179- fa handleErrorWith { _ =>
180- F .sleep(delay) *> retry(fa, delay, max - 1 )
181- }
182164}
0 commit comments