Skip to content

Commit e3c7a13

Browse files
authored
Move the ack implementations to the common package. (#4837)
The ack logic that responds to kafka messages to the controller can be used by other components in the downstream. So move ack logic that is only available in invoker package to common package.
1 parent fcdca86 commit e3c7a13

File tree

8 files changed

+128
-83
lines changed

8 files changed

+128
-83
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.ack
19+
import org.apache.openwhisk.common.{TransactionId, UserEvents}
20+
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
21+
import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation}
22+
23+
import scala.concurrent.Future
24+
25+
/**
26+
* A method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
27+
* are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
28+
* messages for continuations (e.g., sequences and conductor actions).
29+
*
30+
* The activation result is always provided because some acknowledegment messages may not carry the result of
31+
* the activation and this is needed for sending user events.
32+
*
33+
* @param tid the transaction id for the activation
34+
* @param activationResult is the activation result
35+
* @param blockingInvoke is true iff the activation was a blocking request
36+
* @param controllerInstance the originating controller/loadbalancer id
37+
* @param userId is the UUID for the namespace owning the activation
38+
* @param acknowledegment the acknowledgement message to send
39+
*/
40+
trait ActiveAck {
41+
def apply(tid: TransactionId,
42+
activationResult: WhiskActivation,
43+
blockingInvoke: Boolean,
44+
controllerInstance: ControllerInstanceId,
45+
userId: UUID,
46+
acknowledegment: AcknowledegmentMessage): Future[Any]
47+
}
48+
49+
trait EventSender {
50+
def send(msg: => EventMessage): Unit
51+
}
52+
53+
class UserEventSender(producer: MessageProducer) extends EventSender {
54+
override def send(msg: => EventMessage): Unit = UserEvents.send(producer, msg)
55+
}
Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,20 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.openwhisk.core.invoker
18+
package org.apache.openwhisk.core.ack
1919

2020
import org.apache.kafka.common.errors.RecordTooLargeException
21-
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
21+
import org.apache.openwhisk.common.{Logging, TransactionId}
2222
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
23-
import org.apache.openwhisk.core.entity.{ControllerInstanceId, InvokerInstanceId, UUID, WhiskActivation}
24-
import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
23+
import org.apache.openwhisk.core.entity._
2524

2625
import scala.concurrent.{ExecutionContext, Future}
2726
import scala.util.{Failure, Success}
2827

