Skip to content

Commit 9bf73e6

Browse files
authored
Cross compile for Scala 3
2 parents 4a02843 + f826169 commit 9bf73e6

File tree

20 files changed

+98
-47
lines changed

20 files changed

+98
-47
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ jobs:
1010
strategy:
1111
matrix:
1212
scala:
13-
- 2.13.14
13+
- 2.13.16
14+
- 3.3.5
1415

1516
steps:
1617
- uses: actions/checkout@v4
@@ -32,13 +33,13 @@ jobs:
3233

3334
- name: Run tests ${{ matrix.scala }}
3435
if: success()
35-
run: sbt ++${{ matrix.scala }} clean coverage test docs/mdoc versionPolicyCheck
36+
run: sbt clean coverage "++${{ matrix.scala }} test" docs/mdoc "++${{ matrix.scala }} versionPolicyCheck"
3637

3738
- name: Report test coverage
3839
if: success() && github.repository == 'evolution-gaming/kafka-flow'
3940
env:
4041
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
41-
run: sbt ++${{ matrix.scala }} coverageReport coverageAggregate coveralls
42+
run: sbt "++${{ matrix.scala }} coverageReport" coverageAggregate coveralls
4243

4344
- name: Publish documentation / Setup Node
4445
if: success()

build.sbt

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@ ThisBuild / versionScheme := Some("early-semver")
44
ThisBuild / evictionErrorLevel := Level.Warn
55
ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible
66

7+
lazy val Scala3Version = "3.3.5"
8+
lazy val Scala2Version = "2.13.16"
9+
710
lazy val commonSettings = Seq(
811
organization := "com.evolutiongaming",
9-
homepage := Some(new URL("https://github.com/evolution-gaming/kafka-flow")),
12+
homepage := Some(url("https://github.com/evolution-gaming/kafka-flow")),
1013
startYear := Some(2019),
1114
organizationName := "Evolution Gaming",
1215
organizationHomepage := Some(url("https://evolution.com/")),
1316
publishTo := Some(Resolver.evolutionReleases),
14-
scalaVersion := "2.13.16",
17+
crossScalaVersions := Seq(Scala2Version, Scala3Version),
18+
scalaVersion := crossScalaVersions.value.head,
1519
licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))),
1620
testFrameworks += new TestFramework("munit.Framework"),
1721
testOptions += Tests.Argument(new TestFramework("munit.Framework"), "+l"),
@@ -20,10 +24,25 @@ lazy val commonSettings = Seq(
2024
libraryDependencySchemes ++= Seq(
2125
"org.scala-lang.modules" %% "scala-java8-compat" % "always"
2226
),
23-
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full),
24-
scalacOptions ++= Seq("-Xsource:3"),
27+
libraryDependencies ++= crossSettings(
28+
scalaVersion.value,
29+
if3 = Nil,
30+
if2 = List(compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full)),
31+
),
32+
scalacOptions ++= crossSettings(
33+
scalaVersion.value,
34+
if3 = List("-Ykind-projector", "-language:implicitConversions", "-explain", "-deprecation"),
35+
if2 = List("-Xsource:3")
36+
),
2537
)
2638

39+
def crossSettings[T](scalaVersion: String, if3: List[T], if2: List[T]) =
40+
CrossVersion.partialVersion(scalaVersion) match {
41+
case Some((3, _)) => if3
42+
case Some((2, 12 | 13)) => if2
43+
case _ => Nil
44+
}
45+
2746
lazy val root = (project in file("."))
2847
.aggregate(
2948
core,
@@ -38,7 +57,9 @@ lazy val root = (project in file("."))
3857
.settings(commonSettings)
3958
.settings(
4059
name := "kafka-flow",
41-
publish / skip := true
60+
publish / skip := true,
61+
crossScalaVersions := Nil,
62+
scalaVersion := Scala2Version,
4263
)
4364

4465
lazy val core = (project in file("core"))
@@ -58,10 +79,14 @@ lazy val core = (project in file("core"))
5879
sstream,
5980
random,
6081
retry,
61-
Scodec.core,
6282
Scodec.bits,
6383
Testing.munit % Test,
6484
),
85+
libraryDependencies ++= crossSettings(
86+
scalaVersion.value,
87+
if3 = List(Scodec.coreScala3),
88+
if2 = List(Scodec.coreScala213)
89+
),
6590
)
6691

