diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 63a0eeecf..442c5cf52 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -260,7 +260,11 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) { if (testSchemaManager != null) { return testSchemaManager; } - schemaRetriever = config.getSchemaRetriever(); + + if ( schemaRetriever == null) { + schemaRetriever = config.getSchemaRetriever(); + } + SchemaConverter schemaConverter = config.getSchemaConverter(); Optional kafkaKeyFieldName = config.getKafkaKeyFieldName(); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 8c8b6b696..53f7c4f79 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -108,8 +108,20 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat } } catch (BigQueryException exception) { // Should only perform one table creation attempt. - if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { - attemptTableCreate(tableId.getBaseTableId(), topic); + if (isTableNotExistedException(exception) && autoCreateTables ) { + //In some cases the table exists but the partition still is not created, + // so we should no throw an exception but wait + try { + if (bigQuery.getTable(tableId.getBaseTableId()) == null) { + attemptTableCreate(tableId.getBaseTableId(), topic); + } + } catch(BigQueryConnectException ex) { + BigQueryException cause = (BigQueryException)ex.getCause(); + //Ignore the exception because the table already exists + if ( !(cause.getCode() == 409 && cause.getReason().equalsIgnoreCase("duplicate"))) { + throw ex; + } + } } else if (isTableMissingSchema(exception) && autoUpdateSchemas) { attemptSchemaUpdate(tableId, topic); } else { @@ -129,7 +141,11 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat logger.debug("re-attempting insertion"); writeResponse = bigQuery.insertAll(request); } catch (BigQueryException exception) { - // no-op, we want to keep retrying the insert + // When autoCreateTable is used with autoUpdateSchemas and the MemorySchemaRetriever + // a table with no schema can appear and is necessary to check again + if (isTableMissingSchema(exception) && autoUpdateSchemas) { + attemptSchemaUpdate(tableId, topic); + } } } else { return writeResponse.getInsertErrors(); @@ -140,10 +156,12 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat "Failed to write rows after BQ schema update within " + RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId()); } - try { - Thread.sleep(RETRY_WAIT_TIME); - } catch (InterruptedException e) { - // no-op, we want to keep retrying the insert + if (writeResponse == null || writeResponse.hasErrors()) { + try { + Thread.sleep(RETRY_WAIT_TIME); + } catch (InterruptedException e) { + // no-op, we want to keep retrying the insert + } } } logger.debug("table insertion completed successfully");