Skip to content

Commit 30149a0

Browse files
committed
Config runtime
Sometimes, admin may want to reinitalize the runtime config depend on the real requirements, e.g. increase some prewarm containers
1 parent e3c7a13 commit 30149a0

File tree

16 files changed

+485
-90
lines changed

16 files changed

+485
-90
lines changed

ansible/group_vars/all

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ controller:
105105
authentication:
106106
spi: "{{ controller_authentication_spi | default('') }}"
107107
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
108+
username: "{{ controller_username | default('controller.user') }}"
109+
password: "{{ controller_password | default('controller.pass') }}"
108110
entitlement:
109111
spi: "{{ controller_entitlement_spi | default('') }}"
110112
protocol: "{{ controller_protocol | default('https') }}"
@@ -201,6 +203,8 @@ invoker:
201203
runcdir: "{{ invoker_runcdir | default('/run/docker/runtime-runc/moby') }}"
202204
volumes: "{{ invoker_docker_volumes | default([]) }}"
203205
loglevel: "{{ invoker_loglevel | default(whisk_loglevel) | default('INFO') }}"
206+
username: "{{ invoker_username | default('invoker.user') }}"
207+
password: "{{ invoker_password | default('invoker.pass') }}"
204208
jmxremote:
205209
jvmArgs: "{% if inventory_hostname in groups['invokers'] %}
206210
{{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ invokerHostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortInvoker + groups['invokers'].index(inventory_hostname) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortInvoker + groups['invokers'].index(inventory_hostname) }}

ansible/roles/controller/tasks/deploy.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@
266266
"CONFIG_whisk_timeLimit_min": "{{ limit_action_time_min | default() }}"
267267
"CONFIG_whisk_timeLimit_max": "{{ limit_action_time_max | default() }}"
268268
"CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}"
269+
"CONFIG_whisk_credentials_invoker_username": "{{ invoker.username }}"
270+
"CONFIG_whisk_credentials_invoker_password": "{{ invoker.password }}"
269271
"CONFIG_whisk_concurrencyLimit_min": "{{ limit_action_concurrency_min | default() }}"
270272
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
271273
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case class ControllerCredentials(username: String, password: String)
21+
22+
case class InvokerCredentials(username: String, password: String)

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,7 @@ object ConfigKeys {
266266
val swaggerUi = "whisk.swagger-ui"
267267

268268
val apacheClientConfig = "whisk.apache-client"
269+
270+
val controllerCredentials = "whisk.credentials.controller"
271+
val invokerCredentials = "whisk.credentials.invoker"
269272
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,30 @@ object EventMessage extends DefaultJsonProtocol {
429429

430430
def parse(msg: String) = Try(format.read(msg.parseJson))
431431
}
432+
433+
case class RuntimeMessage(runtime: String) extends Message {
434+
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
435+
}
436+
437+
object RuntimeMessage extends DefaultJsonProtocol {
438+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
439+
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
440+
}
441+
442+
case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
443+
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
444+
}
445+
446+
object PrewarmContainerData extends DefaultJsonProtocol {
447+
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
448+
}
449+
450+
case class PrewarmContainerDataList(items: List[PrewarmContainerData])
451+
452+
object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
453+
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
454+
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
455+
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
456+
def write(f: PrewarmContainerDataList) = ???
457+
}
458+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ protected[core] object ExecManifest {
5555
mf
5656
}
5757

