Skip to content

Commit 97c9f7a

Browse files
authored
Add RestartSource to restart failing Kafka consumer for user-events service (#4887)
* Add RestartSource for user-events service * Add missing configuration
1 parent a4122fd commit 97c9f7a

File tree

6 files changed

+76
-19
lines changed

6 files changed

+76
-19
lines changed

core/monitoring/user-events/src/main/resources/application.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
# limitations under the License.
1616
#
1717

18+
akka.kafka.committer {
19+
20+
max-batch = 20
21+
22+
}
23+
1824
akka.kafka.consumer {
1925
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
2026
# can be defined in this configuration section.

core/monitoring/user-events/src/main/resources/reference.conf

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,21 @@ whisk {
3131
# rename/relabel prometheus metrics tags
3232
# "namespace" = "ow_namespae"
3333
}
34+
35+
retry {
36+
# minimum (initial) duration until the Kafka consumer is started again if it is terminated
37+
min-backoff = 3 secs
38+
39+
# the exponential back-off is capped to this duration
40+
max-backoff = 30 secs
41+
42+
# after calculation of the exponential back-off an additional
43+
# random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay
44+
random-factor = 0.2
45+
46+
# the amount of restarts is capped to this amount within a time frame of minBackoff
47+
max-restarts = 10
48+
}
49+
3450
}
3551
}

core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
package org.apache.openwhisk.core.monitoring.metrics
1919

2020
import java.lang.management.ManagementFactory
21+
import java.util.concurrent.atomic.AtomicReference
2122

2223
import akka.Done
2324
import akka.actor.ActorSystem
2425
import akka.kafka.scaladsl.{Committer, Consumer}
25-
import akka.kafka.scaladsl.Consumer.DrainingControl
2626
import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
2727
import akka.stream.ActorMaterializer
28-
import akka.stream.scaladsl.Keep
28+
import akka.stream.scaladsl.{RestartSource, Sink}
2929
import javax.management.ObjectName
3030
import org.apache.kafka.clients.consumer.ConsumerConfig
3131
import kamon.Kamon
@@ -77,25 +77,34 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
7777

7878
def shutdown(): Future[Done] = {
7979
lagRecorder.cancel()
80-
control.drainAndShutdown()(system.dispatcher)
80+
control.get().drainAndShutdown(result)(system.dispatcher)
8181
}
8282

83-
def isRunning: Boolean = !control.isShutdown.isCompleted
84-
85-
override def metrics(): Future[Map[MetricName, common.Metric]] = control.metrics
86-
87-
private val committerSettings = CommitterSettings(system).withMaxBatch(20)
88-
89-
//TODO Use RestartSource
90-
private val control: DrainingControl[Done] = Consumer
91-
.committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
92-
.map { msg =>
93-
processEvent(msg.record.value())
94-
msg.committableOffset
83+
def isRunning: Boolean = !control.get().isShutdown.isCompleted
84+
85+
override def metrics(): Future[Map[MetricName, common.Metric]] = control.get().metrics
86+
87+
private val committerSettings = CommitterSettings(system)
88+
private val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
89+
90+
private val result = RestartSource
91+
.onFailuresWithBackoff(
92+
minBackoff = metricConfig.retry.minBackoff,
93+
maxBackoff = metricConfig.retry.maxBackoff,
94+
randomFactor = metricConfig.retry.randomFactor,
95+
maxRestarts = metricConfig.retry.maxRestarts) { () =>
96+
Consumer
97+
.committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
98+
// this is to access to the Consumer.Control
99+
// instances of the latest Kafka Consumer source
100+
.mapMaterializedValue(c => control.set(c))
101+
.map { msg =>
102+
processEvent(msg.record.value())
103+
msg.committableOffset
104+
}
105+
.via(Committer.flow(committerSettings))
95106
}
96-
.toMat(Committer.sink(committerSettings))(Keep.both)
97-
.mapMaterializedValue(DrainingControl.apply)
98-
.run()
107+
.runWith(Sink.ignore)
99108

100109
private val lagRecorder =
101110
system.scheduler.schedule(10.seconds, 10.seconds)(lagGauge.update(consumerLag))

core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,18 @@ import org.apache.kafka.common.serialization.StringDeserializer
2929
import pureconfig._
3030
import pureconfig.generic.auto._
3131

32+
import scala.concurrent.duration.FiniteDuration
3233
import scala.concurrent.{ExecutionContext, Future}
3334

3435
object OpenWhiskEvents extends SLF4JLogging {
3536

3637
case class MetricConfig(port: Int,
3738
enableKamon: Boolean,
3839
ignoredNamespaces: Set[String],
39-
renameTags: Map[String, String])
40+
renameTags: Map[String, String],
41+
retry: RetryConfig)
42+
43+
case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)
4044

4145
def start(config: Config)(implicit system: ActorSystem,
4246
materializer: ActorMaterializer): Future[Http.ServerBinding] = {

core/monitoring/user-events/src/test/resources/application.conf

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,20 @@ user-events {
2929
rename-tags {
3030
#namespace = "ow_namespace"
3131
}
32+
33+
retry {
34+
# minimum (initial) duration until the Kafka consumer is started again if it is terminated
35+
min-backoff = 3 secs
36+
37+
# the exponential back-off is capped to this duration
38+
max-backoff = 30 secs
39+
40+
# after calculation of the exponential back-off an additional
41+
# random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay
42+
random-factor = 0.2
43+
44+
# the amount of restarts is capped to this amount within a time frame of minBackoff
45+
max-restarts = 10
46+
}
47+
3248
}

core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
9999
| rename-tags {
100100
| namespace = "ow_namespace"
101101
| }
102+
| retry {
103+
| min-backoff = 3 secs
104+
| max-backoff = 30 secs
105+
| random-factor = 0.2
106+
| max-restarts = 10
107+
| }
102108
| }
103109
| }
104110
""".stripMargin)

0 commit comments

Comments
 (0)