Skip to content

Commit 2302b0d

Browse files
pfcoperezc-solo
andauthored
Make timeouts actually cancel the underlying processes (#133)
* Introduce `PerishableFuture` as a `Future` factory which offers a timeout for completion * Introduce `timeout` parameters for `DockerCommandExecutor` operations. Use that timeout in the implementers of the interface to timeout docker operations when the passed timeout is `FiniteDuration`. * Propagate timeout parameters in `DockerContainerState`. * Propagate `PullImagesTimeout`, `StopContainersTimeout` and `StartContainersTimeout` down `DockerContainerManager` and, therefore, down to `DockerCommandExecutor` operations which will use `PerishableFuture` to make ops actually cancel when they timeout. * Use infinite duration timeouts (no timeout) in Specs which invoke `DockerCommandExecutor` methods. Co-authored-by: C.Solovev <[email protected]>
1 parent dcc5744 commit 2302b0d

File tree

13 files changed

+138
-77
lines changed

13 files changed

+138
-77
lines changed

core/src/main/scala/com/whisk/docker/DockerCommandExecutor.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.whisk.docker
22

3+
import scala.concurrent.duration.Duration
34
import scala.concurrent.{ExecutionContext, Future}
45

56
object PortProtocol extends Enumeration {
@@ -29,28 +30,28 @@ trait DockerCommandExecutor {
2930

3031
def host: String
3132

32-
def createContainer(spec: DockerContainer)(implicit ec: ExecutionContext): Future[String]
33+
def createContainer(spec: DockerContainer)(implicit ec: ExecutionContext, timeout: Duration): Future[String]
3334

34-
def startContainer(id: String)(implicit ec: ExecutionContext): Future[Unit]
35+
def startContainer(id: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit]
3536

3637
def inspectContainer(id: String)(
37-
implicit ec: ExecutionContext): Future[Option[InspectContainerResult]]
38+
implicit ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]]
3839

3940
def withLogStreamLines(id: String, withErr: Boolean)(f: String => Unit)(
4041
implicit docker: DockerCommandExecutor,
41-
ec: ExecutionContext
42+
ec: ExecutionContext, timeout: Duration
4243
): Unit
4344

4445
def withLogStreamLinesRequirement(id: String, withErr: Boolean)(f: String => Boolean)(
4546
implicit docker: DockerCommandExecutor,
46-
ec: ExecutionContext): Future[Unit]
47+
ec: ExecutionContext, timeout: Duration): Future[Unit]
4748

48-
def listImages()(implicit ec: ExecutionContext): Future[Set[String]]
49+
def listImages()(implicit ec: ExecutionContext, timeout: Duration): Future[Set[String]]
4950

50-
def pullImage(image: String)(implicit ec: ExecutionContext): Future[Unit]
51+
def pullImage(image: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit]
5152

5253
def remove(id: String, force: Boolean = true, removeVolumes: Boolean = true)(
53-
implicit ec: ExecutionContext): Future[Unit]
54+
implicit ec: ExecutionContext, timeout: Duration): Future[Unit]
5455

5556
def close(): Unit
5657
}

core/src/main/scala/com/whisk/docker/DockerContainerManager.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
2626
dockerStatesMap(container).isReady()
2727
}
2828