6792
lazy val `core-it-tests` = (project in file("core-it-tests"))
@@ -86,7 +111,7 @@ lazy val metrics = (project in file("metrics"))
86111
libraryDependencies ++= Seq(
87112
smetrics,
88113
Testing.munit % Test,
89-
)
114+
),
90115
)
91116

92117
lazy val `persistence-cassandra` = (project in file("persistence-cassandra"))
@@ -98,6 +123,11 @@ lazy val `persistence-cassandra` = (project in file("persistence-cassandra"))
98123
scassandra,
99124
cassandraSync,
100125
),
126+
libraryDependencies ++= crossSettings(
127+
scalaVersion.value,
128+
if3 = List(PureConfig.GenericScala3),
129+
if2 = Nil,
130+
),
101131
)
102132

103133
lazy val `persistence-cassandra-it-tests` = (project in file("persistence-cassandra-it-tests"))
@@ -146,13 +176,16 @@ lazy val journal = (project in file("kafka-journal"))
146176
KafkaJournal.journal,
147177
KafkaJournal.persistence,
148178
Testing.munit % Test,
149-
)
179+
),
150180
)
181+
.settings(crossScalaVersions -= Scala3Version)
151182

152183
lazy val docs = (project in file("kafka-flow-docs"))
153184
.dependsOn(core, `persistence-cassandra`, `persistence-kafka`, metrics)
154185
.settings(commonSettings)
155186
.enablePlugins(MdocPlugin, DocusaurusPlugin)
156-
.settings(scalacOptions -= "-Xfatal-warnings")
187+
.settings(
188+
scalacOptions -= "-Xfatal-warnings",
189+
)
157190

158191
addCommandAlias("check", "versionPolicyCheck")

core-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/ForAllKafkaSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ abstract class ForAllKafkaSuite extends FunSuite with TestContainersFixtures {
2626
override def beforeAll(): Unit = {
2727
val config =
2828
ConsumerConfig(common = CommonConfig(bootstrapServers = NonEmptyList.one(kafka.container.bootstrapServers)))
29-
implicit val logOf = LogOf.slf4j[IO].unsafeRunSync()
29+
implicit val logOf: LogOf[IO] = LogOf.slf4j[IO].unsafeRunSync()
3030
val result = KafkaModule.of[IO]("KafkaSuite", config, CollectorRegistry.empty[IO]).allocated.unsafeRunSync()
3131
moduleRef.set(result)
3232
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.evolutiongaming.kafka.flow.persistence.compression
2+
3+
import scodec.Codec
4+
import scodec.Codec.inlineImplementations
5+
6+
// format: off
7+
extension[A, B](codecA: Codec[A])
8+
/** Combines this Codec with another one, added for compatibility with Scala 2.13 version of scodec-core.
9+
* @param codecB
10+
* Codec for B
11+
* @return
12+
* Codec for tuple of A and B
13+
*/
14+
def ~(codecB: Codec[B]): Codec[(A, B)] =
15+
codecA :: codecB
16+
// format: on

core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ object PartitionFlow {
234234
case (_, Right(value)) => value.context.holding
235235
case (key, Left(_)) =>
236236
log.error(s"trying to compute offset to commit but value for key $key is not ready").as(none[Offset])
237-
}(pickMinOffset))
237+
}(using pickMinOffset))
238238
}
239239

