Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default('controller.user') }}"
password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down Expand Up @@ -209,6 +211,8 @@ invoker:
{% endif %}"
extraEnv: "{{ invoker_extraEnv | default({}) }}"
protocol: "{{ invoker_protocol | default('https') }}"
username: "{{ invoker_username | default('invoker.user') }}"
password: "{{ invoker_password | default('invoker.pass') }}"
ssl:
cn: "openwhisk-invokers"
keyPrefix: "{{ __invoker_ssl_keyPrefix }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"

"CONFIG_whisk_controller_username": "{{ controller.username }}"
"CONFIG_whisk_controller_password": "{{ controller.password }}"

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,9 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val whiskClusterName = "whisk.cluster.name"

val whiskControllerUsername = "whisk.controller.username"
val whiskControllerPassword = "whisk.controller.password"
val whiskInvokerUsername = "whisk.invoker.username"
val whiskInvokerPassword = "whisk.invoker.password"
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,42 @@ object StatusData extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}

case class UserMemoryMessage(userMemory: ByteSize) extends Message {
override def serialize = UserMemoryMessage.serdes.write(this).compactPrint
}

object UserMemoryMessage extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[UserMemoryMessage] {
override def write(message: UserMemoryMessage): JsValue = {
JsObject("userMemory" -> JsString(message.userMemory.toString))
}

override def read(json: JsValue): UserMemoryMessage = {
val userMemory = fromField[String](json, "userMemory")
new UserMemoryMessage(ByteSize.fromString(userMemory))
}
}

def parse(msg: String) = Try(serdes.read(msg.parseJson))
}

case class InvokerConfiguration(invoker: Int, memory: ByteSize)

object InvokerConfigurationProtocol extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[ByteSize] {
override def write(obj: ByteSize): JsValue = JsObject("memory" -> JsString(obj.toString))

override def read(json: JsValue): ByteSize = {
json match {
case JsString(memory) => ByteSize.fromString(memory)
case _ => throw new DeserializationException("Could not deserialize ByteSize")
}
}
}
implicit val invokerConfigurationFormat = jsonFormat2(InvokerConfiguration)
implicit val invokerConfigurationListJsonFormat = new RootJsonFormat[List[InvokerConfiguration]] {
def read(value: JsValue) = value.convertTo[List[InvokerConfiguration]]
def write(f: List[InvokerConfiguration]) = ???
}
}
4 changes: 4 additions & 0 deletions core/controller/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ whisk{
file-system : true
dir-path : "/swagger-ui/"
}
controller {
username: "controller.user"
password: "controller.pass"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand All @@ -32,8 +33,8 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.{InvokerConfiguration, MessagingProvider, UserMemoryMessage}
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
import org.apache.openwhisk.core.entitlement._
Expand Down Expand Up @@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configMemory
}

// initialize datastores
Expand Down Expand Up @@ -176,6 +177,41 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))

private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)

/**
* config user memory of ContainerPool
*/
import org.apache.openwhisk.core.connector.InvokerConfigurationProtocol._
private val configMemory = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "memory") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerUsername && password == controllerPassword) {
entity(as[String]) { memory =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we directly convert this to ConfigMemoryList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated.

val configMemoryList = memory.parseJson.convertTo[List[InvokerConfiguration]]
configMemoryList.find(config => MemoryLimit.MIN_MEMORY.compare(config.memory) > 0) match {
case Some(_) =>
complete(StatusCodes.BadRequest, s"user memory can't be less than ${MemoryLimit.MIN_MEMORY}")
case None =>
configMemoryList.foreach { config =>
val invoker = config.invoker
val userMemoryMessage = UserMemoryMessage(config.memory)
loadBalancer.sendChangeRequestToInvoker(userMemoryMessage, invoker)
}
complete(StatusCodes.Accepted)
}
}
} else {
complete(StatusCodes.Unauthorized, "username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send user memory to invokers
*
* @param userMemory
* @param targetInvokers
*/
def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
Expand Down Expand Up @@ -316,6 +317,21 @@ class ShardingContainerPoolBalancer(
}
}

/** send user memory to invokers */
override def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {
schedulingState.invokers.filter { invoker =>
invoker.id.instance == targetInvoker
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, userMemoryMessage).andThen {
case Success(_) =>
logging.info(this, s"successfully posted user memory configuration to topic $topic")
case Failure(_) =>
logging.error(this, s"failed posted user memory configuration to topic $topic")
}
}
}