58+
/**
59+
* Reads runtimes manifest from runtime string
60+
*
61+
* @param runtime
62+
* @return the manifest if initialized successfully, or an failure
63+
*/
64+
protected[core] def initialize(runtime: String): Try[Runtimes] = {
65+
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
66+
val mf = Try(runtime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
67+
var manifest: Option[Runtimes] = None
68+
mf.foreach(m => manifest = Some(m))
69+
mf
70+
}
71+
5872
/**
5973
* Gets existing runtime manifests.
6074
*

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24-
import akka.http.scaladsl.model.Uri
24+
import akka.http.scaladsl.model.{StatusCodes, Uri}
25+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2526
import akka.http.scaladsl.server.Route
2627
import akka.stream.ActorMaterializer
2728
import kamon.Kamon
@@ -30,8 +31,15 @@ import pureconfig.generic.auto._
3031
import spray.json.DefaultJsonProtocol._
3132
import spray.json._
3233
import org.apache.openwhisk.common.Https.HttpsConfig
33-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
34-
import org.apache.openwhisk.core.WhiskConfig
34+
import org.apache.openwhisk.common.{
35+
AkkaLogging,
36+
ConfigMXBean,
37+
ControllerCredentials,
38+
Logging,
39+
LoggingMarkers,
40+
TransactionId
41+
}
42+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3543
import org.apache.openwhisk.core.connector.MessagingProvider
3644
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3745
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -96,7 +104,7 @@ class Controller(val instance: ControllerInstanceId,
96104
(pathEndOrSingleSlash & get) {
97105
complete(info)
98106
}
99-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
107+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
100108
}
101109

102110
// initialize datastores
@@ -163,6 +171,56 @@ class Controller(val instance: ControllerInstanceId,
163171
LogLimit.config,
164172
runtimes,
165173
List(apiV1.basepath()))
174+
175+
private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)
176+
177+
/**
178+
* config runtime
179+
*/
180+
private val configRuntime = {
181+
implicit val executionContext = actorSystem.dispatcher
182+
(path("config" / "runtime") & post) {
183+
extractCredentials {
184+
case Some(BasicHttpCredentials(username, password)) =>
185+
if (username == controllerCredentials.username && password == controllerCredentials.password) {
186+
entity(as[String]) { runtime =>
187+
val execManifest = ExecManifest.initialize(runtime)
188+
if (execManifest.isFailure) {
189+
logging.error(this, s"Received invalid runtimes manifest")
190+
complete(s"Received invalid runtimes manifest")
191+
} else {
192+
parameter('limit.?) { limit =>
193+
limit match {
194+
case Some(targetValue) =>
195+
val pattern = "\\d+:\\d"
196+
if (targetValue.matches(pattern)) {
197+
val invokerArray = targetValue.split(":")
198+
val beginIndex = invokerArray(0).toInt
199+
val finishIndex = invokerArray(1).toInt
200+
if (finishIndex < beginIndex) {
201+
complete(s"finishIndex can't be less than beginIndex")
202+
} else {
203+
val targetInvokers = (beginIndex to finishIndex).toList
204+
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
205+
complete(s"config runtime request is already sent to target invokers")
206+
}
207+
} else {
208+
complete(s"limit value can't match [beginIndex:finishIndex]")
209+
}
210+
case None =>
211+
loadBalancer.sendRuntimeToInvokers(runtime, None)
212+
complete(s"config runtime request is already sent to all managed invokers")
213+
}
214+
}
215+
}
216+
}
217+
} else {
218+
complete("username or password is wrong")
219+
}
220+
case _ => complete(StatusCodes.Unauthorized)
221+
}
222+
}
223+
}
166224
}
167225

168226
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send runtime to invokers
65+
*
66+
* @param runtime
67+
* @param targetInvokers
68+
*/
69+
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send runtime to invokers*/
321+
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
322+
val runtimeMessage = RuntimeMessage(runtime)
323+
schedulingState.managedInvokers.filter { manageInvoker =>
324+
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
325+
} foreach { invokerHealth =>
326+
val topic = s"invoker${invokerHealth.id.toInt}"
327+
messageProducer.send(topic, runtimeMessage).andThen {
328+
case Success(_) =>
329+
logging.info(this, s"Successfully posted runtime to topic $topic")
330+
case Failure(_) =>
331+
logging.error(this, s"Failed posted runtime to topic $topic")
332+
}
333+
}
334+
}
335+
319336
override val invokerPool =
320337
invokerPoolFactory.createInvokerPool(
321338
actorSystem,

0 commit comments

Comments
 (0)