Skip to content

Commit cf2eaac

Browse files
committed
PMM-347 coordinator support (nearly full), Scala 2.13
1 parent 42f2744 commit cf2eaac

File tree

26 files changed

+322
-68
lines changed

26 files changed

+322
-68
lines changed

build.sbt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ dockerUsername in Docker := Some("clovergrp")
77
dockerUpdateLatest := true
88
dockerAlias in Docker := dockerAlias.value.withTag(dockerAlias.value.tag.map(_.replace("+", "_")))
99

10-
// Flink currently does not work with Scala 2.12.8+
11-
scalaVersion in ThisBuild := "2.12.7"
10+
scalaVersion in ThisBuild := "2.13.8"
1211
resolvers in ThisBuild ++= Seq(
1312
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
1413
Resolver.mavenLocal,
@@ -217,7 +216,7 @@ unmanagedResourceDirectories in Compile -= (resourceDirectory in Compile).value
217216
// Kind projector
218217
resolvers += Resolver.sonatypeRepo("releases")
219218
//addCompilerPlugin("org.spire-math" %% "kind-projector" % Version.kindProjector)
220-
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
219+
//addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
221220

222221

223222
// Git-specific settings

core/src/main/scala/ru/itclover/tsp/core/PQueue.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ object PQueue {
7373
val buffer = scala.collection.mutable.ArrayBuffer.empty[IdxValue[T]]
7474
import scala.collection.convert.ImplicitConversionsToScala._
7575
buffer ++= queue.iterator()
76+
buffer.toSeq
7677
}
7778
override def size: Int = queue.size
7879

@@ -140,7 +141,7 @@ object PQueue {
140141
)
141142
override def clean(): PQueue[T] = this.copy(queue = queue.clean())
142143

143-
override def toSeq: Seq[IdxValue[T]] = queue.toSeq.view.map(x => x.map(_ => func(x)))
144+
override def toSeq: Seq[IdxValue[T]] = queue.toSeq.view.map(x => x.map(_ => func(x))).toSeq
144145

145146
override def rewindTo(newStart: Idx): PQueue[T] = this.copy(queue = queue.rewindTo(newStart))
146147
}

