Skip to content

Avro Schema Format for Bigtable not working #65

@DHRUV6029

Description

@DHRUV6029

we have following catalog definition

{ "table": { "name": "crm-test-1", "tableCoder": "PrimitiveType" }, "rowkey": "id_rowkey", "columns": { "id": { "cf": "rowkey", "col": "id_rowkey", "type": "string" }, "name": { "cf": "info", "col": "name", "type": "string" }, "birthYear": { "cf": "info", "col": "birth_year", "type": "long" }, "locations": { "cf": "location", "col": "locations", "avro": "avroSchema" } } }

Data frame is of the type

`def createTestDataFrameForBTWrite(spark: SparkSession): DataFrame = {
// Define location schema
val locationSchema = StructType(Array(
StructField("address", StringType, true),
StructField("country", StringType, true)
))

// Define the schema with locations as an array of structs
val schema = StructType(Array(
  StructField("id", StringType, false),
  StructField("name", StringType, true),
  StructField("birthYear", LongType, true),
  StructField("locations", ArrayType(locationSchema), true)
))

// Create data with array of location structs
val data = Seq(
  Row("user1", "Dhruv Patel", 1985L,
    Seq(Row("123 Main St", "usa"), Row("3943 hope ave", "germany"))),
  Row("user2", "Jane Smith", 1990L,
    Seq(Row("456 Oak Ave", "india"))),
  Row("user3", "Bob Johnson", 1975L,
    Seq(Row("789 Pine Rd", "canada"), Row("555 Maple St", "usa"))),
  Row("user4", "Alice Williams", 1982L,
    Seq(Row("101 Maple Dr", "test"))),
  Row("user5", "Charlie Brown", 1995L,
    Seq(Row("202 Cedar Ln", "usa")))
)

// Create the base DataFrame
spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

}`

Bigtable Write Utils
`object BTUtils {
@transient lazy val logger: Logger = Logger.getLogger(this.getClass.getName)

private val avroSchemaString= """
{
"type": "array",
"items": {
"type": "record",
"name": "Location",
"fields": [
{"name": "address", "type": "string"},
{"name": "country", "type": "string"}
]
}
}
"""

case class BTConfig(
catalogName: String,
rowKey: String,
projectId: String,
instanceId: String,
batchMutateSize: String
)

private def getCatalog(catalog: String) = {
if(catalog ==null || catalog.isEmpty){
throw new ConfigError("catalog name for Bigtable is Empty")
}
val fileName = "bigtableschema/%s.json".format(catalog)
ConfigSource.resources(fileName).config() match {
case Left(x) => throw new ConfigError(x.prettyPrint())
case Right(config) =>
logger.info("Successfully retrieved the schema file %s".format(fileName))
val renderOptions = com.typesafe.config.ConfigRenderOptions.defaults()
.setJson(true)
.setOriginComments(false)
.setComments(false)
.setFormatted(false)

    config.root().render(renderOptions)
}

}

def readBigtableToDataframe(sparkSession: SparkSession ,config: BTConfig, rowKeyValue: String): DataFrame = {
val dataframe = sparkSession.read
.format("bigtable")
.option("catalog", getCatalog(config.catalogName))
.option("spark.bigtable.project.id", config.projectId)
.option("spark.bigtable.instance.id", config.instanceId)
.option("avroSchema", avroSchemaString)
.load

val filteredDf = dataframe.filter("item_id == '10000000024'")
println(filteredDf.show())
filteredDf

}

def writeDataframeToBigtable(dataframe: DataFrame, config: BTConfig): Unit = {

dataframe.write
  .format("bigtable")
  .option("catalog", getCatalog(config.catalogName))
  .option("spark.bigtable.project.id", config.projectId)
  .option("spark.bigtable.instance.id", config.instanceId)
  .option("avroSchema", avroSchemaString)
  .option("spark.bigtable.batch.mutate.size", config.batchMutateSize)
  .save

}
}
`

Exception:
`Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (m-jc23hpxpjl.homeoffice.wal-mart.com executor driver): java.lang.Exception: unsupported data type ARRAY
at com.google.cloud.spark.bigtable.datasources.AvroSerdes$.serialize(SchemaConverters.scala:478)
at com.google.cloud.spark.bigtable.datasources.Utils$.toBytes(Utils.scala:72)
at com.google.cloud.spark.bigtable.WriteRowConversions.$anonfun$convertToBigtableRowMutation$2(WriteRowConversions.scala:128)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.google.cloud.spark.bigtable.WriteRowConversions.convertToBigtableRowMutation(WriteRowConversions.scala:125)
at com.google.cloud.spark.bigtable.BigtableRelation.$anonfun$insert$1(BigtableDefaultSource.scala:177)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at com.google.cloud.spark.bigtable.BigtableRelation.$anonfun$insert$2(BigtableDefaultSource.scala:183)
at com.google.cloud.spark.bigtable.BigtableRelation.$anonfun$insert$2$adapted(BigtableDefaultSource.scala:178)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1039)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1039)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.`

Not able to find any documentation on the Google docs on how to specify the schema

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions