From 7bf5469b967e588c4a66b248cbd1dc3894f71b82 Mon Sep 17 00:00:00 2001 From: donderom Date: Fri, 1 Dec 2017 12:42:06 +0200 Subject: [PATCH] SPARKC-516: Add column name to the error message in case of conversion error --- .../spark/connector/writer/SqlRowWriter.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala index af7fb453d..a2378881a 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala @@ -5,8 +5,11 @@ import java.util.UUID import com.datastax.spark.connector.ColumnRef import com.datastax.spark.connector.cql.TableDef +import com.datastax.spark.connector.types.TypeConversionException import org.apache.spark.sql.Row +import scala.util.{Failure, Success, Try} + /** A [[RowWriter]] that can write SparkSQL `Row` objects. */ class SqlRowWriter(val table: TableDef, val selectedColumns: IndexedSeq[ColumnRef]) @@ -23,9 +26,16 @@ class SqlRowWriter(val table: TableDef, val selectedColumns: IndexedSeq[ColumnRe require(row.size == columnNames.size, s"Invalid row size: ${row.size} instead of ${columnNames.size}.") for (i <- 0 until row.size) { val colValue = row(i) - buffer(i) = converters(i).convert(colValue) + buffer(i) = Try(converters(i).convert(colValue)) match { + case Success(value) => value + case Failure(ex: IllegalArgumentException) => + val columnType = columnTypes(i).scalaTypeName + throw new TypeConversionException( + s"Cannot convert value $colValue of column ${columnNames(i)} to type $columnType", ex) + case Failure(ex) => throw ex } } + } }