@@ -6,9 +6,12 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
66import akka .http .scaladsl .model .{HttpRequest , Uri }
77import akka .http .scaladsl .unmarshalling .Unmarshal
88import akka .stream .ActorMaterializer
9- import cats .effect .IO
9+ import cats .effect .{Deferred , IO }
10+ import cats .effect .syntax .async
1011import cats .effect .unsafe .implicits .global
12+ import cats .implicits .toTraverseOps
1113import com .typesafe .scalalogging .Logger
14+ import fs2 .concurrent .SignallingRef
1215import ru .itclover .tsp .StreamSource .Row
1316import ru .itclover .tsp .core .{Incident , RawPattern }
1417import ru .itclover .tsp .{JdbcSource , KafkaSource , RowWithIdx , StreamSource }
@@ -17,7 +20,6 @@ import ru.itclover.tsp.dsl.PatternFieldExtractor
1720import ru .itclover .tsp .http .domain .input .{FindPatternsRequest , QueueableRequest }
1821import ru .itclover .tsp .http .protocols .RoutesProtocols
1922import ru .itclover .tsp .http .services .coordinator .CoordinatorService
20- import ru .itclover .tsp .http .services .streaming .MonitoringServiceModel .{JobDetails , Vertex , VertexMetrics }
2123import ru .itclover .tsp .streaming .io .{InputConf , JDBCInputConf , KafkaInputConf }
2224import ru .itclover .tsp .streaming .io .{JDBCOutputConf , KafkaOutputConf , OutputConf }
2325import ru .itclover .tsp .streaming .mappers .PatternsToRowMapper
@@ -35,7 +37,7 @@ import scala.concurrent.{Await, ExecutionContextExecutor, Future}
3537import scala .reflect .ClassTag
3638import scala .util .{Failure , Success , Try }
3739import collection .JavaConverters ._
38-
40+ import scala . collection . mutable . ListBuffer
3941
4042class QueueManagerService (id : String , blockingExecutionContext : ExecutionContextExecutor )(
4143 implicit executionContext : ExecutionContextExecutor ,
@@ -49,7 +51,6 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
4951 type TypedRequest = (QueueableRequest , String )
5052 type Request = FindPatternsRequest [RowWithIdx , Symbol , Any , Row ]
5153
52-
5354 case class Metric (id : String , value : String )
5455
5556 implicit val metricFmt = jsonFormat2(Metric .apply)
@@ -60,6 +61,8 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
6061 // log.warn(s"Recovering job queue: ${jobQueue.count} entries found")
6162 val jobQueue = mutable.Queue [TypedRequest ]()
6263
64+ val runningStreams = mutable.Map [String , SignallingRef [IO , Boolean ]]()
65+
6366 val isLocalhost : Boolean = true
6467
6568 val ex = new ScheduledThreadPoolExecutor (1 )
@@ -72,17 +75,15 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
7275
7376 def enqueue (r : Request ): Unit = {
7477 jobQueue.enqueue(
75- (r,
76- confClassTagToString(ClassTag (r.inputConf.getClass))
77- )
78- )
78+ (r, confClassTagToString(ClassTag (r.inputConf.getClass)))
79+ )
7980 log.info(s " Job ${r.uuid} enqueued. " )
8081 }
8182
8283 def confClassTagToString (ct : ClassTag [_]): String = ct.runtimeClass match {
83- case c if c.isAssignableFrom(classOf [JDBCInputConf ]) => " from-jdbc"
84+ case c if c.isAssignableFrom(classOf [JDBCInputConf ]) => " from-jdbc"
8485 case c if c.isAssignableFrom(classOf [KafkaInputConf ]) => " from-kafka"
85- case _ => " unknown"
86+ case _ => " unknown"
8687 }
8788
8889 def getQueuedJobs : Seq [QueueableRequest ] = jobQueue.map(_._1).toSeq
@@ -101,7 +102,9 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
101102 _ = log.info(" JDBC-to-JDBC: stream started" )
102103 } yield result
103104 resultOrErr match {
104- case Left (error) => log.error(s " Cannot run request. Reason: $error" )
105+ case Left (error) =>
106+ log.error(s " Cannot run request. Reason: $error" )
107+ CoordinatorService .notifyJobCompleted(uuid, Some (new Exception (error.toString)))
105108 case Right (_) => log.info(s " Stream successfully started! " )
106109 }
107110 }
@@ -124,7 +127,6 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
124127 }
125128 }
126129
127-
128130 /* def dequeueAndRun(slots: Int): Unit = {
129131 // TODO: Functional style
130132 var slotsRemaining = slots
@@ -204,24 +206,39 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
204206 CoordinatorService .notifyJobStarted(uuid)
205207
206208 // Run the streams (multiple sinks)
207- streams.foreach { stream =>
208- stream.compile.drain.unsafeRunAsync {
209- case Left (throwable) =>
210- log.error(s " Job $uuid failed: $throwable" )
211- CoordinatorService .notifyJobCompleted(uuid, Some (throwable))
212- case Right (_) =>
213- // success
214- log.info(s " Job $uuid finished " )
215- CoordinatorService .notifyJobCompleted(uuid, None )
216- }
217- }
209+ SignallingRef [IO , Boolean ](false )
210+ .flatMap { signal =>
211+ runningStreams(uuid) = signal
212+ streams
213+ .sequence
214+ .interruptWhen(signal)
215+ .compile
216+ .drain
217+ }
218+ .unsafeRunAsync {
219+ case Left (throwable) =>
220+ log.error(s " Job $uuid failed: $throwable" )
221+ CoordinatorService .notifyJobCompleted(uuid, Some (throwable))
222+ runningStreams.remove(uuid)
223+ case Right (_) =>
224+ // success
225+ log.info(s " Job $uuid finished " )
226+ CoordinatorService .notifyJobCompleted(uuid, None )
227+ runningStreams.remove(uuid)
228+ }
218229
219230 log.debug(" runStream finished" )
220231 Right (None )
221232 }
222233
223- def availableSlots : Future [Int ] = Future (32 )
234+ def stopStream (uuid : String ): Unit = runningStreams.get(uuid).map { signal =>
235+ log.info(s " Job $uuid stopped " )
236+ signal.set(true )
237+ }
238+
239+ def getRunningJobsIds : Seq [String ] = runningStreams.keys.toSeq
224240
241+ def availableSlots : Future [Int ] = Future (32 )
225242
226243 def onTimer (): Unit = {
227244 availableSlots.onComplete {
0 commit comments