diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 5c98c4ddc01..b76665c71a1 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -41,6 +41,26 @@ akka.http { preview.enable-http2 = on parsing.illegal-header-warnings = off } + + cluster { + use-dispatcher = "dispatchers.heartbeat-dispatcher" + failure-detector { + # How often keep-alive heartbeat messages should be sent to each connection. + heartbeat-interval = 1s + + # Number of potentially lost/delayed heartbeats that will be + # accepted before considering it to be an anomaly. + # This margin is important to be able to survive sudden, occasional, + # pauses in heartbeat arrivals, due to for example garbage collect or + # network drop. + acceptable-heartbeat-pause = 5s + + # After the heartbeat request has been sent the first failure detection + # will start after this period, even though no heartbeat message has + # been received. + expected-response-after = 1s + } + } } #kamon related configuration @@ -72,7 +92,7 @@ kamon { service = "openwhisk-statsd" } metric { - tick-interval = 1 second + tick-interval = 10 second } statsd { diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf index cdf692e4c3e..29c6d5b3a39 100644 --- a/common/scala/src/main/resources/reference.conf +++ b/common/scala/src/main/resources/reference.conf @@ -86,4 +86,17 @@ dispatchers { type = PinnedDispatcher executor = "thread-pool-executor" } + + # This is for akka-cluster heartbeat. Since heartbeat is a periodic light-weight message, + # fork-join executor should be enough + heartbeat-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 10 + } + throughput = 100 + } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Scheduler.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Scheduler.scala index 48e6a07847b..39e5905ddf5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Scheduler.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Scheduler.scala @@ -17,16 +17,11 @@ package org.apache.openwhisk.common +import akka.actor.{Actor, ActorSystem, Cancellable, Props} + import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import scala.util.Try - -import akka.actor.Actor -import akka.actor.ActorSystem -import akka.actor.Cancellable -import akka.actor.Props +import scala.util.{Failure, Success, Try} /** * Scheduler utility functions to execute tasks in a repetitive way with controllable behavior @@ -122,4 +117,25 @@ object Scheduler { require(interval > Duration.Zero) system.actorOf(Props(new Worker(initialDelay, interval, true, name, f))) } + + /** + * Schedules a closure to run continuously scheduled, with at least the given interval in between runs using the dispatcher. + * This waits until the Future of the closure has finished, ignores its result and then waits for the + * given interval. + * + * @param interval the time to wait between two runs of the closure + * @param initialDelay optionally delay the first scheduled iteration by given duration + * @param dispatcher the dispatcher to handle this scheduled work + * @param f the function to run + */ + def scheduleWaitAtLeastWith(interval: FiniteDuration, + initialDelay: FiniteDuration = Duration.Zero, + name: String = "Scheduler", + dispatcher: String)(f: () => Future[Any])( + implicit system: ActorSystem, + logging: Logging, + transid: TransactionId = TransactionId.unknown) = { + require(interval > Duration.Zero) + system.actorOf(Props(new Worker(initialDelay, interval, true, name, f)).withDispatcher(dispatcher)) + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index 0889cf2f53e..dee82c35525 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -65,7 +65,7 @@ class FPCPoolBalancer(config: WhiskConfig, private implicit val executionContext: ExecutionContext = actorSystem.dispatcher // This value is given according to the total waiting time at QueueManager for a new queue to be created. - private implicit val requestTimeout: Timeout = Timeout(8.seconds) + private implicit val requestTimeout: Timeout = Timeout(1.seconds) private val entityStore = WhiskEntityStore.datastore() diff --git a/core/scheduler/src/main/resources/reference.conf b/core/scheduler/src/main/resources/reference.conf new file mode 100644 index 00000000000..c5b7992af76 --- /dev/null +++ b/core/scheduler/src/main/resources/reference.conf @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +dispatchers { + # A custom dispatcher for the queue manager + queue-manager-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 1 + parallelism-factor = 1 + parallelism-max = 15 + } + throughput = 5 + } + + # A custom dispatcher for memory queues. + memory-queue-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 10 + parallelism-factor = 2 + parallelism-max = 60 + } + throughput = 5 + } + + # A custom dispatcher for monitoring actors of memory queues. + monitoring-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 10 + parallelism-factor = 2 + parallelism-max = 60 + } + throughput = 5 + } +} diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index 701b392b286..17a94e9ebab 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -47,7 +47,6 @@ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.grpc.ActivationServiceHandler import org.apache.openwhisk.http.BasicHttpService import org.apache.openwhisk.spi.SpiLoader -import org.apache.openwhisk.utils.ExecutionContextFactory import pureconfig.generic.auto._ import pureconfig.loadConfigOrThrow import spray.json.{DefaultJsonProtocol, _} @@ -287,9 +286,8 @@ object Scheduler { } def main(args: Array[String]): Unit = { - implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() - implicit val actorSystem: ActorSystem = - ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec)) + implicit val actorSystem: ActorSystem = ActorSystem("scheduler-actor-system") + implicit val ec = actorSystem.dispatcher implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this)) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 21c77719488..b6731de4521 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -148,7 +148,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, extends FSM[MemoryQueueState, MemoryQueueData] with Stash { - private implicit val ec: ExecutionContextExecutor = context.dispatcher + private implicit val ec: ExecutionContextExecutor = + context.system.dispatchers.lookup("dispatchers.memory-queue-dispatcher") private implicit val actorSystem: ActorSystem = context.system private implicit val timeout = Timeout(5.seconds) private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId) @@ -181,7 +182,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, private[queue] var limit: Option[Int] = None private[queue] var initialized = false - private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () => + private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 10.seconds) { () => MetricEmitter.emitGaugeMetric( LoggingMarkers .SCHEDULER_QUEUE_WAITING_ACTIVATION(invocationNamespace, action.asString, action.toStringWithoutVersion), @@ -926,7 +927,9 @@ class MemoryQueue(private val etcdClient: EtcdClient, // since there is no initial delay, it will try to create a container at initialization time // these schedulers will run forever and stop when the memory queue stops private def startMonitoring(): (ActorRef, ActorRef) = { - val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () => + val droppingScheduler = Scheduler.scheduleWaitAtLeastWith( + schedulingConfig.dropInterval, + dispatcher = "dispatchers.monitoring-dispatcher") { () => checkToDropStaleActivation( clock, queue, @@ -939,7 +942,9 @@ class MemoryQueue(private val etcdClient: EtcdClient, Future.successful(()) } - val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () => + val monitoringScheduler = Scheduler.scheduleWaitAtLeastWith( + schedulingConfig.checkInterval, + dispatcher = "dispatchers.monitoring-dispatcher") { () => // the average duration is updated every checkInterval if (averageDurationBuffer.nonEmpty) { averageDuration = Some(averageDurationBuffer.average) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index 6d0eed87566..f2695e101cb 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -91,7 +91,7 @@ class QueueManager( private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]() private implicit val askTimeout = Timeout(5.seconds) - private implicit val ec = context.dispatcher + private implicit val ec = context.system.dispatchers.lookup("dispatchers.queue-manager-dispatcher") private implicit val system = context.system private val watcherName = "queue-manager"