Skip to content

Commit e6e26ce

Browse files
committed
feat: add support for memcached
1 parent b62e551 commit e6e26ce

File tree

7 files changed

+180
-34
lines changed

7 files changed

+180
-34
lines changed

build.sbt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ lazy val baseSettings = Seq(
4040
resolvers ++= Seq(
4141
Resolver.sonatypeRepo("snapshots"),
4242
Resolver.sonatypeRepo("releases"),
43-
"Seasar Repository" at "https://maven.seasar.org/maven2/"
43+
"Seasar Repository" at "https://maven.seasar.org/maven2/",
44+
"Spy Repository" at "https://files.couchbase.com/maven2/"
4445
),
4546
libraryDependencies ++= Seq(
4647
scalatest.scalatest % Test
4748
),
49+
dependencyOverrides ++= Seq(
50+
"com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0"
51+
),
4852
ThisBuild / scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value),
4953
semanticdbEnabled := true,
5054
semanticdbVersion := scalafixSemanticdb.revision,
@@ -193,6 +197,17 @@ val `docker-controller-scala-redis` = (project in file("docker-controller-scala-
193197
)
194198
).dependsOn(`docker-controller-scala-core`, `docker-controller-scala-scalatest` % Test)
195199

200+
val `docker-controller-scala-memcached` = (project in file("docker-controller-scala-memcached"))
201+
.settings(baseSettings)
202+
.settings(
203+
name := "docker-controller-scala-memcached",
204+
libraryDependencies ++= Seq(
205+
scalatest.scalatest % Test,
206+
logback.classic % Test,
207+
(twitter.finagleMemcached % Test).cross(CrossVersion.for3Use2_13)
208+
)
209+
).dependsOn(`docker-controller-scala-core`, `docker-controller-scala-scalatest` % Test)
210+
196211
val `docker-controller-scala-elasticmq` = (project in file("docker-controller-scala-elasticmq"))
197212
.settings(baseSettings)
198213
.settings(
@@ -233,11 +248,13 @@ val `docker-controller-scala-root` = (project in file("."))
233248
.aggregate(
234249
`docker-controller-scala-core`,
235250
`docker-controller-scala-scalatest`,
251+
`docker-controller-scala-flyway`,
236252
// for RDBMS
237253
`docker-controller-scala-mysql`,
238254
`docker-controller-scala-postgresql`,
239255
// for NoSQL
240256
`docker-controller-scala-redis`,
257+
`docker-controller-scala-memcached`,
241258
`docker-controller-scala-elasticsearch`,
242259
`docker-controller-scala-kafka`,
243260
`docker-controller-scala-zookeeper`,

docker-controller-scala-core/src/main/scala/com/github/j5ik2o/dockerController/DockerController.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ trait DockerController {
5858
def existsImage(p: Image => Boolean): Boolean
5959
def pullImageIfNotExists(f: PullImageCmd => PullImageCmd = identity): Unit
6060
def pullImage(f: PullImageCmd => PullImageCmd = identity): Unit
61-
def awaitCondition(duration: Duration)(predicate: Frame => Boolean): Unit
61+
def awaitCondition(duration: Duration)(predicate: Option[Frame] => Boolean): Unit
6262
}
6363

6464
object DockerController {
@@ -247,7 +247,7 @@ private[dockerController] class DockerControllerImpl(
247247
logger.debug("stopContainer --- finish")
248248
}
249249

250-
override def awaitCondition(duration: Duration)(predicate: Frame => Boolean): Unit = {
250+
override def awaitCondition(duration: Duration)(predicate: Option[Frame] => Boolean): Unit = {
251251
logger.debug("awaitCompletion --- start")
252252
val frameQueue: LinkedBlockingQueue[Frame] = new LinkedBlockingQueue[Frame]()
253253

@@ -266,11 +266,11 @@ private[dockerController] class DockerControllerImpl(
266266
def loop(): Unit = {
267267
if (
268268
!terminate && {
269-
val frame = frameQueue.poll(outputFrameInterval.toMillis, TimeUnit.MILLISECONDS)
270-
if (frame != null) {
269+
val frameOpt = Option(frameQueue.poll(outputFrameInterval.toMillis, TimeUnit.MILLISECONDS))
270+
frameOpt.foreach { frame =>
271271
logger.debug(frame.toString)
272-
!predicate(frame)
273-
} else true
272+
}
273+
!predicate(frameOpt)
274274
}
275275
) {
276276
loop()

docker-controller-scala-core/src/main/scala/com/github/j5ik2o/dockerController/WaitPredicates.scala

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,56 +10,73 @@ import scala.util.matching.Regex
1010

1111
object WaitPredicates {
1212

13-
type WaitPredicate = Frame => Boolean
13+
type WaitPredicate = Option[Frame] => Boolean
1414

1515
protected val logger: Logger = LoggerFactory.getLogger(getClass)
1616

17+
def forDebug(
18+
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
19+
): WaitPredicate = { frameOpt =>
20+
frameOpt.exists { frame =>
21+
val line = new String(frame.getPayload).stripLineEnd
22+
logger.debug(s"forDebug: line = $line")
23+
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
24+
false
25+
}
26+
}
27+
1728
def forLogMessageExactly(
1829
text: String,
1930
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
20-
): WaitPredicate = { frame =>
21-
val line = new String(frame.getPayload).stripLineEnd
22-
val result = line == text
23-
if (result) {
24-
logger.debug(s"forLogMessageExactly: result = $result, line = $line")
25-
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
31+
): WaitPredicate = { frameOpt =>
32+
frameOpt.exists { frame =>
33+
val line = new String(frame.getPayload).stripLineEnd
34+
val result = line == text
35+
if (result) {
36+
logger.debug(s"forLogMessageExactly: result = $result, line = $line")
37+
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
38+
}
39+
result
2640
}
27-
result
2841
}
2942

3043
def forLogMessageContained(
3144
text: String,
3245
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
33-
): WaitPredicate = { frame =>
34-
val line = new String(frame.getPayload).stripLineEnd
35-
val result = line.contains(text)
36-
if (result) {
37-
logger.debug(s"forLogMessageContained: result = $result, line = $line")
38-
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
46+
): WaitPredicate = { frameOpt =>
47+
frameOpt.exists { frame =>
48+
val line = new String(frame.getPayload).stripLineEnd
49+
val result = line.contains(text)
50+
if (result) {
51+
logger.debug(s"forLogMessageContained: result = $result, line = $line")
52+
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
53+
}
54+
result
3955
}
40-
result
4156
}
4257

4358
def forLogMessageByRegex(
4459
regex: Regex,
4560
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
46-
): WaitPredicate = { frame =>
47-
val line = new String(frame.getPayload).stripLineEnd
48-
val result = regex.findFirstIn(line).isDefined
49-
if (result) {
50-
logger.debug(s"forLogMessageByRegex: result = $result, line = $line")
51-
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
61+
): WaitPredicate = { frameOpt =>
62+
frameOpt.exists { frame =>
63+
val line = new String(frame.getPayload).stripLineEnd
64+
val result = regex.findFirstIn(line).isDefined
65+
if (result) {
66+
logger.debug(s"forLogMessageByRegex: result = $result, line = $line")
67+
awaitDurationOpt.foreach { awaitDuration => Thread.sleep(awaitDuration.toMillis) }
68+
}
69+
result
5270
}
53-
result
5471
}
5572

5673
def forListeningHostTcpPort(
5774
host: String,
5875
hostPort: Int,
5976
connectionTimeout: FiniteDuration = 500.milliseconds,
6077
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
61-
): WaitPredicate = { f: Frame =>
62-
val line = new String(f.getPayload)
78+
): WaitPredicate = { frameOpt =>
79+
val line = frameOpt.map(frame => new String(frame.getPayload)).getOrElse("")
6380
val s: Socket = new Socket()
6481
try {
6582
s.connect(new InetSocketAddress(host, hostPort), connectionTimeout.toMillis.toInt)
@@ -82,13 +99,13 @@ object WaitPredicates {
8299
host: String,
83100
hostPort: Int,
84101
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
85-
): WaitPredicate = { _: Frame => forListeningHttp(host, hostPort, awaitDurationOpt).isDefined }
102+
): WaitPredicate = { _ => forListeningHttp(host, hostPort, awaitDurationOpt).isDefined }
86103

87104
def forListeningHttpPortWithPredicate(
88105
host: String,
89106
hostPort: Int,
90107
awaitDurationOpt: Option[FiniteDuration] = Some(500.milliseconds)
91-
)(p: HttpURLConnection => Boolean): WaitPredicate = { _: Frame =>
108+
)(p: HttpURLConnection => Boolean): WaitPredicate = { _ =>
92109
forListeningHttp(host, hostPort, awaitDurationOpt).exists(p)
93110
}
94111

docker-controller-scala-kafka/src/main/scala/com/github/j5ik2o/dockerController/kafka/KafkaController.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class KafkaController(
104104
dockerClient.removeNetworkCmd(networkId).exec()
105105
}
106106

107-
override def awaitCondition(duration: Duration)(predicate: Frame => Boolean): Unit = {
107+
override def awaitCondition(duration: Duration)(predicate: Option[Frame] => Boolean): Unit = {
108108
zooKeeperController.awaitCondition(duration)(zooKeeperWaitPredicate)
109109
super.awaitCondition(duration)(predicate)
110110
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.github.j5ik2o.dockerController.memcached
2+
3+
import com.github.dockerjava.api.DockerClient
4+
import com.github.dockerjava.api.command.CreateContainerCmd
5+
import com.github.dockerjava.api.model.{ ExposedPort, Ports }
6+
import com.github.dockerjava.api.model.HostConfig.newHostConfig
7+
import com.github.j5ik2o.dockerController.DockerControllerImpl
8+
import com.github.j5ik2o.dockerController.memcached.MemcachedController._
9+
10+
import scala.concurrent.duration._
11+
12+
object MemcachedController {
13+
final val DefaultImageName: String = "memcached"
14+
final val DefaultImageTag: Option[String] = Some("1.6")
15+
final val DefaultContainerPort: Int = 11211
16+
17+
def apply(
18+
dockerClient: DockerClient,
19+
outputFrameInterval: FiniteDuration = 500.millis,
20+
imageName: String = DefaultImageName,
21+
imageTag: Option[String] = DefaultImageTag,
22+
envVars: Map[String, String] = Map.empty
23+
)(
24+
hostPort: Int,
25+
prometheusEnabled: Boolean = false
26+
): MemcachedController = new MemcachedController(dockerClient, outputFrameInterval, imageName, imageTag, envVars)(
27+
hostPort,
28+
prometheusEnabled
29+
)
30+
}
31+
32+
class MemcachedController(
33+
dockerClient: DockerClient,
34+
outputFrameInterval: FiniteDuration = 500.millis,
35+
imageName: String = DefaultImageName,
36+
imageTag: Option[String] = DefaultImageTag,
37+
envVars: Map[String, String] = Map.empty
38+
)(
39+
hostPort: Int,
40+
prometheusEnabled: Boolean = false
41+
) extends DockerControllerImpl(dockerClient, outputFrameInterval)(imageName, imageTag) {
42+
43+
private val environmentVariables = Map(
44+
"MEMCACHED_PROMETHEUS_ENABLED" -> prometheusEnabled.toString
45+
) ++
46+
envVars
47+
48+
override protected def newCreateContainerCmd(): CreateContainerCmd = {
49+
val containerPort = ExposedPort.tcp(DefaultContainerPort)
50+
val portBinding = new Ports
51+
portBinding.bind(containerPort, Ports.Binding.bindPort(hostPort))
52+
super
53+
.newCreateContainerCmd()
54+
.withEnv(environmentVariables.map { case (k, v) => s"$k=$v" }.toArray: _*)
55+
.withExposedPorts(containerPort)
56+
.withHostConfig(newHostConfig().withPortBindings(portBinding))
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.github.j5ik2o.dockerController.memcached
2+
3+
import com.github.j5ik2o.dockerController.{
4+
DockerController,
5+
DockerControllerSpecSupport,
6+
RandomPortUtil,
7+
WaitPredicates
8+
}
9+
import com.twitter.finagle.Memcached
10+
import com.twitter.io.Buf
11+
import org.scalatest.concurrent.ScalaFutures
12+
import org.scalatest.freespec.AnyFreeSpec
13+
14+
import scala.concurrent.duration._
15+
import scala.jdk.FutureConverters.CompletionStageOps
16+
17+
class MemcachedControllerSpec extends AnyFreeSpec with DockerControllerSpecSupport with ScalaFutures {
18+
val testTimeFactor: Int = sys.env.getOrElse("TEST_TIME_FACTOR", "1").toInt
19+
logger.debug(s"testTimeFactor = $testTimeFactor")
20+
21+
val hostPort: Int = RandomPortUtil.temporaryServerPort()
22+
val controller: MemcachedController = MemcachedController(dockerClient)(hostPort)
23+
24+
override protected val dockerControllers: Vector[DockerController] = Vector(controller)
25+
26+
override protected val waitPredicatesSettings: Map[DockerController, WaitPredicateSetting] = Map(
27+
controller -> WaitPredicateSetting(
28+
Duration.Inf,
29+
WaitPredicates.forListeningHostTcpPort(
30+
dockerHost,
31+
hostPort,
32+
(1 * testTimeFactor).seconds,
33+
Some((5 * testTimeFactor).seconds)
34+
)
35+
)
36+
)
37+
38+
"MemcachedController" - {
39+
"run" in {
40+
val client = Memcached.client.newRichClient(s"$dockerHost:$hostPort")
41+
val str = "a"
42+
val buf = Buf.Utf8(str)
43+
val result = for {
44+
_ <- client.set("1", buf)
45+
r <- client.get("1")
46+
} yield r
47+
assert(result.toCompletableFuture.asScala.futureValue.get == buf)
48+
}
49+
}
50+
}

project/Dependencies.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,8 @@ object Dependencies {
107107
val redisClient = "net.debasishg" %% "redisclient" % "3.30"
108108
}
109109

110+
object twitter {
111+
val finagleMemcached = "com.twitter" %% "finagle-memcached" % "21.6.0"
112+
}
113+
110114
}

0 commit comments

Comments
 (0)