From f52860fe4a1f80a018e8d8c28529b644d9710768 Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Tue, 24 Mar 2020 15:23:55 +0530 Subject: [PATCH 1/6] Enabling fail fast type casting behavior for string to numeric type conversions based on configuration property `snappydata.failFastTypeCasting`. --- .../spark/sql/catalyst/expressions/Cast.scala | 90 +++++++++---------- .../spark/sql/execution/QueryExecution.scala | 9 +- 2 files changed, 48 insertions(+), 51 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index f15ae3255ca98..d34c7192ae082 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal => JavaBigDecimal} -import org.apache.spark.SparkException +import org.apache.spark.{SparkContext, SparkException, TaskContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -248,7 +248,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToLong(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toLong catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1L else 0L) @@ -264,7 +264,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToInt(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toInt catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1 else 0) @@ -280,7 +280,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToShort(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toShort catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort) @@ -296,7 +296,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToByte(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toByte catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte) @@ -323,7 +323,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w buildCast[UTF8String](_, s => try { changePrecision(Decimal(new JavaBigDecimal(s.toString)), target) } catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => changePrecision(if (b) Decimal.ONE else Decimal.ZERO, target)) @@ -348,7 +348,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDouble(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toDouble catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1d else 0d) @@ -364,7 +364,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToFloat(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toFloat catch { - case _: NumberFormatException => null + case _: NumberFormatException if !failFastTypeCastingEnabled => null }) case BooleanType => buildCast[Boolean](_, b => if (b) 1f else 0f) @@ -703,13 +703,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToByteCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = $c.toByte(); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s"$evPrim = $c.toByte();", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? (byte) 1 : (byte) 0;" case DateType => @@ -725,13 +719,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToShortCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = $c.toShort(); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s"$evPrim = $c.toShort();", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? (short) 1 : (short) 0;" case DateType => @@ -747,13 +735,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToIntCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = $c.toInt(); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s"$evPrim = $c.toInt();", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1 : 0;" case DateType => @@ -769,13 +751,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToLongCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = $c.toLong(); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s" $evPrim = $c.toLong();", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1L : 0L;" case DateType => @@ -791,13 +767,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToFloatCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = Float.valueOf($c.toString()); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s"$evPrim = Float.valueOf($c.toString());", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1.0f : 0.0f;" case DateType => @@ -809,17 +779,23 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case x: NumericType => (c, evPrim, evNull) => s"$evPrim = (float) $c;" } + + private def failFastTypeCastingEnabled = { + val failFastCasting : String = if (TaskContext.get() != null) { + TaskContext.get().getLocalProperty("snappydata.failFastTypeCasting") + } else { + SparkContext.activeContext.get().getLocalProperty("snappydata.failFastTypeCasting") + } + Option(failFastCasting) match { + case Some(value) => value.toBoolean + case None => false + } + } private[this] def castToDoubleCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - s""" - try { - $evPrim = Double.valueOf($c.toString()); - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode(s"$evPrim = Double.valueOf($c.toString());", evNull) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1.0d : 0.0d;" case DateType => @@ -832,6 +808,20 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s"$evPrim = (double) $c;" } + private[this] def castStringToNumberCode(code: String, evNull: String): String = { + if (failFastTypeCastingEnabled) { + code + } else { + s""" + try { + $code + } catch (java.lang.NumberFormatException e) { + $evNull = true; + } + """ + } + } + private[this] def castArrayCode( fromType: DataType, toType: DataType, ctx: CodegenContext): CastFunction = { val elementCast = nullSafeCastFunction(fromType, toType, ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9b53d21deed97..d885590c39b18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -89,7 +89,14 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[InternalRow] = executedPlan.execute() + lazy val toRdd: RDD[InternalRow] = { + // setting snappydata.failFastTypeCasting local property every time before + // executing the query to make the change to the property effective + sparkSession.sparkContext.setLocalProperty("snappydata.failFastTypeCasting", + sparkSession.sessionState.conf.getConfString("snappydata.failFastTypeCasting")) + + executedPlan.execute() + } /** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal From f706cda35c8ce9bcb2602c2f943f7b67041f0c5d Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Thu, 26 Mar 2020 11:41:22 +0530 Subject: [PATCH 2/6] Enforcing fail fast behavior for the following type conversions: - string to date - string to decimal - NaN, Infinite value of fractional numbers to timestamp - string to boolean - decimal precision overflow - date to decimal Enhancing string to numeric type casting failure to include the data value for which casting failed. Also unifying the exception of string to numeric conversion failure to match the exception of other casting failures. --- .../spark/sql/catalyst/expressions/Cast.scala | 195 ++++++++++++++---- 1 file changed, 151 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index d34c7192ae082..80b9ab2d64376 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -122,6 +122,9 @@ object Cast { """) case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with NullIntolerant { + private val fractionalToTimestampCastingErrorMessage = "Can not cast NaN or infinite" + + s" fractional value to ${TimestampType.simpleString}." + override def toString: String = s"cast($child as ${dataType.simpleString})" override def checkInputDataTypes(): TypeCheckResult = { @@ -161,7 +164,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else if (StringUtils.isFalseString(s)) { false } else { - null + if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${BooleanType.simpleString}.") + } else { + null + } } }) case TimestampType => @@ -215,8 +222,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def decimalToTimestamp(d: Decimal): Long = { (d.toBigDecimal * 1000000L).longValue() } + private[this] def doubleToTimestamp(d: Double): Any = { - if (d.isNaN || d.isInfinite) null else (d * 1000000L).toLong + if (d.isNaN || d.isInfinite) { + if (failFastTypeCastingEnabled) { + throw new RuntimeException(fractionalToTimestampCastingErrorMessage) + } else { + null + } + } + else (d * 1000000L).toLong } // converting seconds to us @@ -231,7 +246,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // DateConverter private[this] def castToDate(from: DataType): Any => Any = from match { case StringType => - buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).orNull) + buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).getOrElse(() => { + if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${DateType.simpleString}.") + } else { + null + } + })) case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. @@ -248,7 +269,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToLong(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toLong catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${LongType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1L else 0L) @@ -264,7 +289,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToInt(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toInt catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${IntegerType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1 else 0) @@ -280,7 +309,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToShort(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toShort catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${ShortType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort) @@ -296,7 +329,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToByte(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toByte catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${ByteType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte) @@ -315,7 +352,18 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w * NOTE: this modifies `value` in-place, so don't call it on external data. */ private[this] def changePrecision(value: Decimal, decimalType: DecimalType): Decimal = { - if (value.changePrecision(decimalType.precision, decimalType.scale)) value else null + if (value.changePrecision(decimalType.precision, decimalType.scale)) { + value + } else { + if (failFastTypeCastingEnabled) { + throw new RuntimeException( + s"Casting decimal with precision: ${value.precision}" + + s" and scale: ${value.scale} to ${decimalType.simpleString} will lead to loss of" + + " precision.") + } else { + null + } + } } private[this] def castToDecimal(from: DataType, target: DecimalType): Any => Any = from match { @@ -323,7 +371,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w buildCast[UTF8String](_, s => try { changePrecision(Decimal(new JavaBigDecimal(s.toString)), target) } catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${target.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => changePrecision(if (b) Decimal.ONE else Decimal.ZERO, target)) @@ -348,7 +400,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDouble(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toDouble catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${DoubleType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1d else 0d) @@ -364,7 +420,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToFloat(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toFloat catch { - case _: NumberFormatException if !failFastTypeCastingEnabled => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new RuntimeException(s"Can not cast '$s' to ${FloatType.simpleString}.") + } else { + null + } }) case BooleanType => buildCast[Boolean](_, b => if (b) 1f else 0f) @@ -535,14 +595,27 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if ($intOpt.isDefined()) { $evPrim = ((Integer) $intOpt.get()).intValue(); } else { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Can not cast '" + $c + "'""" + + s""" to ${DateType.simpleString}.");""" + } else { + s"$evNull = true;" + } + } } """ case TimestampType => (c, evPrim, evNull) => s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 1000L);"; - case _ => - (c, evPrim, evNull) => s"$evNull = true;" + case t => + (c, evPrim, evNull) => + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Can not cast ${t.simpleString} value""" + + s""" to ${DateType.simpleString}.");""" + } else { + s"$evNull = true;" + } } private[this] def changePrecision(d: String, decimalType: DecimalType, @@ -551,7 +624,15 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if ($d.changePrecision(${decimalType.precision}, ${decimalType.scale})) { $evPrim = $d; } else { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Casting decimal with precision:""" + + s""" " + $d.precision() + " and scale: " + $d.scale() + " to""" + + s""" ${decimalType.simpleString} will lead to loss of precision.");""" + } else { + s"$evNull = true;" + } + } } """ @@ -563,14 +644,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w from match { case StringType => (c, evPrim, evNull) => - s""" - try { - Decimal $tmp = Decimal.apply(new java.math.BigDecimal($c.toString())); - ${changePrecision(tmp, target, evPrim, evNull)} - } catch (java.lang.NumberFormatException e) { - $evNull = true; - } - """ + castStringToNumberCode( + s"""Decimal $tmp = Decimal.apply(new java.math.BigDecimal($c.toString())); + ${changePrecision(tmp, target, evPrim, evNull)}""", evNull, c, target) case BooleanType => (c, evPrim, evNull) => s""" @@ -579,7 +655,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w """ case DateType => // date can't cast to decimal in Hive - (c, evPrim, evNull) => s"$evNull = true;" + (c, evPrim, evNull) => + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Can not cast ${DateType.simpleString} to""" + + s""" ${target.simpleString}.");""" + } else { + s"$evNull = true;" + } case TimestampType => // Note that we lose precision here. (c, evPrim, evNull) => @@ -642,7 +724,14 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s""" if (Double.isNaN($c) || Double.isInfinite($c)) { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + "throw new java.lang.RuntimeException(" + + s""""$fractionalToTimestampCastingErrorMessage");""" + } else { + s"$evNull = true; " + } + } } else { $evPrim = (long)($c * 1000000L); } @@ -651,7 +740,14 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s""" if (Float.isNaN($c) || Float.isInfinite($c)) { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + "throw new java.lang.RuntimeException(" + + s""""$fractionalToTimestampCastingErrorMessage");""" + } else { + s"$evNull = true;" + } + } } else { $evPrim = (long)($c * 1000000L); } @@ -686,7 +782,14 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else if ($stringUtils.isFalseString($c)) { $evPrim = false; } else { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Can not cast '"+ $c +"'""" + + s""" to ${BooleanType.simpleString}.");""" + } else { + s"$evNull = true;" + } + } } """ case TimestampType => @@ -703,7 +806,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToByteCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s"$evPrim = $c.toByte();", evNull) + castStringToNumberCode(s"$evPrim = $c.toByte();", evNull, c, ByteType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? (byte) 1 : (byte) 0;" case DateType => @@ -719,7 +822,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToShortCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s"$evPrim = $c.toShort();", evNull) + castStringToNumberCode(s"$evPrim = $c.toShort();", evNull, c, ShortType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? (short) 1 : (short) 0;" case DateType => @@ -735,7 +838,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToIntCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s"$evPrim = $c.toInt();", evNull) + castStringToNumberCode(s"$evPrim = $c.toInt();", evNull, c, IntegerType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1 : 0;" case DateType => @@ -751,7 +854,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToLongCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s" $evPrim = $c.toLong();", evNull) + castStringToNumberCode(s" $evPrim = $c.toLong();", evNull, c, LongType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1L : 0L;" case DateType => @@ -767,7 +870,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToFloatCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s"$evPrim = Float.valueOf($c.toString());", evNull) + castStringToNumberCode(s"$evPrim = Float.valueOf($c.toString());", evNull, c, FloatType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1.0f : 0.0f;" case DateType => @@ -795,7 +898,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDoubleCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => - castStringToNumberCode(s"$evPrim = Double.valueOf($c.toString());", evNull) + castStringToNumberCode(s"$evPrim = Double.valueOf($c.toString());", evNull, c, DoubleType) case BooleanType => (c, evPrim, evNull) => s"$evPrim = $c ? 1.0d : 0.0d;" case DateType => @@ -808,18 +911,22 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s"$evPrim = (double) $c;" } - private[this] def castStringToNumberCode(code: String, evNull: String): String = { - if (failFastTypeCastingEnabled) { - code - } else { - s""" - try { - $code - } catch (java.lang.NumberFormatException e) { - $evNull = true; + private[this] def castStringToNumberCode(code: String, evNull: String, c: String, + dataType: DataType): String = { + s""" + try { + $code + } catch (java.lang.NumberFormatException e) { + ${ + if (failFastTypeCastingEnabled) { + s"""throw new java.lang.RuntimeException("Can not cast '" + $c + "'""" + + s""" to ${dataType.simpleString}.");""" + } else { + s"$evNull = true;" } - """ - } + } + } + """ } private[this] def castArrayCode( From 6faf2e29d2a1a5fae5006f05d3aa81b791b0d629 Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Fri, 27 Mar 2020 13:00:53 +0530 Subject: [PATCH 3/6] - Using specific exception (namely TypeCastException) for cast failures instead of using generic RuntimeException - Adding tests for more scenarios --- .../spark/sql/catalyst/expressions/Cast.scala | 110 ++++++++++++------ 1 file changed, 76 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 80b9ab2d64376..25f76eea31a09 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{DataTypes, _} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -226,7 +226,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def doubleToTimestamp(d: Double): Any = { if (d.isNaN || d.isInfinite) { if (failFastTypeCastingEnabled) { - throw new RuntimeException(fractionalToTimestampCastingErrorMessage) + throw new TypeCastException(DoubleType, TimestampType, d) } else { null } @@ -248,7 +248,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).getOrElse(() => { if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${DateType.simpleString}.") + throw new TypeCastException(StringType, DateType, s) } else { null } @@ -270,7 +270,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toLong catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${LongType.simpleString}.") + throw new TypeCastException(StringType, LongType, s) } else { null } @@ -290,7 +290,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toInt catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${IntegerType.simpleString}.") + throw new TypeCastException(StringType, IntegerType, s) } else { null } @@ -310,7 +310,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toShort catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${ShortType.simpleString}.") + throw new TypeCastException(StringType, ShortType, s) } else { null } @@ -330,7 +330,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toByte catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${ByteType.simpleString}.") + throw new TypeCastException(StringType, ByteType, s) } else { null } @@ -356,10 +356,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w value } else { if (failFastTypeCastingEnabled) { - throw new RuntimeException( - s"Casting decimal with precision: ${value.precision}" + - s" and scale: ${value.scale} to ${decimalType.simpleString} will lead to loss of" + - " precision.") + throw new TypeCastException(DecimalType(value.precision, value.scale), decimalType, value) } else { null } @@ -372,7 +369,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w changePrecision(Decimal(new JavaBigDecimal(s.toString)), target) } catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${target.simpleString}.") + throw new TypeCastException(StringType, target, s) } else { null } @@ -392,7 +389,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w b => try { changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)), target) } catch { - case _: NumberFormatException => null + case _: NumberFormatException => if (failFastTypeCastingEnabled) { + throw new TypeCastException(StringType, target, b) + } else { + null + } } } @@ -401,7 +402,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toString.toDouble catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${DoubleType.simpleString}.") + throw new TypeCastException(StringType, DoubleType, s) } else { null } @@ -421,7 +422,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case StringType => buildCast[UTF8String](_, s => try s.toString.toFloat catch { case _: NumberFormatException => if (failFastTypeCastingEnabled) { - throw new RuntimeException(s"Can not cast '$s' to ${FloatType.simpleString}.") + throw new TypeCastException(StringType, FloatType, s) } else { null } @@ -597,8 +598,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else { ${ if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Can not cast '" + $c + "'""" + - s""" to ${DateType.simpleString}.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(DateType)}," + + s" $c);" } else { s"$evNull = true;" } @@ -611,8 +613,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case t => (c, evPrim, evNull) => if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Can not cast ${t.simpleString} value""" + - s""" to ${DateType.simpleString}.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(t)}, ${javaDataTypeName(DateType)}," + + s" $c);" } else { s"$evNull = true;" } @@ -626,9 +629,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else { ${ if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Casting decimal with precision:""" + - s""" " + $d.precision() + " and scale: " + $d.scale() + " to""" + - s""" ${decimalType.simpleString} will lead to loss of precision.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"$javaDataTypesClassName.createDecimalType($d.precision(), $d.scale())," + + s" ${javaDataTypeName(decimalType)}, $d);" } else { s"$evNull = true;" } @@ -657,8 +660,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // date can't cast to decimal in Hive (c, evPrim, evNull) => if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Can not cast ${DateType.simpleString} to""" + - s""" ${target.simpleString}.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(DateType)}, ${javaDataTypeName(target)}," + + s" $c);" } else { s"$evNull = true;" } @@ -690,7 +694,15 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w Decimal $tmp = Decimal.apply(scala.math.BigDecimal.valueOf((double) $c)); ${changePrecision(tmp, target, evPrim, evNull)} } catch (java.lang.NumberFormatException e) { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(x)}, ${javaDataTypeName(target)}," + + s" $c);" + } else { + s"$evNull = true;" + } + } } """ } @@ -708,7 +720,15 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if ($longOpt.isDefined()) { $evPrim = ((Long) $longOpt.get()).longValue(); } else { - $evNull = true; + ${ + if (failFastTypeCastingEnabled) { + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(TimestampType)}," + + s" $c);" + } else { + s"$evNull = true;" + } + } } """ case BooleanType => @@ -726,8 +746,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (Double.isNaN($c) || Double.isInfinite($c)) { ${ if (failFastTypeCastingEnabled) { - "throw new java.lang.RuntimeException(" + - s""""$fractionalToTimestampCastingErrorMessage");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(DoubleType)}, ${javaDataTypeName(TimestampType)}," + + s" $c);" } else { s"$evNull = true; " } @@ -742,8 +763,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (Float.isNaN($c) || Float.isInfinite($c)) { ${ if (failFastTypeCastingEnabled) { - "throw new java.lang.RuntimeException(" + - s""""$fractionalToTimestampCastingErrorMessage");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(FloatType)}, ${javaDataTypeName(TimestampType)}," + + s" $c);" } else { s"$evNull = true;" } @@ -784,8 +806,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else { ${ if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Can not cast '"+ $c +"'""" + - s""" to ${BooleanType.simpleString}.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(BooleanType)}," + + s" $c);" } else { s"$evNull = true;" } @@ -911,6 +934,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s"$evPrim = (double) $c;" } + private val javaDataTypesClassName = classOf[DataTypes].getCanonicalName + + private def javaDataTypeName(dataType: DataType) : String = { + dataType match { + case decimalType: DecimalType => + s"$javaDataTypesClassName.createDecimalType(${decimalType.precision}," + + s" ${decimalType.scale})" + case _ => s"$javaDataTypesClassName.$dataType" + } + } + private[this] def castStringToNumberCode(code: String, evNull: String, c: String, dataType: DataType): String = { s""" @@ -919,8 +953,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } catch (java.lang.NumberFormatException e) { ${ if (failFastTypeCastingEnabled) { - s"""throw new java.lang.RuntimeException("Can not cast '" + $c + "'""" + - s""" to ${dataType.simpleString}.");""" + s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(dataType)}," + + s" $c);" } else { s"$evNull = true;" } @@ -1052,3 +1087,10 @@ case class UpCast(child: Expression, dataType: DataType, walkedTypePath: Seq[Str extends UnaryExpression with Unevaluable { override lazy val resolved = false } + +class TypeCastException(sourceType: DataType, targetType: DataType, value: Any) + extends RuntimeException { + override def getMessage: String = { + s"Can not cast ${sourceType.simpleString} type value '$value' to ${targetType.simpleString}." + } +} \ No newline at end of file From bfae852f56aa40348ebac325eaed301f4e4769b2 Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Fri, 27 Mar 2020 13:46:09 +0530 Subject: [PATCH 4/6] Reverting some changes done for casting incompatible type to date as the code flow is not reaching there due to analysis check performed by `checkInputDataTypes` method. --- .../apache/spark/sql/catalyst/expressions/Cast.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 25f76eea31a09..728628723a7b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -610,15 +610,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case TimestampType => (c, evPrim, evNull) => s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 1000L);"; - case t => - (c, evPrim, evNull) => - if (failFastTypeCastingEnabled) { - s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + - s"${javaDataTypeName(t)}, ${javaDataTypeName(DateType)}," + - s" $c);" - } else { - s"$evNull = true;" - } + case _ => + (c, evPrim, evNull) => s"$evNull = true;" } private[this] def changePrecision(d: String, decimalType: DecimalType, From dbedcd051d73533cfa1284e3feb18070144a2c5b Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Fri, 27 Mar 2020 16:04:57 +0530 Subject: [PATCH 5/6] incorporating review comments --- .../spark/sql/catalyst/expressions/Cast.scala | 52 +++++++++---------- .../spark/sql/execution/QueryExecution.scala | 6 +-- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 728628723a7b2..826d225737f33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -164,7 +164,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } else if (StringUtils.isFalseString(s)) { false } else { - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { throw new RuntimeException(s"Can not cast '$s' to ${BooleanType.simpleString}.") } else { null @@ -225,7 +225,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def doubleToTimestamp(d: Double): Any = { if (d.isNaN || d.isInfinite) { - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { throw new TypeCastException(DoubleType, TimestampType, d) } else { null @@ -247,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDate(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).getOrElse(() => { - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, DateType, s) } else { null @@ -269,7 +269,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToLong(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toLong catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, LongType, s) } else { null @@ -289,7 +289,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToInt(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toInt catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, IntegerType, s) } else { null @@ -309,7 +309,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToShort(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toShort catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, ShortType, s) } else { null @@ -329,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToByte(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toByte catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, ByteType, s) } else { null @@ -355,7 +355,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (value.changePrecision(decimalType.precision, decimalType.scale)) { value } else { - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { throw new TypeCastException(DecimalType(value.precision, value.scale), decimalType, value) } else { null @@ -368,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w buildCast[UTF8String](_, s => try { changePrecision(Decimal(new JavaBigDecimal(s.toString)), target) } catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, target, s) } else { null @@ -389,7 +389,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w b => try { changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)), target) } catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, target, b) } else { null @@ -401,7 +401,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDouble(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toDouble catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, DoubleType, s) } else { null @@ -421,7 +421,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToFloat(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => try s.toString.toFloat catch { - case _: NumberFormatException => if (failFastTypeCastingEnabled) { + case _: NumberFormatException => if (failOnCastErrorEnabled) { throw new TypeCastException(StringType, FloatType, s) } else { null @@ -597,7 +597,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w $evPrim = ((Integer) $intOpt.get()).intValue(); } else { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(DateType)}," + s" $c);" @@ -621,7 +621,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w $evPrim = $d; } else { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"$javaDataTypesClassName.createDecimalType($d.precision(), $d.scale())," + s" ${javaDataTypeName(decimalType)}, $d);" @@ -652,7 +652,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case DateType => // date can't cast to decimal in Hive (c, evPrim, evNull) => - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(DateType)}, ${javaDataTypeName(target)}," + s" $c);" @@ -688,7 +688,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w ${changePrecision(tmp, target, evPrim, evNull)} } catch (java.lang.NumberFormatException e) { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(x)}, ${javaDataTypeName(target)}," + s" $c);" @@ -714,7 +714,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w $evPrim = ((Long) $longOpt.get()).longValue(); } else { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(TimestampType)}," + s" $c);" @@ -738,7 +738,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w s""" if (Double.isNaN($c) || Double.isInfinite($c)) { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(DoubleType)}, ${javaDataTypeName(TimestampType)}," + s" $c);" @@ -755,7 +755,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w s""" if (Float.isNaN($c) || Float.isInfinite($c)) { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(FloatType)}, ${javaDataTypeName(TimestampType)}," + s" $c);" @@ -798,7 +798,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w $evPrim = false; } else { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(BooleanType)}," + s" $c);" @@ -899,13 +899,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w (c, evPrim, evNull) => s"$evPrim = (float) $c;" } - private def failFastTypeCastingEnabled = { - val failFastCasting : String = if (TaskContext.get() != null) { - TaskContext.get().getLocalProperty("snappydata.failFastTypeCasting") + private def failOnCastErrorEnabled = { + val failOnCastError : String = if (TaskContext.get() != null) { + TaskContext.get().getLocalProperty("snappydata.failOnCastError") } else { - SparkContext.activeContext.get().getLocalProperty("snappydata.failFastTypeCasting") + SparkContext.activeContext.get().getLocalProperty("snappydata.failOnCastError") } - Option(failFastCasting) match { + Option(failOnCastError) match { case Some(value) => value.toBoolean case None => false } @@ -945,7 +945,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w $code } catch (java.lang.NumberFormatException e) { ${ - if (failFastTypeCastingEnabled) { + if (failOnCastErrorEnabled) { s"throw new org.apache.spark.sql.catalyst.expressions.TypeCastException(" + s"${javaDataTypeName(StringType)}, ${javaDataTypeName(dataType)}," + s" $c);" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index d885590c39b18..46d8a9a529974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -90,10 +90,10 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = { - // setting snappydata.failFastTypeCasting local property every time before + // setting snappydata.failOnCastError local property every time before // executing the query to make the change to the property effective - sparkSession.sparkContext.setLocalProperty("snappydata.failFastTypeCasting", - sparkSession.sessionState.conf.getConfString("snappydata.failFastTypeCasting")) + sparkSession.sparkContext.setLocalProperty("snappydata.failOnCastError", + sparkSession.sessionState.conf.getConfString("snappydata.failOnCastError")) executedPlan.execute() } From 5bef40282a090042b726efe4e13a757affc99af8 Mon Sep 17 00:00:00 2001 From: Vatsal Mevada Date: Mon, 30 Mar 2020 13:45:43 +0530 Subject: [PATCH 6/6] Fixing spark test failures --- .../spark/sql/catalyst/expressions/Cast.scala | 10 ++++++---- .../spark/sql/execution/QueryExecution.scala | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 826d225737f33..cb3aafa7da04c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -898,12 +898,14 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case x: NumericType => (c, evPrim, evNull) => s"$evPrim = (float) $c;" } - - private def failOnCastErrorEnabled = { - val failOnCastError : String = if (TaskContext.get() != null) { + + private def failOnCastErrorEnabled: Boolean = { + val failOnCastError: String = if (TaskContext.get() != null) { TaskContext.get().getLocalProperty("snappydata.failOnCastError") - } else { + } else if (SparkContext.activeContext.get() != null){ SparkContext.activeContext.get().getLocalProperty("snappydata.failOnCastError") + } else { + "false" } Option(failOnCastError) match { case Some(value) => value.toBoolean diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 46d8a9a529974..dd2fafd7d2077 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.nio.charset.StandardCharsets import java.sql.Timestamp +import java.util.NoSuchElementException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -92,8 +93,17 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val toRdd: RDD[InternalRow] = { // setting snappydata.failOnCastError local property every time before // executing the query to make the change to the property effective - sparkSession.sparkContext.setLocalProperty("snappydata.failOnCastError", - sparkSession.sessionState.conf.getConfString("snappydata.failOnCastError")) + + try { + sparkSession.sparkContext.setLocalProperty("snappydata.failOnCastError", + sparkSession.sessionState.conf.getConfString("snappydata.failOnCastError")) + } catch { + case ex: NoSuchElementException + if (ex.getMessage.equalsIgnoreCase("snappydata.failOnCastError")) => + // Only SnappySession config will have "snappydata.failOnCastError" set. + // While using spark session this config won't be there hence ignoring this + // failure. + } executedPlan.execute() }