29-
def pullImages(): Future[Seq[String]] = {
29+
def pullImages(timeout: Duration): Future[Seq[String]] = {
30+
implicit val tout = timeout
3031
executor.listImages().flatMap { images =>
3132
val imagesToPull: Seq[String] = containers.map(_.image).filterNot { image =>
3233
val cImage = if (image.contains(":")) image else image + ":latest"
@@ -40,10 +41,13 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
4041
containerStartTimeout: Duration): Future[Seq[(DockerContainerState, Boolean)]] = {
4142
import DockerContainerManager._
4243

44+
implicit val timeout = containerStartTimeout
45+
4346
@tailrec
4447
def initGraph(graph: ContainerDependencyGraph,
4548
previousInits: Future[Seq[DockerContainerState]] = Future.successful(Seq.empty))
4649
: Future[Seq[DockerContainerState]] = {
50+
4751
val initializedContainers = previousInits.flatMap { prev =>
4852
Future.traverse(graph.containers.map(dockerStatesMap))(_.init()).map(prev ++ _)
4953
}
@@ -54,6 +58,7 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
5458
val readyInits: Future[Seq[Future[Boolean]]] =
5559
initializedContainers.map(_.map(state => state.isReady()))
5660
val simplifiedReadyInits: Future[Seq[Boolean]] = readyInits.flatMap(Future.sequence(_))
61+
5762
Await.result(simplifiedReadyInits, containerStartTimeout)
5863
initGraph(dependants, initializedContainers)
5964
}
@@ -68,7 +73,8 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
6873
})
6974
}
7075

71-
def stopRmAll(): Future[Unit] = {
76+
def stopRmAll(timeout: Duration): Future[Unit] = {
77+
implicit val tout = timeout
7278
val future = Future.traverse(states)(_.remove(force = true, removeVolumes = true)).map(_ => ())
7379
future.onComplete { _ =>
7480
executor.close()

core/src/main/scala/com/whisk/docker/DockerContainerState.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicBoolean
44

55
import org.slf4j.LoggerFactory
66

7+
import scala.concurrent.duration.Duration
78
import scala.concurrent.{ExecutionContext, Future, Promise}
89

910
class DockerContainerState(spec: DockerContainer) {
@@ -39,10 +40,11 @@ class DockerContainerState(spec: DockerContainer) {
3940

4041
def isReady(): Future[Boolean] = _isReady.future
4142

42-
def isRunning()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[Boolean] =
43+
def isRunning()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
4344
getRunningContainer().map(_.isDefined)
45+
}
4446

45-
def init()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[this.type] = {
47+
def init()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[this.type] = {
4648
for {
4749
s <- _id.init(docker.createContainer(spec))
4850
_ <- docker.startContainer(s)
@@ -56,7 +58,8 @@ class DockerContainerState(spec: DockerContainer) {
5658
}
5759

5860
private def runReadyCheck()(implicit docker: DockerCommandExecutor,
59-
ec: ExecutionContext): Future[Boolean] =
61+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
62+
6063
_isReady.init(
6164
(for {
6265
r <- isRunning() if r
@@ -72,28 +75,29 @@ class DockerContainerState(spec: DockerContainer) {
7275
Future.successful(false)
7376
}
7477
)
78+
}
7579

7680
protected def getRunningContainer()(
7781
implicit docker: DockerCommandExecutor,
78-
ec: ExecutionContext): Future[Option[InspectContainerResult]] =
82+
ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] =
7983
id.flatMap(docker.inspectContainer)
8084

81-
def getName()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[String] =
85+
def getName()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[String] =
8286
getRunningContainer().flatMap {
8387
case Some(res) => Future.successful(res.name)
8488
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
8589
}
8690

8791
def getIpAddresses()(implicit docker: DockerCommandExecutor,
88-
ec: ExecutionContext): Future[Seq[String]] = getRunningContainer().flatMap {
92+
ec: ExecutionContext, timeout: Duration): Future[Seq[String]] = getRunningContainer().flatMap {
8993
case Some(res) => Future.successful(res.ipAddresses)
9094
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
9195
}
9296

9397
private val _ports = SinglePromise[Map[Int, Int]]
9498

9599
def getPorts()(implicit docker: DockerCommandExecutor,
96-
ec: ExecutionContext): Future[Map[Int, Int]] = {
100+
ec: ExecutionContext, timeout: Duration): Future[Map[Int, Int]] = {
97101
def portsFuture: Future[Map[Int, Int]] = getRunningContainer().flatMap {
98102
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
99103
case Some(c) =>
@@ -108,6 +112,6 @@ class DockerContainerState(spec: DockerContainer) {
108112

109113
def remove(force: Boolean = true, removeVolumes: Boolean = true)(
110114
implicit docker: DockerCommandExecutor,
111-
ec: ExecutionContext): Future[Unit] =
115+
ec: ExecutionContext, timeout: Duration): Future[Unit] =
112116
id.flatMap(x => docker.remove(x, force, removeVolumes))
113117
}

core/src/main/scala/com/whisk/docker/DockerKit.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,28 @@ trait DockerKit {
4141
}
4242

4343
def startAllOrFail(): Unit = {
44-
Await.result(containerManager.pullImages(), PullImagesTimeout)
45-
val allRunning: Boolean = try {
44+
Await.result(containerManager.pullImages(PullImagesTimeout), PullImagesTimeout)
45+
val allRunning: Boolean = {
4646
val future: Future[Boolean] =
47-
containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity))
47+
containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity)).recover {
48+
case e: Exception =>
49+
log.error("Exception during container initialization", e)
50+
false
51+
}
4852
sys.addShutdownHook(
49-
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
53+
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
5054
)
5155
Await.result(future, StartContainersTimeout)
52-
} catch {
53-
case e: Exception =>
54-
log.error("Exception during container initialization", e)
55-
false
5656
}
5757
if (!allRunning) {
58-
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
58+
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
5959
throw new RuntimeException("Cannot run all required containers")
6060
}
6161
}
6262

6363
def stopAllQuietly(): Unit = {
6464
try {
65-
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
65+
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
6666
} catch {
6767
case e: Throwable =>
6868
log.error(e.getMessage, e)

core/src/main/scala/com/whisk/docker/DockerReadyChecker.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package com.whisk.docker
33
import java.net.{HttpURLConnection, URL}
44
import java.util.{Timer, TimerTask}
55

6-
import scala.concurrent.duration.FiniteDuration
6+
import scala.concurrent.duration.{Duration, FiniteDuration}
77
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
88

99
trait DockerReadyChecker {
1010

1111
def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
12-
ec: ExecutionContext): Future[Boolean]
12+
ec: ExecutionContext, timeout: Duration): Future[Boolean]
1313

1414
@deprecated("this method will be removed. Use DockerReadyChecker.And(a, b)", "0.9.6")
1515
def and(other: DockerReadyChecker): DockerReadyChecker = {
@@ -81,7 +81,7 @@ object DockerReadyChecker {
8181

8282
case class And(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker {
8383
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
84-
ec: ExecutionContext): Future[Boolean] = {
84+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
8585
val aF = r1(container)
8686
val bF = r2(container)
8787
for {
@@ -93,7 +93,7 @@ object DockerReadyChecker {
9393

9494
case class Or(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker {
9595
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
96-
ec: ExecutionContext): Future[Boolean] = {
96+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
9797
val aF = r1(container)
9898
val bF = r2(container)
9999
val p = Promise[Boolean]()
@@ -111,7 +111,7 @@ object DockerReadyChecker {
111111

112112
object Always extends DockerReadyChecker {
113113
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
114-
ec: ExecutionContext): Future[Boolean] =
114+
ec: ExecutionContext, timeout: Duration): Future[Boolean] =
115115
Future.successful(true)
116116
}
117117

@@ -121,7 +121,7 @@ object DockerReadyChecker {
121121
code: Int = 200)
122122
extends DockerReadyChecker {
123123
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
124-
ec: ExecutionContext): Future[Boolean] = {
124+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
125125
container.getPorts().map(_(port)).flatMap { p =>
126126
val url = new URL("http", host.getOrElse(docker.host), p, path)
127127
Future {
@@ -139,7 +139,7 @@ object DockerReadyChecker {
139139

140140
case class LogLineContains(str: String) extends DockerReadyChecker {
141141
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
142-
ec: ExecutionContext): Future[Boolean] = {
142+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
143143
for {
144144
id <- container.id
145145
_ <- docker.withLogStreamLinesRequirement(id, withErr = true)(_.contains(str))
@@ -153,7 +153,7 @@ object DockerReadyChecker {
153153
extends DockerReadyChecker {
154154

155155
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
156-
ec: ExecutionContext): Future[Boolean] = {
156+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
157157
RetryUtils.runWithin(underlying(container), duration).recover {
158158
case _: TimeoutException =>
159159
false
@@ -167,14 +167,14 @@ object DockerReadyChecker {
167167
extends DockerReadyChecker {
168168

169169
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
170-
ec: ExecutionContext): Future[Boolean] = {
170+
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
171171
RetryUtils.looped(underlying(container).filter(identity), attempts, delay)
172172
}
173173
}
174174

175175
case class F(f: DockerContainerState => Future[Boolean]) extends DockerReadyChecker {
176176
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
177-
ec: ExecutionContext): Future[Boolean] =
177+
ec: ExecutionContext, timeout: Duration): Future[Boolean] =
178178
f(container)
179179
}
180180

core/src/main/scala/com/whisk/docker/package.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package com.whisk
22

3+
import java.util.TimerTask
4+
import java.util.concurrent.{Callable, CancellationException, FutureTask}
5+
6+
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
7+
import scala.concurrent.duration.{Duration, FiniteDuration}
8+
import scala.util.{Failure, Try}
9+
310
/**
411
* General utility functions
512
*/
@@ -10,4 +17,39 @@ package object docker {
1017
case Some(x) => f(content, x)
1118
}
1219
}
20+
21+
private[docker] object PerishableFuture {
22+
23+
def apply[T](body: => T)(implicit ec: ExecutionContext, timeout: Duration): Future[T] = timeout match {
24+
case finiteTimeout: FiniteDuration =>
25+
val promise = Promise[T]
26+
27+
val futureTask = new FutureTask[T](new Callable[T] {
28+
override def call(): T = body
29+
}) {
30+
override def done(): Unit = promise.tryComplete {
31+
Try(get()).recoverWith {
32+
case _: CancellationException => Failure(new TimeoutException())
33+
}
34+
}
35+
}
36+
37+
val reaperTask = new TimerTask {
38+
override def run(): Unit = {
39+
futureTask.cancel(true)
40+
promise.tryFailure(new TimeoutException())
41+
}
42+
}
43+
44+
timer.schedule(reaperTask, finiteTimeout.toMillis)
45+
ec.execute(futureTask)
46+
47+
promise.future
48+
49+
case _ => Future.apply(body)
50+
}
51+
52+
private val timer = new java.util.Timer(true)
53+
54+
}
1355
}

0 commit comments

Comments
 (0)