Skip to content

Commit 7a5a005

Browse files
author
jmast
committed
Cross compile for Scala 3
1 parent 4a02843 commit 7a5a005

File tree

20 files changed

+97
-49
lines changed

20 files changed

+97
-49
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ jobs:
1010
strategy:
1111
matrix:
1212
scala:
13-
- 2.13.14
13+
- 2.13.16
14+
- 3.3.5
1415

1516
steps:
1617
- uses: actions/checkout@v4

build.sbt

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ ThisBuild / versionScheme := Some("early-semver")
44
ThisBuild / evictionErrorLevel := Level.Warn
55
ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible
66

7+
lazy val Scala3Version = "3.3.5"
8+
lazy val Scala2Version = "2.13.16"
9+
710
lazy val commonSettings = Seq(
811
organization := "com.evolutiongaming",
9-
homepage := Some(new URL("https://github.com/evolution-gaming/kafka-flow")),
12+
homepage := Some(url("https://github.com/evolution-gaming/kafka-flow")),
1013
startYear := Some(2019),
1114
organizationName := "Evolution Gaming",
1215
organizationHomepage := Some(url("https://evolution.com/")),
1316
publishTo := Some(Resolver.evolutionReleases),
14-
scalaVersion := "2.13.16",
17+
scalaVersion := Scala2Version,
1518
licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))),
1619
testFrameworks += new TestFramework("munit.Framework"),
1720
testOptions += Tests.Argument(new TestFramework("munit.Framework"), "+l"),
@@ -20,10 +23,26 @@ lazy val commonSettings = Seq(
2023
libraryDependencySchemes ++= Seq(
2124
"org.scala-lang.modules" %% "scala-java8-compat" % "always"
2225
),
23-
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full),
24-
scalacOptions ++= Seq("-Xsource:3"),
26+
libraryDependencies ++= crossSettings(
27+
scalaVersion.value,
28+
if3 = Nil,
29+
if2 = List(compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full)),
30+
),
31+
scalacOptions ++= crossSettings(
32+
scalaVersion.value,
33+
if3 = List("-Ykind-projector", "-language:implicitConversions", "-explain", "-deprecation"),
34+
if2 = List("-Xsource:3")
35+
),
36+
crossScalaVersions := Seq(Scala2Version, Scala3Version)
2537
)
2638

39+
def crossSettings[T](scalaVersion: String, if3: List[T], if2: List[T]) =
40+
CrossVersion.partialVersion(scalaVersion) match {
41+
case Some((3, _)) => if3
42+
case Some((2, 12 | 13)) => if2
43+
case _ => Nil
44+
}
45+
2746
lazy val root = (project in file("."))
2847
.aggregate(
2948
core,
@@ -38,7 +57,8 @@ lazy val root = (project in file("."))
3857
.settings(commonSettings)
3958
.settings(
4059
name := "kafka-flow",
41-
publish / skip := true
60+
publish / skip := true,
61+
crossScalaVersions := Nil,
4262
)
4363

4464
lazy val core = (project in file("core"))
@@ -58,10 +78,14 @@ lazy val core = (project in file("core"))
5878
sstream,
5979
random,
6080
retry,
61-
Scodec.core,
6281
Scodec.bits,
6382
Testing.munit % Test,
6483
),
84+
libraryDependencies ++= crossSettings(
85+
scalaVersion.value,
86+
if3 = List(Scodec.coreScala3),
87+
if2 = List(Scodec.coreScala213)
88+
)
6589
)
6690

6791
lazy val `core-it-tests` = (project in file("core-it-tests"))
@@ -98,6 +122,11 @@ lazy val `persistence-cassandra` = (project in file("persistence-cassandra"))
98122
scassandra,
99123
cassandraSync,
100124
),
125+
libraryDependencies ++= crossSettings(
126+
scalaVersion.value,
127+
if3 = List(PureConfig.GenericScala3),
128+
if2 = Nil,
129+
)
101130
)
102131

