diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index c5867cc7f522..9b67fcf10541 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -58,6 +58,8 @@ abstract class AppendClientInfo { abstract DescriptorProtos.DescriptorProto getDescriptor(); + abstract boolean getUseEnhancedTableRowConversion(); + @AutoValue.Builder abstract static class Builder { abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value); @@ -74,6 +76,8 @@ abstract static class Builder { abstract Builder setStreamName(@Nullable String name); + abstract Builder setUseEnhancedTableRowConversion(boolean value); + abstract AppendClientInfo build(); }; @@ -84,6 +88,15 @@ static AppendClientInfo of( DescriptorProtos.DescriptorProto descriptor, Consumer closeAppendClient) throws Exception { + return of(tableSchema, descriptor, closeAppendClient, false); + } + + static AppendClientInfo of( + TableSchema tableSchema, + DescriptorProtos.DescriptorProto descriptor, + Consumer closeAppendClient, + boolean useEnhancedTableRowConversion) + throws Exception { return new AutoValue_AppendClientInfo.Builder() .setTableSchema(tableSchema) .setCloseAppendClient(closeAppendClient) @@ -91,6 +104,7 @@ static AppendClientInfo of( .setSchemaInformation( TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema)) .setDescriptor(descriptor) + .setUseEnhancedTableRowConversion(useEnhancedTableRowConversion) .build(); } @@ -106,6 +120,20 @@ static AppendClientInfo of( closeAppendClient); } + static AppendClientInfo of( + TableSchema tableSchema, + Consumer closeAppendClient, + boolean includeCdcColumns, + boolean useEnhancedTableRowConversion) + throws Exception { + return of( + tableSchema, + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + tableSchema, true, includeCdcColumns), + closeAppendClient, + useEnhancedTableRowConversion); + } + public AppendClientInfo withNoAppendClient() { return toBuilder().setStreamAppendClient(null).build(); } @@ -173,7 +201,9 @@ public TableRow toTableRow(ByteString protoBytes, Predicate includeField DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes), true, - includeField); + includeField, + "", + getUseEnhancedTableRowConversion()); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e3f9de3b7ab3..d1be4ce04f50 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2451,6 +2451,8 @@ public enum Method { abstract @Nullable String getWriteTempDataset(); + abstract @Nullable Boolean getUseEnhancedTableRowConversion(); + abstract @Nullable SerializableFunction getRowMutationInformationFn(); @@ -2576,6 +2578,8 @@ abstract Builder setBadRecordErrorHandler( abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + abstract Builder setUseEnhancedTableRowConversion(Boolean useEnhancedTableRowConversion); + abstract Write build(); } @@ -3326,6 +3330,20 @@ public Write withWriteTempDataset(String writeTempDataset) { return toBuilder().setWriteTempDataset(writeTempDataset).build(); } + /** + * Enables enhanced table row conversion for better handling of nested fields and complex data + * types. When enabled, uses improved conversion logic that provides better compatibility with + * BigQuery's data model. When disabled (default), uses the legacy conversion behavior for + * backward compatibility. + * + * @param useEnhancedTableRowConversion true to enable enhanced conversion, false for legacy + * behavior + * @return the updated Write transform + */ + public Write withEnhancedTableRowConversion(boolean useEnhancedTableRowConversion) { + return toBuilder().setUseEnhancedTableRowConversion(useEnhancedTableRowConversion).build(); + } + public Write withErrorHandler(ErrorHandler errorHandler) { return toBuilder() .setBadRecordErrorHandler(errorHandler) @@ -3939,7 +3957,10 @@ private WriteResult continueExpandTyped( getRowMutationInformationFn() != null, getCreateDisposition(), getIgnoreUnknownValues(), - getAutoSchemaUpdate()); + getAutoSchemaUpdate(), + getUseEnhancedTableRowConversion() != null + ? getUseEnhancedTableRowConversion() + : false); } int numShards = getStorageApiNumStreams(bqOptions); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java index 7f4ec4a77d0b..98093d2a6f52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.checkerframework.checker.nullness.qual.NonNull; /** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */ @@ -108,7 +107,10 @@ public TableRow toFailsafeTableRow(T element) { TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto), element.toByteArray()), true, - Predicates.alwaysTrue()); + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates + .alwaysTrue(), + "", + false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index 08588cfc7850..5ebc0f5d62b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -42,6 +42,7 @@ public class StorageApiDynamicDestinationsTableRow getMessageConverter( DestinationT destination, DatasetService datasetService) throws Exception { @@ -189,7 +196,8 @@ public StorageApiWritePayload toMessage( allowMissingFields, unknownFields, changeType, - changeSequenceNum); + changeSequenceNum, + useEnhancedTableRowConversion); return StorageApiWritePayload.of( msg.toByteArray(), unknownFields, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index ab8de041be8f..43162a33b5e5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -663,7 +663,9 @@ long flush( getAppendClientInfo(true, null).getDescriptor()), rowBytes), true, - successfulRowsPredicate); + successfulRowsPredicate, + "", + getEnhancedConversionFlag()); } org.joda.time.Instant timestamp = insertTimestamps.get(i); failedRowsReceiver.outputWithTimestamp( @@ -748,7 +750,9 @@ long flush( .getDescriptor()), protoBytes), true, - Predicates.alwaysTrue()); + Predicates.alwaysTrue(), + "", + getEnhancedConversionFlag()); } element = new BigQueryStorageApiInsertError( @@ -900,7 +904,9 @@ long flush( TableRowToStorageApiProto.tableRowFromMessage( DynamicMessage.parseFrom(descriptor, rowBytes), true, - successfulRowsPredicate); + successfulRowsPredicate, + "", + getEnhancedConversionFlag()); org.joda.time.Instant timestamp = c.timestamps.get(i); successfulRowsReceiver.outputWithTimestamp(row, timestamp); } catch (Exception e) { @@ -967,6 +973,14 @@ void postFlush() { private int streamAppendClientCount; private final @Nullable Map bigLakeConfiguration; + private boolean getEnhancedConversionFlag() { + if (dynamicDestinations instanceof StorageApiDynamicDestinationsTableRow) { + return ((StorageApiDynamicDestinationsTableRow) dynamicDestinations) + .getUseEnhancedTableRowConversion(); + } + return false; + } + WriteRecordsDoFn( String operationName, StorageApiDynamicDestinations dynamicDestinations, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index bf9c4c28bc1b..d395f5e062c1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -624,6 +624,34 @@ public static DynamicMessage messageFromTableRow( @Nullable String changeType, @Nullable String changeSequenceNum) throws SchemaConversionException { + return messageFromTableRow( + schemaInformation, + descriptor, + tableRow, + ignoreUnknownValues, + allowMissingRequiredFields, + unknownFields, + changeType, + changeSequenceNum, + false); + } + + /** + * Given a BigQuery TableRow, returns a protocol-buffer message that can be used to write data + * using the BigQuery Storage API. + */ + @SuppressWarnings("nullness") + public static DynamicMessage messageFromTableRow( + SchemaInformation schemaInformation, + Descriptor descriptor, + TableRow tableRow, + boolean ignoreUnknownValues, + boolean allowMissingRequiredFields, + final @Nullable TableRow unknownFields, + @Nullable String changeType, + @Nullable String changeSequenceNum, + boolean useEnhancedTableRowConversion) + throws SchemaConversionException { @Nullable Object fValue = tableRow.get("f"); if (fValue instanceof List) { List> cells = (List>) fValue; @@ -1129,7 +1157,7 @@ private static long toEpochMicros(Instant timestamp) { @VisibleForTesting public static TableRow tableRowFromMessage( Message message, boolean includeCdcColumns, Predicate includeField) { - return tableRowFromMessage(message, includeCdcColumns, includeField, ""); + return tableRowFromMessage(message, includeCdcColumns, includeField, "", false); } public static TableRow tableRowFromMessage( @@ -1137,22 +1165,71 @@ public static TableRow tableRowFromMessage( boolean includeCdcColumns, Predicate includeField, String namePrefix) { - // TODO: Would be more correct to generate TableRows using setF. + return tableRowFromMessage(message, includeCdcColumns, includeField, namePrefix, false); + } + + public static TableRow tableRowFromMessage( + Message message, + boolean includeCdcColumns, + Predicate includeField, + String namePrefix, + boolean useEnhancedConversion) { TableRow tableRow = new TableRow(); - for (Map.Entry field : message.getAllFields().entrySet()) { - StringBuilder fullName = new StringBuilder(); - FieldDescriptor fieldDescriptor = field.getKey(); - String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor); - fullName = fullName.append(namePrefix).append(fieldName); - Object fieldValue = field.getValue(); - if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) - && includeField.test(fieldName)) { - tableRow.put( - fieldName, - jsonValueFromMessageValue( - fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString())); + + if (useEnhancedConversion) { + // New behavior: Process fields in descriptor order and use F list format + List tableCells = Lists.newArrayList(); + + for (FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { + StringBuilder fullName = new StringBuilder(); + String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor); + fullName = fullName.append(namePrefix).append(fieldName); + + TableCell tableCell = new TableCell(); + + if (message.hasField(fieldDescriptor) + && (includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) + && includeField.test(fieldName)) { + Object fieldValue = message.getField(fieldDescriptor); + Object jsonValue = + jsonValueFromMessageValue( + fieldDescriptor, + fieldValue, + true, + includeField, + fullName.append(".").toString(), + useEnhancedConversion); + tableCell.setV(jsonValue); + } + + tableCells.add(tableCell); + } + + tableRow.setF(tableCells); + } else { + // Original behavior: Set fields directly on TableRow by name + for (FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { + StringBuilder fullName = new StringBuilder(); + String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor); + fullName = fullName.append(namePrefix).append(fieldName); + + if (message.hasField(fieldDescriptor) + && (includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) + && includeField.test(fieldName)) { + Object fieldValue = message.getField(fieldDescriptor); + Object jsonValue = + jsonValueFromMessageValue( + fieldDescriptor, + fieldValue, + true, + includeField, + fullName.append(".").toString(), + useEnhancedConversion); + tableRow.set(fieldName, jsonValue); + } } } + return tableRow; } @@ -1161,18 +1238,23 @@ public static Object jsonValueFromMessageValue( Object fieldValue, boolean expandRepeated, Predicate includeField, - String prefix) { + String prefix, + boolean useEnhancedConversion) { if (expandRepeated && fieldDescriptor.isRepeated()) { List valueList = (List) fieldValue; return valueList.stream() - .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField, prefix)) + .map( + v -> + jsonValueFromMessageValue( + fieldDescriptor, v, false, includeField, prefix, useEnhancedConversion)) .collect(toList()); } switch (fieldDescriptor.getType()) { case GROUP: case MESSAGE: - return tableRowFromMessage((Message) fieldValue, false, includeField, prefix); + return tableRowFromMessage( + (Message) fieldValue, false, includeField, prefix, useEnhancedConversion); case BYTES: return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray()); case ENUM: @@ -1192,4 +1274,15 @@ public static Object jsonValueFromMessageValue( return fieldValue.toString(); } } + + // Backward-compatible overload for jsonValueFromMessageValue + public static Object jsonValueFromMessageValue( + FieldDescriptor fieldDescriptor, + Object fieldValue, + boolean expandRepeated, + Predicate includeField, + String prefix) { + return jsonValueFromMessageValue( + fieldDescriptor, fieldValue, expandRepeated, includeField, prefix, false); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 77fc7cab0245..fe9ee71a774f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -652,7 +652,9 @@ public ApiFuture appendRows(long offset, ProtoRows rows) TableRowToStorageApiProto.tableRowFromMessage( DynamicMessage.parseFrom(protoDescriptor, bytes), false, - Predicates.alwaysTrue()); + Predicates.alwaysTrue(), + "", + false); if (shouldFailRow.apply(tableRow)) { rowIndexToErrorMessage.put(i, "Failing row " + tableRow.toPrettyString()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryNestedFFieldIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryNestedFFieldIT.java new file mode 100644 index 000000000000..afc0f9af72e4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryNestedFFieldIT.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.TreeMap; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for BigQuery Storage API write with nested structures containing 'f' field. This + * test verifies the fix for IllegalArgumentException when setting a List field to Double in nested + * TableRow structures, based on the scenario from BigQuerySetFPipeline.java. + * + *

This test requires enhanced table row conversion to be enabled to properly handle nested + * structures with 'f' fields. Without enhanced conversion, the legacy behavior treats 'f' fields + * specially, which can cause conflicts with nested structures containing 'f' fields. + */ +@RunWith(JUnit4.class) +public class BigQueryNestedFFieldIT { + + private static final Logger LOG = LoggerFactory.getLogger(BigQueryNestedFFieldIT.class); + private static String project; + private static final String DATASET_ID = + "nested_f_field_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); + private static final String TABLE_NAME = "nested_f_field_test"; + + private static TestBigQueryOptions bqOptions; + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryNestedFFieldIT"); + + @BeforeClass + public static void setup() throws Exception { + bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); + project = bqOptions.as(GcpOptions.class).getProject(); + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(project, DATASET_ID, null, bqOptions.getBigQueryLocation()); + } + + @AfterClass + public static void cleanup() { + BQ_CLIENT.deleteDataset(project, DATASET_ID); + } + + /** + * Test case that reproduces the scenario from BigQuerySetFPipeline.java where a nested structure + * contains an 'f' field with a float value. This tests the fix for the IllegalArgumentException + * that occurred when TableRowToStorageApiProto tried to set a List field to a Double value. + * + *

This test uses enhanced table row conversion to properly handle the nested 'f' field. + * Enhanced conversion avoids the legacy special handling of 'f' fields that can cause conflicts + * in nested structures. + */ + @Test + public void testNestedFFieldWithFloat() throws IOException, InterruptedException { + // Define the table schema with nested structure containing 'f' field + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("bytes").setType("BYTES"), + new TableFieldSchema() + .setName("sub") + .setType("RECORD") + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("a").setType("STRING"), + new TableFieldSchema().setName("c").setType("INTEGER"), + new TableFieldSchema().setName("f").setType("FLOAT"))))); + + String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, TABLE_NAME); + + // Set up pipeline options for Storage API + bqOptions.setUseStorageWriteApi(true); + bqOptions.setUseStorageWriteApiAtLeastOnce(true); + + Pipeline pipeline = Pipeline.create(bqOptions); + + // Create test data similar to BigQuerySetFPipeline.java + WriteResult result = + pipeline + .apply("CreateInput", Create.of("test")) + .apply("GenerateTestData", ParDo.of(new GenerateTestDataFn())) + .apply("CreateTableRows", MapElements.via(new CreateTableRowFn())) + .apply( + "WriteToBigQuery", + BigQueryIO.writeTableRows() + .to(tableSpec) + .withSchema(schema) + .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withEnhancedTableRowConversion(true)); + + // Validate failed inserts using PAssert + PCollection failedInserts = result.getFailedStorageApiInserts(); + + // Assert that we expect exactly 1 failed insert (only the large row should fail) + // The test intentionally creates a batch with one row that exceeds BigQuery's size limit + PAssert.that(failedInserts) + .satisfies( + (Iterable errors) -> { + int count = 0; + for (BigQueryStorageApiInsertError error : errors) { + count++; + LOG.info( + "Failed insert error: {}", + error.getErrorMessage()); // Log the error for debugging + if (!error.getErrorMessage().contains("Row payload too large")) { + throw new AssertionError( + "Expected 'Row payload too large' error, got: " + error.getErrorMessage()); + } + } + LOG.info("Total failed inserts: {}", count); + if (count != 1) { + throw new AssertionError("Expected exactly 1 failed insert, got: " + count); + } + return null; + }); + + // Run the pipeline + PipelineResult pipelineResult = pipeline.run(); + pipelineResult.waitUntilFinish(); + + // Check if the BigQuery table exists and has any rows + String testQuery = + String.format("SELECT sub.a, sub.c, sub.f FROM [%s.%s];", DATASET_ID, TABLE_NAME); + + QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); + + if (response.getRows() != null) { + int actualRows = response.getRows().size(); + LOG.info("Found {} successful inserts in BigQuery table", actualRows); + + if (actualRows == 2) { + // Verify the nested 'f' field value for all rows + for (int i = 0; i < response.getRows().size(); i++) { + TableRow resultRow = response.getRows().get(i); + assertEquals("hello", resultRow.getF().get(0).getV()); // sub.a + assertEquals("3", resultRow.getF().get(1).getV()); // sub.c + assertEquals("1.2", resultRow.getF().get(2).getV()); // sub.f + } + + LOG.info( + "Successfully wrote and verified nested structure with 'f' field containing float value. " + + "Verified {} rows", + response.getRows().size()); + } else { + fail("Expected exactly 2 successful inserts in BigQuery table, got: " + actualRows); + } + } else { + fail("No successful inserts found in BigQuery table - all rows may have failed"); + } + } + + private static class GenerateTestDataFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(1_000); // Small byte array size for test + c.output(1_000_000); // Medium byte array size for test + c.output(10_000_000); // Large byte array size for test - this row will not be added + } + } + + /** Static SimpleFunction for creating TableRows to avoid serialization issues. */ + private static class CreateTableRowFn extends SimpleFunction { + @Override + public TableRow apply(Integer bytesSize) { + // Create nested structure with 'f' field containing float value + // This reproduces the exact scenario from BigQuerySetFPipeline.java + ImmutableMap data = + ImmutableMap.of( + "bytes", + new byte[bytesSize], + "sub", + new TreeMap<>(ImmutableMap.of("a", "hello", "c", 3, "f", 1.2f))); + + TableRow row = new TableRow(); + row.putAll(new TreeMap<>(data)); + return row; + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 1a6b83c5ebd6..5b7888d2958e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1860,4 +1861,73 @@ public void testCdcFields() throws Exception { assertEquals( Long.toHexString(42L), msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN))); } + + @Test + public void testTableRowFromMessageWithFieldNamedF() throws Exception { + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setType("STRING").setName("stringValue"), + new TableFieldSchema().setType("FLOAT64").setName("f"))); + + // Create a DynamicMessage directly to test the tableRowFromMessage fix + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, false, false); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + + // Set field values in the message + FieldDescriptor stringField = descriptor.findFieldByName("stringvalue"); + FieldDescriptor fField = descriptor.findFieldByName("f"); + + builder.setField(stringField, "test"); + builder.setField(fField, 3.14); + + DynamicMessage msg = builder.build(); + + // Convert DynamicMessage to TableRow with enhanced conversion (new behavior) + TableRow result = + TableRowToStorageApiProto.tableRowFromMessage(msg, false, field -> true, "", true); + + // Verify the conversion worked correctly - all fields are now in the F list as TableCells + List tableCells = (List) result.getF(); + assertEquals(2, tableCells.size()); // Should have 2 fields: stringvalue and f + assertEquals("test", tableCells.get(0).getV()); // First field (stringvalue) + assertEquals(3.14, tableCells.get(1).getV()); // Second field (f) + } + + @Test + public void testTableRowFromMessageWithFieldNamedFBackwardCompatible() throws Exception { + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setType("STRING").setName("stringValue"), + new TableFieldSchema().setType("FLOAT64").setName("floatField"))); + + // Create a DynamicMessage directly to test the tableRowFromMessage backward compatibility + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, false, false); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + + // Set field values in the message + FieldDescriptor stringField = descriptor.findFieldByName("stringvalue"); + FieldDescriptor floatField = descriptor.findFieldByName("floatfield"); + + builder.setField(stringField, "test"); + builder.setField(floatField, 3.14); + + DynamicMessage msg = builder.build(); + + // Convert DynamicMessage to TableRow with backward compatible conversion (old behavior) + TableRow result = + TableRowToStorageApiProto.tableRowFromMessage(msg, false, field -> true, "", false); + + // Verify the conversion worked correctly - fields are set by name on the TableRow + assertEquals("test", result.get("stringvalue")); + assertEquals(3.14, result.get("floatfield")); + + // The F list should not be set in backward compatible mode + assertNull(result.getF()); + } }