Skip to content

Commit aa96a6c

Browse files
Iss375 observability (#418)
* basic quarantine logic for failed CLI Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * cleanup Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * scalafmt --signoff Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * scalastyle Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * updates/cleanup for comments to PR Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * tests and cleanup Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * sbt file needed formatting Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * merge and updates for scalastyle Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * delete temp files Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * a few more lines of coverage Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com> * test scalafmt Signed-off-by: Marcus A. Henry, Jr <marcus.henry@databricks.com>
1 parent 484c886 commit aa96a6c

File tree

5 files changed

+171
-25
lines changed

5 files changed

+171
-25
lines changed

build.sbt

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ ThisBuild / installHail := {
231231
"conda create -y --name hail &&" +
232232
"conda activate hail --stack &&" +
233233
"cd \"hail/hail\" &&" +
234-
"sed " + "\""+ s"s/^pyspark.*/pyspark==${sparkVersion.value}/" + "\"" + " python/requirements.txt | grep -v '^#' | xargs pip3 install -U &&" +
234+
"sed " + "\"" + s"s/^pyspark.*/pyspark==${sparkVersion.value}/" + "\"" + " python/requirements.txt | grep -v '^#' | xargs pip3 install -U &&" +
235235
s"make SCALA_VERSION=${scalaVersion.value} SPARK_VERSION=${sparkVersion.value} shadowJar wheel &&" +
236236
s"pip3 install --no-deps build/deploy/dist/hail-${hailVersion.value}-py3-none-any.whl"
237237
) !
@@ -414,15 +414,16 @@ updateCondaEnv := {
414414
"conda env update -f python/environment.yml" !
415415
}
416416

417-
def crossReleaseStep(step: ReleaseStep, requiresPySpark: Boolean, requiresHail: Boolean): Seq[ReleaseStep] = {
417+
def crossReleaseStep(
418+
step: ReleaseStep,
419+
requiresPySpark: Boolean,
420+
requiresHail: Boolean): Seq[ReleaseStep] = {
418421
val updateCondaEnvStep = releaseStepCommandAndRemaining(
419422
if (requiresPySpark) "updateCondaEnv" else "")
420423
val changePySparkVersionStep = releaseStepCommandAndRemaining(
421424
if (requiresPySpark) "changePySparkVersion" else "")
422-
val installHailStep = releaseStepCommandAndRemaining(
423-
if (requiresHail) "installHail" else "")
424-
val uninstallHailStep = releaseStepCommandAndRemaining(
425-
if (requiresHail) "uninstallHail" else "")
425+
val installHailStep = releaseStepCommandAndRemaining(if (requiresHail) "installHail" else "")
426+
val uninstallHailStep = releaseStepCommandAndRemaining(if (requiresHail) "uninstallHail" else "")
426427

427428
Seq(
428429
updateCondaEnvStep,
@@ -457,20 +458,38 @@ releaseProcess := Seq[ReleaseStep](
457458
inquireVersions,
458459
runClean
459460
) ++
460-
crossReleaseStep(releaseStepCommandAndRemaining("core/test"), requiresPySpark = false, requiresHail = false) ++
461-
crossReleaseStep(releaseStepCommandAndRemaining("python/test"), requiresPySpark = true, requiresHail = false) ++
462-
crossReleaseStep(releaseStepCommandAndRemaining("docs/test"), requiresPySpark = true, requiresHail = false) ++
463-
crossReleaseStep(releaseStepCommandAndRemaining("hail/test"), requiresPySpark = true, requiresHail = true) ++
461+
crossReleaseStep(
462+
releaseStepCommandAndRemaining("core/test"),
463+
requiresPySpark = false,
464+
requiresHail = false) ++
465+
crossReleaseStep(
466+
releaseStepCommandAndRemaining("python/test"),
467+
requiresPySpark = true,
468+
requiresHail = false) ++
469+
crossReleaseStep(
470+
releaseStepCommandAndRemaining("docs/test"),
471+
requiresPySpark = true,
472+
requiresHail = false) ++
473+
crossReleaseStep(
474+
releaseStepCommandAndRemaining("hail/test"),
475+
requiresPySpark = true,
476+
requiresHail = true) ++
464477
Seq(
465478
setReleaseVersion,
466479
updateStableVersion,
467480
commitReleaseVersion,
468481
commitStableVersion,
469482
tagRelease
470483
) ++
471-
crossReleaseStep(releaseStepCommandAndRemaining("publishSigned"), requiresPySpark = false, requiresHail = false) ++
484+
crossReleaseStep(
485+
releaseStepCommandAndRemaining("publishSigned"),
486+
requiresPySpark = false,
487+
requiresHail = false) ++
472488
sonatypeSteps ++
473-
crossReleaseStep(releaseStepCommandAndRemaining("stagedRelease/test"), requiresPySpark = false, requiresHail = false) ++
489+
crossReleaseStep(
490+
releaseStepCommandAndRemaining("stagedRelease/test"),
491+
requiresPySpark = false,
492+
requiresHail = false) ++
474493
Seq(
475494
setNextVersion,
476495
commitNextVersion

core/src/main/scala/io/projectglow/transformers/pipe/PipeTransformer.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ class PipeTransformer extends DataFrameTransformer {
7676
}.makeOutputFormatter(new SnakeCaseMap(outputFormatterOptions))
7777
}
7878

79+
private def getQuarantineLocation: Option[String] =
80+
options.get(QUARANTINE_TABLE_KEY)
81+
private def getQuarantineFlavor: Option[String] =
82+
options.get(QUARANTINE_FLAVOR_KEY)
83+
7984
private def getCmd: Seq[String] = {
8085
val mapper = new ObjectMapper()
8186
mapper.registerModule(DefaultScalaModule)
@@ -118,13 +123,19 @@ class PipeTransformer extends DataFrameTransformer {
118123

119124
val inputFormatter = getInputFormatter
120125
val outputFormatter = getOutputFormatter
126+
val quarantineLocation = getQuarantineLocation
127+
val quarantineFlavor = getQuarantineFlavor
128+
val quarantine = quarantineLocation.flatMap { a =>
129+
quarantineFlavor.map { b =>
130+
(a, b)
131+
}
132+
}
121133
val env = options.collect {
122134
case (k, v) if k.startsWith(ENV_PREFIX) =>
123135
(k.stripPrefix(ENV_PREFIX), v)
124136
}
125137

126-
Piper.pipe(inputFormatter, outputFormatter, cmd, env, df)
127-
138+
Piper.pipe(inputFormatter, outputFormatter, cmd, env, df, quarantine)
128139
}
129140
}
130141

@@ -137,6 +148,9 @@ object PipeTransformer {
137148
private val ENV_PREFIX = "env_"
138149
private val INPUT_FORMATTER_PREFIX = "in_"
139150
private val OUTPUT_FORMATTER_PREFIX = "out_"
151+
private val QUARANTINE_TABLE_KEY = "quarantineTable"
152+
private val QUARANTINE_FLAVOR_KEY = "quarantineFlavor"
153+
140154
val LOGGING_BLOB_KEY = "pipeCmdTool"
141155

142156
private def lookupInputFormatterFactory(name: String): Option[InputFormatterFactory] =

core/src/main/scala/io/projectglow/transformers/pipe/Piper.scala

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.projectglow.transformers.pipe
1818

19+
import java.lang.{IllegalStateException => ISE}
1920
import java.io._
2021
import java.util.concurrent.atomic.AtomicReference
2122

@@ -60,10 +61,14 @@ private[projectglow] object Piper extends GlowLogging {
6061
outputformatter: OutputFormatter,
6162
cmd: Seq[String],
6263
env: Map[String, String],
63-
df: DataFrame): DataFrame = {
64-
64+
df: DataFrame,
65+
quarantineLocation: Option[(String, String)] = None): DataFrame = {
6566
logger.info(s"Beginning pipe with cmd $cmd")
6667

68+
val quarantineInfo = quarantineLocation.map { a =>
69+
val (location, flavor) = a
70+
PipeIterator.QuarantineInfo(df, location, PipeIterator.QuarantineWriter(flavor))
71+
}
6772
val rawRdd = df.queryExecution.toRdd
6873
val inputRdd = if (rawRdd.getNumPartitions == 0) {
6974
logger.warn("Not piping any rows, as the input DataFrame has zero partitions.")
@@ -86,6 +91,25 @@ private[projectglow] object Piper extends GlowLogging {
8691
cachedRdds.append(schemaInternalRowRDD)
8792
}
8893

94+
// Quarantining is potentially very wasteful due to the throw-based control
95+
// flow implemented at the level below.
96+
quarantineInfo.foreach { quarantineInfo =>
97+
try {
98+
schemaInternalRowRDD.mapPartitions { it =>
99+
if (it.nonEmpty) {
100+
val result = if (it.asInstanceOf[PipeIterator].error) {
101+
Iterator(true)
102+
} else {
103+
Iterator.empty
104+
}
105+
result
106+
} else {
107+
Iterator.empty
108+
}
109+
}.filter(identity).take(1).nonEmpty
110+
} catch { case _: Throwable => quarantineInfo.flavor.quarantine(quarantineInfo) }
111+
}
112+
89113
val schemaSeq = schemaInternalRowRDD.mapPartitions { it =>
90114
if (it.hasNext) {
91115
Iterator(it.next.asInstanceOf[StructType])
@@ -115,11 +139,10 @@ private[projectglow] class ProcessHelper(
115139
context: TaskContext)
116140
extends GlowLogging {
117141

118-
private val childThreadException = new AtomicReference[Throwable](null)
142+
private val _childThreadException = new AtomicReference[Throwable](null)
119143
private var process: Process = _
120144

121145
def startProcess(): BufferedInputStream = {
122-
123146
val pb = new ProcessBuilder(cmd.asJava)
124147
val pbEnv = pb.environment()
125148
environment.foreach { case (k, v) => pbEnv.put(k, v) }
@@ -132,7 +155,7 @@ private[projectglow] class ProcessHelper(
132155
try {
133156
inputFn(out)
134157
} catch {
135-
case t: Throwable => childThreadException.set(t)
158+
case t: Throwable => _childThreadException.set(t)
136159
} finally {
137160
out.close()
138161
}
@@ -151,7 +174,7 @@ private[projectglow] class ProcessHelper(
151174
// scalastyle:on println
152175
}
153176
} catch {
154-
case t: Throwable => childThreadException.set(t)
177+
case t: Throwable => _childThreadException.set(t)
155178
} finally {
156179
err.close()
157180
}
@@ -170,12 +193,14 @@ private[projectglow] class ProcessHelper(
170193
}
171194

172195
def propagateChildException(): Unit = {
173-
val t = childThreadException.get()
174-
if (t != null) {
175-
Option(process).foreach(_.destroy())
196+
childThreadExceptionO.foreach { t =>
197+
processO.foreach(_.destroy())
176198
throw t
177199
}
178200
}
201+
202+
def childThreadExceptionO: Option[Throwable] = Option(_childThreadException.get())
203+
def processO: Option[Process] = Option(process)
179204
}
180205

181206
object ProcessHelper {
@@ -186,11 +211,13 @@ object ProcessHelper {
186211
class PipeIterator(
187212
cmd: Seq[String],
188213
environment: Map[String, String],
189-
input: Iterator[InternalRow],
214+
_input: Iterator[InternalRow],
190215
inputFormatter: InputFormatter,
191216
outputFormatter: OutputFormatter)
192217
extends Iterator[Any] {
218+
import PipeIterator.illegalStateException
193219

220+
private val input = _input.toSeq
194221
private val processHelper = new ProcessHelper(cmd, environment, writeInput, TaskContext.get)
195222
private val inputStream = processHelper.startProcess()
196223
private val baseIterator = outputFormatter.makeIterator(inputStream)
@@ -208,7 +235,7 @@ class PipeIterator(
208235
} else {
209236
val exitStatus = processHelper.waitForProcess()
210237
if (exitStatus != 0) {
211-
throw new IllegalStateException(s"Subprocess exited with status $exitStatus")
238+
throw illegalStateException(s"Subprocess exited with status $exitStatus")
212239
}
213240
false
214241
}
@@ -217,4 +244,44 @@ class PipeIterator(
217244
}
218245

219246
override def next(): Any = baseIterator.next()
247+
248+
def error: Boolean = {
249+
(0 == processHelper.waitForProcess())
250+
}
251+
}
252+
253+
object PipeIterator {
254+
// This would typically be a typeclass, but this code base appears to be
255+
// subclass polymorphic rather than typeclass polymorphic
256+
trait QuarantineWriter extends Product with Serializable {
257+
def quarantine(qi: QuarantineInfo): Unit
258+
}
259+
final object QuarantineWriter {
260+
def apply(flavor: String): QuarantineWriter = flavor match {
261+
case "delta" => QuarantineWriterDelta
262+
case "csv" => QuarantineWriterCsv
263+
case _ => throw illegalStateException(s"unknown QuarantineWriter flavor: $flavor")
264+
}
265+
}
266+
final case object QuarantineWriterDelta extends QuarantineWriter {
267+
override def quarantine(qi: QuarantineInfo): Unit = {
268+
qi.df.write.format("delta").mode("append").saveAsTable(qi.location)
269+
}
270+
}
271+
final case object QuarantineWriterCsv extends QuarantineWriter {
272+
override def quarantine(qi: QuarantineInfo): Unit = {
273+
val df = qi.df.write.mode("append").csv(qi.location)
274+
}
275+
}
276+
277+
/* ~~~Scalastyle template evidently does not accept standard scaladoc comments~~~
278+
* ~~~Scalastyle states "Insert a space after the start of the comment" ~~~
279+
* Data for Quarantining records which fail in process.
280+
* @param df The [[DataFrame]] being processed.
281+
* @param location The delta table to write to. Typically of the form `classifier.tableName`.
282+
*/
283+
final case class QuarantineInfo(df: DataFrame, location: String, flavor: QuarantineWriter)
284+
285+
def illegalStateException(message: String): IllegalStateException =
286+
new ISE("[PipeIterator] " + message)
220287
}

core/src/test/scala/io/projectglow/transformers/pipe/PipeTransformerSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,46 @@ class PipeTransformerSuite extends GlowBaseTest {
114114
assert(output == "monkey!")
115115
Glow.transform("pipe_cleanup", inputDf, Map.empty[String, String])
116116
}
117+
118+
test("quarantine on failure") {
119+
{ // coverage
120+
PipeIterator.QuarantineWriter("delta")
121+
PipeIterator.QuarantineWriter("csv")
122+
intercept[IllegalStateException] {
123+
PipeIterator.QuarantineWriter("will fail")
124+
}
125+
intercept[NullPointerException] {
126+
PipeIterator.QuarantineWriterDelta.quarantine(null)
127+
}
128+
intercept[NullPointerException] {
129+
PipeIterator.QuarantineWriterCsv.quarantine(null)
130+
}
131+
}
132+
133+
val sess = spark
134+
import sess.implicits._
135+
val inputDf = Seq("monkey", "dolphin").toDF
136+
val testTable = s"default.test_test_test"
137+
val options = Map(
138+
"inputFormatter" -> "text",
139+
"outputFormatter" -> "text",
140+
"quarantineTable" -> testTable,
141+
"quarantineFlavor" -> "csv",
142+
"cmd" -> Seq("python", "identity.py", "2"))
143+
val exc = intercept[org.apache.spark.SparkException] {
144+
Glow.transform("pipe", inputDf, options).as[String].head
145+
}
146+
assert(exc.getMessage.contains("Subprocess exited with status"))
147+
Glow.transform("pipe_cleanup", inputDf, Map.empty[String, String])
148+
149+
// delete temp files
150+
// Yes, there is a shiny new and fancy nio way of doing temp files but it
151+
// blows super hard; this "old school" way is far simpler and more readable
152+
import scala.reflect.io.Directory
153+
import java.io.File
154+
val directory = new Directory(new File(testTable))
155+
directory.deleteRecursively()
156+
}
117157
}
118158

119159
class DummyInputFormatterFactory() extends InputFormatterFactory {

identity.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import sys
2+
3+
a = sys.argv[1]
4+
5+
sys.exit(a)
6+

0 commit comments

Comments
 (0)