Skip to content

Ignore additional field in kafka record if not present in BQ table #306

@subham611

Description

@subham611

I have a BQ table with field {"id":"string", "amount":"long"}.
I am passing kafka record as {"id":"1", "amount":1, "extraField":"someField"} and this results in following error:

Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-2" java.lang.NullPointerException
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
	at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
	at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
	at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2022-08-05 05:22:34,130] ERROR [client_event_receiver_bq_connector|task-0] Task failed with java.lang.NullPointerException error: null (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:104)
	at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:46)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getBigQuerySchema(SchemaManager.java:479)
	at com.wepay.kafka.connect.bigquery.SchemaManager.convertRecordSchema(SchemaManager.java:315)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:286)
	at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:269)
	at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:242)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:159)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:110)
	at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

My connector config is as below:

{
    "connector.class"              : "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max"                       : "1",
    "topics"                              : "kafka_bq_receiver-1",
    "key.converter"                   : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "key.converter.schemas.enable"    : "false",
    "errors.deadletterqueue.topic.name" : "kafka-dq-sink",
    "errors.log.include.messages" : "true",
    "sanitizeTopics" : "true",
    "autoCreateTables" : "false",
    "autoUpdateSchemas" : "false",
    "schemaRetriever" : "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
    "defaultDataset" : "df",
    "allowNewBigQueryFields" : "true",
    "allowBigQueryRequiredFieldRelaxation": "true",
    "keyfile" : "key.json",
    "bigQueryPartitionDecorator": "false",
    "timestamp": "UTC",
    "transforms" : "setTopic",
    "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.setTopic.field": "id"
  }

Is there any way to ignore this additional field in kafka record instead of generating error?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions