Skip to content

Commit 41e8e0d

Browse files
committed
Kinesis source: make idleTimeBetweenReadsInMillis configurable for polling
1 parent 71748db commit 41e8e0d

File tree

5 files changed

+14
-9
lines changed

5 files changed

+14
-9
lines changed

modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ object Utils {
8888
streamName,
8989
UUID.randomUUID.toString,
9090
KinesisSourceConfig.InitialPosition.TrimHorizon,
91-
KinesisSourceConfig.Retrieval.Polling(1),
91+
KinesisSourceConfig.Retrieval.Polling(1, 1500.millis),
9292
Some(endpoint),
9393
Some(endpoint),
9494
Some(endpoint),

modules/kinesis/src/main/resources/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ snowplow.defaults: {
88
retrievalMode: {
99
type: "Polling"
1010
maxRecords: 1000
11+
idleTimeBetweenReads: "1500 millis"
1112
}
1213
leaseDuration: "10 seconds"
1314
maxLeasesToStealAtOneTimeFactor: 2.0

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/KinesisSourceConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ object KinesisSourceConfig {
7070
sealed trait Retrieval
7171

7272
object Retrieval {
73-
case class Polling(maxRecords: Int) extends Retrieval
73+
case class Polling(maxRecords: Int, idleTimeBetweenReads: FiniteDuration) extends Retrieval
7474
case object FanOut extends Retrieval
7575

7676
private[KinesisSourceConfig] def decoder(implicit c: Configuration) = deriveConfiguredDecoder[Retrieval]

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/source/KCLScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ private[source] object KCLScheduler {
7979
kinesisConfig.retrievalMode match {
8080
case KinesisSourceConfig.Retrieval.FanOut =>
8181
new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName)
82-
case KinesisSourceConfig.Retrieval.Polling(maxRecords) =>
83-
val c = new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
82+
case KinesisSourceConfig.Retrieval.Polling(maxRecords, idleTimeBetweenReads) =>
83+
val c = new PollingConfig(kinesisConfig.streamName, kinesisClient)
84+
.maxRecords(maxRecords)
85+
.idleTimeBetweenReadsInMillis(idleTimeBetweenReads.toMillis)
8486
c.recordsFetcherFactory.maxPendingProcessRecordsInput(1)
8587
c
8688
}

modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/streams/kinesis/KinesisSourceConfigSpec.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class KinesisSourceConfigSpec extends Specification {
3434
"workerIdentifier": "my-identifier",
3535
"retrievalMode": {
3636
"type": "Polling",
37-
"maxRecords": 42
37+
"maxRecords": 42,
38+
"idleTimeBetweenReads": "1500 millis"
3839
},
3940
"initialPosition": {
4041
"type": "TrimHorizon"
@@ -56,7 +57,7 @@ class KinesisSourceConfigSpec extends Specification {
5657
c.streamName must beEqualTo("my-stream"),
5758
c.workerIdentifier must beEqualTo("my-identifier"),
5859
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
59-
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
60+
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42, 1500.millis)),
6061
c.leaseDuration must beEqualTo(20.seconds),
6162
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
6263
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
@@ -73,7 +74,8 @@ class KinesisSourceConfigSpec extends Specification {
7374
"workerIdentifier": "my-identifier",
7475
"retrievalMode": {
7576
"type": "POLLING",
76-
"maxRecords": 42
77+
"maxRecords": 42,
78+
"idleTimeBetweenReads": "1500 millis"
7779
},
7880
"initialPosition": {
7981
"type": "TRIM_HORIZON"
@@ -95,7 +97,7 @@ class KinesisSourceConfigSpec extends Specification {
9597
c.streamName must beEqualTo("my-stream"),
9698
c.workerIdentifier must beEqualTo("my-identifier"),
9799
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
98-
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
100+
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42, 1500.millis)),
99101
c.leaseDuration must beEqualTo(20.seconds),
100102
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
101103
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
@@ -122,7 +124,7 @@ class KinesisSourceConfigSpec extends Specification {
122124
streamName = "my-stream",
123125
workerIdentifier = System.getenv("HOSTNAME"),
124126
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
125-
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
127+
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000, 1500.millis),
126128
customEndpoint = None,
127129
dynamodbCustomEndpoint = None,
128130
cloudwatchCustomEndpoint = None,

0 commit comments

Comments
 (0)