Skip to content

Commit fbbaaf3

Browse files
committed
Change prewarm container
- Add prewarm container - Delete prewarm container
1 parent 00fad95 commit fbbaaf3

File tree

8 files changed

+300
-4
lines changed

8 files changed

+300
-4
lines changed

ansible/group_vars/all

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ invoker:
181181
docker:
182182
become: "{{ invoker_docker_become | default(false) }}"
183183
loglevel: "{{ invoker_loglevel | default(whisk_loglevel) | default('INFO') }}"
184+
username: "{{ invoker_username | default('invoker.user') }}"
185+
password: "{{ invoker_password | default('invoker.pass') }}"
184186
jmxremote:
185187
jvmArgs: "{% if inventory_hostname in groups['invokers'] %}
186188
{{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ invokerHostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortInvoker + groups['invokers'].index(inventory_hostname) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortInvoker + groups['invokers'].index(inventory_hostname) }}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ protected[core] object ExecManifest {
5353
mf
5454
}
5555

56+
protected[core] def initializePrewarm(prewarmRuntime: String): Try[Runtimes] = {
57+
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
58+
val mf = Try(prewarmRuntime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
59+
var manifest: Option[Runtimes] = None
60+
mf.foreach(m => manifest = Some(m))
61+
mf
62+
}
63+
5664
/**
5765
* Gets existing runtime manifests.
5866
*

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
2222
import org.apache.openwhisk.core.connector.MessageFeed
2323
import org.apache.openwhisk.core.entity._
2424
import org.apache.openwhisk.core.entity.size._
25+
import spray.json.{JsNumber, JsObject}
2526

2627
import scala.collection.immutable
2728
import scala.concurrent.duration._
@@ -32,6 +33,9 @@ case object Busy extends WorkerState
3233
case object Free extends WorkerState
3334

3435
case class WorkerData(data: ContainerData, state: WorkerState)
36+
case class AddPreWarmConfigList(list: List[PrewarmingConfig])
37+
case class DeletePreWarmConfigList(list: List[PrewarmingConfig])
38+
case class PreWarmConfig(kind: String, memory: ByteSize)
3539

3640
/**
3741
* A pool managing containers to run actions on.
@@ -93,6 +97,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
9397
}
9498

9599
def receive: Receive = {
100+
case prewarmConfigList: AddPreWarmConfigList =>
101+
prewarmConfigList.list foreach { config =>
102+
logging.info(this, s"add extra pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
103+
TransactionId.invokerWarmup)
104+
(1 to config.count).foreach { _ =>
105+
prewarmContainer(config.exec, config.memoryLimit)
106+
}
107+
}
108+
109+
case prewarmConfigList: DeletePreWarmConfigList =>
110+
prewarmConfigList.list foreach { config =>
111+
logging.info(this, s"delete pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
112+
TransactionId.invokerWarmup)
113+
(1 to config.count).foreach { _ =>
114+
deletePrewarmContainer(config.exec.kind, config.memoryLimit)
115+
}
116+
}
117+
118+
case prewarmConfig: PreWarmConfig =>
119+
val numberResponse = {
120+
JsObject("number" -> JsNumber(getPrewarmContainerNumber(prewarmConfig.kind, prewarmConfig.memory)))
121+
}
122+
sender() ! numberResponse
123+
96124
// A job to run on a container
97125
//
98126
// Run messages are received either via the feed or from child containers which cannot process
@@ -293,6 +321,38 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
293321
}
294322
}
295323

324+
/**
325+
* Delete the prewarm container
326+
* @param kind
327+
* @param memory
328+
* @return
329+
*/
330+
def deletePrewarmContainer(kind: String, memory: ByteSize) = {
331+
prewarmedPool
332+
.find {
333+
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
334+
case _ => false
335+
}
336+
.map {
337+
case (ref, data) =>
338+
ref ! Remove
339+
prewarmedPool = prewarmedPool - ref
340+
}
341+
}
342+
343+
/**
344+
* get the prewarm container number
345+
* @param kind
346+
* @param memory
347+
* @return
348+
*/
349+
def getPrewarmContainerNumber(kind: String, memory: ByteSize) = {
350+
prewarmedPool.filter {
351+
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
352+
case _ => false
353+
}.size
354+
}
355+
296356
/** Removes a container and updates state accordingly. */
297357
def removeContainer(toDelete: ActorRef) = {
298358
toDelete ! Remove

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
3232
import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId}
3333
import org.apache.openwhisk.core.entity.ActivationEntityLimit
3434
import org.apache.openwhisk.core.entity.size._
35-
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
35+
import org.apache.openwhisk.http.BasicHttpService
3636
import org.apache.openwhisk.spi.SpiLoader
3737
import org.apache.openwhisk.utils.ExecutionContextFactory
3838

@@ -46,6 +46,8 @@ object Invoker {
4646

4747
protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
4848

49+
var invokerReactive: Option[InvokerReactive] = None
50+
4951
/**
5052
* An object which records the environment variables required for this component to run.
5153
*/
@@ -157,7 +159,7 @@ object Invoker {
157159
}
158160
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
159161
val invoker = try {
160-
new InvokerReactive(config, invokerInstance, producer, poolConfig)
162+
invokerReactive = Some(new InvokerReactive(config, invokerInstance, producer, poolConfig))
161163
} catch {
162164
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
163165
}
@@ -172,7 +174,7 @@ object Invoker {
172174
val httpsConfig =
173175
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
174176

175-
BasicHttpService.startHttpService(new BasicRasService {}.route, port, httpsConfig)(
177+
BasicHttpService.startHttpService(new InvokerServer().route, port, httpsConfig)(
176178
actorSystem,
177179
ActorMaterializer.create(actorSystem))
178180
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.invoker
2020
import java.nio.charset.StandardCharsets
2121
import java.time.Instant
2222

23-
import akka.actor.{ActorRefFactory, ActorSystem, Props}
23+
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
2424
import akka.event.Logging.InfoLevel
2525
import akka.stream.ActorMaterializer
2626
import org.apache.kafka.common.errors.RecordTooLargeException
@@ -195,6 +195,10 @@ class InvokerReactive(
195195
private val pool =
196196
actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs))
197197

198+
def getContainerPoolInstance: ActorRef = {
199+
pool
200+
}
201+
198202
/** Is called when an ActivationMessage is read from Kafka */
199203
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
200204
Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.model.StatusCodes
22+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
23+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24+
import akka.http.scaladsl.server.Route
25+
import akka.pattern.ask
26+
import akka.util.Timeout
27+
28+
import org.apache.openwhisk.common.{Logging, TransactionId}
29+
import org.apache.openwhisk.core.containerpool._
30+
import org.apache.openwhisk.core.entity.{CodeExecAsString, ExecManifest}
31+
import org.apache.openwhisk.core.entity.size._
32+
import org.apache.openwhisk.http.BasicRasService
33+
34+
import spray.json.JsObject
35+
36+
import scala.concurrent.{ExecutionContext, Future}
37+
import scala.concurrent.duration._
38+
39+
/**
40+
* Implements web server to handle certain REST API calls.
41+
* Currently provides a health ping route, only.
42+
*/
43+
class InvokerServer(implicit val ec: ExecutionContext,
44+
implicit val actorSystem: ActorSystem,
45+
implicit val logger: Logging)
46+
extends BasicRasService {
47+
48+
val invokerUsername = {
49+
val source = scala.io.Source.fromFile("/conf/invokerauth.username");
50+
try source.mkString.replaceAll("\r|\n", "")
51+
finally source.close()
52+
}
53+
val invokerPassword = {
54+
val source = scala.io.Source.fromFile("/conf/invokerauth.password");
55+
try source.mkString.replaceAll("\r|\n", "")
56+
finally source.close()
57+
}
58+
59+
override def routes(implicit transid: TransactionId): Route = {
60+
super.routes ~ {
61+
(path("prewarmContainer") & post) {
62+
extractCredentials {
63+
case Some(BasicHttpCredentials(username, password)) =>
64+
if (username == invokerUsername && password == invokerPassword) {
65+
entity(as[String]) { prewarmRuntime =>
66+
logger.info(this, s"add prewarmRuntime:${prewarmRuntime}")
67+
val execManifest = ExecManifest.initializePrewarm(prewarmRuntime)
68+
if (execManifest.isFailure) {
69+
logger.error(this, s"Invalid prewarm runtimes manifest: ${execManifest.failed.get}")
70+
complete("Invalid pass prewarm runtimes manifest")
71+
} else {
72+
val prewarmingConfigs: List[PrewarmingConfig] = execManifest.get.stemcells.flatMap {
73+
case (mf, cells) =>
74+
cells.map { cell =>
75+
PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory)
76+
}
77+
}.toList
78+
Invoker.invokerReactive.get.getContainerPoolInstance ! AddPreWarmConfigList(prewarmingConfigs)
79+
complete("add prewarm container request is handling")
80+
}
81+
}
82+
} else {
83+
complete("username or password is wrong")
84+
}
85+
case _ => complete(StatusCodes.Unauthorized)
86+
}
87+
}
88+
} ~ {
89+
(path("prewarmContainer") & delete) {
90+
extractCredentials {
91+
case Some(BasicHttpCredentials(username, password)) =>
92+
if (username == invokerUsername && password == invokerPassword) {
93+
entity(as[String]) { prewarmRuntime =>
94+
logger.info(this, s"delete prewarmRuntime:${prewarmRuntime}")
95+
val execManifest = ExecManifest.initializePrewarm(prewarmRuntime)
96+
if (execManifest.isFailure) {
97+
logger.error(this, s"Invalid prewarm runtimes manifest: ${execManifest.failed.get}")
98+
complete("Invalid pass prewarm runtimes manifest")
99+
} else {
100+
val prewarmingConfigs: List[PrewarmingConfig] = execManifest.get.stemcells.flatMap {
101+
case (mf, cells) =>
102+
cells.map { cell =>
103+
PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory)
104+
}
105+
}.toList
106+
Invoker.invokerReactive.get.getContainerPoolInstance ! DeletePreWarmConfigList(prewarmingConfigs)
107+
complete("delete prewarm container request is handling")
108+
}
109+
}
110+
} else {
111+
complete("username or password is wrong")
112+
}
113+
case _ => complete(StatusCodes.Unauthorized)
114+
}
115+
}
116+
} ~ {
117+
(path("prewarmContainerNumber") & get) {
118+
parameter('kind.as[String], 'memory.as[Int]) { (kind, memory) =>
119+
implicit val timeout = Timeout(5.seconds)
120+
val numberFuture = Future {
121+
Invoker.invokerReactive.get.getContainerPoolInstance
122+
.ask(PreWarmConfig(kind, memory.MB))
123+
.mapTo[JsObject]
124+
}.flatten
125+
complete(numberFuture)
126+
}
127+
}
128+
}
129+
}
130+
}

docs/operation.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<!--
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
-->
19+
20+
# Prewarm container operation
21+
## Add prewarm container on assigned invoker, e.g:
22+
```
23+
curl -u ${username}:${password} -X POST http://${invokerAddress}:${invokerPort}/prewarmContainer -d '
24+
{
25+
"runtimes": {
26+
"nodejs": [{
27+
"kind": "nodejs:6",
28+
"default": true,
29+
"image": {
30+
"prefix": "openwhisk",
31+
"name": "nodejs6action",
32+
"tag": "latest"
33+
},
34+
"deprecated": false,
35+
"attached": {
36+
"attachmentName": "codefile",
37+
"attachmentType": "text/plain"
38+
},
39+
"stemCells": [{
40+
"count": 2,
41+
"memory": "128 MB"
42+
}]
43+
}]
44+
}
45+
}
46+
'
47+
```
48+
## Delete prewarm container
49+
```
50+
curl -u ${username}:${password} -X DELETE http://${invokerAddress}:${invokerPort}/prewarmContainer -d '
51+
{
52+
"runtimes": {
53+
"nodejs": [{
54+
"kind": "nodejs:6",
55+
"default": true,
56+
"image": {
57+
"prefix": "openwhisk",
58+
"name": "nodejs6action",
59+
"tag": "latest"
60+
},
61+
"deprecated": false,
62+
"attached": {
63+
"attachmentName": "codefile",
64+
"attachmentType": "text/plain"
65+
},
66+
"stemCells": [{
67+
"count": 2,
68+
"memory": "128 MB"
69+
}]
70+
}]
71+
}
72+
}
73+
'
74+
```
75+
## Get prewarm container
76+
```
77+
curl -X GET 'http://${invokerAddress}:${invokerPort}/prewarmContainerNumber?kind=nodejs:6&memory=128'
78+
```

tests/src/test/scala/common/WhiskProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,18 @@ public static String getBaseControllerAddress() {
263263
return getBaseControllerHost() + ":" + getControllerBasePort();
264264
}
265265

266+
public static String getBaseInvokerAddress(){
267+
return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
268+
}
269+
270+
public static String getInvokerUsername() {
271+
return whiskProperties.getProperty("invoker.username");
272+
}
273+
274+
public static String getInvokerPassword() {
275+
return whiskProperties.getProperty("invoker.password");
276+
}
277+
266278
public static int getMaxActionInvokesPerMinute() {
267279
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
268280
return Integer.parseInt(valStr);

0 commit comments

Comments
 (0)