Skip to content

Commit 1f45d42

Browse files
authored
Merge pull request #41 from criteo/upgrade_deps
Upgrade dependencies
2 parents 9a3be80 + b2adf08 commit 1f45d42

File tree

5 files changed

+93
-72
lines changed

5 files changed

+93
-72
lines changed

build.sbt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import scala.sys.process.Process
33
lazy val commonSettings = Seq(
44
organization := "com.criteo",
55
version := "0.4.8",
6-
scalaVersion := "2.12.2",
7-
crossScalaVersions := Seq("2.11.8", "2.12.2"),
6+
scalaVersion := "2.12.4",
7+
crossScalaVersions := Seq("2.11.8", "2.12.4"),
88
credentials += Credentials(
99
"Sonatype Nexus Repository Manager",
1010
"oss.sonatype.org",
@@ -92,14 +92,16 @@ lazy val root = (project in file("."))
9292
.settings(
9393
name := "slab",
9494
libraryDependencies ++= Seq(
95-
"org.json4s" %% "json4s-native" % "3.4.2",
96-
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
97-
"org.mockito" % "mockito-core" % "2.7.0" % Test,
9895
"org.slf4j" % "slf4j-api" % "1.7.25",
99-
"com.criteo.lolhttp" %% "lolhttp" % "0.6.1",
96+
"org.json4s" %% "json4s-native" % "3.4.2",
97+
"com.criteo.lolhttp" %% "lolhttp" % "0.9.1",
10098
"com.github.cb372" %% "scalacache-core" % "0.9.4",
10199
"com.github.cb372" %% "scalacache-caffeine" % "0.9.4",
102-
"com.chuusai" %% "shapeless" % "2.3.2"
100+
"com.chuusai" %% "shapeless" % "2.3.2",
101+
"org.typelevel" %% "cats-core" % "1.0.1",
102+
"org.typelevel" %% "cats-effect" % "0.7",
103+
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
104+
"org.mockito" % "mockito-core" % "2.7.0" % Test
103105
)
104106
)
105107

example/src/main/scala/com/criteo/slab/example/GraphiteLauncher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object GraphiteLauncher {
1919
host <- sys.env.get("GRAPHITE_HOST")
2020
port <- sys.env.get("GRAPHITE_PORT").map(_.toInt)
2121
webHost <- sys.env.get("GRAPHITE_WEB_HOST")
22-
} yield new GraphiteStore(host, port, webHost, Duration.ofSeconds(60), Some("slab.example"))
22+
} yield new GraphiteStore(host, port, webHost, Duration.ofSeconds(60), Some("slab.example"), Some("slab.example.slo"))
2323
implicit val store = maybeStore match {
2424
case Some(s) =>
2525
logger.info("[Slab Example] using Graphite store")

example/src/main/scala/com/criteo/slab/example/Launcher.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.criteo.slab.example
55

66
import java.net.URLDecoder
77

8+
import cats.effect.IO
89
import com.criteo.slab.app.StateService.NotFoundError
910
import com.criteo.slab.app.WebServer
1011
import com.criteo.slab.lib.InMemoryStore
@@ -32,14 +33,16 @@ object Launcher {
3233
.withRoutes(stateService => {
3334
case GET at "/api/heartbeat" => Ok("ok")
3435
case GET at url"/api/boards/$board/status" =>
35-
stateService
36-
.current(URLDecoder.decode(board, "UTF-8")).map(view => Ok(view.status.name))
37-
.recover {
38-
case NotFoundError(message) => NotFound(message)
39-
case e =>
40-
logger.error(e.getMessage, e)
41-
InternalServerError()
42-
}
36+
IO.fromFuture(IO(
37+
stateService
38+
.current(URLDecoder.decode(board, "UTF-8")).map(view => Ok(view.status.name))
39+
.recover {
40+
case NotFoundError(message) => NotFound(message)
41+
case e =>
42+
logger.error(e.getMessage, e)
43+
InternalServerError()
44+
}
45+
))
4346
})
4447
// Attach a board to the server
4548
.attach(board)

src/main/scala/com/criteo/slab/app/WebServer.scala

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import java.net.URLDecoder
44
import java.text.DecimalFormat
55
import java.time.{Duration, Instant}
66

7+
import cats.Eval
8+
import cats.effect.IO
79
import com.criteo.slab.app.StateService.NotFoundError
810
import com.criteo.slab.core.Executor.{FetchBoardHistory, FetchBoardHourlySlo, RunBoard}
911
import com.criteo.slab.core._
@@ -16,7 +18,6 @@ import shapeless.HList
1618
import shapeless.poly.Case3
1719

1820
import scala.concurrent.{ExecutionContext, Future}
19-
import scala.util.Try
2021
import scala.util.control.NonFatal
2122

2223
/** Slab Web server
@@ -30,7 +31,7 @@ import scala.util.control.NonFatal
3031
case class WebServer(
3132
val pollingInterval: Int = 60,
3233
val statsDays: Int = 730,
33-
private val routeGenerator: StateService => PartialFunction[Request, Future[Response]] = _ => PartialFunction.empty,
34+
private val routeGenerator: StateService => PartialFunction[Request, IO[Response]] = _ => PartialFunction.empty,
3435
private val executors: List[Executor[_]] = List.empty
3536
)(implicit ec: ExecutionContext) {
3637
/**
@@ -71,7 +72,7 @@ case class WebServer(
7172
* @param generator A function that takes StateService and returns routes
7273
* @return Web server with the created routes
7374
*/
74-
def withRoutes(generator: StateService => PartialFunction[Request, Future[Response]]) = this.copy(routeGenerator = generator)
75+
def withRoutes(generator: StateService => PartialFunction[Request, IO[Response]]) = this.copy(routeGenerator = generator)
7576

7677
private val logger = LoggerFactory.getLogger(this.getClass)
7778

@@ -89,68 +90,80 @@ case class WebServer(
8990

9091
private lazy val boards = executors.map(_.board)
9192

92-
private val routes: PartialFunction[Request, Future[Response]] = {
93+
private val routes: PartialFunction[Request, IO[Response]] = {
9394
// Configs of boards
9495
case GET at url"/api/boards" => {
9596
Ok(boards.map { board => BoardConfig(board.title, board.layout, board.links, board.slo) }.toJSON).map(jsonContentType)
9697
}
9798
// Current board view
9899
case GET at url"/api/boards/$board" => {
99100
val boardName = URLDecoder.decode(board, "UTF-8")
100-
stateService
101-
.current(boardName)
102-
.map((_: ReadableView).toJSON)
103-
.map(Ok(_))
104-
.map(jsonContentType)
105-
.recoverWith(errorHandler)
101+
IO.fromFuture(IO(
102+
stateService
103+
.current(boardName)
104+
.map((_: ReadableView).toJSON)
105+
.map(Ok(_))
106+
.map(jsonContentType)
107+
.recover(errorHandler)
108+
))
106109
}
107110
// Snapshot of the given time point
108111
case GET at url"/api/boards/$board/snapshot/$timestamp" => {
109112
val boardName = URLDecoder.decode(board, "UTF-8")
110-
executors.find(_.board.title == boardName).fold(Future.successful(NotFound(s"Board $boardName does not exist"))) { executor =>
111-
Try(timestamp.toLong).map(Instant.ofEpochMilli).toOption.fold(
112-
Future.successful(BadRequest("invalid timestamp"))
113-
) { dateTime =>
114-
executor.apply(Some(Context(dateTime)))
115-
.map((_: ReadableView).toJSON)
116-
.map(Ok(_))
117-
.map(jsonContentType)
113+
executors.find(_.board.title == boardName).fold(IO(NotFound(s"Board $boardName does not exist"))) { executor =>
114+
IO(Instant.ofEpochMilli(timestamp.toLong)).attempt.flatMap {
115+
case Left(_) => IO(BadRequest("invalid timestamp"))
116+
case Right(dateTime) =>
117+
IO.fromFuture(IO(
118+
executor.apply(Some(Context(dateTime)))
119+
.map((_: ReadableView).toJSON)
120+
.map(Ok(_))
121+
.map(jsonContentType)
122+
.recover(errorHandler)
123+
))
118124
}
119-
}.recoverWith(errorHandler)
125+
}
120126
}
121127
// History of last 24 hours
122128
case GET at url"/api/boards/$board/history?last" => {
123129
val boardName = URLDecoder.decode(board, "UTF-8")
124-
stateService
125-
.history(boardName)
126-
.map(h => Ok(h.toJSON))
127-
.map(jsonContentType)
128-
.recoverWith(errorHandler)
130+
IO.fromFuture(IO(
131+
stateService
132+
.history(boardName)
133+
.map(h => Ok(h.toJSON))
134+
.map(jsonContentType)
135+
.recover(errorHandler)
136+
))
129137
}
130138
// History of the given range
131139
case GET at url"/api/boards/$board/history?from=$fromTS&until=$untilTS" => {
132140
val boardName = URLDecoder.decode(board, "UTF-8")
133-
executors.find(_.board.title == boardName).fold(Future.successful(NotFound(s"Board $boardName does not exist"))) { executor =>
134-
val range = for {
135-
from <- Try(fromTS.toLong).map(Instant.ofEpochMilli).toOption
136-
until <- Try(untilTS.toLong).map(Instant.ofEpochMilli).toOption
137-
} yield (from, until)
138-
range.fold(Future.successful(BadRequest("Invalid timestamp"))) { case (from, until) =>
139-
executor.fetchHistory(from, until)
140-
.map(_.toMap.mapValues(_.status.name).toJSON)
141-
.map(Ok(_))
142-
.map(jsonContentType)
141+
executors.find(_.board.title == boardName).fold(IO(NotFound(s"Board $boardName does not exist"))) { executor =>
142+
IO(
143+
Instant.ofEpochMilli(fromTS.toLong) -> Instant.ofEpochMilli(untilTS.toLong)
144+
).attempt.flatMap {
145+
case Left(_) => IO(BadRequest("Invalid timestamp"))
146+
case Right((from, until)) =>
147+
IO.fromFuture(IO(
148+
executor.fetchHistory(from, until)
149+
.map(_.toMap.mapValues(_.status.name).toJSON)
150+
.map(Ok(_))
151+
.map(jsonContentType)
152+
.recover(errorHandler)
153+
))
143154
}
144-
}.recoverWith(errorHandler)
155+
}
145156
}
146157
// Stats of the board
147158
case GET at url"/api/boards/$board/stats" => {
148159
val boardName = URLDecoder.decode(board, "UTF-8")
149-
stateService.stats(boardName)
150-
.map(_.mapValues(decimalFormat.format)).map(_.toJSON)
151-
.map(Ok(_))
152-
.map(jsonContentType)
153-
.recoverWith(errorHandler)
160+
IO.fromFuture(IO(
161+
stateService.stats(boardName)
162+
.map(_.mapValues(decimalFormat.format)).map(_.toJSON)
163+
.map(Ok(_))
164+
.map(jsonContentType)
165+
.recover(errorHandler)
166+
))
154167
}
155168
// Static resources
156169
case GET at url"/$file.$ext" => {
@@ -161,14 +174,14 @@ case class WebServer(
161174
}
162175
}
163176

164-
private def notFound: PartialFunction[Request, Future[Response]] = {
177+
private def notFound: PartialFunction[Request, IO[Response]] = {
165178
case anyReq => {
166179
logger.info(s"${anyReq.method.toString} ${anyReq.url} not found")
167180
Response(404)
168181
}
169182
}
170183

171-
private def errorHandler: PartialFunction[Throwable, Future[Response]] = {
184+
private def errorHandler: PartialFunction[Throwable, Response] = {
172185
case f: NotFoundError =>
173186
NotFound(f.message)
174187
case NonFatal(e) =>
@@ -178,7 +191,7 @@ case class WebServer(
178191

179192
private def jsonContentType(res: Response) = res.addHeaders(HttpString("content-type") -> HttpString("application/json"))
180193

181-
private def routeLogger(router: Request => Future[Response]) = (request: Request) => {
194+
private def routeLogger(router: Request => IO[Response]) = (request: Request) => {
182195
val start = Instant.now()
183196
router(request) map { res =>
184197
val duration = Duration.between(start, Instant.now)

src/main/scala/com/criteo/slab/utils/HttpUtils.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import java.net.{URL, URLEncoder}
44
import java.time.Instant
55
import java.util.concurrent.TimeUnit.SECONDS
66

7-
import lol.http.{Client, ContentDecoder, HttpString, Get, Response}
7+
import cats.effect.IO
8+
import lol.http.{Client, ContentDecoder, Get, HttpString, Response}
89
import org.slf4j.LoggerFactory
910

1011
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -39,12 +40,12 @@ object HttpUtils {
3940
val fullURL = s"${url.getProtocol}://${url.getHost}:${port}$path"
4041
logger.info(s"Requesting $fullURL")
4142
val start = Instant.now
42-
Client(url.getHost, port, url.getProtocol) runAndStop { client =>
43+
Client(url.getHost, port, url.getProtocol).runAndStop { client =>
4344
client.run(request, timeout = timeout) { res =>
4445
logger.info(s"Response from $fullURL, status: ${res.status}, ${Instant.now.toEpochMilli - start.toEpochMilli}ms")
4546
handleResponse(res, fullURL)
46-
} recoverWith handleError(fullURL)
47-
}
47+
}
48+
}.unsafeToFuture().recoverWith(handleError(fullURL))
4849
}
4950

5051
/** Make a HTTP Get client of which the connection is kept open
@@ -73,19 +74,21 @@ object HttpUtils {
7374

7475
private def encodeURI(in: String) = URLEncoder.encode(in, "UTF-8").replace("+", "%20")
7576

76-
private def handleResponse[A: ContentDecoder](res: Response, url: String)(implicit ec: ExecutionContext): Future[A] = {
77+
private def handleResponse[A: ContentDecoder](res: Response, url: String)(implicit ec: ExecutionContext): IO[A] = {
7778
if (res.status < 400)
7879
res.readAs[A]
7980
else
80-
res
81-
.readAs[String]
82-
.recoverWith { case e =>
83-
logger.error(e.getMessage, e)
84-
Future.successful("Unable to get the message")
81+
res.readAs[String].attempt
82+
.map {
83+
case Left(e) =>
84+
logger.error(e.getMessage, e)
85+
"Unable to get the message"
86+
case Right(message) =>
87+
message
8588
}
8689
.flatMap { message =>
8790
logger.info(s"Request to $url has failed, status: ${res.status}, message: $message")
88-
Future.failed(FailedRequestException(res))
91+
IO.raiseError(FailedRequestException(res))
8992
}
9093
}
9194

@@ -103,7 +106,7 @@ object HttpUtils {
103106
case class FailedRequestException(response: Response) extends Exception
104107

105108
/**
106-
* It be used to send GET requests using the same client
109+
* Send GET requests using the same HTTP client
107110
*
108111
* @param client The client
109112
* @param defaultHeaders Default request headers
@@ -122,7 +125,7 @@ object HttpUtils {
122125
client.run(request, timeout = timeout) { res: Response =>
123126
logger.info(s"Response from $fullURL, status: ${res.status}, ${Instant.now.toEpochMilli - start.toEpochMilli}ms")
124127
handleResponse(res, fullURL)
125-
} recoverWith handleError(fullURL)
128+
}.unsafeToFuture().recoverWith(handleError(fullURL))
126129
}
127130
}
128131

0 commit comments

Comments
 (0)