Skip to content

[destination-bigquery] Getting error for exceeded table quota for imports for temporary tables created in airbyte_internal database #72869

@chinmay1994

Description

Connector Name

destination-bigquery

Connector Version

3.0.17

What step the error happened?

During the sync

Relevant information

We have some connections that sync data from MySQL to BigQuery using CDC. The syncs run frequently (every 10 minutes). After a couple of days, I start getting failures in the connections with the following error message

Warning from destination: Quota exceeded: Your table exceeded quota for imports or query appends per table. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas

In the bigquery project history, I see that the failures are for drop table queries which drop the temporary tables.
I see that the temporary table names are static for every stream, and as a result we probably end up hitting the DDL quota. Is it possible to create temporary tables with dynamic (unique) names so that we don't run into table level quota issues ?

Image

Relevant log output

[ {
  "failureOrigin" : "destination",
  "failureType" : "system_error",
  "internalMessage" : "com.google.cloud.bigquery.BigQueryException: Quota exceeded: Your table exceeded quota for imports or query appends per table. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas",
  "externalMessage" : "Quota exceeded: Your table exceeded quota for imports or query appends per table. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 9559,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "com.google.cloud.bigquery.BigQueryException: Quota exceeded: Your table exceeded quota for imports or query appends per table. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas\n\tat io.airbyte.integrations.destination.bigquery.write.typing_deduping.BigQueryDatabaseHandler.execute(BigQueryDatabaseHandler.kt:106)\n\tat io.airbyte.cdk.load.orchestration.db.direct_load_table.DefaultDirectLoadTableSqlOperations.dropTable$suspendImpl(DirectLoadTableOperations.kt:127)\n\tat io.airbyte.cdk.load.orchestration.db.direct_load_table.DefaultDirectLoadTableSqlOperations.dropTable(DirectLoadTableOperations.kt)\n\tat io.airbyte.integrations.destination.bigquery.write.typing_deduping.direct_load_tables.BigqueryDirectLoadSqlTableOperations.dropTable(BigqueryDirectLoadSqlTableOperations.kt)\n\tat io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableDedupStreamLoader.close(DirectLoadTableStreamLoader.kt:110)\n\tat io.airbyte.cdk.load.write.StreamLoader.close$default(StreamLoader.kt:28)\n\tat io.airbyte.cdk.load.task.implementor.CloseStreamTask.execute(CloseStreamTask.kt:24)\n\tat io.airbyte.cdk.load.task.DestinationTaskLauncher$WrappedTask.execute(DestinationTaskLauncher.kt:124)\n\tat io.airbyte.cdk.load.task.TaskScopeProvider$launch$job$1.invokeSuspend(TaskScopeProvider.kt:35)\n\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)\n\tat kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:124)\n\tat kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)\n\tSuppressed: io.airbyte.cdk.TransientErrorException: Input was fully read, but some streams did not receive a terminal stream status message. If the destination did not encounter other errors, this likely indicates an error in the source or platform. Streams without a status message: []\n\t\tat io.airbyte.cdk.load.state.SyncManager.markInputConsumed(SyncManager.kt:116)\n\t\tat io.airbyte.cdk.load.state.PipelineEventBookkeepingRouter.close(PipelineEventBookkeepingRouter.kt:300)\n\t\tat io.airbyte.cdk.load.task.internal.InputConsumerTask.execute(InputConsumerTask.kt:120)\n\t\tat io.airbyte.cdk.load.task.internal.InputConsumerTask$execute$1.invokeSuspend(InputConsumerTask.kt)\n\t\t... 8 more\n",
  "timestamp" : 1770269283288
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 9559,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1\n\tat io.airbyte.container.orchestrator.worker.DestinationReader.run(ReplicationTask.kt:54)\n\tat io.airbyte.container.orchestrator.worker.DestinationReader$run$1.invokeSuspend(ReplicationTask.kt)\n\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1770269284696
} ]

Contribute

  • Yes, I want to contribute

Internal Tracking: https://github.com/airbytehq/oncall/issues/11167

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions