[Feature][Connector] Add BigQuery Sink Connector#10485
[Feature][Connector] Add BigQuery Sink Connector#10485dybyte wants to merge 14 commits intoapache:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new BigQuery Sink connector (Storage Write API-based) and wires it into SeaTunnel distribution, plugin discovery, E2E module structure, and documentation. It also includes a small JSON format API refactor to promote the row-to-JSON converter interface to a top-level type used by the new connector.
Changes:
- Add
connector-bigqueryimplementation (sink factory/sink/writer/client factory/options/errors) and register it in plugin mapping + plugin config + dist. - Add (currently disabled) BigQuery E2E module + example configs.
- Refactor JSON format converter API (
RowToJsonConverterextracted) and addJsonSerializationSchema#convert(...)for directJsonNodeconversion.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/llm/LLMRequestJsonTest.java | Update test to use new RowToJsonConverter type. |
| seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/remote/AbstractModel.java | Switch internal converter field/getter to new RowToJsonConverter. |
| seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java | Remove nested converter interface (now top-level). |
| seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverter.java | New top-level converter interface. |
| seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java | Accept injected converter + add convert(SeaTunnelRow) returning JsonNode. |
| seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | Register new connector-bigquery-e2e module. |
| seatunnel-e2e/.../connector-bigquery-e2e/** | Add BigQuery E2E scaffolding + configs + (disabled) ITs. |
| seatunnel-dist/pom.xml | Add connector-bigquery dependency to distribution. |
| seatunnel-connectors-v2/connector-bigquery/** | New BigQuery sink connector implementation. |
| plugin-mapping.properties | Register seatunnel.sink.BigQuery = connector-bigquery. |
| config/plugin_config | Add connector-bigquery to plugin list. |
| docs/en/connectors/sink/BigQuery.md | Add English connector documentation. |
| docs/zh/connectors/sink/BigQuery.md | Add Chinese connector documentation. |
| docs/*/connectors/changelog/connector-bigquery.md | Add changelog stubs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/seatunnel/connectors/bigquery/client/BigQueryWriteClientFactory.java
Show resolved
Hide resolved
...rc/main/java/org/apache/seatunnel/connectors/bigquery/client/BigQueryWriteClientFactory.java
Show resolved
Hide resolved
...src/main/java/org/apache/seatunnel/connectors/bigquery/sink/writer/BigQueryStreamWriter.java
Show resolved
Hide resolved
...bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java
Outdated
Show resolved
Hide resolved
...uery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java
Outdated
Show resolved
Hide resolved
...ector-bigquery-e2e/src/test/java/org/apache/seatunnel/e2e/connector/bigquery/BigqueryIT.java
Outdated
Show resolved
Hide resolved
Issue 1: Missing Exactly-Once Semantics SupportLocation: public class BigQuerySink
implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>, SupportMultiTableSink {Related Context:
Issue Description:
Since BigQuery Storage Write API is a streaming API where data cannot be rolled back after writing, the current implementation cannot guarantee data consistency. Potential Risks:
Impact Scope:
Severity: CRITICAL Suggestions for Improvement:
Issue 2: Incomplete Async Write Error Propagation MechanismLocation: private void flush() throws IOException {
// ... omitting the first part
ApiFutures.addCallback(
future,
new ApiFutureCallback<AppendRowsResponse>() {
@Override
public void onSuccess(AppendRowsResponse result) {
inflightRequests.arriveAndDeregister();
log.info("Successfully appended {} rows.", dataToSend.length());
}
@Override
public void onFailure(Throwable t) {
fatalError.compareAndSet(
null,
new BigQueryConnectorException(
BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, t));
inflightRequests.arriveAndDeregister();
log.warn("Failed to append rows.", t);
}
},
MoreExecutors.directExecutor());
}Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: CRITICAL Suggestions for Improvement: // Pseudocode: use synchronous or CompletableFutures to wait
private void flush() throws IOException {
if (buffer.length() == 0) return;
JSONArray dataToSend = buffer;
buffer = new JSONArray();
ApiFuture<AppendRowsResponse> future;
try {
future = streamWriter.append(dataToSend);
} catch (Exception e) {
// Restore buffer on failure
buffer = dataToSend;
throw new BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
}
try {
// Wait synchronously for result
future.get();
log.info("Successfully appended {} rows.", dataToSend.length());
} catch (Exception e) {
// Restore buffer on failure or retry
buffer = dataToSend;
throw new BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
}
}Issue 3: close() Method May Cause DeadlockLocation: @Override
public void close() throws IOException {
flush(); // Line 136
inflightRequests.arriveAndAwaitAdvance(); // Line 137
checkFatalError();
streamWriter.close();
}Related Context:
Issue Description:
Problem: Phaser's initial parties is 1, if More serious issue: If
Potential Risks:
Impact Scope:
Severity: MAJOR Suggestions for Improvement: @Override
public void close() throws IOException {
try {
flush();
} finally {
// Wait for inflight requests regardless of whether flush succeeds
inflightRequests.arriveAndAwaitAdvance();
}
checkFatalError();
streamWriter.close();
}Issue 4: Incorrect Map Conversion Logic in RowToJsonConvertersLocation: protected RowToJsonConverter createMapConverter(
SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
// ... omitting the first part
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
ObjectNode node = mapper.createObjectNode();
Map<?, ?> mapData = (Map) value;
for (Map.Entry<?, ?> entry : mapData.entrySet()) {
JsonNode keyNode = keyConverter.convert(mapper, null, entry.getKey());
String fieldName = keyNode.isTextual() ? keyNode.asText() : keyNode.toString();
node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
}
return mapper.getNodeFactory().textNode(node.toString()); // Line 298: Error!
}
};
}Related Context:
Issue Description: For example, assuming Map data is
Potential Risks:
Impact Scope:
Severity: CRITICAL Suggestions for Improvement: @Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
ObjectNode node = mapper.createObjectNode();
Map<?, ?> mapData = (Map) value;
for (Map.Entry<?, ?> entry : mapData.entrySet()) {
JsonNode keyNode = keyConverter.convert(mapper, null, entry.getKey());
String fieldName = keyNode.isTextual() ? keyNode.asText() : keyNode.toString();
node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
}
return node; // Return ObjectNode instead of textNode
}Issue 5: Code Duplication in RowToJsonConvertersLocation: Related Context:
Issue Description: Problems:
Potential Risks:
Impact Scope:
Severity: MAJOR Suggestions for Improvement:
// BigQuerySerializer.java
public BigQuerySerializer(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
this.jsonSerializationSchema =
new JsonSerializationSchema(
rowType,
new org.apache.seatunnel.format.json.RowToJsonConverters()
.createConverter(checkNotNull(rowType)));
}Issue 6: New Public Method in JsonSerializationSchemaLocation: public JsonNode convert(SeaTunnelRow row) {
if (node == null) {
node = mapper.createObjectNode();
}
try {
return runtimeConverter.convert(mapper, node, row);
} catch (Exception e) {
throw CommonError.jsonOperationError(FORMAT, row.toString(), e);
}
}Related Context:
Issue Description:
Although this method is to support BigQuery's requirement (needs JsonNode instead of byte[]), need to evaluate whether it will affect other modules. Potential Risks:
Impact Scope:
Severity: MAJOR Suggestions for Improvement:
Issue 7: Insufficient Sensitive Information HandlingLocation: public static final Option<String> SERVICE_ACCOUNT_KEY_PATH =
Options.key("service_account_key_path")
.stringType()
.noDefaultValue()
.withDescription("Path to GCP service account JSON key file");
public static final Option<String> SERVICE_ACCOUNT_KEY_JSON =
Options.key("service_account_key_json")
.stringType()
.noDefaultValue()
.withDescription("Inline GCP service account JSON key content");Related Context:
Issue Description: Potential Risks:
Impact Scope:
Severity: MAJOR Suggestions for Improvement:
try {
byte[] jsonBytes = config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_JSON).getBytes(StandardCharsets.UTF_8);
credentials = ServiceAccountCredentials.fromStream(new ByteArrayInputStream(jsonBytes));
} catch (IOException e) {
throw new BigQueryConnectorException(
BigQueryConnectorErrorCode.INVALID_CREDENTIALS,
"Invalid service account JSON format",
e);
}Issue 8: Tests Disabled and Lacking Unit TestsLocation: @TestTemplate
@Disabled("bigquery-emulator does not support bigquery storage write api.")
void testBigQuerySink(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/fake_to_bigquery_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
@TestTemplate
@Disabled("bigquery-emulator does not support bigquery storage write api.")
void testBigQuerySinkWithVariousType(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake_to_bigquery_sink_with_various_type.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Suggestions for Improvement:
@Test
public void testWrite() throws IOException {
BigQueryWriter mockWriter = mock(BigQueryWriter.class);
when(mockWriter.append(any())).thenReturn(ApiFutures.immediateFuture(mockResponse));
BigQuerySinkWriter writer = new BigQuerySinkWriter(config, mockWriter, serializer);
writer.write(testRow);
verify(mockWriter).append(any());
}
Issue 9: Hardcoded Retry ParametersLocation: RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();Related Context:
Issue Description: Potential Risks:
Impact Scope:
Severity: MINOR Suggestions for Improvement: public static final Option<Long> INITIAL_RETRY_DELAY_MS =
Options.key("initial_retry_delay_ms")
.longType()
.defaultValue(500L)
.withDescription("Initial retry delay in milliseconds");
public static final Option<Integer> MAX_RETRY_ATTEMPTS =
Options.key("max_retry_attempts")
.intType()
.defaultValue(5)
.withDescription("Maximum number of retry attempts");Then use configured values in BigQueryStreamWriter. Issue 10: Missing Metrics CollectionLocation: Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MINOR Suggestions for Improvement: public BigQuerySinkWriter(
ReadonlyConfig readOnlyConfig,
BigQueryStreamWriter streamWriter,
BigQuerySerializer serializer,
SinkWriter.Context context) { // Add Context parameter
this.batchSize = readOnlyConfig.get(BigQuerySinkOptions.BATCH_SIZE);
this.streamWriter = streamWriter;
this.serializer = serializer;
// Initialize Metrics
this.rowsSent = context.getMetricsContext().counter("rowsSent");
this.batchesSent = context.getMetricsContext().counter("batchesSent");
this.sendErrors = context.getMetricsContext().counter("sendErrors");
} |
…igquery # Conflicts: # seatunnel-e2e/seatunnel-connector-v2-e2e/connector-bigquery-e2e/src/test/java/org/apache/seatunnel/e2e/connector/bigquery/BigqueryIT.java # seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
#10355
Purpose of this pull request
This PR implements a simple BigQuery sink connector. Additional features will be implemented in follow-up PRs.
Does this PR introduce any user-facing change?
Yes. Please refer to the documentation for details.
How was this patch tested?
I've added
BigQueryITfor testing. However, the bigquery-emulator doesn't appear to support the BigQuery Write Storage API, so I tested this using my GCP account and verified that it works successfully.Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.