override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
Expand Down
2 changes: 2 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ whisk {
}

invoker {
username: "invoker.user"
password: "invoker.pass"
protocol: http
}
runtime.delete.timeout = "30 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.{MessageFeed, UserMemoryMessage}
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
Expand All @@ -31,6 +31,8 @@ import scala.util.{Random, Try}

case class ColdStartKey(kind: String, memory: ByteSize)

object UserMemoryQuery

case object EmitMetrics

case object AdjustPrewarmedContainer
Expand Down Expand Up @@ -68,6 +70,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
var latestUserMemory = poolConfig.userMemory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can avoid using var if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. seems the lastUserMemory need to support changable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can take advantage of Option.

// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
Expand Down Expand Up @@ -209,7 +212,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
s"Rescheduling Run message, too many message in the pool, " +
s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
s"maxContainersMemory ${latestUserMemory.toMB} MB, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
s"waiting messages: ${runBuffer.size}")(r.msg.transid)
Expand Down Expand Up @@ -297,6 +300,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case userMemoryMessage: UserMemoryMessage =>
logging.info(
this,
s"user memory is reconfigured from ${latestUserMemory.toString} to ${userMemoryMessage.userMemory.toString}")
latestUserMemory = userMemoryMessage.userMemory
case UserMemoryQuery =>
sender() ! latestUserMemory.toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about sending the intact obejct directly and handle the type conversion in the InvokerServer layer?
I think this kind of type conversion is only related to the InvokerServer and the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm..
When we want to change invoker's user memory, in this pr, need to send the request to controller, there has one benefit that can modfiy many invoker's user memory with one request only. e.g.

curl  -u admin:admin -X POST http://xxx.xxx.xxx.xxx:10001/config/memory  -d '
[
{"invoker":0,"memory": "20480 MB"},
{"invoker":1,"memory": "10240 MB"}
{"invoker":2,"memory": "51200 MB"}
]
'

if send change invoker user memory request to invokerServer, if want to modify many invokers's user memory, need to send http request many times.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok since this is just a ByteSize I think we can take this approach.

case EmitMetrics =>
emitMetrics()

Expand Down Expand Up @@ -444,7 +454,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
def hasPoolSpaceFor[A](pool: Map[A, ContainerData],
prewarmStartingPool: Map[A, (String, ByteSize)],
memory: ByteSize): Boolean = {
memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB
memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= latestUserMemory.toMB
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
import pureconfig._
import spray.json.PrettyPrinter

import scala.concurrent.ExecutionContext

/**
* Implements web server to handle certain REST API calls.
*/
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
val actorSystem: ActorSystem,
val logger: Logging)
extends BasicRasService {

val invokerUsername = loadConfigOrThrow[String](ConfigKeys.whiskInvokerUsername)
val invokerPassword = loadConfigOrThrow[String](ConfigKeys.whiskInvokerPassword)

override def routes(implicit transid: TransactionId): Route = {
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password))
if username == invokerUsername && password == invokerPassword =>
(path("config" / "memory") & get) {
invoker.getUserMemory()
}
case _ =>
implicit val jsonPrettyResponsePrinter = PrettyPrinter
terminate(StatusCodes.Unauthorized)
}
}
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new DefaultInvokerServer(invoker)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker

import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
Expand Down Expand Up @@ -217,7 +218,9 @@ trait InvokerProvider extends Spi {
}

// this trait can be used to add common implementation
trait InvokerCore {}
trait InvokerCore {
def getUserMemory(): Route
}

/**
* An Spi for providing RestAPI implementation for invoker.
Expand All @@ -227,9 +230,3 @@ trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}

object DefaultInvokerServer extends InvokerServerProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala

override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new BasicRasService {}
}
Loading