29-
trait EventSender {
30-
def send(msg: => EventMessage): Unit
31-
}
32-
33-
class UserEventSender(producer: MessageProducer) extends EventSender {
34-
override def send(msg: => EventMessage): Unit = UserEvents.send(producer, msg)
35-
}
36-
37-
class MessagingActiveAck(producer: MessageProducer, instance: InvokerInstanceId, eventSender: Option[EventSender])(
28+
class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventSender: Option[EventSender])(
3829
implicit logging: Logging,
3930
ec: ExecutionContext)
4031
extends ActiveAck {
41-
private val source = s"invoker${instance.instance}"
4232
override def apply(tid: TransactionId,
4333
activationResult: WhiskActivation,
4434
blockingInvoke: Boolean,
@@ -58,7 +48,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InvokerInstanceId,
5848
// UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
5949
if (acknowledegment.isSlotFree.nonEmpty) {
6050
eventSender.foreach { s =>
61-
EventMessage.from(activationResult, source, userId) match {
51+
EventMessage.from(activationResult, instance.source, userId) match {
6252
case Success(msg) => s.send(msg)
6353
case Failure(t) => logging.error(this, s"activation event was not sent: $t")
6454
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.openwhisk.core.entity
1919

2020
import spray.json.DefaultJsonProtocol
21-
import org.apache.openwhisk.core.entity.ControllerInstanceId.LEGAL_CHARS
22-
import org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
2321

2422
/**
2523
* An instance id representing an invoker
@@ -31,30 +29,51 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
3129
case class InvokerInstanceId(val instance: Int,
3230
uniqueName: Option[String] = None,
3331
displayedName: Option[String] = None,
34-
val userMemory: ByteSize) {
32+
val userMemory: ByteSize)
33+
extends InstanceId {
3534
def toInt: Int = instance
3635

37-
override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
36+
override val instanceType = "invoker"
37+
38+
override val source = s"$instanceType$instance"
39+
40+
override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
3841
}
3942

40-
case class ControllerInstanceId(val asString: String) {
41-
require(
42-
asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
43-
"Controller instance id contains invalid characters")
43+
case class ControllerInstanceId(asString: String) extends InstanceId {
44+
validate(asString)
45+
override val instanceType = "controller"
46+
47+
override val source = s"$instanceType$asString"
48+
49+
override val toString: String = source
4450
}
4551

4652
object InvokerInstanceId extends DefaultJsonProtocol {
4753
import org.apache.openwhisk.core.entity.size.{serdes => xserds}
48-
implicit val serdes = jsonFormat4(InvokerInstanceId.apply)
54+
implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance", "uniqueName", "displayedName", "userMemory")
4955
}
5056

5157
object ControllerInstanceId extends DefaultJsonProtocol {
58+
implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString")
59+
}
60+
61+
trait InstanceId {
62+
5263
// controller ids become part of a kafka topic, hence, hence allow only certain characters
5364
// see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
5465
private val LEGAL_CHARS = "[a-zA-Z0-9._-]+"
5566

5667
// reserve some number of characters as the prefix to be added to topic names
5768
private val MAX_NAME_LENGTH = 249 - 121
5869

59-
implicit val serdes = jsonFormat1(ControllerInstanceId.apply)
70+
def validate(asString: String): Unit =
71+
require(
72+
asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
73+
s"$instanceType instance id contains invalid characters")
74+
75+
val instanceType: String
76+
77+
val source: String
78+
6079
}

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import akka.actor.Actor
2121
import akka.actor.ActorRef
2222
import akka.actor.Cancellable
2323
import java.time.Instant
24+
2425
import akka.actor.Status.{Failure => FailureMessage}
2526
import akka.actor.{FSM, Props, Stash}
2627
import akka.event.Logging.InfoLevel
@@ -33,17 +34,19 @@ import akka.io.Tcp.Connected
3334
import akka.pattern.pipe
3435
import pureconfig._
3536
import pureconfig.generic.auto._
36-
3737
import akka.stream.ActorMaterializer
3838
import java.net.InetSocketAddress
3939
import java.net.SocketException
40+
4041
import org.apache.openwhisk.common.MetricEmitter
4142
import org.apache.openwhisk.common.TransactionId.systemPrefix
43+
4244
import scala.collection.immutable
4345
import spray.json.DefaultJsonProtocol._
4446
import spray.json._
4547
import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
4648
import org.apache.openwhisk.core.ConfigKeys
49+
import org.apache.openwhisk.core.ack.ActiveAck
4750
import org.apache.openwhisk.core.connector.{
4851
ActivationMessage,
4952
CombinedCompletionAndResultMessage,
@@ -55,8 +58,9 @@ import org.apache.openwhisk.core.database.UserContext
5558
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
5659
import org.apache.openwhisk.core.entity._
5760
import org.apache.openwhisk.core.entity.size._
58-
import org.apache.openwhisk.core.invoker.InvokerReactive.{ActiveAck, LogsCollector}
61+
import org.apache.openwhisk.core.invoker.Invoker.LogsCollector
5962
import org.apache.openwhisk.http.Messages
63+
6064
import scala.concurrent.Future
6165
import scala.concurrent.duration._
6266
import scala.util.{Failure, Success}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import org.apache.openwhisk.common.Https.HttpsConfig
2626
import org.apache.openwhisk.common._
2727
import org.apache.openwhisk.core.WhiskConfig._
2828
import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
29-
import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
30-
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId}
29+
import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
30+
import org.apache.openwhisk.core.entity._
3131
import org.apache.openwhisk.core.entity.size._
3232
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3333
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
@@ -37,13 +37,36 @@ import pureconfig._
3737
import pureconfig.generic.auto._
3838

3939
import scala.concurrent.duration._
40-
import scala.concurrent.{Await, ExecutionContext}
40+
import scala.concurrent.{Await, ExecutionContext, Future}
4141
import scala.util.Try
4242

4343
case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
4444

4545
object Invoker {
4646

47+
/**
48+
* Collect logs after the activation has finished.
49+
*
50+
* This method is called after an activation has finished. The logs gathered here are stored along the activation
51+
* record in the database.
52+
*
53+
* @param transid transaction the activation ran in
54+
* @param user the user who ran the activation
55+
* @param activation the activation record
56+
* @param container container used by the activation
57+
* @param action action that was activated
58+
* @return logs for the given activation
59+
*/
60+
trait LogsCollector {
61+
def logsToBeCollected(action: ExecutableWhiskAction): Boolean = action.limits.logs.asMegaBytes != 0.MB
62+
63+
def apply(transid: TransactionId,
64+
user: Identity,
65+
activation: WhiskActivation,
66+
container: Container,
67+
action: ExecutableWhiskAction): Future[ActivationLogs]
68+
}
69+
4770
protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
4871

4972
/**

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import akka.event.Logging.InfoLevel
2626
import akka.stream.ActorMaterializer
2727
import org.apache.openwhisk.common._
2828
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
29-
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, _}
29+
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
30+
import org.apache.openwhisk.core.connector._
3031
import org.apache.openwhisk.core.containerpool._
3132
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3233
import org.apache.openwhisk.core.database.{UserContext, _}
@@ -45,53 +46,6 @@ import scala.util.{Failure, Success}
4546

4647
object InvokerReactive extends InvokerProvider {
4748

48-
/**
49-
* A method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
50-
* are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
51-
* messages for continuations (e.g., sequences and conductor actions).
52-
*
53-
* The activation result is always provided because some acknowledegment messages may not carry the result of
54-
* the activation and this is needed for sending user events.
55-
*
56-
* @param tid the transaction id for the activation
57-
* @param activationResult is the activation result
58-
* @param blockingInvoke is true iff the activation was a blocking request
59-
* @param controllerInstance the originating controller/loadbalancer id
60-
* @param userId is the UUID for the namespace owning the activation
61-
* @param acknowledegment the acknowledgement message to send
62-
*/
63-
trait ActiveAck {
64-
def apply(tid: TransactionId,
65-
activationResult: WhiskActivation,
66-
blockingInvoke: Boolean,
67-
controllerInstance: ControllerInstanceId,
68-
userId: UUID,
69-
acknowledegment: AcknowledegmentMessage): Future[Any]
70-
}
71-
72-
/**
73-
* Collect logs after the activation has finished.
74-
*
75-
* This method is called after an activation has finished. The logs gathered here are stored along the activation
76-
* record in the database.
77-
*
78-
* @param transid transaction the activation ran in
79-
* @param user the user who ran the activation
80-
* @param activation the activation record
81-
* @param container container used by the activation
82-
* @param action action that was activated
83-
* @return logs for the given activation
84-
*/
85-
trait LogsCollector {
86-
def logsToBeCollected(action: ExecutableWhiskAction): Boolean = action.limits.logs.asMegaBytes != 0.MB
87-
88-
def apply(transid: TransactionId,
89-
user: Identity,
90-
activation: WhiskActivation,
91-
container: Container,
92-
action: ExecutableWhiskAction): Future[ActivationLogs]
93-
}
94-
9549
override def instance(
9650
config: WhiskConfig,
9751
instance: InvokerInstanceId,

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/LogStoreCollector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.openwhisk.common.TransactionId
2121
import org.apache.openwhisk.core.containerpool.Container
2222
import org.apache.openwhisk.core.containerpool.logging.LogStore
2323
import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
24-
import org.apache.openwhisk.core.invoker.InvokerReactive.LogsCollector
24+
import org.apache.openwhisk.core.invoker.Invoker.LogsCollector
2525

2626
import scala.concurrent.Future
2727

tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
3636
import spray.json.DefaultJsonProtocol._
3737
import spray.json._
3838
import org.apache.openwhisk.common.{Logging, TransactionId}
39+
import org.apache.openwhisk.core.ack.ActiveAck
3940
import org.apache.openwhisk.core.connector.{
4041
AcknowledegmentMessage,
4142
ActivationMessage,
@@ -51,7 +52,7 @@ import org.apache.openwhisk.core.entity._
5152
import org.apache.openwhisk.core.entity.size._
5253
import org.apache.openwhisk.http.Messages
5354
import org.apache.openwhisk.core.database.UserContext
54-
import org.apache.openwhisk.core.invoker.InvokerReactive
55+
import org.apache.openwhisk.core.invoker.Invoker
5556

5657
import scala.collection.mutable
5758
import scala.concurrent.Await
@@ -172,7 +173,7 @@ class ContainerProxyTests
172173
expectMsg(Transition(machine, Pausing, Paused))
173174
}
174175

175-
trait LoggedAcker extends InvokerReactive.ActiveAck {
176+
trait LoggedAcker extends ActiveAck {
176177
def calls =
177178
mutable.Buffer[(TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, AcknowledegmentMessage)]()
178179

@@ -239,8 +240,7 @@ class ContainerProxyTests
239240
response
240241
}
241242

242-
class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit)
243-
extends InvokerReactive.LogsCollector {
243+
class LoggedCollector(response: Future[ActivationLogs], invokeCallback: () => Unit) extends Invoker.LogsCollector {
244244
val collector = LoggedFunction {
245245
(transid: TransactionId,
246246
user: Identity,

0 commit comments

Comments
 (0)