Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
# -- kafka bootstrap servers
"bootstrapServers": "localhost:9092"

# -- How frequently to commit our progress back to kafka. By increasing this value,
# -- we decrease the number of requests made to the kafka broker
debounceCommitOffsets: "10 seconds"

# -- Any valid Kafka consumer config options
consumerConf: {
"group.id": "snowplow-bigquery-loader"
Expand Down
4 changes: 4 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
"minBackoff": "100 millis"
"maxBackoff": "1 second"
}

## -- How frequently to checkpoint our progress to the DynamoDB table. By increasing this value,
## -- we can decrease the write-throughput requirements of the DynamoDB table
debounceCheckpoints: "10 seconds"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ class KafkaConfigSpec extends Specification with CatsEffect {
object KafkaConfigSpec {
private val minimalConfig = Config[KafkaSourceConfig, KafkaSinkConfig](
input = KafkaSourceConfig(
topicName = "sp-dev-enriched",
bootstrapServers = "localhost:9092",
topicName = "sp-dev-enriched",
bootstrapServers = "localhost:9092",
debounceCommitOffsets = 10.seconds,
consumerConf = Map(
"group.id" -> "snowplow-bigquery-loader",
"allow.auto.create.topics" -> "false",
Expand Down Expand Up @@ -131,8 +132,9 @@ object KafkaConfigSpec {

private val extendedConfig = Config[KafkaSourceConfig, KafkaSinkConfig](
input = KafkaSourceConfig(
topicName = "sp-dev-enriched",
bootstrapServers = "localhost:9092",
topicName = "sp-dev-enriched",
bootstrapServers = "localhost:9092",
debounceCommitOffsets = 10.seconds,
consumerConf = Map(
"group.id" -> "snowplow-bigquery-loader",
"enable.auto.commit" -> "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object KinesisConfigSpec {
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal("2.0"),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
debounceCheckpoints = 10.seconds
),
output = Config.Output(
good = Config.BigQuery(
Expand Down Expand Up @@ -142,7 +143,8 @@ object KinesisConfigSpec {
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal("2.0"),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
debounceCheckpoints = 10.seconds
),
output = Config.Output(
good = Config.BigQuery(
Expand Down
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ object Dependencies {

// java
val slf4j = "2.0.12"
val azureSdk = "1.11.4"
val azureSdk = "1.15.1"
val sentry = "6.25.2"
val awsSdk2 = "2.29.0"
val bigqueryStorage = "2.47.0"
val bigquery = "2.34.2"
val awsSdk2 = "2.30.17"
val bigqueryStorage = "3.11.2"
val bigquery = "2.47.0"

// Snowplow
val streams = "0.10.0"
val streams = "0.11.0"
val igluClient = "4.0.0"

// tests
Expand Down
Loading