103132
lazy val `persistence-cassandra-it-tests` = (project in file("persistence-cassandra-it-tests"))
@@ -146,7 +175,8 @@ lazy val journal = (project in file("kafka-journal"))
146175
KafkaJournal.journal,
147176
KafkaJournal.persistence,
148177
Testing.munit % Test,
149-
)
178+
),
179+
crossScalaVersions := Seq(Scala2Version),
150180
)
151181

152182
lazy val docs = (project in file("kafka-flow-docs"))

core-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/ForAllKafkaSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ abstract class ForAllKafkaSuite extends FunSuite with TestContainersFixtures {
2626
override def beforeAll(): Unit = {
2727
val config =
2828
ConsumerConfig(common = CommonConfig(bootstrapServers = NonEmptyList.one(kafka.container.bootstrapServers)))
29-
implicit val logOf = LogOf.slf4j[IO].unsafeRunSync()
29+
implicit val logOf: LogOf[IO] = LogOf.slf4j[IO].unsafeRunSync()
3030
val result = KafkaModule.of[IO]("KafkaSuite", config, CollectorRegistry.empty[IO]).allocated.unsafeRunSync()
3131
moduleRef.set(result)
3232
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.evolutiongaming.kafka.flow.persistence.compression
2+
3+
import scodec.Codec
4+
import scodec.Codec.inlineImplementations
5+
6+
extension[A, B](codecA: Codec[A])
7+
8+
/** Combines this Codec with another one, added for compatibility with Scala 2.13 version of scodec-core.
9+
* @param codecB
10+
* Codec for B
11+
* @return
12+
* Codec for tuple of A and B
13+
*/
14+
def ~(codecB: Codec[B]): Codec[(A, B)] =
15+
codecA :: codecB

core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ object PartitionFlow {
234234
case (_, Right(value)) => value.context.holding
235235
case (key, Left(_)) =>
236236
log.error(s"trying to compute offset to commit but value for key $key is not ready").as(none[Offset])
237-
}(pickMinOffset))
237+
})
238238
}
239239

240240
def offsetToCommit(getMinOffset: F[Option[Offset]]): F[Option[Offset]] = for {
@@ -338,13 +338,14 @@ object PartitionFlow {
338338

339339
}
340340

341-
private[this] val pickMinOffset = CommutativeMonoid.instance[Option[Offset]](
342-
none[Offset],
343-
(x, y) =>
344-
(x, y) match {
345-
case (Some(x), Some(y)) => Some(x min y)
346-
case (x @ Some(_), _) => x
347-
case (_, y) => y
348-
}
349-
)
341+
private[this] implicit val pickMinOffset: CommutativeMonoid[Option[Offset]] =
342+
CommutativeMonoid.instance[Option[Offset]](
343+
none[Offset],
344+
(x, y) =>
345+
(x, y) match {
346+
case (Some(x), Some(y)) => Some(x min y)
347+
case (x @ Some(_), _) => x
348+
case (_, y) => y
349+
}
350+
)
350351
}

core/src/main/scala/com/evolutiongaming/kafka/flow/TopicFlow.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import com.evolutiongaming.skafka.consumer.ConsumerRecords
1515
import scodec.bits.ByteVector
1616

1717
import scala.collection.immutable.SortedSet
18-
import scala.annotation.nowarn
1918