240240
def offsetToCommit(getMinOffset: F[Option[Offset]]): F[Option[Offset]] = for {
@@ -249,7 +249,7 @@ object PartitionFlow {
249249

250250
allowedOffset = minimumOffset getOrElse maximumOffset
251251

252-
// we move forward if minimum offset became larger or it is empty,
252+
// we move forward if minimum offset became larger, or it is empty,
253253
// i.e. if we dealt with all the states, and there is nothing holding
254254
// us from moving forward
255255
committedOffsetValue <- committedOffset.get

core/src/main/scala/com/evolutiongaming/kafka/flow/TopicFlow.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import com.evolutiongaming.skafka.consumer.ConsumerRecords
1515
import scodec.bits.ByteVector
1616

1717
import scala.collection.immutable.SortedSet
18-
import scala.annotation.nowarn
1918

2019
trait TopicFlow[F[_]] {
2120

@@ -82,7 +81,6 @@ object TopicFlow {
8281

8382
val acquire = new TopicFlow[F] {
8483

85-
@nowarn("cat=unused") // for some reason compiler thinks that `flow` and `partitionRecords` are unused
8684
def apply(records: ConsumerRecords[String, ByteVector]) = {
8785

8886
for {

core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ object KafkaModule {
3535
config: ConsumerConfig,
3636
registry: CollectorRegistry[F]
3737
): Resource[F, KafkaModule[F]] = {
38-
implicit val measureDuration = MeasureDuration.fromClock[F](Clock[F])
38+
implicit val measureDuration: MeasureDuration[F] = MeasureDuration.fromClock[F](Clock[F])
3939
for {
4040
producerMetrics <- ProducerMetrics.of(registry)
4141
consumerMetrics <- ConsumerMetrics.of(registry)
4242
_producerOf = RawProducerOf.apply1[F](producerMetrics(applicationId).some)
4343
_consumerOf = RawConsumerOf.apply1[F](consumerMetrics(applicationId).some)
4444

4545
_healthCheck <- {
46-
implicit val randomIdOf = RandomIdOf.uuid[F]
47-
implicit val consumerOf = _consumerOf
48-
implicit val producerOf = _producerOf
46+
implicit val randomIdOf: RandomIdOf[F] = RandomIdOf.uuid[F]
47+
implicit val consumerOf: RawConsumerOf[F] = _consumerOf
48+
implicit val producerOf: RawProducerOf[F] = _producerOf
4949

5050
val commonConfig = config.common.copy(clientId = config.common.clientId.map(id => s"$id-HealthCheck"))
5151

core/src/main/scala/com/evolutiongaming/kafka/flow/persistence/compression/HeaderAndPayload.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ private[compression] object HeaderAndPayload {
1111
private val codec = variableSizeBytes(int32, bytes) ~ bytes
1212

1313
def toBytes[F[_]: MonadThrow](header: ByteVector, payload: ByteVector): F[ByteVector] = {
14-
codec.encode(header ~ payload) match {
14+
codec.encode((header, payload)) match {
1515
case Attempt.Successful(value) => value.bytes.pure[F]
1616
case Attempt.Failure(e) => CompressionError(e.message).raiseError
1717
}

core/src/test/scala/com/evolutiongaming/kafka/flow/ConsumerFlowSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import munit.FunSuite
1515
import scodec.bits.ByteVector
1616

1717
import scala.concurrent.duration.*
18-
import scala.util.{Success, Try}
18+
import scala.util.Try
1919

2020
class ConsumerFlowSpec extends FunSuite {
2121

@@ -29,7 +29,7 @@ class ConsumerFlowSpec extends FunSuite {
2929
Command.Records()
3030
)
3131

32-
val Success(result) = ConstFixture.app(topic).runS(Context(commands = commands))
32+
val result = ConstFixture.app(topic).runS(Context(commands = commands)).get
3333

3434
assertEquals(
3535
result.actions.reverse,
@@ -64,7 +64,7 @@ class ConsumerFlowSpec extends FunSuite {
6464
Command.Records()
6565
)
6666

67-
val Success(result) = ConstFixture.app(topic1, topic2).runS(Context(commands = commands))
67+
val result = ConstFixture.app(topic1, topic2).runS(Context(commands = commands)).get
6868

6969
assertEquals(
7070
result.actions.reverse,

core/src/test/scala/com/evolutiongaming/kafka/flow/KeyFlowSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class KeyFlowSpec extends FunSuite {
8484
Offset.unsafe(1) -> 7
8585
}
8686
val timerFlowOf = TimerFlowOf.unloadOrphaned[SyncIO]()
87-
implicit val context = new KeyContext[SyncIO] {
87+
implicit val context: KeyContext[SyncIO] = new KeyContext[SyncIO] {
8888
def holding = none[Offset].pure[SyncIO]
8989
def hold(offset: Offset) = SyncIO.unit
9090
def remove = removeCalled.set(true)
@@ -134,7 +134,7 @@ class KeyFlowSpec extends FunSuite {
134134
val fold = FoldOption.empty[SyncIO, State, ConsumerRecord[String, ByteVector]]
135135
val timerFlowOf = TimerFlowOf.unloadOrphaned[SyncIO]()
136136

137-
implicit val context = new KeyContext[SyncIO] {
137+
implicit val context: KeyContext[SyncIO] = new KeyContext[SyncIO] {
138138
def holding = none[Offset].pure[SyncIO]
139139
def hold(offset: Offset) = SyncIO.unit
140140
def remove = removeCalled.set(true)

0 commit comments

Comments
 (0)