Skip to content

Commit e095587

Browse files
committed
Adjust user memory via api
1 parent 74c5418 commit e095587

File tree

15 files changed

+439
-93
lines changed

15 files changed

+439
-93
lines changed

ansible/group_vars/all

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ controller:
105105
authentication:
106106
spi: "{{ controller_authentication_spi | default('') }}"
107107
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
108+
username: "{{ controller_username | default('controller.user') }}"
109+
password: "{{ controller_password | default('controller.pass') }}"
108110
entitlement:
109111
spi: "{{ controller_entitlement_spi | default('') }}"
110112
protocol: "{{ controller_protocol | default('https') }}"

ansible/roles/controller/tasks/deploy.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case class ControllerCredentials(username: String, password: String)

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,6 @@ object ConfigKeys {
275275
val parameterStorage = "whisk.parameter-storage"
276276

277277
val azBlob = "whisk.azure-blob"
278+
279+
val controllerCredentials = "whisk.credentials.controller"
278280
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,22 @@ object EventMessage extends DefaultJsonProtocol {
426426

427427
def parse(msg: String) = Try(format.read(msg.parseJson))
428428
}
429+
430+
case class ByteSizeMessage(userMemory: ByteSize) extends Message {
431+
override def serialize = ByteSizeMessage.serdes.write(this).compactPrint
432+
}
433+
434+
object ByteSizeMessage extends DefaultJsonProtocol {
435+
implicit val serdes = new RootJsonFormat[ByteSizeMessage] {
436+
override def write(message: ByteSizeMessage): JsValue = {
437+
JsObject("userMemory" -> JsString(message.userMemory.toString))
438+
}
439+
440+
override def read(json: JsValue): ByteSizeMessage = {
441+
val userMemory = fromField[String](json, "userMemory")
442+
new ByteSizeMessage(ByteSize.fromString(userMemory))
443+
}
444+
}
445+
446+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
447+
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
2424
import akka.http.scaladsl.model.StatusCodes._
25-
import akka.http.scaladsl.model.Uri
25+
import akka.http.scaladsl.model.{StatusCodes, Uri}
26+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2627
import akka.http.scaladsl.server.Route
2728
import akka.stream.ActorMaterializer
2829
import kamon.Kamon
@@ -31,9 +32,16 @@ import pureconfig.generic.auto._
3132
import spray.json.DefaultJsonProtocol._
3233
import spray.json._
3334
import org.apache.openwhisk.common.Https.HttpsConfig
34-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
36-
import org.apache.openwhisk.core.connector.MessagingProvider
35+
import org.apache.openwhisk.common.{
36+
AkkaLogging,
37+
ConfigMXBean,
38+
ControllerCredentials,
39+
Logging,
40+
LoggingMarkers,
41+
TransactionId
42+
}
43+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
44+
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessagingProvider}
3745
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3846
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
3947
import org.apache.openwhisk.core.entitlement._
@@ -97,7 +105,7 @@ class Controller(val instance: ControllerInstanceId,
97105
(pathEndOrSingleSlash & get) {
98106
complete(info)
99107
}
100-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
108+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configMemory
101109
}
102110

103111
// initialize datastores
@@ -176,6 +184,63 @@ class Controller(val instance: ControllerInstanceId,
176184
LogLimit.config,
177185
runtimes,
178186
List(apiV1.basepath()))
187+
188+
private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)
189+
190+
/**
191+
* config user memory of ContainerPool
192+
*/
193+
private val configMemory = {
194+
implicit val executionContext = actorSystem.dispatcher
195+
(path("config" / "memory") & post) {
196+
extractCredentials {
197+
case Some(BasicHttpCredentials(username, password)) =>
198+
if (username == controllerCredentials.username && password == controllerCredentials.password) {
199+
entity(as[String]) { memory =>
200+
try {
201+
val userMemoryMessage = ByteSizeMessage(ByteSize.fromString(memory))
202+
if (userMemoryMessage.userMemory.size == 0) {
203+
complete(StatusCodes.BadRequest, "user memory must be positive")
204+
} else {
205+
parameter('limit.?) { limit =>
206+
limit match {
207+
case Some(targetValue) =>
208+
val pattern = """\d+:\d"""
209+
if (targetValue.matches(pattern)) {
210+
val invokerArray = targetValue.split(":")
211+
val beginIndex = invokerArray(0).toInt
212+
val finishIndex = invokerArray(1).toInt
213+
if (finishIndex < beginIndex) {
214+
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
215+
} else {
216+
val targetInvokers = (beginIndex to finishIndex).toList
217+
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, Some(targetInvokers))
218+
logging.info(this, "config user memory request is already sent to target invokers")
219+
complete(StatusCodes.Accepted)
220+
}
221+
} else {
222+
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
223+
}
224+
case None =>
225+
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, None)
226+
logging.info(this, "config user memory request is already sent to all invokers")
227+
complete(StatusCodes.Accepted)
228+
}
229+
}
230+
}
231+
} catch {
232+
case ex: IllegalArgumentException =>
233+
logging.info(this, s"error message: ${ex.getMessage}")
234+
complete(StatusCodes.BadRequest, ex.getMessage)
235+
}
236+
}
237+
} else {
238+
complete(StatusCodes.Unauthorized, "username or password is wrong")
239+
}
240+
case _ => complete(StatusCodes.Unauthorized)
241+
}
242+
}
243+
}
179244
}
180245

