Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [3.4.2]
scala: [3.6.4]
java: [temurin@21]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -65,7 +65,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [3.4.2]
scala: [3.6.4]
java: [temurin@21]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -85,12 +85,12 @@ jobs:
- name: Setup sbt
uses: sbt/setup-sbt@v1

- name: Download target directories (3.4.2)
- name: Download target directories (3.6.4)
uses: actions/download-artifact@v4
with:
name: target-${{ matrix.os }}-3.4.2-${{ matrix.java }}
name: target-${{ matrix.os }}-3.6.4-${{ matrix.java }}

- name: Inflate target directories (3.4.2)
- name: Inflate target directories (3.6.4)
run: |
tar xf targets.tar
rm targets.tar
Expand Down
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lazy val circeVersion = "0.14.9"
lazy val circeVersion = "0.14.12"
lazy val pekkoVersion = "1.0.3"
lazy val pekkoHttpVersion = "1.0.1"
lazy val pekkoHttpVersion = "1.1.0"

ThisBuild / organization := "com.clever-cloud"
ThisBuild / homepage := Some(url("https://github.com/clevercloud/warp10-scala-client"))
Expand All @@ -14,7 +14,7 @@ ThisBuild / developers := List(
)
)
ThisBuild / version := "2.1.0"
ThisBuild / scalaVersion := "3.4.2"
ThisBuild / scalaVersion := "3.6.4"
ThisBuild / versionScheme := Some("early-semver")
ThisBuild / scmInfo := Some(
ScmInfo(
Expand All @@ -23,17 +23,17 @@ ThisBuild / scmInfo := Some(
)
)
ThisBuild / libraryDependencies ++= Seq(
"org.apache.commons" % "commons-text" % "1.10.0",
"org.apache.commons" % "commons-text" % "1.13.0",
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion,
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"ch.qos.logback" % "logback-classic" % "1.5.2",
"ch.qos.logback" % "logback-classic" % "1.5.18",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.specs2" %% "specs2-core" % "4.20.2" % Test,
"com.clever-cloud" %% "testcontainers-scala-warp10" % "2.1.0" % Test
"org.specs2" %% "specs2-core" % "4.21.0" % Test,
"com.clever-cloud" %% "testcontainers-scala-warp10" % "2.1.2" % Test
)
ThisBuild / scalacOptions ++= Seq(
"-deprecation",
Expand Down
30 changes: 4 additions & 26 deletions src/main/scala/com/clevercloud/warp10client/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ import com.clevercloud.warp10client.models._
import com.clevercloud.warp10client.models.gts_module.GTS

object Fetcher {
val log = Logger(LoggerFactory.getLogger("Fetcher"))
val log: Logger = Logger(LoggerFactory.getLogger("Fetcher"))

def fetch(
readToken: String
)(implicit
warpClientContext: WarpClientContext
): Flow[Query[FetchRange], Future[Either[WarpException, Seq[GTS]]], NotUsed] = {
def fetch(readToken: String)(using warpClientContext: WarpClientContext): Flow[Query[FetchRange], Future[Either[WarpException, Seq[GTS]]], NotUsed] = {
val uuid = UUID.randomUUID
Flow[Query[FetchRange]]
.map(query => fetchRequest(readToken, query))
Expand All @@ -39,7 +35,7 @@ object Fetcher {
def fetchRequest(
readToken: String,
query: Query[FetchRange]
)(implicit
)(using
warpClientContext: WarpClientContext
) = {
log.debug(s"[FETCHER] sending ${warpClientContext.configuration.fetchUrl}?${query.serialize}")
Expand All @@ -52,7 +48,7 @@ object Fetcher {

def processResponse(
httpResponse: HttpResponse
)(implicit
)(using
warpClientContext: WarpClientContext
): Future[Either[WarpException, List[GTS]]] = {
import warpClientContext._
Expand Down Expand Up @@ -81,22 +77,4 @@ object Fetcher {
}
}
}

def stringToGTSSeq: Flow[String, Seq[GTS], NotUsed] = {
Flow[String].map { data =>
if (data.size > 0) {
log.debug(s"Data provided, let's parse them")
GTS.parse(data) match {
case Left(e) => {
log.error(s"Can't parse GTS due to: ${e.toString()}")
throw WarpException(s"Can't parse GTS due to: $e")
}
case Right(gtsList) => gtsList
}
} else {
log.debug(s"Empty data provided, let's return empty Seq()")
Seq()
}
}
}
}
46 changes: 10 additions & 36 deletions src/main/scala/com/clevercloud/warp10client/Pusher.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
package com.clevercloud.warp10client

import java.util.UUID

import scala.concurrent.Future
import scala.util.{ Failure, Success }

import org.apache.pekko
import pekko.NotUsed
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.Flow

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.commons.text.StringEscapeUtils
import com.clevercloud.warp10client.models.gts_module.GTS

object Pusher {

def push(
writeToken: String
)(implicit
)(using
warpClientContext: WarpClientContext
): Flow[GTS, Future[Either[WarpException, Unit]], NotUsed] = {
val uuid = UUID.randomUUID
Flow[GTS]
.map(gts => pushRequest(gts, writeToken))
.map(request => (request -> uuid)) // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html
.map { gts => pushRequest(gts, writeToken) }
.map{ request => (request -> uuid) } // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html
.via(warpClientContext.poolClientFlow)
.filter({ case (_, key) => key == uuid })
.map({ case (responseTry, _) => responseTry })
Expand All @@ -34,15 +30,11 @@ object Pusher {
}
}

def pushSeq(
writeToken: String
)(implicit
warpClientContext: WarpClientContext
): Flow[Seq[GTS], Future[Either[WarpException, Unit]], NotUsed] = {
def pushSeq(writeToken: String)(using warpClientContext: WarpClientContext): Flow[Seq[GTS], Future[Either[WarpException, Unit]], NotUsed] = {
val uuid = UUID.randomUUID
Flow[Seq[GTS]]
.map(gtsSeq => pushSeqRequest(gtsSeq, writeToken))
.map(request => (request -> uuid)) // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html
.map { gtsSeq => pushSeqRequest(gtsSeq, writeToken) }
.map {request => (request -> uuid) } // cf. https://doc.pekko.io/docs/pekko-http/current/client-side/host-level.html
.via(warpClientContext.poolClientFlow)
.filter({ case (_, key) => key == uuid })
.map({ case (responseTry, _) => responseTry })
Expand All @@ -52,39 +44,21 @@ object Pusher {
}
}

def pushSeqRequest(
gtsSeq: Seq[GTS],
writeToken: String
)(implicit
warpClientContext: WarpClientContext
) = {
HttpRequest(
def pushSeqRequest(gtsSeq: Seq[GTS], writeToken: String)(using warpClientContext: WarpClientContext) = HttpRequest(
method = HttpMethods.POST,
uri = warpClientContext.configuration.pushUrl,
headers = List(`X-Warp10-Token`(writeToken)),
entity = HttpEntity(gtsSeq.map(_.serialize).mkString("\n"))
)
}

def pushRequest(
gts: GTS,
writeToken: String
)(implicit
warpClientContext: WarpClientContext
) = {
HttpRequest(
def pushRequest(gts: GTS, writeToken: String)(using warpClientContext: WarpClientContext) = HttpRequest(
method = HttpMethods.POST,
uri = warpClientContext.configuration.pushUrl,
headers = List(`X-Warp10-Token`(writeToken)),
entity = HttpEntity(gts.serialize)
)
}

def processResponse(
httpResponse: HttpResponse
)(implicit
warpClientContext: WarpClientContext
): Future[Either[WarpException, Unit]] = {
def processResponse(httpResponse: HttpResponse)(using warpClientContext: WarpClientContext): Future[Either[WarpException, Unit]] = {
import warpClientContext._

if (httpResponse.status == StatusCodes.OK) {
Expand Down
54 changes: 18 additions & 36 deletions src/main/scala/com/clevercloud/warp10client/Runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import scala.concurrent.Future
object Runner {
type WarpScript = String

def exec(
)(implicit
warpClientContext: WarpClientContext
): Flow[WarpScript, String, NotUsed] = {
def exec()(implicit warpClientContext: WarpClientContext): Flow[WarpScript, String, NotUsed] = {
val uuid = UUID.randomUUID
Flow[WarpScript]
.map(script => execRequest(script))
Expand All @@ -32,22 +29,13 @@ object Runner {
.via(processResponseTry)
}

private def execRequest(
script: WarpScript
)(implicit
warpClientContext: WarpClientContext
) = {
HttpRequest(
private def execRequest(script: WarpScript)(using warpClientContext: WarpClientContext) = HttpRequest(
method = HttpMethods.POST,
uri = warpClientContext.configuration.execUrl,
entity = HttpEntity(script)
)
}

def processResponseTry(
implicit
warpClientContext: WarpClientContext
): Flow[Try[HttpResponse], String, NotUsed] = {
def processResponseTry(using warpClientContext: WarpClientContext): Flow[Try[HttpResponse], String, NotUsed] = {
import warpClientContext._

Flow[Try[HttpResponse]].flatMapConcat {
Expand Down Expand Up @@ -79,17 +67,14 @@ object Runner {
}
}

private def parseJson: Flow[String, Json, NotUsed] = {
Flow[String].map { s =>
private def parseJson: Flow[String, Json, NotUsed] = Flow[String].map { s =>
parse(s) match {
case Right(json) => json
case Left(e) => throw WarpException(s"Error on parsing: $e")
}
}
}

def jsonToGTSSeq(): Flow[String, Seq[GTS], NotUsed] = {
Flow[String]
def jsonToGTSSeq(): Flow[String, Seq[GTS], NotUsed] = Flow[String]
.via(parseJson)
.map { json => json.hcursor.downArray } // warp response contains [[]] or [{}] so we drop an array level
.map { array => // global array with all matching script
Expand All @@ -104,61 +89,58 @@ object Runner {
GTS(
classname = `class`,
labels = labels,
points = (series \\ "v").map { seriesContentArrays => // [[point_1], [point_2], ...]
points = (series \\ "v").flatMap { seriesContentArrays => // [[point_1], [point_2], ...]
seriesContentArrays.asArray.get.map { point => // [point_i]
point.asArray.get match { // [timestamp, lat, lon, elev, value] is point's content
case Vector(timestamp: Json, value: Json) => {
point.asArray.getOrElse(Vector(0, 0, 0, 0, 0)) match { // [timestamp, lat, lon, elev, value] is point's content
case Vector(timestamp: Json, value: Json) =>
GTSPoint(
timestamp.asNumber.get.toLong,
None,
None,
GTSValue.parse(value) match {
case Right(gtsPoint) => gtsPoint
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
}
)
}
case Vector(timestamp: Json, elevation: Json, value: Json) => {
case Vector(timestamp: Json, elevation: Json, value: Json) =>
GTSPoint(
timestamp.asNumber.get.toLong,
None,
elevation.asNumber.get.toLong,
GTSValue.parse(value) match {
case Right(gtsPoint) => gtsPoint
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
}
)
}
case Vector(timestamp: Json, latitude: Json, longitude: Json, value: Json) => {
case Vector(timestamp: Json, latitude: Json, longitude: Json, value: Json) =>
GTSPoint(
timestamp.asNumber.get.toLong,
Some(Coordinates(latitude.asNumber.get.toDouble, longitude.asNumber.get.toDouble)),
None,
GTSValue.parse(value) match {
case Right(gtsPoint) => gtsPoint
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
}
)
}
case Vector(timestamp: Json, latitude: Json, longitude: Json, elevation: Json, value: Json) => {
case Vector(timestamp: Json, latitude: Json, longitude: Json, elevation: Json, value: Json) =>
GTSPoint(
timestamp.asNumber.get.toLong,
Some(Coordinates(latitude.asNumber.get.toDouble, longitude.asNumber.get.toDouble)),
elevation.asNumber.get.toLong,
GTSValue.parse(value) match {
case Right(gtsPoint) => gtsPoint
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
case Left(e) => GTSStringValue(s"${e.toString}: ${value.toString}.")
}
)
}

case Vector(_*) => ???
}
}
}.toSeq.flatten
}
)
}
.toSeq
}
}

def jsonToStack(): Flow[String, Warp10Stack, NotUsed] = Flow[String].via(parseJson).map(Warp10Stack.apply)
}
Loading