Skip to content

Commit 71ea516

Browse files
committed
DEVX-533: Implement LoadBalancer Strategy based on annotation
1 parent 1753946 commit 71ea516

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

core/controller/src/main/resources/reference.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ whisk {
2020
use-cluster-bootstrap: false
2121
}
2222
loadbalancer {
23+
strategy {
24+
"default" = org.apache.openwhisk.core.loadBalancer.LeanBalancer
25+
"multitenantrouting" = org.apache.openwhisk.core.loadBalancer.LeanBalancer
26+
}
2327
managed-fraction: 90%
2428
blackbox-fraction: 10%
2529
# factor to increase the timeout for forced active acks
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.apache.openwhisk.core.loadBalancer
2+
3+
import akka.actor.{ActorRef, ActorSystem, Props}
4+
import akka.stream.ActorMaterializer
5+
import org.apache.openwhisk.common.{Logging, TransactionId}
6+
import org.apache.openwhisk.core.WhiskConfig
7+
import org.apache.openwhisk.core.WhiskConfig._
8+
import org.apache.openwhisk.core.connector.{ActivationMessage, MessagingProvider}
9+
import org.apache.openwhisk.core.entity._
10+
import org.apache.openwhisk.spi.SpiLoader
11+
import spray.json._
12+
13+
import scala.concurrent.Future
14+
15+
class KindBasedLoadBalancer(config: WhiskConfig,
16+
feedFactory: FeedFactory,
17+
controllerInstance: ControllerInstanceId,
18+
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
19+
implicit actorSystem: ActorSystem,
20+
logging: Logging,
21+
materializer: ActorMaterializer)
22+
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
23+
24+
private val balancers = lbConfig.strategy.foldLeft(Map.empty[String, LoadBalancer]) {
25+
case (result, (name, lbClass)) => result + (name -> getClass(lbClass))
26+
}
27+
28+
def getClass[A](name: String): A = {
29+
logging.info(this, "'" + name + "'$")
30+
val clazz = Class.forName(name + "$")
31+
clazz.getField("MODULE$").get(clazz).asInstanceOf[A]
32+
}
33+
34+
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
35+
override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = {
36+
// Currently do nothing
37+
}
38+
override protected val invokerPool: ActorRef = actorSystem.actorOf(Props.empty)
39+
40+
/** 1. Publish a message to the loadbalancer */
41+
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
42+
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
43+
logging.info(this, "We are here")
44+
action.annotations.get("activationStrategy") match {
45+
case None =>
46+
balancers("default").publish(action, msg)
47+
case Some(JsString(value)) => {
48+
if (balancers.contains(value)) {
49+
balancers(value).publish(action, msg)
50+
} else {
51+
balancers("default").publish(action, msg)
52+
}
53+
}
54+
case (Some(_)) => balancers("default").publish(action, msg)
55+
}
56+
57+
// if (action.annotations.get("activationStrategy").isDefined && balancers.contains(action.annotations.get("activationStrategy").get.toString.)
58+
// {
59+
// balancers().publish(action, msg)
60+
// } else {
61+
// balancers("default").publish(action, msg)
62+
// }
63+
}
64+
}
65+
66+
object KindBasedLoadBalancer extends LoadBalancerProvider {
67+
68+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
69+
implicit actorSystem: ActorSystem,
70+
logging: Logging,
71+
materializer: ActorMaterializer): LoadBalancer = {
72+
73+
new KindBasedLoadBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
74+
}
75+
76+
def requiredProperties =
77+
ExecManifest.requiredProperties ++
78+
wskApiHost
79+
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,8 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
598598
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
599599
* @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
600600
*/
601-
case class ShardingContainerPoolBalancerConfig(managedFraction: Double,
601+
case class ShardingContainerPoolBalancerConfig(strategy: Map[String, String],
602+
managedFraction: Double,
602603
blackboxFraction: Double,
603604
timeoutFactor: Int,
604605
timeoutAddon: FiniteDuration)

core/standalone/src/main/resources/standalone.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ whisk {
3636
spi {
3737
ArtifactStoreProvider = "org.apache.openwhisk.core.database.memory.MemoryArtifactStoreProvider"
3838
MessagingProvider = "org.apache.openwhisk.connector.lean.LeanMessagingProvider"
39-
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
39+
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.KindBasedLoadBalancer"
4040
# Use cli based log store for all setups as its more stable to use
4141
# and does not require root user access
4242
LogStoreProvider = "org.apache.openwhisk.core.containerpool.docker.DockerCliLogStoreProvider"

0 commit comments

Comments
 (0)