Skip to content

Commit edcb2c1

Browse files
committed
DEVX-533: Implement LoadBalancer Strategy based on annotation
- addressed code review comments
1 parent 71ea516 commit edcb2c1

File tree

3 files changed

+32
-22
lines changed

3 files changed

+32
-22
lines changed
Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@ import spray.json._
1212

1313
import scala.concurrent.Future
1414

15-
class KindBasedLoadBalancer(config: WhiskConfig,
16-
feedFactory: FeedFactory,
17-
controllerInstance: ControllerInstanceId,
18-
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
15+
class MuxBalancer(config: WhiskConfig,
16+
feedFactory: FeedFactory,
17+
controllerInstance: ControllerInstanceId,
18+
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
1919
implicit actorSystem: ActorSystem,
20-
logging: Logging,
21-
materializer: ActorMaterializer)
20+
logging: Logging)
2221
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
2322

2423
private val balancers = lbConfig.strategy.foldLeft(Map.empty[String, LoadBalancer]) {
@@ -37,10 +36,13 @@ class KindBasedLoadBalancer(config: WhiskConfig,
3736
}
3837
override protected val invokerPool: ActorRef = actorSystem.actorOf(Props.empty)
3938

40-
/** 1. Publish a message to the loadbalancer */
39+
/**
40+
* Publish a message to the loadbalancer
41+
*
42+
* Select the LoadBalancer based on the annotation, if available, otherwise use the default one
43+
**/
4144
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
4245
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
43-
logging.info(this, "We are here")
4446
action.annotations.get("activationStrategy") match {
4547
case None =>
4648
balancers("default").publish(action, msg)
@@ -51,26 +53,18 @@ class KindBasedLoadBalancer(config: WhiskConfig,
5153
balancers("default").publish(action, msg)
5254
}
5355
}
54-
case (Some(_)) => balancers("default").publish(action, msg)
56+
case Some(_) => balancers("default").publish(action, msg)
5557
}
56-
57-
// if (action.annotations.get("activationStrategy").isDefined && balancers.contains(action.annotations.get("activationStrategy").get.toString.)
58-
// {
59-
// balancers().publish(action, msg)
60-
// } else {
61-
// balancers("default").publish(action, msg)
62-
// }
6358
}
6459
}
6560

66-
object KindBasedLoadBalancer extends LoadBalancerProvider {
61+
object MuxBalancer extends LoadBalancerProvider {
6762

6863
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
6964
implicit actorSystem: ActorSystem,
70-
logging: Logging,
71-
materializer: ActorMaterializer): LoadBalancer = {
65+
logging: Logging): LoadBalancer = {
7266

73-
new KindBasedLoadBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
67+
new MuxBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
7468
}
7569

7670
def requiredProperties =

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,12 +598,28 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
598598
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
599599
* @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
600600
*/
601-
case class ShardingContainerPoolBalancerConfig(strategy: Map[String, String],
601+
case class ShardingContainerPoolBalancerConfig(strategy: ActivationStrategy,
602602
managedFraction: Double,
603603
blackboxFraction: Double,
604604
timeoutFactor: Int,
605605
timeoutAddon: FiniteDuration)
606606

607+
/**
608+
* Configuration for the annotation-based load balancer multiplexer
609+
*
610+
* @param default the default strategy to be used if nothing is configured for the given annotation
611+
* @param custom the Map of the strategy name to strategy configuration
612+
*/
613+
case class ActivationStrategy(default: String,
614+
custom: Map[String, StrategyConfig])
615+
616+
/**
617+
* Configuration for the strategy
618+
*
619+
* @param className indicates the class which will handle this strategy name
620+
*/
621+
case class StrategyConfig(className: String)
622+
607623
/**
608624
* State kept for each activation slot until completion.
609625
*

core/standalone/src/main/resources/standalone.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ whisk {
3636
spi {
3737
ArtifactStoreProvider = "org.apache.openwhisk.core.database.memory.MemoryArtifactStoreProvider"
3838
MessagingProvider = "org.apache.openwhisk.connector.lean.LeanMessagingProvider"
39-
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.KindBasedLoadBalancer"
39+
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.MuxBalancer"
4040
# Use cli based log store for all setups as its more stable to use
4141
# and does not require root user access
4242
LogStoreProvider = "org.apache.openwhisk.core.containerpool.docker.DockerCliLogStoreProvider"

0 commit comments

Comments
 (0)