Skip to content

Commit 42f2744

Browse files
committed
PMM-337 coordinator support stub
1 parent 3c80f39 commit 42f2744

File tree

11 files changed

+62
-364
lines changed

11 files changed

+62
-364
lines changed

build.sbt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ lazy val core = project.in(file("core"))
152152
.settings(commonSettings)
153153
.settings(
154154
libraryDependencies ++= Library.scalaTest ++ Library.logging ++ Library.config ++ Library.cats
155-
++ Library.jol.map(_ % "test") ++ Library.arrowDeps
155+
++ Library.jol.map(_ % "test")
156156
)
157157

158158
lazy val config = project.in(file("config"))
@@ -176,7 +176,7 @@ lazy val http = project.in(file("http"))
176176
.settings(commonSettings)
177177
.settings(
178178
libraryDependencies ++= Library.scalaTest ++ Library.akka ++
179-
Library.akkaHttp ++ Library.logging ++ Library.swayDB
179+
Library.akkaHttp ++ Library.logging
180180
)
181181
.dependsOn(core, config, streaming, dsl)
182182

@@ -195,12 +195,12 @@ lazy val itValid = project.in(file("integration/correctness"))
195195
)
196196
.dependsOn(core, streaming, http, config)
197197

198-
lazy val itPerf = project.in(file("integration/performance"))
199-
.settings(commonSettings)
200-
.settings(
201-
libraryDependencies ++= Library.scalaTest ++ Library.dbDrivers ++ Library.testContainers ++ Library.logging
202-
)
203-
.dependsOn(itValid)
198+
//lazy val itPerf = project.in(file("integration/performance"))
199+
// .settings(commonSettings)
200+
// .settings(
201+
// libraryDependencies ++= Library.scalaTest ++ Library.dbDrivers ++ Library.testContainers ++ Library.logging
202+
// )
203+
// .dependsOn(itValid)
204204

205205

206206
/*** Other settings ***/

http/src/main/resources/application.conf

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,10 @@ http {
1010
port = 8080
1111
}
1212

13-
flink {
14-
max-parallelism = 3000
15-
16-
metrics {
17-
source = 0.numRecordsOut
18-
search = 0.Flat_Map.currentEventTs
19-
sink = 0.numRecordsOut
20-
}
21-
22-
monitoring {
23-
host = localhost
24-
port = 8081
25-
}
26-
27-
job-manager {
28-
host = localhost
29-
port = 8081
30-
}
31-
32-
checkpointing-interval = 10000
13+
coordinator {
14+
enabled = false
15+
host = localhost
16+
port = 8081
3317
}
3418

3519

http/src/main/scala/ru/itclover/tsp/http/HttpService.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,19 @@ trait HttpService extends RoutesProtocols {
2828

2929
val blockingExecutorContext: ExecutionContextExecutor
3030

31-
val reporting: Option[JobReporting]
32-
3331
private val configs = ConfigFactory.load()
3432
val isDebug = true
3533
val isHideExceptions = configs.getBoolean("general.is-hide-exceptions")
36-
val flinkMonitoringHost: String = getEnvVarOrConfig("FLINK_MONITORING_HOST", "flink.monitoring.host")
37-
val flinkMonitoringPort: Int = getEnvVarOrConfig("FLINK_MONITORING_PORT", "flink.monitoring.port").toInt
38-
val monitoringUri: Uri = s"http://$flinkMonitoringHost:$flinkMonitoringPort"
3934

4035
private val log = Logger[HttpService]
4136

4237
def composeRoutes: Reader[ExecutionContextExecutor, Route] = {
4338
log.debug("composeRoutes started")
4439

4540
val res = for {
46-
jobs <- JobsRoutes.fromExecutionContext(monitoringUri, reporting, blockingExecutorContext)
47-
monitoring <- MonitoringRoutes.fromExecutionContext(monitoringUri)
41+
jobs <- JobsRoutes.fromExecutionContext(blockingExecutorContext)
4842
validation <- ValidationRoutes.fromExecutionContext()
49-
} yield jobs ~ monitoring ~ validation
43+
} yield jobs ~ validation
5044

5145
log.debug("composeRoutes finished")
5246
res

http/src/main/scala/ru/itclover/tsp/http/Launcher.scala

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import java.net.URLDecoder
44
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
55
import akka.actor.ActorSystem
66
import akka.http.scaladsl.Http
7+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
78
import akka.stream.ActorMaterializer
89
import cats.implicits._
910
import ru.itclover.tsp.RowWithIdx
10-
import ru.itclover.tsp.http.routes.JobReporting
1111
import ru.itclover.tsp.http.services.queuing.QueueManagerService
12+
13+
import scala.concurrent.Future
14+
import scala.util.{Failure, Success}
1215
//import com.google.common.util.concurrent.ThreadFactoryBuilder
1316
import com.typesafe.config.ConfigFactory
1417
import com.typesafe.scalalogging.Logger
@@ -73,6 +76,25 @@ object Launcher extends App with HttpService {
7376
val bindingFuture = Http().bindAndHandle(route, host, port)
7477

7578
log.info(s"Service online at http://$host:$port/" + (if (isDebug) " in debug mode." else ""))
79+
val coordinator = getCoordinatorHostPort
80+
coordinator.map {
81+
case (enabled, host, port) => if (enabled) {
82+
val uri = s"http://$host:$port/register"
83+
log.warn(s"TSP coordinator connection enabled: connecting to $uri...")
84+
85+
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = uri))
86+
87+
responseFuture
88+
.onComplete {
89+
case Success(res) => {
90+
if (res.status.isFailure) sys.error(s"Error: TSP coordinator returned ${res.status}")
91+
}
92+
case Failure(ex) => sys.error(s"Cannot connect to $uri: $ex")
93+
}
94+
} else {
95+
log.warn("TSP coordinator connection disabled.")
96+
}
97+
}
7698

7799
if (configs.getBoolean("general.is-follow-input")) {
78100
log.info("Press RETURN to stop...")
@@ -90,36 +112,23 @@ object Launcher extends App with HttpService {
90112
}
91113
}
92114

93-
def getClusterHostPort: Either[String, (String, Int)] = {
94-
val host = getEnvVarOrConfig("FLINK_JOBMGR_HOST", "flink.job-manager.host")
95-
val portStr = getEnvVarOrConfig("FLINK_JOBMGR_PORT", "flink.job-manager.port")
115+
def getCoordinatorHostPort: Either[String, (Boolean, String, Int)] = {
116+
val enabledStr = getEnvVarOrConfig("COORDINATOR_ENABLED", "coordinator.enabled")
117+
val host = getEnvVarOrConfig("COORDINATOR_HOST", "coordinator.host")
118+
val portStr = getEnvVarOrConfig("COORDINATOR_PORT", "coordinator.port")
96119
val port = Either.catchNonFatal(portStr.toInt).left.map { ex: Throwable =>
97-
s"Cannot parse FLINK_JOBMGR_PORT ($portStr): ${ex.getMessage}"
120+
s"Cannot parse COORDINATOR_PORT ($portStr): ${ex.getMessage}"
98121
}
99-
port.map(p => (host, p))
100-
}
101-
102-
override val reporting: Option[JobReporting] = {
103-
val reportingEnabledConfig = getEnvVarOrNone("JOB_REPORTING_ENABLED").getOrElse("0")
104-
val reportingEnabled = reportingEnabledConfig match {
105-
case "0" | "false" | "off" | "no" => false
106-
case "1" | "true" | "on" | "yes" => true
107-
case _ => sys.error(s"JOB_REPORTING_ENABLED not set or set to a unsupported value: $reportingEnabledConfig")
108-
}
109-
110-
if (reportingEnabled) {
111-
val reportingBroker = getEnvVarOrNone("JOB_REPORTING_BROKER")
112-
.getOrElse(sys.error("Job reporting enabled, but JOB_REPORTING_BROKER not set"))
113-
val reportingTopic = getEnvVarOrNone("JOB_REPORTING_TOPIC")
114-
.getOrElse(sys.error("Job reporting enabled, but JOB_REPORTING_TOPIC not set"))
115-
log.warn(s"Job reporting enabled, sending to topic $reportingTopic on $reportingBroker")
116-
Some(JobReporting(reportingBroker, reportingTopic))
117-
} else {
118-
log.warn("Job reporting disabled")
119-
None
122+
val enabled = Either.catchNonFatal(enabledStr.toBoolean) match {
123+
case Left(ex) => {
124+
log.warn(s"Cannot parse COORDINATOR_ENABLED ($enabledStr), defaulting to false: ${ex.getMessage}")
125+
false
126+
}
127+
case Right(value) => value
120128
}
129+
port.map(p => (enabled, host, p))
121130
}
122131

123-
implicit val rep = reporting
124-
val queueManager = QueueManagerService.getOrCreate(monitoringUri, blockingExecutorContext)
132+
133+
val queueManager = QueueManagerService.getOrCreate("mgr", blockingExecutorContext)
125134
}

http/src/main/scala/ru/itclover/tsp/http/routes/JobsRoutes.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,22 @@ import ru.itclover.tsp.http.domain.output.SuccessfulResponse.ExecInfo
2323
import ru.itclover.tsp.http.domain.output._
2424
import ru.itclover.tsp.http.protocols.RoutesProtocols
2525
import ru.itclover.tsp.http.services.queuing.QueueManagerService
26-
import ru.itclover.tsp.http.services.streaming.FlinkMonitoringService
2726
import ru.itclover.tsp.streaming.io.{InputConf, JDBCInputConf, KafkaInputConf}
2827
import ru.itclover.tsp.streaming.io.{JDBCOutputConf, KafkaOutputConf, OutputConf}
2928
import ru.itclover.tsp.streaming.mappers._
3029

3130
import scala.concurrent.{ExecutionContextExecutor, Future}
3231
import ru.itclover.tsp.streaming.utils.ErrorsADT.{ConfigErr, Err, GenericRuntimeErr, RuntimeErr}
3332

34-
case class JobReporting(brokers: String, topic: String)
35-
3633
trait JobsRoutes extends RoutesProtocols {
3734
implicit val executionContext: ExecutionContextExecutor
3835
val blockingExecutionContext: ExecutionContextExecutor
3936
implicit val actorSystem: ActorSystem
4037
implicit val materializer: ActorMaterializer
4138
implicit val decoders = AnyDecodersInstances
4239

43-
val monitoringUri: Uri
44-
lazy val monitoring = FlinkMonitoringService(monitoringUri)
4540
val queueManager: QueueManagerService
4641

47-
implicit val reporting: Option[JobReporting]
48-
4942
private val log = Logger[JobsRoutes]
5043

5144
val route: Route =
@@ -72,9 +65,7 @@ object JobsRoutes {
7265

7366
private val log = Logger[JobsRoutes]
7467

75-
def fromExecutionContext(monitoringUrl: Uri,
76-
rep: Option[JobReporting],
77-
blocking: ExecutionContextExecutor)(
68+
def fromExecutionContext(blocking: ExecutionContextExecutor)(
7869
implicit as: ActorSystem,
7970
am: ActorMaterializer
8071
): Reader[ExecutionContextExecutor, Route] = {
@@ -87,11 +78,9 @@ object JobsRoutes {
8778
implicit val executionContext: ExecutionContextExecutor = execContext
8879
implicit val actorSystem = as
8980
implicit val materializer = am
90-
override val monitoringUri = monitoringUrl
91-
override val queueManager = QueueManagerService.getOrCreate(monitoringUrl, blocking)(
92-
execContext, as, am, AnyDecodersInstances, rep
81+
override val queueManager = QueueManagerService.getOrCreate("mgr", blocking)(
82+
execContext, as, am, AnyDecodersInstances
9383
)
94-
override val reporting: Option[JobReporting] = rep
9584
}.route
9685
}
9786

http/src/main/scala/ru/itclover/tsp/http/routes/MonitoringRoutes.scala

Lines changed: 0 additions & 115 deletions
This file was deleted.

http/src/main/scala/ru/itclover/tsp/http/services/kafka/ArrowPkg.scala

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)