Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: CI

on:
workflow_dispatch:
push:
branches:
- main
Expand All @@ -24,6 +25,42 @@ jobs:
- name: Check Scala formatting
run: sbt scalafmtSbtCheck scalafmtCheckAll

- name: Cache StatsD image
uses: actions/cache@v4
id: statsd-cache
with:
path: /tmp/docker-images/statsd.tar
key: docker-${{ runner.os }}-statsd-v0.10.1

- name: Load cached StatsD image
if: steps.statsd-cache.outputs.cache-hit == 'true'
run: docker load -i /tmp/docker-images/statsd.tar

- name: Pull and cache StatsD image
if: steps.statsd-cache.outputs.cache-hit != 'true'
run: |
mkdir -p /tmp/docker-images
docker pull dblworks/statsd:v0.10.1
docker save dblworks/statsd:v0.10.1 -o /tmp/docker-images/statsd.tar

- name: Cache LocalStack image
uses: actions/cache@v4
id: localstack-cache
with:
path: /tmp/docker-images/localstack.tar
key: docker-${{ runner.os }}-localstack-2.3.2

- name: Load cached LocalStack image
if: steps.localstack-cache.outputs.cache-hit == 'true'
run: docker load -i /tmp/docker-images/localstack.tar

- name: Pull and cache LocalStack image
if: steps.localstack-cache.outputs.cache-hit != 'true'
run: |
mkdir -p /tmp/docker-images
docker pull localstack/localstack:2.3.2
docker save localstack/localstack:2.3.2 -o /tmp/docker-images/localstack.tar

