Skip to content

Commit 965de82

Browse files
committed
Fixed TopicSourceParquetSink
- throwing exception in doUpdateDf in TopicSourceParquetSink if empty df so we can retry to read the parquet file (in SinkData) - ConfluentSparkAvroUtils factory moved to appropriate file
1 parent 5953727 commit 965de82

File tree

5 files changed

+40
-37
lines changed

5 files changed

+40
-37
lines changed

src/main/scala/com/databricks/spark/avro/ConfluentSparkAvroUtils.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import scala.collection.JavaConverters._
3535
import scalaz.Memo
3636
import play.api.libs.json.Json
3737

38+
import scala.collection.mutable
3839
import scala.util.Try
3940

4041
/**
@@ -128,4 +129,21 @@ class ConfluentSparkAvroUtils(schemaRegistryURLs: String) extends Serializable {
128129
schemaAndType =>
129130
SchemaConverters.createConverterToSQL(schemaAndType._1, schemaAndType._2)
130131
}
131-
}
132+
}
133+
134+
/**
135+
* Factory for [[ConfluentSparkAvroUtils]].
136+
*/
137+
object ConfluentSparkAvroUtils {
138+
139+
val avroRegistries: mutable.Map[String, ConfluentSparkAvroUtils] =
140+
mutable.Map[String, ConfluentSparkAvroUtils]()
141+
142+
def apply(schemaRegistryURL: String): ConfluentSparkAvroUtils = {
143+
avroRegistries
144+
.getOrElseUpdate(
145+
schemaRegistryURL,
146+
new ConfluentSparkAvroUtils(schemaRegistryURL)
147+
)
148+
}
149+
}

src/main/scala/com/haufe/umantis/ds/sources/kafka/KafkaDeserializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class KafkaDeserializer(conf: TopicConf) {
2828

2929
lazy val avroUtils: Option[ConfluentSparkAvroUtils] =
3030
conf.kafkaConf.schemaRegistryURL match {
31-
case Some(url) => Some(TopicSourceParquetSink.getAvroUtils(url))
31+
case Some(url) => Some(ConfluentSparkAvroUtils(url))
3232
case _ => None
3333
}
3434

src/main/scala/com/haufe/umantis/ds/sources/kafka/SinkData.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,16 @@ trait SinkData extends Source {
4646
log("Updating DataFrame")
4747
// Thread.sleep(100)
4848

49-
sink match {
50-
case Some(_) =>
51-
(1 to 30).foreach(retryNr => {
52-
log(s"Reading Fresh Data Try # $retryNr")
53-
54-
try{
55-
return doUpdateDf()
56-
} catch {
57-
case _: AnalysisException =>
58-
Thread.sleep(1000)
59-
}
60-
})
61-
case _ =>
62-
// in case isReadOnly == true
49+
(1 to 30).foreach(retryNr => {
50+
log(s"Reading Fresh Data Try # $retryNr")
51+
52+
try{
6353
return doUpdateDf()
64-
}
54+
} catch {
55+
case _: AnalysisException => Thread.sleep(1000)
56+
case _: NoSuchElementException => Thread.sleep(1000)
57+
}
58+
})
6559

6660
throw new KafkaTopicNotAvailableException(
6761
s"Kafka topic ${conf.kafkaTopic.topic} not ready!")

src/main/scala/com/haufe/umantis/ds/sources/kafka/TopicSourceParquetSink.scala

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ extends TopicSourceSink(conf)
7474

7575
outputSchema = sourceDf.schema
7676

77+
sourceDf.printSchema()
78+
7779
val s = sourceDf
7880
.writeStream
7981
.outputMode("append")
@@ -181,11 +183,18 @@ extends TopicSourceSink(conf)
181183
.sql(s"select * from parquet.`$fname`")
182184
.toDF()
183185
} else {
184-
currentSparkSession
186+
val df = currentSparkSession
185187
.read
186188
.schema(outputSchema)
187189
.format("parquet")
188190
.load(fname)
191+
192+
// if the df is empty (because no data has been written yet)
193+
// we want to trigger an exception (caught in SinkData.updateDf()
194+
// so that we can retry to read the df
195+
df.head()
196+
197+
df
189198
}
190199
.repartition(conf.sinkConf.numPartitions)
191200

@@ -208,24 +217,6 @@ extends TopicSourceSink(conf)
208217
}
209218
}
210219

211-
/**
212-
* Factory for [[TopicSourceParquetSink]].
213-
*/
214-
object TopicSourceParquetSink {
215-
216-
val avroRegistries: mutable.Map[String, ConfluentSparkAvroUtils] =
217-
mutable.Map[String, ConfluentSparkAvroUtils]()
218-
219-
def getAvroUtils(schemaRegistryURL: String): ConfluentSparkAvroUtils = {
220-
avroRegistries
221-
.getOrElseUpdate(
222-
schemaRegistryURL,
223-
new ConfluentSparkAvroUtils(schemaRegistryURL)
224-
)
225-
}
226-
}
227-
228-
229220

230221
/**
231222
* Exception thrown if a Kafka topic is not available.

src/test/scala/com/haufe/umantis/ds/sources/kafka/TopicSourceEventSourcingSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ trait TopicSourceEventSourcingSpec
8888
).pipeline
8989
)
9090
}
91-
def sinkConf = SinkConf(transformationFunction, 1 /* seconds */, 4 /* num partitions */)
91+
def sinkConf = SinkConf(transformationFunction, refreshTime = 1 /* seconds */, numPartitions = 4)
9292
def conf = TopicConf(kafkaConf, topicName, sinkConf)
9393
def ts: TopicSourceSink
9494

0 commit comments

Comments
 (0)