-
Notifications
You must be signed in to change notification settings - Fork 30
Description
We use the sink connector in the following setup:
Strimzi Version: 0.45.1
Kafka Version: 3.8.1
Sink connector version: 2.10.3
Connector confgs:
(...)
timePartitioningType: DAY
autoCreateTables: true
allBQFieldsNullable: true
allowNewBigQueryFields: true
allowBigQueryRequiredFieldRelaxation: true
allowSchemaUnionization: true
useStorageWriteApi: true
(...)
and encountered the following problem:
If the sink connector needs to add a new column to the target table, because there was a new field added to the Kafka Event, it adds the new column to the table without any problems, but then fails writing to it, until the whole KafkaConnect deployment gets restarted.
When that problem happens, the following errors are written:
com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: A write thread has failed with an unrecoverable error
Caused by: Exceeded 130 attempts to write to table projects/some-project/datasets/some-dataset/tables/topic-name$20260106
at com.wepay.kafka.connect.bigquery.write.batch.KcbqThreadPoolExecutor.lambda$maybeThrowEncounteredError$0(KcbqThreadPoolExecutor.java:111)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at com.wepay.kafka.connect.bigquery.write.batch.KcbqThreadPoolExecutor.maybeThrowEncounteredError(KcbqThreadPoolExecutor.java:110)
at com.wepay.kafka.connect.bigquery.write.batch.KcbqThreadPoolExecutor.awaitCurrentTasks(KcbqThreadPoolExecutor.java:100)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:189)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:209)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:399)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:230)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException: Exceeded 130 attempts to write to table projects/some-project/datasets/some-dataset/tables/topic-name$20260106
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiRetryHandler.maybeRetry(StorageWriteApiRetryHandler.java:127)
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBase.initializeAndWriteRecords(StorageWriteApiBase.java:208)
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiWriter.run(StorageWriteApiWriter.java:88)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask$MdcContextThreadFactory.lambda$newThread$0(BigQuerySinkTask.java:671)
... 1 more
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException: Failed to write rows on table projects/some-project/datasets/some-dataset/tables/topic-name$20260106 due to INVALID_ARGUMENT: Append serialization failed for writer: projects/some-project/datasets/some-dataset/tables/topic-name$20260106/_default
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBase.writeBatch(StorageWriteApiBase.java:286)
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBase.initializeAndWriteRecords(StorageWriteApiBase.java:205)
... 5 more
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/some-project/datasets/some-dataset/tables/topic-name$20260106/_default
at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:212)
at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:125)
at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:65)
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiDefaultStream$DefaultStreamWriter.appendRows(StorageWriteApiDefaultStream.java:197)
at com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBase.writeBatch(StorageWriteApiBase.java:253)
... 6 more
It seems like the connector still wants to write with the old schema to the adjusted table schema and that seems to fail without recovering on its own, causing the lag on the topic to go up without manual intervention.
Restarting the KafkaConnect deployment will "fix" that issue and the connector can ingest the data again.