diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 555ad881044..d16aec01d89 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -107,6 +107,8 @@ controller: authentication: spi: "{{ controller_authentication_spi | default('') }}" loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}" + username: "{{ controller_username | default('controller.user') }}" + password: "{{ controller_password | default('controller.pass') }}" entitlement: spi: "{{ controller_entitlement_spi | default('') }}" protocol: "{{ controller_protocol | default('https') }}" @@ -209,6 +211,8 @@ invoker: {% endif %}" extraEnv: "{{ invoker_extraEnv | default({}) }}" protocol: "{{ invoker_protocol | default('https') }}" + username: "{{ invoker_username | default('invoker.user') }}" + password: "{{ invoker_password | default('invoker.pass') }}" ssl: cn: "openwhisk-invokers" keyPrefix: "{{ __invoker_ssl_keyPrefix }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 64724c41d54..e79adeab631 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -203,6 +203,9 @@ "CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}" "CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}" + "CONFIG_whisk_controller_username": "{{ controller.username }}" + "CONFIG_whisk_controller_password": "{{ controller.password }}" + "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}" "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" "LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}" diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index ea4ce48114b..f893f7bb4f7 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -279,6 +279,8 @@ "CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}" "CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}" "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}" + "CONFIG_whisk_invoker_username": "{{ invoker.username }}" + "CONFIG_whisk_invoker_password": "{{ invoker.password }}" - name: extend invoker dns env set_fact: diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index ba15f5f2342..b5c4f5e8dfa 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -291,4 +291,9 @@ object ConfigKeys { val azBlob = "whisk.azure-blob" val whiskClusterName = "whisk.cluster.name" + + val whiskControllerUsername = "whisk.controller.username" + val whiskControllerPassword = "whisk.controller.password" + val whiskInvokerUsername = "whisk.invoker.username" + val whiskInvokerPassword = "whisk.invoker.password" } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index 60522586842..b9ff43941be 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -457,3 +457,42 @@ object StatusData extends DefaultJsonProtocol { implicit val serdes = jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data") } + +case class UserMemoryMessage(userMemory: ByteSize) extends Message { + override def serialize = UserMemoryMessage.serdes.write(this).compactPrint +} + +object UserMemoryMessage extends DefaultJsonProtocol { + implicit val serdes = new RootJsonFormat[UserMemoryMessage] { + override def write(message: UserMemoryMessage): JsValue = { + JsObject("userMemory" -> JsString(message.userMemory.toString)) + } + + override def read(json: JsValue): UserMemoryMessage = { + val userMemory = fromField[String](json, "userMemory") + new UserMemoryMessage(ByteSize.fromString(userMemory)) + } + } + + def parse(msg: String) = Try(serdes.read(msg.parseJson)) +} + +case class InvokerConfiguration(invoker: Int, memory: ByteSize) + +object InvokerConfigurationProtocol extends DefaultJsonProtocol { + implicit val serdes = new RootJsonFormat[ByteSize] { + override def write(obj: ByteSize): JsValue = JsObject("memory" -> JsString(obj.toString)) + + override def read(json: JsValue): ByteSize = { + json match { + case JsString(memory) => ByteSize.fromString(memory) + case _ => throw new DeserializationException("Could not deserialize ByteSize") + } + } + } + implicit val invokerConfigurationFormat = jsonFormat2(InvokerConfiguration) + implicit val invokerConfigurationListJsonFormat = new RootJsonFormat[List[InvokerConfiguration]] { + def read(value: JsValue) = value.convertTo[List[InvokerConfiguration]] + def write(f: List[InvokerConfiguration]) = ??? + } +} diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf index 8358ced2109..7d3ef190d43 100644 --- a/core/controller/src/main/resources/application.conf +++ b/core/controller/src/main/resources/application.conf @@ -117,4 +117,8 @@ whisk{ file-system : true dir-path : "/swagger-ui/" } + controller { + username: "controller.user" + password: "controller.pass" + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 935219685ed..db57b7677f9 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging.InfoLevel import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.{StatusCodes, Uri} +import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import kamon.Kamon @@ -32,8 +33,8 @@ import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId} -import org.apache.openwhisk.core.WhiskConfig -import org.apache.openwhisk.core.connector.MessagingProvider +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} +import org.apache.openwhisk.core.connector.{InvokerConfiguration, MessagingProvider, UserMemoryMessage} import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation} import org.apache.openwhisk.core.entitlement._ @@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId, (pathEndOrSingleSlash & get) { complete(info) } - } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth + } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configMemory } // initialize datastores @@ -176,6 +177,41 @@ class Controller(val instance: ControllerInstanceId, LogLimit.config, runtimes, List(apiV1.basepath())) + + private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername) + private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword) + + /** + * config user memory of ContainerPool + */ + import org.apache.openwhisk.core.connector.InvokerConfigurationProtocol._ + private val configMemory = { + implicit val executionContext = actorSystem.dispatcher + (path("config" / "memory") & post) { + extractCredentials { + case Some(BasicHttpCredentials(username, password)) => + if (username == controllerUsername && password == controllerPassword) { + entity(as[String]) { memory => + val configMemoryList = memory.parseJson.convertTo[List[InvokerConfiguration]] + configMemoryList.find(config => MemoryLimit.MIN_MEMORY.compare(config.memory) > 0) match { + case Some(_) => + complete(StatusCodes.BadRequest, s"user memory can't be less than ${MemoryLimit.MIN_MEMORY}") + case None => + configMemoryList.foreach { config => + val invoker = config.invoker + val userMemoryMessage = UserMemoryMessage(config.memory) + loadBalancer.sendChangeRequestToInvoker(userMemoryMessage, invoker) + } + complete(StatusCodes.Accepted) + } + } + } else { + complete(StatusCodes.Unauthorized, "username or password is wrong") + } + case _ => complete(StatusCodes.Unauthorized) + } + } + } } /** diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index 20225127c3c..80d4502d139 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -60,6 +60,14 @@ trait LoadBalancer { def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] + /** + * send user memory to invokers + * + * @param userMemory + * @param targetInvokers + */ + def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {} + /** * Returns a message indicating the health of the containers and/or container pool in general. * diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index f06cb04279b..262752e7931 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success} /** * A loadbalancer that schedules workload based on a hashing-algorithm. @@ -316,6 +317,21 @@ class ShardingContainerPoolBalancer( } } + /** send user memory to invokers */ + override def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = { + schedulingState.invokers.filter { invoker => + invoker.id.instance == targetInvoker + } foreach { invokerHealth => + val topic = s"invoker${invokerHealth.id.toInt}" + messageProducer.send(topic, userMemoryMessage).andThen { + case Success(_) => + logging.info(this, s"successfully posted user memory configuration to topic $topic") + case Failure(_) => + logging.error(this, s"failed posted user memory configuration to topic $topic") + } + } + } + override val invokerPool = invokerPoolFactory.createInvokerPool( actorSystem, diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index be683338593..737b7294a9b 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -171,6 +171,8 @@ whisk { } invoker { + username: "invoker.user" + password: "invoker.pass" protocol: http } runtime.delete.timeout = "30 seconds" diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 724cd5971ed..c76f4f72ac8 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} -import org.apache.openwhisk.core.connector.MessageFeed +import org.apache.openwhisk.core.connector.{MessageFeed, UserMemoryMessage} import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ @@ -31,6 +31,8 @@ import scala.util.{Random, Try} case class ColdStartKey(kind: String, memory: ByteSize) +object UserMemoryQuery + case object EmitMetrics case object AdjustPrewarmedContainer @@ -68,6 +70,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, var busyPool = immutable.Map.empty[ActorRef, ContainerData] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData] var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)] + var latestUserMemory = poolConfig.userMemory // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be // buffered here to keep order of computation. // Otherwise actions with small memory-limits could block actions with large memory limits. @@ -209,7 +212,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, s"Rescheduling Run message, too many message in the pool, " + s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " + s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " + - s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " + + s"maxContainersMemory ${latestUserMemory.toMB} MB, " + s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " + s"needed memory: ${r.action.limits.memory.megabytes} MB, " + s"waiting messages: ${runBuffer.size}")(r.msg.transid) @@ -297,6 +300,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, case RescheduleJob => freePool = freePool - sender() busyPool = busyPool - sender() + case userMemoryMessage: UserMemoryMessage => + logging.info( + this, + s"user memory is reconfigured from ${latestUserMemory.toString} to ${userMemoryMessage.userMemory.toString}") + latestUserMemory = userMemoryMessage.userMemory + case UserMemoryQuery => + sender() ! latestUserMemory.toString case EmitMetrics => emitMetrics() @@ -444,7 +454,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, def hasPoolSpaceFor[A](pool: Map[A, ContainerData], prewarmStartingPool: Map[A, (String, ByteSize)], memory: ByteSize): Boolean = { - memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB + memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= latestUserMemory.toMB } /** diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala new file mode 100644 index 00000000000..a639e7f22ef --- /dev/null +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.invoker + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.server.Route +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.http.BasicRasService +import org.apache.openwhisk.http.ErrorResponse.terminate +import pureconfig._ +import spray.json.PrettyPrinter + +import scala.concurrent.ExecutionContext + +/** + * Implements web server to handle certain REST API calls. + */ +class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext, + val actorSystem: ActorSystem, + val logger: Logging) + extends BasicRasService { + + val invokerUsername = loadConfigOrThrow[String](ConfigKeys.whiskInvokerUsername) + val invokerPassword = loadConfigOrThrow[String](ConfigKeys.whiskInvokerPassword) + + override def routes(implicit transid: TransactionId): Route = { + super.routes ~ extractCredentials { + case Some(BasicHttpCredentials(username, password)) + if username == invokerUsername && password == invokerPassword => + (path("config" / "memory") & get) { + invoker.getUserMemory() + } + case _ => + implicit val jsonPrettyResponsePrinter = PrettyPrinter + terminate(StatusCodes.Unauthorized) + } + } +} + +object DefaultInvokerServer extends InvokerServerProvider { + override def instance( + invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = + new DefaultInvokerServer(invoker) +} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 1b0c8bf797e..52382809a5c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker import akka.Done import akka.actor.{ActorSystem, CoordinatedShutdown} +import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import com.typesafe.config.ConfigValueFactory import kamon.Kamon @@ -217,7 +218,9 @@ trait InvokerProvider extends Spi { } // this trait can be used to add common implementation -trait InvokerCore {} +trait InvokerCore { + def getUserMemory(): Route +} /** * An Spi for providing RestAPI implementation for invoker. @@ -227,9 +230,3 @@ trait InvokerServerProvider extends Spi { def instance( invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService } - -object DefaultInvokerServer extends InvokerServerProvider { - override def instance( - invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = - new BasicRasService {} -} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 6aa088444e9..13b9287a069 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -23,7 +23,11 @@ import java.time.Instant import akka.Done import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} import akka.event.Logging.InfoLevel +import akka.http.scaladsl.server.Route +import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer +import akka.pattern.ask +import akka.util.Timeout import org.apache.openwhisk.common._ import org.apache.openwhisk.common.tracing.WhiskTracerProvider import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} @@ -221,38 +225,46 @@ class InvokerReactive( /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { - Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) + Future( + ActivationMessage + .parse(new String(bytes, StandardCharsets.UTF_8)) + .orElse(UserMemoryMessage.parse(new String(bytes, StandardCharsets.UTF_8)))) .flatMap(Future.fromTry) - .flatMap { msg => - // The message has been parsed correctly, thus the following code needs to *always* produce at least an - // active-ack. - - implicit val transid: TransactionId = msg.transid - - //set trace context to continue tracing - WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) - - if (!namespaceBlacklist.isBlacklisted(msg.user)) { - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - handleActivationMessage(msg) - } else { - // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol - // Due to the protective nature of the blacklist, a database entry is not written. + .flatMap { + case msg: ActivationMessage => + // The message has been parsed correctly, thus the following code needs to *always* produce at least an + // active-ack. + + implicit val transid: TransactionId = msg.transid + + //set trace context to continue tracing + WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) + + if (!namespaceBlacklist.isBlacklisted(msg.user)) { + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) + handleActivationMessage(msg) + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + activationFeed ! MessageFeed.Processed + + val activation = + generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) + ack( + msg.transid, + activation, + false, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(transid, activation, instance)) + + logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") + Future.successful(()) + } + case msg: UserMemoryMessage => + pool ! msg activationFeed ! MessageFeed.Processed - - val activation = - generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack( - msg.transid, - activation, - false, - msg.rootControllerIndex, - msg.user.namespace.uuid, - CombinedCompletionAndResultMessage(transid, activation, instance)) - - logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") Future.successful(()) - } } .recoverWith { case t => @@ -299,4 +311,11 @@ class InvokerReactive( } }) + override def getUserMemory(): Route = { + complete { + pool + .ask(UserMemoryQuery)(Timeout(5.seconds)) + .mapTo[String] + } + } } diff --git a/docs/operation.md b/docs/operation.md new file mode 100644 index 00000000000..93e3d617cd7 --- /dev/null +++ b/docs/operation.md @@ -0,0 +1,33 @@ + + +# User memory configuration of containerPool +## Change user memory to all invokers via controller. e.g. +``` +curl -u ${username}:${password} -X POST http://${controllerAddress}:${controllerPort}/config/memory -d '1024 MB' +``` +Note: you can add `?limit` to specify target invokers, e.g. specify invoker0 and invoker1 +``` +curl -u ${username}:${password} -X POST http://${controllerAddress}:${controllerPort}/config/memory?limit=0:1 -d '1024 MB' +``` +## Get user memory info on assigned invoker +``` +curl -u ${username}:${password} -X GET 'http://${invokerAddress}:${invokerPort}/config/memory' +1024 MB +``` diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index cdae2bddaa7..bd140a050e0 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -64,6 +64,8 @@ whisk { } controller { + username = {{ controller.username }} + password = {{ controller.password }} protocol = {{ controller.protocol }} https { keystore-flavor = "{{ controller.ssl.storeFlavor }}" @@ -73,6 +75,8 @@ whisk { } } invoker { + username = {{ invoker.username }} + password = {{ invoker.password }} protocol = {{ invoker.protocol }} https { keystore-flavor = "{{ invoker.ssl.storeFlavor }}" @@ -81,6 +85,7 @@ whisk { client-auth = "{{ invoker.ssl.clientAuth }}" } } + user-events { enabled = {{ user_events }} } diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java index 05d8b322638..05ce08b889b 100644 --- a/tests/src/test/scala/common/WhiskProperties.java +++ b/tests/src/test/scala/common/WhiskProperties.java @@ -262,6 +262,10 @@ public static String getBaseControllerHost() { return getControllerHosts().split(",")[0]; } + public static String getBaseInvokerAddress(){ + return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort"); + } + public static String getBaseDBHost() { return getDBHosts().split(",")[0]; } diff --git a/tests/src/test/scala/org/apache/openwhisk/operation/UserMemoryConfigurationTests.scala b/tests/src/test/scala/org/apache/openwhisk/operation/UserMemoryConfigurationTests.scala new file mode 100644 index 00000000000..dae68f2b14e --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/operation/UserMemoryConfigurationTests.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.operation + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials} +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, StatusCodes} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import common._ +import common.rest.HttpConnection +import org.apache.openwhisk.core.entity.ByteSize +import org.apache.openwhisk.core.entity.size.SizeInt +import org.junit.runner.RunWith +import org.scalatest.Matchers +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import pureconfig.loadConfigOrThrow +import system.rest.RestUtil + +@RunWith(classOf[JUnitRunner]) +class UserMemoryConfigurationTests + extends TestHelpers + with RestUtil + with Matchers + with ScalaFutures + with WskActorSystem + with StreamLogging { + + implicit val materializer = ActorMaterializer() + + val invokerProtocol = loadConfigOrThrow[String]("whisk.invoker.protocol") + val invokerAddress = WhiskProperties.getBaseInvokerAddress + + val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol") + val controllerAddress = WhiskProperties.getBaseControllerAddress + private val controllerUsername = loadConfigOrThrow[String]("whisk.controller.username") + private val controllerPassword = loadConfigOrThrow[String]("whisk.controller.password") + val controllerAuthHeader = Authorization(BasicHttpCredentials(controllerUsername, controllerPassword)) + private val invokerUsername = loadConfigOrThrow[String]("whisk.invoker.username") + private val invokerPassword = loadConfigOrThrow[String]("whisk.invoker.password") + val invokerAuthHeader = Authorization(BasicHttpCredentials(invokerUsername, invokerPassword)) + + val getUserMemoryUrl = s"${invokerProtocol}://${invokerAddress}/config/memory" + val controllerChangeUserMemoryUrl = s"${controllerProtocol}://${controllerAddress}/config/memory" + + it should "change invoker's user memory" in { + Http() + .singleRequest( + HttpRequest(method = HttpMethods.GET, uri = s"${getUserMemoryUrl}"), + connectionContext = HttpConnection.getContext(invokerProtocol)) + .map { response => + response.status shouldBe StatusCodes.OK + val orignalUserMemory = ByteSize.fromString(Unmarshal(response).to[String].futureValue) + val changedUserMemory = orignalUserMemory + 128.MB + Http() + .singleRequest( + HttpRequest( + method = HttpMethods.POST, + uri = s"${controllerChangeUserMemoryUrl}", + headers = List(controllerAuthHeader), + entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, changedUserMemory.toString)), + connectionContext = HttpConnection.getContext(controllerProtocol)) + .map { response => + response.status shouldBe StatusCodes.Accepted + + // Make sure the user memory's configuration is changed + Thread.sleep(2000) + + Http() + .singleRequest( + HttpRequest(method = HttpMethods.GET, uri = s"${getUserMemoryUrl}", headers = List(invokerAuthHeader)), + connectionContext = HttpConnection.getContext(invokerProtocol)) + .map { response => + response.status shouldBe StatusCodes.OK + ByteSize + .fromString(Unmarshal(response).to[String].futureValue) + .toBytes shouldBe changedUserMemory.toBytes + } + } + } + } +}