core/src/main/scala/ru/itclover/tsp/core/QueueUtils.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,26 @@ object QueueUtils {
99

1010
private val trueFunction = (_: Any) => true
1111

12-
def takeWhileFromQueue[A](queue: m.Queue[A])(predicate: A => Boolean = trueFunction): (m.Queue[A], m.Queue[A]) =
13-
if (predicate.eq(trueFunction)) (queue, m.Queue.empty)
12+
def takeWhileFromQueue[A](queue: m.ArrayDeque[A])(predicate: A => Boolean = trueFunction): (m.ArrayDeque[A], m.ArrayDeque[A]) =
13+
if (predicate.eq(trueFunction)) (queue, m.ArrayDeque.empty)
1414
else {
1515

1616
@tailrec
17-
def inner(result: m.Queue[A], q: m.Queue[A]): (m.Queue[A], m.Queue[A]) =
17+
def inner(result: m.ArrayDeque[A], q: m.ArrayDeque[A]): (m.ArrayDeque[A], m.ArrayDeque[A]) =
1818
q.headOption match {
19-
case Some(x) if predicate(x) => inner({ result.enqueue(x); result }, { q.dequeue(); q })
19+
case Some(x) if predicate(x) => inner({ result.append(x); result }, { q.removeHead(); q })
2020
case _ => (result, q)
2121
}
2222

23-
inner(m.Queue.empty, queue)
23+
inner(m.ArrayDeque.empty, queue)
2424
}
2525

2626
/**
2727
* Splits inner `q` at point idx, so all records with id < idx are in first returned queue, and all with id >= idx are in second.
2828
*/
29-
def splitAtIdx(q: m.Queue[(Idx, Time)], idx: Idx, marginToFirst: Boolean = false)(
29+
def splitAtIdx(q: m.ArrayDeque[(Idx, Time)], idx: Idx, marginToFirst: Boolean = false)(
3030
implicit ord: Order[Idx]
31-
): (m.Queue[(Idx, Time)], m.Queue[(Idx, Time)]) = {
31+
): (m.ArrayDeque[(Idx, Time)], m.ArrayDeque[(Idx, Time)]) = {
3232
takeWhileFromQueue(q) {
3333
if (marginToFirst) {
3434
case (idx1: Idx, _: Time) => ord.lteqv(idx1, idx)

core/src/main/scala/ru/itclover/tsp/core/Result.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package ru.itclover.tsp.core
22

3-
sealed trait Result[+A] {
3+
sealed trait Result[+A] extends Serializable {
44
private def get: A = this match {
55
case Fail => throw new RuntimeException("Illegal get on Fail")
66
case Succ(t) => t

core/src/main/scala/ru/itclover/tsp/core/aggregators/AccumPattern.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ case class AggregatorPState[InnerState, InnerOut, AState](
1919
innerState: InnerState,
2020
innerQueue: PQueue[InnerOut],
2121
astate: AState,
22-
indexTimeMap: m.Queue[(Idx, Time)]
22+
indexTimeMap: m.ArrayDeque[(Idx, Time)]
2323
)
2424

2525
abstract class AccumPattern[Event: IdxExtractor: TimeExtractor, InnerState, InnerOut, Out, AState <: AccumState[
@@ -39,7 +39,7 @@ abstract class AccumPattern[Event: IdxExtractor: TimeExtractor, InnerState, Inne
3939
): F[(AggregatorPState[InnerState, InnerOut, AState], PQueue[Out])] = {
4040

4141
val idxTimeMapWithNewEvents =
42-
event.foldLeft(state.indexTimeMap) { case (a, b) => a.enqueue(b.index -> b.time); a }
42+
event.foldLeft(state.indexTimeMap) { case (a, b) => a.append(b.index -> b.time); a }
4343

4444
inner
4545
.apply[F, Cont](state.innerState, state.innerQueue, event)
@@ -63,8 +63,8 @@ abstract class AccumPattern[Event: IdxExtractor: TimeExtractor, InnerState, Inne
6363
innerQueue: QI[InnerOut],
6464
accumState: AState,
6565
results: QI[Out],
66-
indexTimeMap: m.Queue[(Idx, Time)]
67-
): (QI[InnerOut], AState, QI[Out], m.Queue[(Idx, Time)]) = {
66+
indexTimeMap: m.ArrayDeque[(Idx, Time)]
67+
): (QI[InnerOut], AState, QI[Out], m.ArrayDeque[(Idx, Time)]) = {
6868
innerQueue.dequeueOption() match {
6969
case None => (innerQueue, accumState, results, indexTimeMap)
7070
case Some((iv @ IdxValue(start, end, _), updatedQueue)) =>
@@ -91,5 +91,5 @@ trait AccumState[In, Out, Self <: AccumState[In, Out, Self]] extends Product wit
9191
* @param idxValue - result from inner pattern.
9292
* @return Tuple of updated state and queue of results to be emitted from this pattern.
9393
*/
94-
def updated(window: Window, times: m.Queue[(Idx, Time)], idxValue: IdxValue[In]): (Self, QI[Out])
94+
def updated(window: Window, times: m.ArrayDeque[(Idx, Time)], idxValue: IdxValue[In]): (Self, QI[Out])
9595
}

core/src/main/scala/ru/itclover/tsp/core/aggregators/GroupPattern.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ case class GroupPattern[Event: IdxExtractor: TimeExtractor, S, T: Group](
3333
)
3434
}
3535

36-
case class GroupAccumState[T: Group](lastValue: Option[GroupAccumResult[T]], windowQueue: m.Queue[GroupAccumValue[T]])
36+
case class GroupAccumState[T: Group](lastValue: Option[GroupAccumResult[T]], windowQueue: m.ArrayDeque[GroupAccumValue[T]])
3737
extends AccumState[T, GroupAccumResult[T], GroupAccumState[T]] {
3838

3939
override def updated(
4040
window: Window,
41-
times: m.Queue[(Idx, Time)],
41+
times: m.ArrayDeque[(Idx, Time)],
4242
idxValue: IdxValue[T]
4343
): (GroupAccumState[T], QI[GroupAccumResult[T]]) = {
4444

@@ -57,9 +57,9 @@ case class GroupAccumState[T: Group](lastValue: Option[GroupAccumResult[T]], win
5757
window: Window,
5858
value: Result[T],
5959
lastValue: Option[GroupAccumResult[T]],
60-
windowQueue: m.Queue[GroupAccumValue[T]],
60+
windowQueue: m.ArrayDeque[GroupAccumValue[T]],
6161
outputQueue: QI[GroupAccumResult[T]]
62-
): (Option[GroupAccumResult[T]], m.Queue[GroupAccumValue[T]], QI[GroupAccumResult[T]]) = {
62+
): (Option[GroupAccumResult[T]], m.ArrayDeque[GroupAccumValue[T]], QI[GroupAccumResult[T]]) = {
6363
value
6464
.map { t =>
6565
val newLastValue = lastValue
@@ -77,7 +77,7 @@ case class GroupAccumState[T: Group](lastValue: Option[GroupAccumResult[T]], win
7777
}
7878

7979
// add new element to queue
80-
val finalWindowQueue = { updatedWindowQueue.enqueue(GroupAccumValue(idx, time, t)); updatedWindowQueue }
80+
val finalWindowQueue = { updatedWindowQueue.append(GroupAccumValue(idx, time, t)); updatedWindowQueue }
8181

8282
Tuple3(
8383
finalNewLastValue,

core/src/main/scala/ru/itclover/tsp/core/aggregators/PreviousValue.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ case class PreviousValue[Event: IdxExtractor: TimeExtractor, State, Out](
2727
case class PreviousValueAccumState[T](queue: QI[(Time, T)]) extends AccumState[T, T, PreviousValueAccumState[T]] {
2828
override def updated(
2929
window: Window,
30-
times: m.Queue[(Idx, Time)],
30+
times: m.ArrayDeque[(Idx, Time)],
3131
idxValue: IdxValue[T]
3232
): (PreviousValueAccumState[T], QI[T]) = {
3333
val (newQueue, newOutputQueue) =

core/src/main/scala/ru/itclover/tsp/core/aggregators/TimerPattern.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ case class TimerPattern[Event: IdxExtractor: TimeExtractor, S, T](
2626
}
2727

2828
case class TimerAccumState[T](
29-
windowQueue: m.Queue[(Idx, Time)],
29+
windowQueue: m.ArrayDeque[(Idx, Time)],
3030
lastEnd: (Idx, Time),
3131
lastValue: Result[T],
3232
eventsMaxGapMs: Long
@@ -35,7 +35,7 @@ case class TimerAccumState[T](
3535
@inline
3636
override def updated(
3737
window: Window,
38-
times: m.Queue[(Idx, Time)],
38+
times: m.ArrayDeque[(Idx, Time)],
3939
idxValue: IdxValue[T]
4040
): (TimerAccumState[T], QI[Boolean]) = {
4141

@@ -66,7 +66,7 @@ case class TimerAccumState[T](
6666
val end: Time = times.last._2 // time corresponding to the idxValue.end
6767

6868
// don't use ++ here, slow!
69-
val windowQueueWithNewPoints = times.foldLeft(windowQueue) { case (a, b) => a.enqueue(b); a }
69+
val windowQueueWithNewPoints = times.foldLeft(windowQueue) { case (a, b) => a.append(b); a }
7070

7171
// output fail on older points (before the end of the window)
7272
// but don't clean the whole queue
@@ -86,7 +86,7 @@ case class TimerAccumState[T](
8686
// if event chunk is shorter than the window, and the next window is sufficiently close,
8787
// then save it in the queue, and return empty state (since the later events can continue the window)
8888
if (cleanedWindowQueue.isEmpty && times.head._2.toMillis - lastEnd._2.toMillis < eventsMaxGapMs) {
89-
updatedWindowQueue.enqueue(failOutputs: _*)
89+
updatedWindowQueue.appendAll(failOutputs)
9090
(
9191
TimerAccumState(updatedWindowQueue, times.last, idxValue.value, eventsMaxGapMs),
9292
PQueue.empty[Boolean]

core/src/main/scala/ru/itclover/tsp/core/aggregators/TimestampsAdderPattern.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ protected case class TimestampAdderAccumState[T]() extends AccumState[T, Segment
2727
@inline
2828
override def updated(
2929
window: Window,
30-
times: m.Queue[(Idx, Time)],
30+
times: m.ArrayDeque[(Idx, Time)],
3131
idxValue: IdxValue[T]
3232
): (TimestampAdderAccumState[T], QI[Segment]) = {
3333
if (times.isEmpty) (this, PQueue.empty)

core/src/main/scala/ru/itclover/tsp/core/aggregators/WaitPattern.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ case class WaitPattern[Event: IdxExtractor: TimeExtractor, S, T](
2626

2727
// Here, head and last are guaranteed to work, so suppress warnings for them
2828
@SuppressWarnings(Array("org.wartremover.warts.TraversableOps"))
29-
case class WaitAccumState[T](windowQueue: m.Queue[(Idx, Time)], lastFail: Boolean, lastTime: (Idx, Time))
29+
case class WaitAccumState[T](windowQueue: m.ArrayDeque[(Idx, Time)], lastFail: Boolean, lastTime: (Idx, Time))
3030
extends AccumState[T, T, WaitAccumState[T]] {
3131

3232
/** This method is called for each IdxValue produced by inner patterns.
@@ -40,7 +40,7 @@ case class WaitAccumState[T](windowQueue: m.Queue[(Idx, Time)], lastFail: Boolea
4040
@inline
4141
override def updated(
4242
window: Window,
43-
times: m.Queue[(Idx, Time)],
43+
times: m.ArrayDeque[(Idx, Time)],
4444
idxValue: IdxValue[T]
4545
): (WaitAccumState[T], QI[T]) = {
4646

@@ -50,7 +50,7 @@ case class WaitAccumState[T](windowQueue: m.Queue[(Idx, Time)], lastFail: Boolea
5050
val end = if (idxValue.value.isFail) times.last._2.minus(window) else times.last._2
5151

5252
// don't use ++ here, slow!
53-
val windowQueueWithNewPoints = times.foldLeft(windowQueue) { case (a, b) => a.enqueue(b); a }
53+
val windowQueueWithNewPoints = times.foldLeft(windowQueue) { case (a, b) => a.append(b); a }
5454

5555
val cleanedWindowQueue = windowQueueWithNewPoints.dropWhile {
5656
case (_, t) => t < start

0 commit comments

Comments
 (0)