Skip to content

Commit 9f0fb28

Browse files
authored
Streams core: Eager processing of inner streams (#151)
Common streams is built around the concept of a stream of streams: `Stream[F, Stream[F, A]]`. The nesting of `Stream` is needed so the Source has a way to force the app to immediately checkpoint. This is needed e.g. at the end of a kinesis shard, or when kafka rebalances. We already allowed eager processing of inner streams under some circumstances, i.e. the app can start processing the next inner stream while the preceding inner stream is still finalizing. But we did not have eager processing under all circumstances. This PR allows the eager processing feature to work in more scenarios. It will have most impact on the kinesis source near a shard end, and in particular an app that works with timed windows, e.g. Lake Loader.
1 parent 5fee6cc commit 9f0fb28

File tree

2 files changed

+34
-31
lines changed

2 files changed

+34
-31
lines changed

modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/streams/internal/LowLevelSource.scala

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ private[streams] trait LowLevelSource[F[_], C] {
3838
/**
3939
* Provides a stream of stream of low level events
4040
*
41-
* The inner streams are processed one at a time, with clean separation before starting the next
42-
* inner stream. This is required e.g. for Kafka, where the end of a stream represents client
43-
* rebalancing.
41+
* The inner streams are processed sequentially, though processing may be interleaved during
42+
* finalization — the next inner stream can begin while the previous is still finalizing.
43+
*
44+
* A new inner stream is emitted when the source requires a hard boundary, e.g. for Kafka when a
45+
* client rebalance occurs, or for Kinesis when a shard ends.
4446
*
4547
* A new [[EventProcessor]] will be invoked for each inner stream
4648
*
@@ -107,35 +109,35 @@ private[streams] object LowLevelSource {
107109
def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] =
108110
for {
109111
stateRef <- Ref[F].of[InternalState](InternalState.Disconnected)
110-
} yield sourceAndAckImpl(source, stateRef)
112+
acksRef <- Ref[F].of(Map.empty[Unique.Token, C])
113+
} yield sourceAndAckImpl(source, stateRef, acksRef)
111114

112115
private def sourceAndAckImpl[F[_]: Async, C](
113116
source: LowLevelSource[F, C],
114-
stateRef: Ref[F, InternalState]
117+
stateRef: Ref[F, InternalState],
118+
acksRef: Ref[F, Map[Unique.Token, C]]
115119
): SourceAndAck[F] = new SourceAndAck[F] {
116120
def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing] = {
117-
val str = for {
121+
val sinks = EagerWindows.pipes { control: EagerWindows.Control[F] =>
122+
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control, source.debounceCheckpoints))
123+
}
124+
125+
val tokenedSources = for {
118126
s2 <- source.stream
119-
acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _))
120127
now <- Stream.eval(Sync[F].realTime)
121128
_ <- Stream.bracket(stateRef.set(InternalState.AwaitingUpstream(now)))(_ => stateRef.set(InternalState.Disconnected))
122-
} yield {
123-
val tokenedSources = s2
124-
.through(monitorLatency(config, stateRef))
125-
.through(tokened(acksRef))
126-
.through(windowed(config.windowing))
127-
128-
val sinks = EagerWindows.pipes { control: EagerWindows.Control[F] =>
129-
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control, source.debounceCheckpoints))
129+
} yield s2
130+
.through(monitorLatency(config, stateRef))
131+
.through(tokened(acksRef))
132+
.through(windowed(config.windowing))
133+
134+
tokenedSources.flatten
135+
.zip(sinks)
136+
.map { case (tokenedSource, sink) => sink(tokenedSource) }
137+
.parJoin(eagerness(config.windowing)) // so we start processing the next window while the previous window is still finishing up.
138+
.onFinalize {
139+
nackUnhandled(source.checkpointer, acksRef)
130140
}
131-
132-
tokenedSources
133-
.zip(sinks)
134-
.map { case (tokenedSource, sink) => sink(tokenedSource) }
135-
.parJoin(eagerness(config.windowing)) // so we start processing the next window while the previous window is still finishing up.
136-
}
137-
138-
str.flatten
139141
}
140142

141143
def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] =

modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/streams/internal/LowLevelSourceSpec.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,10 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
317317
Action.ProcessorStartedWindow("1970-01-01T00:01:00Z"),
318318
Action.ProcessorReceivedEvents("1970-01-01T00:01:00Z", List("25", "26", "27", "28", "29", "30", "31", "32")),
319319
Action.ProcessorReceivedEvents("1970-01-01T00:01:20Z", List("33", "34", "35", "36", "37", "38", "39", "40")),
320-
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"),
321-
Action.Checkpointed(List("25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40")),
322320
Action.ProcessorStartedWindow("1970-01-01T00:01:40Z"),
323321
Action.ProcessorReceivedEvents("1970-01-01T00:01:40Z", List("41", "42", "43", "44", "45", "46", "47", "48")),
322+
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"),
323+
Action.Checkpointed(List("25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40")),
324324
Action.ProcessorReceivedEvents("1970-01-01T00:02:00Z", List("49", "50", "51", "52", "53", "54", "55", "56")),
325325
Action.ProcessorReceivedEvents("1970-01-01T00:02:20Z", List("57", "58", "59", "60", "61", "62", "63", "64")),
326326
Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:25Z"),
@@ -355,10 +355,10 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
355355
Action.ProcessorStartedWindow("1970-01-01T00:02:40Z"),
356356
Action.ProcessorReceivedEvents("1970-01-01T00:02:40Z", List("65", "66", "67", "68", "69", "70", "71", "72")),
357357
Action.ProcessorReceivedEvents("1970-01-01T00:03:00Z", List("73", "74", "75", "76", "77", "78", "79", "80")),
358-
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"),
359-
Action.Checkpointed(List("65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80")),
360358
Action.ProcessorStartedWindow("1970-01-01T00:03:20Z"),
361359
Action.ProcessorReceivedEvents("1970-01-01T00:03:20Z", List("81", "82", "83", "84", "85", "86", "87", "88")),
360+
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"),
361+
Action.Checkpointed(List("65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80")),
362362
Action.ProcessorReceivedEvents("1970-01-01T00:03:40Z", List("89", "90", "91", "92", "93", "94", "95", "96")),
363363
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:55Z"),
364364
Action.Checkpointed(List("81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96"))
@@ -624,10 +624,10 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
624624
Action.ProcessorStartedWindow("1970-01-01T00:01:00Z"),
625625
Action.ProcessorReceivedEvents("1970-01-01T00:01:00Z", List("25", "26", "27", "28", "29", "30", "31", "32")),
626626
Action.ProcessorReceivedEvents("1970-01-01T00:01:20Z", List("33", "34", "35", "36", "37", "38", "39", "40")),
627-
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"),
628-
Action.Checkpointed(List("25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40")),
629627
Action.ProcessorStartedWindow("1970-01-01T00:01:40Z"),
630628
Action.ProcessorReceivedEvents("1970-01-01T00:01:40Z", List("41", "42", "43", "44", "45", "46", "47", "48")),
629+
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"),
630+
Action.Checkpointed(List("25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40")),
631631
Action.ProcessorReceivedEvents("1970-01-01T00:02:00Z", List("49", "50", "51", "52", "53", "54", "55", "56")),
632632
Action.ProcessorReceivedEvents("1970-01-01T00:02:20Z", List("57", "58", "59", "60", "61", "62", "63", "64")),
633633
Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:25Z"),
@@ -662,10 +662,10 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
662662
Action.ProcessorStartedWindow("1970-01-01T00:02:40Z"),
663663
Action.ProcessorReceivedEvents("1970-01-01T00:02:40Z", List("65", "66", "67", "68", "69", "70", "71", "72")),
664664
Action.ProcessorReceivedEvents("1970-01-01T00:03:00Z", List("73", "74", "75", "76", "77", "78", "79", "80")),
665-
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"),
666-
Action.Checkpointed(List("65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80")),
667665
Action.ProcessorStartedWindow("1970-01-01T00:03:20Z"),
668666
Action.ProcessorReceivedEvents("1970-01-01T00:03:20Z", List("81", "82", "83", "84", "85", "86", "87", "88")),
667+
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"),
668+
Action.Checkpointed(List("65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80")),
669669
Action.ProcessorReceivedEvents("1970-01-01T00:03:40Z", List("89", "90", "91", "92", "93", "94", "95", "96")),
670670
Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:55Z"),
671671
Action.Checkpointed(List("81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96"))
@@ -1058,6 +1058,7 @@ object LowLevelSourceSpec {
10581058
val start = IO.realTimeInstant.flatMap(t => ref.update(_ :+ Action.ProcessorStartedWindow(t.toString)))
10591059
val end = for {
10601060
t <- IO.realTimeInstant
1061+
_ <- IO.sleep(1.nanos) // Forces deterministic order of Actions
10611062
_ <- ref.update(_ :+ Action.ProcessorReachedEndOfWindow(t.toString))
10621063
_ <- IO.sleep(config.timeToFinalizeWindow)
10631064
tokens <- checkpoints.get

0 commit comments

Comments
 (0)