diff --git a/build.gradle b/build.gradle index 5228625916d..2fc65378c96 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ subprojects { 'akka-discovery', 'akka-distributed-data', 'akka-protobuf', 'akka-remote', 'akka-slf4j', 'akka-stream', 'akka-stream-testkit', 'akka-testkit'] def akkaHttp = ['akka-http', 'akka-http-core', 'akka-http-spray-json', 'akka-http-testkit', 'akka-http-xml', - 'akka-parsing', 'akka-http2-support'] + 'akka-http2-support', 'akka-parsing'] akka.forEach { cons.add('compile', "com.typesafe.akka:${it}_${gradle.scala.depVersion}:${gradle.akka.version}") diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index 70d2fae517f..8e4e15b8bc6 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -20,6 +20,10 @@ whisk { use-cluster-bootstrap: false } loadbalancer { + strategy { + default = "" + custom = {} + } managed-fraction: 90% blackbox-fraction: 10% # factor to increase the timeout for forced active acks diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/MuxBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/MuxBalancer.scala new file mode 100644 index 00000000000..31a9592ab3a --- /dev/null +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/MuxBalancer.scala @@ -0,0 +1,94 @@ +package org.apache.openwhisk.core.loadBalancer + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.stream.ActorMaterializer +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} +import org.apache.openwhisk.core.WhiskConfig._ +import org.apache.openwhisk.core.connector.{ActivationMessage, MessagingProvider} +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.spi.SpiLoader +import pureconfig.loadConfigOrThrow +import spray.json._ +import pureconfig._ +import pureconfig.generic.auto._ + +import scala.concurrent.Future + +class MuxBalancer(config: WhiskConfig, + feedFactory: FeedFactory, + controllerInstance: ControllerInstanceId, + implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider], + override val lbConfig: ShardingContainerPoolBalancerConfig = + loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer) + extends CommonLoadBalancer(config, feedFactory, controllerInstance) { + + private val defaultLoadBalancer = + getClass[LoadBalancerProvider](lbConfig.strategy.default).instance(config, controllerInstance) + private val customLoadBalancerMap: Map[String, LoadBalancer] = + lbConfig.strategy.custom.foldLeft(Map.empty[String, LoadBalancer]) { + case (result, (name, strategyConfig)) => + result + (name -> getClass[LoadBalancerProvider](strategyConfig.className).instance(config, controllerInstance)) + } + + /** + * Instantiates an object of the given type. + * + * Similar to SpiLoader.get, with the difference that the constructed class does not need to be declared as Spi. + * Thus there could be multiple classes implementing same interface constructed at the same time + * + * @param name the name of the class + * @tparam A expected type to return + * @return instance of the class + */ + private def getClass[A](name: String): A = { + val clazz = Class.forName(name + "$") + val classInst = clazz.getField("MODULE$").get(clazz).asInstanceOf[A] + classInst + } + + override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth]) + override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = { + // Currently do nothing + } + override protected val invokerPool: ActorRef = actorSystem.actorOf(Props.empty) + + /** + * Publish a message to the loadbalancer + * + * Select the LoadBalancer based on the annotation, if available, otherwise use the default one + **/ + override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { + action.annotations.get("activationStrategy") match { + case None => + defaultLoadBalancer.publish(action, msg) + case Some(JsString(value)) => { + if (customLoadBalancerMap.contains(value)) { + customLoadBalancerMap(value).publish(action, msg) + } else { + defaultLoadBalancer.publish(action, msg) + } + } + case Some(_) => defaultLoadBalancer.publish(action, msg) + } + } +} + +object MuxBalancer extends LoadBalancerProvider { + + override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): LoadBalancer = { + + new MuxBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance) + } + + def requiredProperties = + ExecManifest.requiredProperties ++ + wskApiHost +} diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index d4432438911..528a27de7b8 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -599,11 +599,28 @@ case class ClusterConfig(useClusterBootstrap: Boolean) * @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon) * @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon) */ -case class ShardingContainerPoolBalancerConfig(managedFraction: Double, +case class ShardingContainerPoolBalancerConfig(strategy: ActivationStrategy, + managedFraction: Double, blackboxFraction: Double, timeoutFactor: Int, timeoutAddon: FiniteDuration) +/** + * Configuration for the annotation-based load balancer multiplexer + * + * @param default the default strategy to be used if nothing is configured for the given annotation + * @param custom the Map of the strategy name to strategy configuration + */ +case class ActivationStrategy(default: String, + custom: Map[String, StrategyConfig]) + +/** + * Configuration for the strategy + * + * @param className indicates the class which will handle this strategy name + */ +case class StrategyConfig(className: String) + /** * State kept for each activation slot until completion. * diff --git a/core/standalone/src/main/resources/standalone.conf b/core/standalone/src/main/resources/standalone.conf index 0817680007a..0aaf320617e 100644 --- a/core/standalone/src/main/resources/standalone.conf +++ b/core/standalone/src/main/resources/standalone.conf @@ -36,7 +36,7 @@ whisk { spi { ArtifactStoreProvider = "org.apache.openwhisk.core.database.memory.MemoryArtifactStoreProvider" MessagingProvider = "org.apache.openwhisk.connector.lean.LeanMessagingProvider" - LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.LeanBalancer" + LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.MuxBalancer" # Use cli based log store for all setups as its more stable to use # and does not require root user access LogStoreProvider = "org.apache.openwhisk.core.containerpool.docker.DockerCliLogStoreProvider" @@ -56,6 +56,12 @@ whisk { limits-actions-invokes-concurrent = 30 } + loadbalancer { + strategy { + default = "org.apache.openwhisk.core.loadBalancer.LeanBalancer" + } + } + controller { protocol = http diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala index 2c6790868e5..4143486f456 100644 --- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala +++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala @@ -27,7 +27,7 @@ import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.WhiskConfig import org.apache.openwhisk.core.WhiskConfig.kafkaHosts import org.apache.openwhisk.core.entity.ControllerInstanceId -import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider} +import org.apache.openwhisk.core.loadBalancer.{LoadBalancer, LoadBalancerProvider, MuxBalancer} import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd} import scala.concurrent.{ExecutionContext, Future} @@ -109,11 +109,12 @@ class KafkaLauncher( } object KafkaAwareLeanBalancer extends LoadBalancerProvider { - override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts + override def requiredProperties: Map[String, String] = MuxBalancer.requiredProperties ++ kafkaHosts - override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem, - logging: Logging): LoadBalancer = - LeanBalancer.instance(whiskConfig, instance) + override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging): LoadBalancer = + MuxBalancer.instance(whiskConfig, instance) } object KafkaLauncher { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MockLoadBalancer.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MockLoadBalancer.scala new file mode 100644 index 00000000000..81c6a6b26fe --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MockLoadBalancer.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.loadBalancer.test + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.connector.ActivationMessage +import org.apache.openwhisk.core.entity.{ + ActivationId, + ControllerInstanceId, + ExecManifest, + ExecutableWhiskActionMetaData, + UUID, + WhiskActivation +} +import org.apache.openwhisk.core.loadBalancer.{InvokerHealth, LoadBalancer, LoadBalancerProvider} +import org.apache.openwhisk.core.WhiskConfig._ + +import scala.concurrent.Future + +class MockLoadBalancer(prefix: String) extends LoadBalancer { + override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth]) + override def clusterSize: Int = 1 + override def totalActiveActivations: Future[Int] = Future.successful(1) + override def activeActivationsFor(namespace: UUID): Future[Int] = + Future.successful(0) + override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( + implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { + Future.successful(Future.successful(Left(ActivationId(prefix + "-mockLoadBalancerId0")))) + } +} + +object MockLoadBalancerCustom extends LoadBalancerProvider { + override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): LoadBalancer = { + + new MockLoadBalancer("custom") + } + + def requiredProperties = + ExecManifest.requiredProperties ++ + wskApiHost +} + +object MockLoadBalancerDefault extends LoadBalancerProvider { + override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): LoadBalancer = { + + new MockLoadBalancer("default") + } + + def requiredProperties = + ExecManifest.requiredProperties ++ + wskApiHost +} \ No newline at end of file diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MuxBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MuxBalancerTests.scala new file mode 100644 index 00000000000..7819650f2fc --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/MuxBalancerTests.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.loadBalancer.test + +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem} +import akka.stream.ActorMaterializer +import akka.testkit.TestProbe +import common.StreamLogging +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.test.ExecHelpers +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.loadBalancer._ +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.junit.JUnitRunner + +import scala.collection.immutable.Map +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +/** + * Unit tests for the MuxBalancer object. + * + */ +@RunWith(classOf[JUnitRunner]) +class MuxBalancerTests extends FlatSpec with Matchers with StreamLogging with ExecHelpers with MockFactory { + behavior of "Mux Balancer" + + def lbConfig(activationStrategy: ActivationStrategy) = + ShardingContainerPoolBalancerConfig(activationStrategy, 1.0 - 0.5, 0.5, 1, 1.minute) + + implicit val transId: TransactionId = TransactionId.testing + + val feedProbe = new FeedFactory { + def createFeed(f: ActorRefFactory, m: MessagingProvider, p: (Array[Byte]) => Future[Unit]) = + TestProbe().testActor + + } + val invokerPoolProbe = new InvokerPoolFactory { + override def createInvokerPool( + actorRefFactory: ActorRefFactory, + messagingProvider: MessagingProvider, + messagingProducer: MessageProducer, + sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata], + monitor: Option[ActorRef]): ActorRef = + TestProbe().testActor + } + + def mockMessaging(): MessagingProvider = { + val messaging = stub[MessagingProvider] + val producer = stub[MessageProducer] + val consumer = stub[MessageConsumer] + (messaging + .getProducer(_: WhiskConfig, _: Option[ByteSize])(_: Logging, _: ActorSystem)) + .when(*, *, *, *) + .returns(producer) + (messaging + .getConsumer(_: WhiskConfig, _: String, _: String, _: Int, _: FiniteDuration)(_: Logging, _: ActorSystem)) + .when(*, *, *, *, *, *, *) + .returns(consumer) + (producer + .send(_: String, _: Message, _: Int)) + .when(*, *, *) + .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0))) + + messaging + } + + it should "execute correct LoadBalancer for the default activation strategy" in { + behave like asssertActivation( + ActivationStrategy("org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerDefault", Map()), + Parameters(), + Left(ActivationId("default-mockLoadBalancerId0"))) + } + + it should "execute correct LoadBalancer for the custom activation strategy" in { + behave like asssertActivation( + ActivationStrategy( + "org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerDefault", + Map( + "customLBStrategy01" -> StrategyConfig( + "org.apache.openwhisk.core.loadBalancer.test.MockLoadBalancerCustom"))), + Parameters("activationStrategy", "customLBStrategy01"), + Left(ActivationId("custom-mockLoadBalancerId0"))) + } + + def asssertActivation(activationStrategy: ActivationStrategy, + annotations: Parameters, + expected: Either[ActivationId, WhiskActivation]) = { + val slots = 10 + val memoryPerSlot = MemoryLimit.MIN_MEMORY + val memory = memoryPerSlot * slots + val config = new WhiskConfig(ExecManifest.requiredProperties) + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val balancer: LoadBalancer = + new MuxBalancer(config, feedProbe, ControllerInstanceId("0"), mockMessaging, lbConfig(activationStrategy)) + val namespace = EntityPath("testspace") + val name = EntityName("testname") + val invocationNamespace = EntityName("invocationSpace") + val concurrency = 5 + val actionMem = 256.MB + val uuid = UUID() + val aid = ActivationId.generate() + val actionMetaData = + WhiskActionMetaData( + namespace, + name, + js10MetaData(Some("jsMain"), false), + limits = actionLimits(actionMem, concurrency), + annotations = annotations) + + val msg = ActivationMessage( + TransactionId.testing, + actionMetaData.fullyQualifiedName(true), + actionMetaData.rev, + Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret())), + aid, + ControllerInstanceId("0"), + blocking = false, + content = None, + initArgs = Set.empty, + lockedArgs = Map.empty) + + val activation = balancer.publish(actionMetaData.toExecutableWhiskAction.get, msg) + Await.ready(activation, 10.seconds) + activation.onComplete(result => { + result.get.onComplete(activation => { + activation.get shouldBe expected + }) + }) + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index 907264c825f..edfe68e6678 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -94,6 +94,7 @@ class ShardingContainerPoolBalancerTests def lbConfig(blackboxFraction: Double, managedFraction: Option[Double] = None) = ShardingContainerPoolBalancerConfig( + ActivationStrategy("org.apache.openwhisk.core.loadBalancer.LeanBalancer", Map()), managedFraction.getOrElse(1.0 - blackboxFraction), blackboxFraction, 1,