181246
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send user memory to invokers
65+
*
66+
* @param userMemory
67+
* @param targetInvokers
68+
*/
69+
def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,21 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send user memory to invokers */
321+
override def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {
322+
schedulingState.invokers.filter { invoker =>
323+
targetInvokers.getOrElse(schedulingState.invokers.map(_.id.instance)).contains(invoker.id.instance)
324+
} foreach { invokerHealth =>
325+
val topic = s"invoker${invokerHealth.id.toInt}"
326+
messageProducer.send(topic, userMemoryMessage).andThen {
327+
case Success(_) =>
328+
logging.info(this, s"successfully posted user memory configuration to topic $topic")
329+
case Failure(_) =>
330+
logging.error(this, s"failed posted user memory configuration to topic $topic")
331+
}
332+
}
333+
}
334+
319335
override val invokerPool =
320336
invokerPoolFactory.createInvokerPool(
321337
actorSystem,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.MessageFeed
22+
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessageFeed}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
@@ -37,6 +37,8 @@ case class ColdStartKey(kind: String, memory: ByteSize)
3737

3838
case class WorkerData(data: ContainerData, state: WorkerState)
3939

40+
object UserMemoryQuery
41+
4042
case object EmitMetrics
4143

4244
case object AdjustPrewarmedContainer
@@ -74,6 +76,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
7476
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
7577
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
7678
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
79+
var latestUserMemory = poolConfig.userMemory
7780
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7881
// buffered here to keep order of computation.
7982
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -219,7 +222,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
219222
s"Rescheduling Run message, too many message in the pool, " +
220223
s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
221224
s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
222-
s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
225+
s"maxContainersMemory ${latestUserMemory.toMB} MB, " +
223226
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
224227
s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
225228
s"waiting messages: ${runBuffer.size}")(r.msg.transid)
@@ -305,6 +308,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
305308
case RescheduleJob =>
306309
freePool = freePool - sender()
307310
busyPool = busyPool - sender()
311+
case message: ByteSizeMessage =>
312+
logging.info(
313+
this,
314+
s"user memory is reconfigured from ${latestUserMemory.toString} to ${message.userMemory.toString}")
315+
latestUserMemory = message.userMemory
316+
case UserMemoryQuery =>
317+
sender() ! latestUserMemory.toString
308318
case EmitMetrics =>
309319
emitMetrics()
310320

@@ -444,7 +454,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
444454
* @return true, if there is enough space for the given amount of memory.
445455
*/
446456
def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
447-
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
457+
memoryConsumptionOf(pool) + memory.toMB <= latestUserMemory.toMB
448458
}
449459

450460
/**
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.server.Route
22+
import org.apache.openwhisk.common.{Logging, TransactionId}
23+
24+
import org.apache.openwhisk.http.BasicRasService
25+
26+
import scala.concurrent.ExecutionContext
27+
28+
/**
29+
* Implements web server to handle certain REST API calls.
30+
*/
31+
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
32+
val actorSystem: ActorSystem,
33+
val logger: Logging)
34+
extends BasicRasService {
35+
36+
override def routes(implicit transid: TransactionId): Route = {
37+
super.routes ~ {
38+
(path("config" / "memory") & get) {
39+
invoker.getUserMemory()
40+
}
41+
}
42+
}
43+
}
44+
45+
object DefaultInvokerServer extends InvokerServerProvider {
46+
override def instance(
47+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
48+
new DefaultInvokerServer(invoker)
49+
}

0 commit comments

Comments
 (0)