Skip to content

Commit 4aa3401

Browse files
Feature/197 process all available (#198)
* Revert "#149: Remove ProcessAllAvailable" This reverts commit f86fcfb. * Keep default termination method * Don't revert formatting * Fix readme
1 parent 556c86c commit 4aa3401

File tree

6 files changed

+95
-15
lines changed

6 files changed

+95
-15
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ to identify which configuration options belong to a certain transformer instance
9999
| Property Name | Required | Description |
100100
| :--- | :---: | :--- |
101101
| `ingestor.spark.app.name` | Yes | User-defined name of the Spark application. See Spark property `spark.app.name` |
102+
| `ingestor.spark.termination.method` | No | Either `processAllAvailable` (stop query when no more messages are incoming) or `awaitTermination` (stop query on signal, e.g. Ctrl-C). Default: `awaitTermination`. See also [Combination of trigger and termination method](#combination-of-trigger-and-termination-method) |
102103
| `ingestor.spark.await.termination.timeout` | No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method `awaitTermination` |
103104

104105
#### Settings for built-in components
@@ -322,11 +323,13 @@ More on these options: https://docs.mongodb.com/spark-connector/current/configur
322323

323324
#### Behavior of Triggers
324325

325-
| Trigger (`writer.common.trigger.type`) | Timeout (`ingestor.spark.termination.timeout`) | Runtime | Details |
326+
| Trigger (`writer.common.trigger.type`) | Termination method (`ingestor.spark.termination.method`) | Runtime | Details |
326327
| :--- | :--- | :--- | :--- |
327-
| Once | No timeout | Limited | Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch. |
328-
| ProcessingTime | With timeout | Limited | Consumes data in micro-batches and only stops when the timeout is reached or the query is killed. |
329-
| ProcessingTime | No timeout | Long-running | Consumes data in micro-batches and only stops when the query is killed. |
328+
| Once | AwaitTermination or ProcessAllAvailable | Limited | Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch. |
329+
| Once | AwaitTermination with timeout | Limited | Same as above, but terminates at the timeout. If the timeout is reached before the micro-batch is processed, it won't be completed and no data will be committed. |
330+
| ProcessingTime | ProcessAllAvailable | Only long-running if topic continuously produces messages, otherwise limited | Consumes all available data in micro-batches and only stops when no new data arrives, i.e. when the available offsets are the same as in the previous micro-batch. Thus, it completely depends on the topic, if and when the query terminates. |
331+
| ProcessingTime | AwaitTermination with timeout | Limited | Consumes data in micro-batches and only stops when the timeout is reached or the query is killed. |
332+
| ProcessingTime | AwaitTermination | Long-running | Consumes data in micro-batches and only stops when the query is killed. |
330333

331334
- Note 1: The first micro-batch of the query will contain all available messages to consume and can therefore be quite large,
332335
even if the trigger `ProcessingTime` is configured, and regardless of what micro-batch interval is configured.

driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestor.scala

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@ import java.util.UUID
2020
import org.apache.commons.configuration2.Configuration
2121
import org.apache.logging.log4j.LogManager
2222
import org.apache.spark.sql.SparkSession
23+
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.{AwaitTermination, ProcessAllAvailable, TerminationMethod}
2324
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
2425
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
2526
import za.co.absa.hyperdrive.ingestor.api.utils.{ComponentFactoryUtil, ConfigUtils}
2627
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
2728
import za.co.absa.hyperdrive.shared.exceptions.{IngestionException, IngestionStartException}
2829

2930
import scala.util.control.NonFatal
31+
import scala.util.{Failure, Success}
3032

3133
/**
3234
* This object is responsible for running the ingestion job by using the components it
3335
* receives upon invocation.
3436
*/
3537
class SparkIngestor(val spark: SparkSession,
38+
val terminationMethod: TerminationMethod,
3639
val awaitTerminationTimeout: Option[Long],
3740
val conf: Configuration) {
3841

@@ -69,12 +72,18 @@ class SparkIngestor(val spark: SparkSession,
6972
}
7073

7174
try {
72-
awaitTerminationTimeout match {
73-
case Some(timeout) =>
74-
ingestionQuery.awaitTermination(timeout)
75+
terminationMethod match {
76+
case ProcessAllAvailable =>
77+
ingestionQuery.processAllAvailable()
7578
ingestionQuery.stop()
76-
case None =>
77-
ingestionQuery.awaitTermination()
79+
case AwaitTermination =>
80+
awaitTerminationTimeout match {
81+
case Some(timeout) =>
82+
ingestionQuery.awaitTermination(timeout)
83+
ingestionQuery.stop()
84+
case None =>
85+
ingestionQuery.awaitTermination()
86+
}
7887
}
7988
} catch {
8089
case NonFatal(e) =>
@@ -96,10 +105,26 @@ object SparkIngestor extends SparkIngestorAttributes {
96105
def apply(conf: Configuration): SparkIngestor = {
97106
ComponentFactoryUtil.validateConfiguration(conf, getProperties)
98107
val spark = getSparkSession(conf)
108+
val terminationMethod = getTerminationMethod(conf)
99109
val awaitTerminationTimeout = getAwaitTerminationTimeoutMs(conf)
100110

101-
logger.info(s"Creating ingestor: await termination timeout = '$awaitTerminationTimeout'")
102-
new SparkIngestor(spark, awaitTerminationTimeout, conf)
111+
logger.info(s"Creating ingestor: termination method = '$terminationMethod', " +
112+
s"await termination timeout = '$awaitTerminationTimeout'")
113+
new SparkIngestor(spark, terminationMethod, awaitTerminationTimeout, conf)
114+
}
115+
116+
private def getTerminationMethod(conf: Configuration): TerminationMethod = {
117+
ConfigUtils.getOrNone(KEY_TERMINATION_METHOD, conf) match {
118+
case Some(name) => parseTerminationMethod(name)
119+
case None => AwaitTermination
120+
}
121+
}
122+
123+
private def parseTerminationMethod(name: String) = {
124+
TerminationMethodEnum.of(name) match {
125+
case Failure(exception) => throw new IllegalArgumentException(s"Invalid value for $KEY_TERMINATION_METHOD", exception)
126+
case Success(value) => value
127+
}
103128
}
104129

105130
private def getAwaitTerminationTimeoutMs(conf: Configuration): Option[Long] = {

driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestorAttributes.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
package za.co.absa.hyperdrive.driver
1717

18+
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.{AwaitTermination, ProcessAllAvailable}
1819
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
1920

2021
trait SparkIngestorAttributes extends HasComponentAttributes {
2122
val keysPrefix = "ingestor.spark"
2223
val KEY_APP_NAME = s"$keysPrefix.app.name"
24+
val KEY_TERMINATION_METHOD = s"$keysPrefix.termination.method"
2325
val KEY_AWAIT_TERMINATION_TIMEOUT = s"$keysPrefix.await.termination.timeout"
2426

2527
override def getName: String = "Spark Ingestor"
@@ -28,7 +30,11 @@ trait SparkIngestorAttributes extends HasComponentAttributes {
2830

2931
override def getProperties: Map[String, PropertyMetadata] = Map(
3032
KEY_APP_NAME -> PropertyMetadata("Name of Spark application", None, required = true),
31-
KEY_AWAIT_TERMINATION_TIMEOUT -> PropertyMetadata("Await Termination: Timeout(ms)", Some("Stops query when timeout is reached."), required = false)
33+
KEY_TERMINATION_METHOD -> PropertyMetadata("Termination method",
34+
Some(s"Either '$ProcessAllAvailable' (stop when no more messages arrive) or '$AwaitTermination' (stop on signal)." +
35+
s" Default is '$ProcessAllAvailable'"), required = false),
36+
KEY_AWAIT_TERMINATION_TIMEOUT -> PropertyMetadata("Await Termination: Timeout(ms)", Some("Stops query when timeout is reached." +
37+
s" This option is only valid with termination method '$AwaitTermination'"), required = false)
3238
)
3339

3440
override def getExtraConfigurationPrefix: Option[String] = None
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.driver
17+
18+
import scala.util.{Failure, Success, Try}
19+
20+
private[hyperdrive] object TerminationMethodEnum {
21+
sealed abstract class TerminationMethod(val name: String) {
22+
override def toString: String = name
23+
}
24+
25+
case object ProcessAllAvailable extends TerminationMethod("ProcessAllAvailable")
26+
case object AwaitTermination extends TerminationMethod("AwaitTermination")
27+
val values = Seq(ProcessAllAvailable, AwaitTermination)
28+
29+
def of(name: String): Try[TerminationMethod] = {
30+
values.find(p => p.toString.toUpperCase == name.toUpperCase)
31+
.map(Success(_))
32+
.getOrElse(Failure(new IllegalArgumentException(s"No TerminationMethod with name $name exists")))
33+
}
34+
}

driver/src/test/scala/za/co/absa/hyperdrive/driver/TestSparkIngestor.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.mockito.Mockito._
2424
import org.scalatest.mockito.MockitoSugar
2525
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
2626
import za.co.absa.commons.spark.SparkTestBase
27+
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.AwaitTermination
2728
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
2829
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
2930
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
@@ -80,7 +81,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
8081
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
8182
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
8283
when(streamWriter.write(dataFrame)).thenReturn(streamingQuery)
83-
when(streamingQuery.awaitTermination).thenThrow(classOf[RuntimeException])
84+
when(streamingQuery.awaitTermination()).thenThrow(classOf[RuntimeException])
8485
assertThrows[IngestionException](sparkIngestor.ingest(streamReader, Seq(streamTransformer), streamWriter))
8586
}
8687

@@ -96,7 +97,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
9697
inOrderCheck.verify(streamReader).read(any[SparkSession])
9798
inOrderCheck.verify(streamTransformer).transform(dataFrame)
9899
inOrderCheck.verify(streamWriter).write(dataFrame)
99-
verify(streamingQuery).awaitTermination
100+
verify(streamingQuery).awaitTermination()
100101
}
101102

102103
it should "use the configured app name" in {
@@ -140,6 +141,15 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
140141
verify(streamingQuery).awaitTermination(eqTo(10000L))
141142
}
142143

144+
it should "throw if an invalid terminationMethod is configured" in {
145+
val config = new BaseConfiguration
146+
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
147+
config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", "non-existent")
148+
val throwable = intercept[IllegalArgumentException](SparkIngestor(config))
149+
150+
throwable.getMessage should include(SparkIngestor.KEY_TERMINATION_METHOD)
151+
}
152+
143153
it should "throw if a timeout is not a number" in {
144154
val config = new BaseConfiguration
145155
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")

driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
9494

9595
// Spark settings
9696
"ingestor.spark.app.name" -> "ingestor-app",
97+
"ingestor.spark.termination.method" -> "ProcessAllAvailable",
9798

9899
// Source(Kafka) settings
99100
"reader.kafka.topic" -> sourceTopic,
@@ -122,7 +123,8 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
122123

123124
// Sink(Kafka) settings
124125
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
125-
"writer.common.trigger.type" -> "Once",
126+
"writer.common.trigger.type" -> "ProcessingTime",
127+
"writer.common.trigger.processing.time" -> "1000",
126128
"writer.kafka.topic" -> destinationTopic,
127129
"writer.kafka.brokers" -> "${reader.kafka.brokers}"
128130
)

0 commit comments

Comments
 (0)