2019
trait TopicFlow[F[_]] {
2120

@@ -82,7 +81,6 @@ object TopicFlow {
8281

8382
val acquire = new TopicFlow[F] {
8483

85-
@nowarn("cat=unused") // for some reason compiler thinks that `flow` and `partitionRecords` are unused
8684
def apply(records: ConsumerRecords[String, ByteVector]) = {
8785

8886
for {

core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ object KafkaModule {
3535
config: ConsumerConfig,
3636
registry: CollectorRegistry[F]
3737
): Resource[F, KafkaModule[F]] = {
38-
implicit val measureDuration = MeasureDuration.fromClock[F](Clock[F])
38+
implicit val measureDuration: MeasureDuration[F] = MeasureDuration.fromClock[F](Clock[F])
3939
for {
4040
producerMetrics <- ProducerMetrics.of(registry)
4141
consumerMetrics <- ConsumerMetrics.of(registry)
4242
_producerOf = RawProducerOf.apply1[F](producerMetrics(applicationId).some)
4343
_consumerOf = RawConsumerOf.apply1[F](consumerMetrics(applicationId).some)
4444

4545
_healthCheck <- {
46-
implicit val randomIdOf = RandomIdOf.uuid[F]
47-
implicit val consumerOf = _consumerOf
48-
implicit val producerOf = _producerOf
46+
implicit val randomIdOf: RandomIdOf[F] = RandomIdOf.uuid[F]
47+
implicit val consumerOf: RawConsumerOf[F] = _consumerOf
48+
implicit val producerOf: RawProducerOf[F] = _producerOf
4949

5050
val commonConfig = config.common.copy(clientId = config.common.clientId.map(id => s"$id-HealthCheck"))
5151

core/src/main/scala/com/evolutiongaming/kafka/flow/persistence/compression/HeaderAndPayload.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ private[compression] object HeaderAndPayload {
1111
private val codec = variableSizeBytes(int32, bytes) ~ bytes
1212

1313
def toBytes[F[_]: MonadThrow](header: ByteVector, payload: ByteVector): F[ByteVector] = {
14-
codec.encode(header ~ payload) match {
14+
codec.encode((header, payload)) match {
1515
case Attempt.Successful(value) => value.bytes.pure[F]
1616
case Attempt.Failure(e) => CompressionError(e.message).raiseError
1717
}

core/src/test/scala/com/evolutiongaming/kafka/flow/ConsumerFlowSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import munit.FunSuite
1515
import scodec.bits.ByteVector
1616

1717
import scala.concurrent.duration.*
18-
import scala.util.{Success, Try}
18+
import scala.util.Try
1919

2020
class ConsumerFlowSpec extends FunSuite {
2121

@@ -29,7 +29,7 @@ class ConsumerFlowSpec extends FunSuite {
2929
Command.Records()
3030
)
3131

32-
val Success(result) = ConstFixture.app(topic).runS(Context(commands = commands))
32+
val result = ConstFixture.app(topic).runS(Context(commands = commands)).get
3333

3434
assertEquals(
3535
result.actions.reverse,
@@ -64,7 +64,7 @@ class ConsumerFlowSpec extends FunSuite {
6464
Command.Records()
6565
)
6666

67-
val Success(result) = ConstFixture.app(topic1, topic2).runS(Context(commands = commands))
67+
val result = ConstFixture.app(topic1, topic2).runS(Context(commands = commands)).get
6868

6969
assertEquals(
7070
result.actions.reverse,

core/src/test/scala/com/evolutiongaming/kafka/flow/KeyFlowSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class KeyFlowSpec extends FunSuite {
8484
Offset.unsafe(1) -> 7
8585
}
8686
val timerFlowOf = TimerFlowOf.unloadOrphaned[SyncIO]()
87-
implicit val context = new KeyContext[SyncIO] {
87+
implicit val context: KeyContext[SyncIO] = new KeyContext[SyncIO] {
8888
def holding = none[Offset].pure[SyncIO]
8989
def hold(offset: Offset) = SyncIO.unit
9090
def remove = removeCalled.set(true)
@@ -134,7 +134,7 @@ class KeyFlowSpec extends FunSuite {
134134
val fold = FoldOption.empty[SyncIO, State, ConsumerRecord[String, ByteVector]]
135135
val timerFlowOf = TimerFlowOf.unloadOrphaned[SyncIO]()
136136

137-
implicit val context = new KeyContext[SyncIO] {
137+
implicit val context: KeyContext[SyncIO] = new KeyContext[SyncIO] {
138138
def holding = none[Offset].pure[SyncIO]
139139
def hold(offset: Offset) = SyncIO.unit
140140
def remove = removeCalled.set(true)

0 commit comments

Comments
 (0)