- name: Run tests
run: env TESTCONTAINERS_RYUK_DISABLED=true sbt clean +test

Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 0.19.0 (2026-01-08)
--------------------------
Fix flaky tests (#153)
Streams core: Eager processing of inner streams (#151)
Kafka source recover from RebalanceInProgressException (#149)
Add configurable AWS SDK retry count for Kinesis source (#150)

Version 0.18.0 (2025-12-01)
--------------------------
Use hard-coded AWS user-agent (#146)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Licensed under the [Snowplow Community License](https://docs.snowplow.io/communi
[build-image]: https://github.com/snowplow-incubator/common-streams/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/common-streams/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.18.0-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.19.0-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/common-streams/releases

[license]: https://docs.snowplow.io/docs/contributing/community-license-faq/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,32 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}
import cats.effect.{IO, Ref, Resource}
import cats.effect.testing.specs2.CatsResource
import org.specs2.mutable.SpecificationLike
import org.specs2.specification.BeforeAll
import org.testcontainers.containers.GenericContainer
import com.snowplowanalytics.snowplow.it.DockerPull

import retry.syntax.all._
import retry.RetryPolicies

class MetricsSpec extends CatsResource[IO, (GenericContainer[_], StatsdAPI[IO])] with SpecificationLike with BeforeAll {
class MetricsSpec extends CatsResource[IO, StatsdAPI[IO]] with SpecificationLike {

override def beforeAll(): Unit = {
DockerPull.pull(Statsd.image, Statsd.tag)
super.beforeAll()
}
override protected val ResourceTimeout = 1.minute

override val resource: Resource[IO, (GenericContainer[_], StatsdAPI[IO])] =
override val resource: Resource[IO, StatsdAPI[IO]] =
for {
statsd <- Statsd.resource(TestMetrics.getClass.getSimpleName)
socket <- Resource.eval(IO.blocking(new Socket(statsd.getHost(), statsd.getMappedPort(8126))))
statsdApi <- StatsdAPI.resource[IO](socket)
} yield (statsd, statsdApi)
} yield statsdApi

override def is = s2"""
MetricsSpec should
deliver metrics to statsd $e1
"""

def e1 = withResource { case (statsd @ _, statsdApi) =>
def e1 = withResource { statsdApi =>
for {
t <- TestMetrics.impl
t <- TestMetrics.impl(300.millis)
_ <- t.count(100)
_ <- t.time(10.seconds)
f <- t.report.compile.drain.start
_ <- IO.sleep(150.millis)
_ <- IO.sleep(350.millis)
counters <- statsdApi
.get(Metrics.MetricType.Count)
.retryingOnFailures(
Expand Down Expand Up @@ -81,7 +74,7 @@ object TestMetrics {
def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t))
}

def impl = Ref[IO]
def impl(period: FiniteDuration) = Ref[IO]
.of(TestState.empty)
.map { ref =>
TestMetrics(
Expand All @@ -92,7 +85,7 @@ object TestMetrics {
"localhost",
8125,
Map.empty,
100.millis,
period,
""
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@ import com.github.dockerjava.api.model.Ports
object Statsd {
val image = "dblworks/statsd" // the official statsd/statsd size is monstrous
val tag = "v0.10.1"
def resource(
loggerName: String
): Resource[IO, GenericContainer[_]] =
Resource.make {
val statsd: GenericContainer[_] = new GenericContainer(s"$image:$tag")
statsd.addExposedPort(8126)
statsd.setWaitStrategy(Wait.forLogMessage("""^(.*)server is up(.+)$""", 1))
statsd.withCreateContainerCmdModifier { cmd =>
val statsPort = 8125
cmd.withExposedPorts((cmd.getExposedPorts().toList :+ ExposedPort.udp(statsPort)).asJava)
val ports = cmd.getHostConfig().getPortBindings()
ports.bind(ExposedPort.udp(statsPort), Ports.Binding.bindPort(statsPort))
cmd.getHostConfig().withPortBindings(ports)
()
}
IO.blocking(start(statsd, loggerName))
}(ls => IO.blocking(ls.stop()))

private def start(statsd: GenericContainer[_], loggerName: String): GenericContainer[_] = {
def resource(loggerName: String): Resource[IO, GenericContainer[_]] =
Resource.make(IO.blocking(startContainer(loggerName)))(c => IO.blocking(c.stop()))

private def startContainer(loggerName: String): GenericContainer[_] = {
val statsd: GenericContainer[_] = new GenericContainer(s"$image:$tag")
statsd.addExposedPort(8126)
statsd.setWaitStrategy(Wait.forLogMessage("""^(.*)server is up(.+)$""", 1))
statsd.withCreateContainerCmdModifier { cmd =>
val statsPort = 8125
cmd.withExposedPorts((cmd.getExposedPorts().toList :+ ExposedPort.udp(statsPort)).asJava)
val ports = cmd.getHostConfig().getPortBindings()
ports.bind(ExposedPort.udp(statsPort), Ports.Binding.bindPort(statsPort))
cmd.getHostConfig().withPortBindings(ports)
()
}
statsd.start()
val logger = LoggerFactory.getLogger(loggerName)
val logs = new Slf4jLogConsumer(logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Utils {
10.seconds,
BigDecimal(1.0),
BackoffPolicy(100.millis, 1.second),
10.seconds
10.seconds,
10
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfigM[Id](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.concurrent.duration.{DurationLong, FiniteDuration}
import fs2.kafka._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.errors.RebalanceInProgressException

// snowplow
import com.snowplowanalytics.snowplow.streams.SourceAndAck
Expand Down Expand Up @@ -68,7 +69,9 @@ private[kafka] object KafkaSource {

val empty: KafkaCheckpoints = Map.empty
def ack(c: KafkaCheckpoints): F[Unit] =
kafkaConsumer.commitSync(c)
kafkaConsumer.commitSync(c).recoverWith { case _: RebalanceInProgressException =>
Logger[F].warn("Failed to commit offsets during rebalance, offsets will be lost and events may be reprocessed")
}
def nack(c: KafkaCheckpoints): F[Unit] = Applicative[F].unit
}

Expand Down Expand Up @@ -145,4 +148,5 @@ private[kafka] object KafkaSource {
.withProperties(config.consumerConf)
.withEnableAutoCommit(false)
.withCommitTimeout(config.commitTimeout)
.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
}
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ snowplow.defaults: {
maxBackoff: "1 second"
}
debounceCheckpoints: "10 seconds"
maxRetries: 10
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ case class KinesisSourceConfig(
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal,
checkpointThrottledBackoffPolicy: BackoffPolicy,
debounceCheckpoints: FiniteDuration
debounceCheckpoints: FiniteDuration,
maxRetries: Int
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import cats.effect.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.retries.api.RetryStrategy
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
Expand All @@ -36,14 +38,20 @@ private[source] object KCLScheduler {
config: KinesisSourceConfig,
queue: LinkedBlockingQueue[KCLAction],
client: SdkAsyncHttpClient
): Resource[F, Unit] =
): Resource[F, Unit] = {
val retryStrategy = AwsRetryStrategy
.standardRetryStrategy()
.toBuilder
.maxAttempts(config.maxRetries)
.build()
for {
kinesis <- mkKinesisClient[F](config.customEndpoint, client)
dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint, client)
cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint, client)
kinesis <- mkKinesisClient[F](config.customEndpoint, retryStrategy, client)
dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint, retryStrategy, client)
cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint, retryStrategy, client)
scheduler <- Resource.eval(mkScheduler(kinesis, dynamo, cloudWatch, config, queue))
_ <- runInBackground(scheduler)
} yield ()
}

private def mkScheduler[F[_]: Sync](
kinesisClient: KinesisAsyncClient,
Expand Down Expand Up @@ -124,6 +132,7 @@ private[source] object KCLScheduler {

private def mkKinesisClient[F[_]: Sync](
customEndpoint: Option[URI],
retryStrategy: RetryStrategy,
client: SdkAsyncHttpClient
): Resource[F, KinesisAsyncClient] =
Resource.fromAutoCloseable {
Expand All @@ -133,6 +142,7 @@ private[source] object KCLScheduler {
.defaultsMode(DefaultsMode.AUTO)
.httpClient(client)
.overrideConfiguration { c =>
c.retryStrategy(retryStrategy)
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
()
}
Expand All @@ -143,6 +153,7 @@ private[source] object KCLScheduler {

private def mkDynamoDbClient[F[_]: Sync](
customEndpoint: Option[URI],
retryStrategy: RetryStrategy,
client: SdkAsyncHttpClient
): Resource[F, DynamoDbAsyncClient] =
Resource.fromAutoCloseable {
Expand All @@ -152,6 +163,7 @@ private[source] object KCLScheduler {
.defaultsMode(DefaultsMode.AUTO)
.httpClient(client)
.overrideConfiguration { c =>
c.retryStrategy(retryStrategy)
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
()
}
Expand All @@ -162,6 +174,7 @@ private[source] object KCLScheduler {

private def mkCloudWatchClient[F[_]: Sync](
customEndpoint: Option[URI],
retryStrategy: RetryStrategy,
client: SdkAsyncHttpClient
): Resource[F, CloudWatchAsyncClient] =
Resource.fromAutoCloseable {
Expand All @@ -171,6 +184,7 @@ private[source] object KCLScheduler {
.defaultsMode(DefaultsMode.AUTO)
.httpClient(client)
.overrideConfiguration { c =>
c.retryStrategy(retryStrategy)
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class KinesisSourceConfigSpec extends Specification {
"minBackoff": "100 millis",
"maxBackoff": "1second"
},
"debounceCheckpoints": "42 seconds"
"debounceCheckpoints": "42 seconds",
"maxRetries": 10
}
"""

Expand Down Expand Up @@ -83,7 +84,8 @@ class KinesisSourceConfigSpec extends Specification {
"minBackoff": "100 millis",
"maxBackoff": "1second"
},
"debounceCheckpoints": "42 seconds"
"debounceCheckpoints": "42 seconds",
"maxRetries": 10
}
"""

Expand Down Expand Up @@ -127,7 +129,8 @@ class KinesisSourceConfigSpec extends Specification {
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
debounceCheckpoints = 10.seconds
debounceCheckpoints = 10.seconds,
maxRetries = 10
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
Loading