diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9b23d25..f242022 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [3.4.2] + scala: [3.6.4] java: [temurin@21] runs-on: ${{ matrix.os }} steps: @@ -65,7 +65,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [3.4.2] + scala: [3.6.4] java: [temurin@21] runs-on: ${{ matrix.os }} steps: @@ -85,12 +85,12 @@ jobs: - name: Setup sbt uses: sbt/setup-sbt@v1 - - name: Download target directories (3.4.2) + - name: Download target directories (3.6.4) uses: actions/download-artifact@v4 with: - name: target-${{ matrix.os }}-3.4.2-${{ matrix.java }} + name: target-${{ matrix.os }}-3.6.4-${{ matrix.java }} - - name: Inflate target directories (3.4.2) + - name: Inflate target directories (3.6.4) run: | tar xf targets.tar rm targets.tar diff --git a/build.sbt b/build.sbt index 6d9d563..7ac43fd 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ -lazy val circeVersion = "0.14.9" +lazy val circeVersion = "0.14.12" lazy val pekkoVersion = "1.0.3" -lazy val pekkoHttpVersion = "1.0.1" +lazy val pekkoHttpVersion = "1.1.0" ThisBuild / organization := "com.clever-cloud" ThisBuild / homepage := Some(url("https://github.com/clevercloud/warp10-scala-client")) @@ -14,7 +14,7 @@ ThisBuild / developers := List( ) ) ThisBuild / version := "2.1.0" -ThisBuild / scalaVersion := "3.4.2" +ThisBuild / scalaVersion := "3.6.4" ThisBuild / versionScheme := Some("early-semver") ThisBuild / scmInfo := Some( ScmInfo( @@ -23,17 +23,17 @@ ThisBuild / scmInfo := Some( ) ) ThisBuild / libraryDependencies ++= Seq( - "org.apache.commons" % "commons-text" % "1.10.0", + "org.apache.commons" % "commons-text" % "1.13.0", "org.apache.pekko" %% "pekko-actor" % pekkoVersion, "org.apache.pekko" %% "pekko-stream" % pekkoVersion, "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion, "io.circe" %% "circe-core" % circeVersion, "io.circe" %% "circe-generic" % circeVersion, "io.circe" %% "circe-parser" % circeVersion, - "ch.qos.logback" % "logback-classic" % "1.5.2", + "ch.qos.logback" % "logback-classic" % "1.5.18", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", - "org.specs2" %% "specs2-core" % "4.20.2" % Test, - "com.clever-cloud" %% "testcontainers-scala-warp10" % "2.1.0" % Test + "org.specs2" %% "specs2-core" % "4.21.0" % Test, + "com.clever-cloud" %% "testcontainers-scala-warp10" % "2.1.2" % Test ) ThisBuild / scalacOptions ++= Seq( "-deprecation", diff --git a/src/main/scala/com/clevercloud/warp10client/Fetcher.scala b/src/main/scala/com/clevercloud/warp10client/Fetcher.scala index eb2e294..1086539 100644 --- a/src/main/scala/com/clevercloud/warp10client/Fetcher.scala +++ b/src/main/scala/com/clevercloud/warp10client/Fetcher.scala @@ -16,13 +16,9 @@ import com.clevercloud.warp10client.models._ import com.clevercloud.warp10client.models.gts_module.GTS object Fetcher { - val log = Logger(LoggerFactory.getLogger("Fetcher")) + val log: Logger = Logger(LoggerFactory.getLogger("Fetcher")) - def fetch( - readToken: String - )(implicit - warpClientContext: WarpClientContext - ): Flow[Query[FetchRange], Future[Either[WarpException, Seq[GTS]]], NotUsed] = { + def fetch(readToken: String)(using warpClientContext: WarpClientContext): Flow[Query[FetchRange], Future[Either[WarpException, Seq[GTS]]], NotUsed] = { val uuid = UUID.randomUUID Flow[Query[FetchRange]] .map(query => fetchRequest(readToken, query)) @@ -39,7 +35,7 @@ object Fetcher { def fetchRequest( readToken: String, query: Query[FetchRange] - )(implicit + )(using warpClientContext: WarpClientContext ) = { log.debug(s"[FETCHER] sending ${warpClientContext.configuration.fetchUrl}?${query.serialize}") @@ -52,7 +48,7 @@ object Fetcher { def processResponse( httpResponse: HttpResponse - )(implicit + )(using warpClientContext: WarpClientContext ): Future[Either[WarpException, List[GTS]]] = { import warpClientContext._ @@ -81,22 +77,4 @@ object Fetcher { } } } - - def stringToGTSSeq: Flow[String, Seq[GTS], NotUsed] = { - Flow[String].map { data => - if (data.size > 0) { - log.debug(s"Data provided, let's parse them") - GTS.parse(data) match { - case Left(e) => { - log.error(s"Can't parse GTS due to: ${e.toString()}") - throw WarpException(s"Can't parse GTS due to: $e") - } - case Right(gtsList) => gtsList - } - } else { - log.debug(s"Empty data provided, let's return empty Seq()") - Seq() - } - } - } } diff --git a/src/main/scala/com/clevercloud/warp10client/Pusher.scala b/src/main/scala/com/clevercloud/warp10client/Pusher.scala index 03a3123..369aefc 100644 --- a/src/main/scala/com/clevercloud/warp10client/Pusher.scala +++ b/src/main/scala/com/clevercloud/warp10client/Pusher.scala @@ -1,30 +1,26 @@ package com.clevercloud.warp10client import java.util.UUID - import scala.concurrent.Future import scala.util.{ Failure, Success } - import org.apache.pekko import pekko.NotUsed import pekko.http.scaladsl.model._ import pekko.stream.scaladsl.Flow - -import org.apache.commons.lang3.StringEscapeUtils - +import org.apache.commons.text.StringEscapeUtils import com.clevercloud.warp10client.models.gts_module.GTS object Pusher { def push( writeToken: String - )(implicit + )(using warpClientContext: WarpClientContext ): Flow[GTS, Future[Either[WarpException, Unit]], NotUsed] = { val uuid = UUID.randomUUID Flow[GTS] - .map(gts => pushRequest(gts, writeToken)) - .map(request => (request -> uuid)) // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html + .map { gts => pushRequest(gts, writeToken) } + .map{ request => (request -> uuid) } // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html .via(warpClientContext.poolClientFlow) .filter({ case (_, key) => key == uuid }) .map({ case (responseTry, _) => responseTry }) @@ -34,15 +30,11 @@ object Pusher { } } - def pushSeq( - writeToken: String - )(implicit - warpClientContext: WarpClientContext - ): Flow[Seq[GTS], Future[Either[WarpException, Unit]], NotUsed] = { + def pushSeq(writeToken: String)(using warpClientContext: WarpClientContext): Flow[Seq[GTS], Future[Either[WarpException, Unit]], NotUsed] = { val uuid = UUID.randomUUID Flow[Seq[GTS]] - .map(gtsSeq => pushSeqRequest(gtsSeq, writeToken)) - .map(request => (request -> uuid)) // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html + .map { gtsSeq => pushSeqRequest(gtsSeq, writeToken) } + .map {request => (request -> uuid) } // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html .via(warpClientContext.poolClientFlow) .filter({ case (_, key) => key == uuid }) .map({ case (responseTry, _) => responseTry }) @@ -52,39 +44,21 @@ object Pusher { } } - def pushSeqRequest( - gtsSeq: Seq[GTS], - writeToken: String - )(implicit - warpClientContext: WarpClientContext - ) = { - HttpRequest( + def pushSeqRequest(gtsSeq: Seq[GTS], writeToken: String)(using warpClientContext: WarpClientContext) = HttpRequest( method = HttpMethods.POST, uri = warpClientContext.configuration.pushUrl, headers = List(`X-Warp10-Token`(writeToken)), entity = HttpEntity(gtsSeq.map(_.serialize).mkString("\n")) ) - } - def pushRequest( - gts: GTS, - writeToken: String - )(implicit - warpClientContext: WarpClientContext - ) = { - HttpRequest( + def pushRequest(gts: GTS, writeToken: String)(using warpClientContext: WarpClientContext) = HttpRequest( method = HttpMethods.POST, uri = warpClientContext.configuration.pushUrl, headers = List(`X-Warp10-Token`(writeToken)), entity = HttpEntity(gts.serialize) ) - } - def processResponse( - httpResponse: HttpResponse - )(implicit - warpClientContext: WarpClientContext - ): Future[Either[WarpException, Unit]] = { + def processResponse(httpResponse: HttpResponse)(using warpClientContext: WarpClientContext): Future[Either[WarpException, Unit]] = { import warpClientContext._ if (httpResponse.status == StatusCodes.OK) { diff --git a/src/main/scala/com/clevercloud/warp10client/Runner.scala b/src/main/scala/com/clevercloud/warp10client/Runner.scala index ca8d33c..65ce8c0 100644 --- a/src/main/scala/com/clevercloud/warp10client/Runner.scala +++ b/src/main/scala/com/clevercloud/warp10client/Runner.scala @@ -18,10 +18,7 @@ import scala.concurrent.Future object Runner { type WarpScript = String - def exec( - )(implicit - warpClientContext: WarpClientContext - ): Flow[WarpScript, String, NotUsed] = { + def exec()(implicit warpClientContext: WarpClientContext): Flow[WarpScript, String, NotUsed] = { val uuid = UUID.randomUUID Flow[WarpScript] .map(script => execRequest(script)) @@ -32,22 +29,13 @@ object Runner { .via(processResponseTry) } - private def execRequest( - script: WarpScript - )(implicit - warpClientContext: WarpClientContext - ) = { - HttpRequest( + private def execRequest(script: WarpScript)(using warpClientContext: WarpClientContext) = HttpRequest( method = HttpMethods.POST, uri = warpClientContext.configuration.execUrl, entity = HttpEntity(script) ) - } - def processResponseTry( - implicit - warpClientContext: WarpClientContext - ): Flow[Try[HttpResponse], String, NotUsed] = { + def processResponseTry(using warpClientContext: WarpClientContext): Flow[Try[HttpResponse], String, NotUsed] = { import warpClientContext._ Flow[Try[HttpResponse]].flatMapConcat { @@ -79,17 +67,14 @@ object Runner { } } - private def parseJson: Flow[String, Json, NotUsed] = { - Flow[String].map { s => + private def parseJson: Flow[String, Json, NotUsed] = Flow[String].map { s => parse(s) match { case Right(json) => json case Left(e) => throw WarpException(s"Error on parsing: $e") } } - } - def jsonToGTSSeq(): Flow[String, Seq[GTS], NotUsed] = { - Flow[String] + def jsonToGTSSeq(): Flow[String, Seq[GTS], NotUsed] = Flow[String] .via(parseJson) .map { json => json.hcursor.downArray } // warp response contains [[]] or [{}] so we drop an array level .map { array => // global array with all matching script @@ -104,61 +89,58 @@ object Runner { GTS( classname = `class`, labels = labels, - points = (series \\ "v").map { seriesContentArrays => // [[point_1], [point_2], ...] + points = (series \\ "v").flatMap { seriesContentArrays => // [[point_1], [point_2], ...] seriesContentArrays.asArray.get.map { point => // [point_i] - point.asArray.get match { // [timestamp, lat, lon, elev, value] is point's content - case Vector(timestamp: Json, value: Json) => { + point.asArray.getOrElse(Vector(0, 0, 0, 0, 0)) match { // [timestamp, lat, lon, elev, value] is point's content + case Vector(timestamp: Json, value: Json) => GTSPoint( timestamp.asNumber.get.toLong, None, None, GTSValue.parse(value) match { case Right(gtsPoint) => gtsPoint - case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") + case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") } ) - } - case Vector(timestamp: Json, elevation: Json, value: Json) => { + case Vector(timestamp: Json, elevation: Json, value: Json) => GTSPoint( timestamp.asNumber.get.toLong, None, elevation.asNumber.get.toLong, GTSValue.parse(value) match { case Right(gtsPoint) => gtsPoint - case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") + case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") } ) - } - case Vector(timestamp: Json, latitude: Json, longitude: Json, value: Json) => { + case Vector(timestamp: Json, latitude: Json, longitude: Json, value: Json) => GTSPoint( timestamp.asNumber.get.toLong, Some(Coordinates(latitude.asNumber.get.toDouble, longitude.asNumber.get.toDouble)), None, GTSValue.parse(value) match { case Right(gtsPoint) => gtsPoint - case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") + case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") } ) - } - case Vector(timestamp: Json, latitude: Json, longitude: Json, elevation: Json, value: Json) => { + case Vector(timestamp: Json, latitude: Json, longitude: Json, elevation: Json, value: Json) => GTSPoint( timestamp.asNumber.get.toLong, Some(Coordinates(latitude.asNumber.get.toDouble, longitude.asNumber.get.toDouble)), elevation.asNumber.get.toLong, GTSValue.parse(value) match { case Right(gtsPoint) => gtsPoint - case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") + case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.") } ) - } + + case Vector(_*) => ??? } } - }.toSeq.flatten + } ) } .toSeq } - } def jsonToStack(): Flow[String, Warp10Stack, NotUsed] = Flow[String].via(parseJson).map(Warp10Stack.apply) } diff --git a/src/main/scala/com/clevercloud/warp10client/Warp10Client.scala b/src/main/scala/com/clevercloud/warp10client/Warp10Client.scala index 17ef603..6a4e3aa 100644 --- a/src/main/scala/com/clevercloud/warp10client/Warp10Client.scala +++ b/src/main/scala/com/clevercloud/warp10client/Warp10Client.scala @@ -20,49 +20,40 @@ object WarpClient { host: String, port: Int, scheme: String = "http" - )(implicit + )(using warpConfiguration: WarpConfiguration, actorSystem: ActorSystem, actorMaterializer: Materializer - ): Warp10Client = { - if (scheme.equals("http")) { + ): Warp10Client = if (scheme.equals("http")) { WarpClient(Http().cachedHostConnectionPool[UUID](host, port)) } else { WarpClient(Http().cachedHostConnectionPoolHttps[UUID](host, port)) } - } def apply( poolClientFlow: PoolClientFlow - )(implicit + )(using warpConfiguration: WarpConfiguration, actorMaterializer: Materializer - ): Warp10Client = { - new Warp10Client( + ): Warp10Client = new Warp10Client( WarpClientContext( warpConfiguration, poolClientFlow, actorMaterializer ) ) - } - def closePool( - )(implicit - actorSystem: ActorSystem - ): Future[Unit] = { - Http().shutdownAllConnectionPools() - } + def closePool()(using actorSystem: ActorSystem): Future[Unit] = Http().shutdownAllConnectionPools() } class Warp10Client(warpContext: WarpClientContext) { import warpContext._ + given WarpClientContext = warpContext def fetch(readToken: String): Flow[Query[FetchRange], Future[Either[WarpException, Seq[GTS]]], NotUsed] = - Fetcher.fetch(readToken)(warpContext) + Fetcher.fetch(readToken) - def fetch(readToken: String, query: Query[FetchRange]): Future[Either[WarpException, Seq[GTS]]] = { - Source + def fetch(readToken: String, query: Query[FetchRange]): Future[Either[WarpException, Seq[GTS]]] = Source .single(query) .via(fetch(readToken)) .runWith( @@ -71,16 +62,14 @@ class Warp10Client(warpContext: WarpClientContext) { )((a, b) => a.flatMap(_ => b)) ) .flatten - } def pushSeq(writeToken: String): Flow[Seq[GTS], Future[Either[WarpException, Unit]], NotUsed] = - Pusher.pushSeq(writeToken)(warpContext) + Pusher.pushSeq(writeToken) def push(writeToken: String): Flow[GTS, Future[Either[WarpException, Unit]], NotUsed] = - Pusher.push(writeToken)(warpContext) + Pusher.push(writeToken) - def push(gts: GTS, writeToken: String): Future[Either[WarpException, Unit]] = { - Source + def push(gts: GTS, writeToken: String): Future[Either[WarpException, Unit]] = Source .single(gts) .via(push(writeToken)) .runWith( @@ -89,19 +78,16 @@ class Warp10Client(warpContext: WarpClientContext) { )((a, b) => a.flatMap(_ => b)) ) .flatten - } - def push(gtsSeq: Seq[GTS], writeToken: String, batchSize: Int = 100): Future[Either[WarpException, Unit]] = { - Source + def push(gtsSeq: Seq[GTS], writeToken: String, batchSize: Int = 100): Future[Either[WarpException, Unit]] = Source .fromIterator(() => gtsSeq.grouped(batchSize)) - .via(Pusher.pushSeq(writeToken)(warpContext)) + .via(Pusher.pushSeq(writeToken)) .runWith( Sink.fold[Future[Either[WarpException, Unit]], Future[Either[WarpException, Unit]]]( Future.successful(Right(())) )((a, b) => a.flatMap(_ => b)) ) .flatten - } def exec: Flow[WarpScript, Seq[GTS], NotUsed] = Runner.exec()(warpContext).via(jsonToGTSSeq()) def execStack: Flow[WarpScript, Warp10Stack, NotUsed] = Runner.exec()(warpContext).via(jsonToStack()) diff --git a/src/main/scala/com/clevercloud/warp10client/WarpClientUtils.scala b/src/main/scala/com/clevercloud/warp10client/WarpClientUtils.scala index d04c27d..c9c0b93 100644 --- a/src/main/scala/com/clevercloud/warp10client/WarpClientUtils.scala +++ b/src/main/scala/com/clevercloud/warp10client/WarpClientUtils.scala @@ -19,18 +19,14 @@ import scala.util.Try object WarpClientUtils { type PoolClientFlow = Flow[(HttpRequest, UUID), (Try[HttpResponse], UUID), ?] - def readAllDataBytes( - dataBytesSource: Source[ByteString, ?] - )(implicit + def readAllDataBytes(dataBytesSource: Source[ByteString, ?])(using actorMaterializer: Materializer ): Future[String] = { given executionContext: ExecutionContext = actorMaterializer.system.dispatcher dataBytesSource.runFold(ByteString.empty)({ case (seq, item) => seq ++ item }).map(_.decodeString("UTF-8")) } - def decode: Flow[ByteString, String, NotUsed] = { - Flow[ByteString].map(_.decodeString("UTF-8")) - } + def decode: Flow[ByteString, String, NotUsed] = Flow[ByteString].map(_.decodeString("UTF-8")) } case class WarpClientContext( @@ -47,11 +43,8 @@ case class WarpClientContext( case class WarpException(error: String, line: Option[Int] = None) extends Exception(error) object `X-Warp10-Token` { - - def apply(value: String): HttpHeader = { - HttpHeader.parse("X-Warp10-Token", value) match { + def apply(value: String): HttpHeader = HttpHeader.parse("X-Warp10-Token", value) match { case ParsingResult.Ok(httpHeader, _) => httpHeader case ParsingResult.Error(error) => throw WarpException(s"${error.summary}: ${error.detail}.") } - } } diff --git a/src/main/scala/com/clevercloud/warp10client/models/Warp10Stack.scala b/src/main/scala/com/clevercloud/warp10client/models/Warp10Stack.scala index d42f9f4..665bf56 100644 --- a/src/main/scala/com/clevercloud/warp10client/models/Warp10Stack.scala +++ b/src/main/scala/com/clevercloud/warp10client/models/Warp10Stack.scala @@ -4,18 +4,12 @@ import io.circe.{ Decoder, Json } case class Warp10Stack(json: Json) { - def head[T]( - implicit - d: Decoder[T] - ): Either[String, T] = get(0) + def head[T](using d: Decoder[T]): Either[String, T] = get(0) def size: Int = json.asArray.knownSize - def get[T]( - index: Int - )(implicit - d: Decoder[T] - ): Either[String, T] = json.asArray + def get[T](index: Int)(using d: Decoder[T]): Either[String, T] = json + .asArray .map(Right(_)) .getOrElse(Left("invalid stack")) .map { array => array.apply(index) } diff --git a/src/test/scala/GTSSpec.scala b/src/test/scala/com/clevercloud/warp10client/GTSSpec.scala similarity index 58% rename from src/test/scala/GTSSpec.scala rename to src/test/scala/com/clevercloud/warp10client/GTSSpec.scala index bba35c1..546f147 100644 --- a/src/test/scala/GTSSpec.scala +++ b/src/test/scala/com/clevercloud/warp10client/GTSSpec.scala @@ -1,5 +1,8 @@ -import com.clevercloud.warp10client.models.gts_module._ -import org.specs2._ +package com.clevercloud.warp10client + +import com.clevercloud.warp10client.models.gts_module.* +import org.specs2.* +import org.specs2.matcher.MatchResult import scala.collection.immutable.ListMap @@ -13,7 +16,7 @@ class GTSSpec extends Specification { Parse GTSMacroValue $g2 """ - val gts = GTSMacroValue( + val gts: GTSMacroValue = GTSMacroValue( "m", "macro", ListMap( @@ -28,8 +31,8 @@ class GTSSpec extends Specification { ) val serialized = ":m:macro:{'s' '12.12.12.12' 'i1' 10 'l' 2 'i2' 20 'd' 3.2 'b1' false 'b2' true}" - def g1 = gts.serialize must beEqualTo(serialized) + def g1: MatchResult[String] = gts.serialize must beEqualTo(serialized) // not implemented - def g2 = GTSValue.parse(serialized) must beAnInstanceOf[Left[_, _]] + def g2: MatchResult[Either[gts_errors.InvalidGTSPointFormat, GTSValue]] = GTSValue.parse(serialized) must beAnInstanceOf[Left[?, ?]] } diff --git a/src/test/scala/MockServer.scala b/src/test/scala/com/clevercloud/warp10client/MockServer.scala similarity index 79% rename from src/test/scala/MockServer.scala rename to src/test/scala/com/clevercloud/warp10client/MockServer.scala index 69568b1..0f3e8dd 100644 --- a/src/test/scala/MockServer.scala +++ b/src/test/scala/com/clevercloud/warp10client/MockServer.scala @@ -1,3 +1,5 @@ +package com.clevercloud.warp10client + import org.apache.pekko import org.apache.pekko.actor.ActorSystem import org.apache.pekko.http.scaladsl.Http @@ -14,22 +16,15 @@ object MockServer { val interface = "localhost" val port = 8888 - def handleRequest( - method: HttpMethod, - uri: Uri, - response: HttpResponse - // httpRequest: HttpRequest - )(implicit + def handleRequest(method: HttpMethod, uri: Uri, response: HttpResponse)(using system: ActorSystem ): Future[Http.ServerBinding] = { val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http().newServerAt(interface, port).connectionSource() - // val requestPath = httpRequest.uri.path.toString() - val requestHandler: HttpRequest => HttpResponse = { - case HttpRequest(method, uri, _, _, _) => // Uri.Path(`requestPath`) + case HttpRequest(method, uri, _, _, _) => response case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") diff --git a/src/test/scala/Warp10ClientSpec.scala b/src/test/scala/com/clevercloud/warp10client/Warp10ClientSpec.scala similarity index 91% rename from src/test/scala/Warp10ClientSpec.scala rename to src/test/scala/com/clevercloud/warp10client/Warp10ClientSpec.scala index f9d2fc9..938fddd 100644 --- a/src/test/scala/Warp10ClientSpec.scala +++ b/src/test/scala/com/clevercloud/warp10client/Warp10ClientSpec.scala @@ -1,3 +1,5 @@ +package com.clevercloud.warp10client + import java.time.* import java.util.UUID import scala.concurrent.{Await, ExecutionContext, Future} @@ -57,11 +59,10 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer { )(implicit actorMaterializer: Materializer, executionContext: ExecutionContext - ) = { - WarpClientContext( + ) = WarpClientContext( poolClientFlow = { Flow[(HttpRequest, UUID)].mapAsync(1) { - case (httpRequest, requestKey) => { + case (httpRequest, requestKey) => WarpClientUtils .readAllDataBytes(httpRequest.entity.dataBytes) .map { @@ -73,19 +74,17 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer { case _ => Success(HttpResponse(StatusCodes.NotImplemented)) } .map(httpResponse => (httpResponse, requestKey)) - } } }, actorMaterializer = actorMaterializer, configuration = warpConfiguration ) - } val wPushClient = new Warp10Client(pushContext()) val gtsPointSeq1: Seq[GTSPoint] = Seq(GTSPoint(Some(1.toLong), None, None, GTSLongValue(73346576))) val validSend_f: Future[Either[WarpException, Unit]] = wPushClient.push(GTS("testClass", Map.empty[String, String], gtsPointSeq1), writeToken) - def p1 = Await.result(validSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Right[?, ?]] + def p1: MatchResult[Either[WarpException, Unit]] = Await.result(validSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Right[?, ?]] val gtsPointSeq2: Seq[GTSPoint] = Seq(GTSPoint(None, None, None, GTSLongValue(7))) @@ -93,25 +92,20 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer { GTS("testFailClass{}", Map("label1" -> "dsfF3", "label2" -> "dsfg"), gtsPointSeq2), "invalid_write_token" ) - def p2 = Await.result(invalidTokenSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Left[?, ?]] + def p2: MatchResult[Either[WarpException, Unit]] = Await.result(invalidTokenSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Left[?, ?]] val gtsPointSeq4: Seq[GTSPoint] = Seq( - GTSPoint(Some(4.toLong), Some(Coordinates(1.0.toDouble, -0.1.toDouble)), Some(1.toLong), GTSStringValue("string")) + GTSPoint(Some(4.toLong), Some(Coordinates(1.0, -0.1)), Some(1.toLong), GTSStringValue("string")) ) val fullDataFieldSend_f: Future[Either[WarpException, Unit]] = wPushClient.push(GTS("testFullDataField", Map("lbl1" -> "test", "lbl2" -> "test"), gtsPointSeq4), writeToken) - def p4 = Await.result(fullDataFieldSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Right[?, ?]] + def p4: MatchResult[Either[WarpException, Unit]] = Await.result(fullDataFieldSend_f, Period(1000, MILLISECONDS)) must beAnInstanceOf[Right[?, ?]] // FETCH TESTS - private def fetchContext( - )(implicit - actorMaterializer: Materializer - ) = { - WarpClientContext( + private def fetchContext()(implicit actorMaterializer: Materializer) = WarpClientContext( poolClientFlow = Flow[(HttpRequest, UUID)].map { - case (httpRequest, requestKey) => { - ( + case (httpRequest, requestKey) => ( httpRequest.uri.rawQueryString match { // case Some(x) => println(x.toString) ; Success(HttpResponse(StatusCodes.OK)) // use it to debug case Some(query) if query.contains("selector=test") => { @@ -136,19 +130,18 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer { }, requestKey ) - } }, actorMaterializer = actorMaterializer, configuration = warpConfiguration ) - } + val wFetchClient = new Warp10Client(fetchContext()) val gtsPointForSeq: Seq[GTSPoint] = Seq( - GTSPoint(Some(1434590504.toLong), None, None, GTSDoubleValue(-0.6133061918698982.toDouble)), - GTSPoint(Some(1434590288.toLong), None, None, GTSDoubleValue(0.9228427144511169.toDouble)), - GTSPoint(Some(1434590072.toLong), None, None, GTSDoubleValue(-0.1301889411087915.toDouble)) + GTSPoint(Some(1434590504.toLong), None, None, GTSDoubleValue(-0.6133061918698982)), + GTSPoint(Some(1434590288.toLong), None, None, GTSDoubleValue(0.9228427144511169)), + GTSPoint(Some(1434590072.toLong), None, None, GTSDoubleValue(-0.1301889411087915)) ) val realSeq: Seq[GTS] = Seq( diff --git a/src/test/scala/Warp10StackTest.scala b/src/test/scala/com/clevercloud/warp10client/Warp10StackTest.scala similarity index 100% rename from src/test/scala/Warp10StackTest.scala rename to src/test/scala/com/clevercloud/warp10client/Warp10StackTest.scala diff --git a/src/test/scala/Warp10TestContainer.scala b/src/test/scala/com/clevercloud/warp10client/Warp10TestContainer.scala similarity index 94% rename from src/test/scala/Warp10TestContainer.scala rename to src/test/scala/com/clevercloud/warp10client/Warp10TestContainer.scala index 182bd21..a684827 100644 --- a/src/test/scala/Warp10TestContainer.scala +++ b/src/test/scala/com/clevercloud/warp10client/Warp10TestContainer.scala @@ -1,3 +1,5 @@ +package com.clevercloud.warp10client + import com.clevercloud.testcontainers.scala.Warp10Container import org.specs2.specification.BeforeAfterAll