Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ abstract class AppendClientInfo {

abstract DescriptorProtos.DescriptorProto getDescriptor();

abstract boolean getUseEnhancedTableRowConversion();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value);
Expand All @@ -74,6 +76,8 @@ abstract static class Builder {

abstract Builder setStreamName(@Nullable String name);

abstract Builder setUseEnhancedTableRowConversion(boolean value);

abstract AppendClientInfo build();
};

Expand All @@ -84,13 +88,23 @@ static AppendClientInfo of(
DescriptorProtos.DescriptorProto descriptor,
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
throws Exception {
return of(tableSchema, descriptor, closeAppendClient, false);
}

static AppendClientInfo of(
TableSchema tableSchema,
DescriptorProtos.DescriptorProto descriptor,
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
boolean useEnhancedTableRowConversion)
throws Exception {
return new AutoValue_AppendClientInfo.Builder()
.setTableSchema(tableSchema)
.setCloseAppendClient(closeAppendClient)
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
.setSchemaInformation(
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
.setDescriptor(descriptor)
.setUseEnhancedTableRowConversion(useEnhancedTableRowConversion)
.build();
}

Expand All @@ -106,6 +120,20 @@ static AppendClientInfo of(
closeAppendClient);
}

static AppendClientInfo of(
TableSchema tableSchema,
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
boolean includeCdcColumns,
boolean useEnhancedTableRowConversion)
throws Exception {
return of(
tableSchema,
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, includeCdcColumns),
closeAppendClient,
useEnhancedTableRowConversion);
}

public AppendClientInfo withNoAppendClient() {
return toBuilder().setStreamAppendClient(null).build();
}
Expand Down Expand Up @@ -173,7 +201,9 @@ public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes),
true,
includeField);
includeField,
"",
getUseEnhancedTableRowConversion());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,8 @@ public enum Method {

abstract @Nullable String getWriteTempDataset();

abstract @Nullable Boolean getUseEnhancedTableRowConversion();

abstract @Nullable SerializableFunction<T, RowMutationInformation>
getRowMutationInformationFn();

Expand Down Expand Up @@ -2576,6 +2578,8 @@ abstract Builder<T> setBadRecordErrorHandler(

abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Builder<T> setUseEnhancedTableRowConversion(Boolean useEnhancedTableRowConversion);

abstract Write<T> build();
}

Expand Down Expand Up @@ -3326,6 +3330,20 @@ public Write<T> withWriteTempDataset(String writeTempDataset) {
return toBuilder().setWriteTempDataset(writeTempDataset).build();
}

/**
* Enables enhanced table row conversion for better handling of nested fields and complex data
* types. When enabled, uses improved conversion logic that provides better compatibility with
* BigQuery's data model. When disabled (default), uses the legacy conversion behavior for
* backward compatibility.
*
* @param useEnhancedTableRowConversion true to enable enhanced conversion, false for legacy
* behavior
* @return the updated Write transform
*/
public Write<T> withEnhancedTableRowConversion(boolean useEnhancedTableRowConversion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users on new Beam version could still trigger the bug if not opt-in this PTransform configuration. Also, this is only used in DLQ in storage_write_api write methods. Personally prefer over #36425 in terms of fix, and the new tests added in this PR is valuable and could check in after fix merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks!

return toBuilder().setUseEnhancedTableRowConversion(useEnhancedTableRowConversion).build();
}

public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder()
.setBadRecordErrorHandler(errorHandler)
Expand Down Expand Up @@ -3939,7 +3957,10 @@ private <DestinationT> WriteResult continueExpandTyped(
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
getAutoSchemaUpdate(),
getUseEnhancedTableRowConversion() != null
? getUseEnhancedTableRowConversion()
: false);
}

int numShards = getStorageApiNumStreams(bqOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */
Expand Down Expand Up @@ -108,7 +107,10 @@ public TableRow toFailsafeTableRow(T element) {
TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto),
element.toByteArray()),
true,
Predicates.alwaysTrue());
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates
.alwaysTrue(),
"",
false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
private final CreateDisposition createDisposition;
private final boolean ignoreUnknownValues;
private final boolean autoSchemaUpdates;
private final boolean useEnhancedTableRowConversion;
private static final TableSchemaCache SCHEMA_CACHE =
new TableSchemaCache(Duration.standardSeconds(1));

Expand All @@ -56,20 +57,26 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
boolean usesCdc,
CreateDisposition createDisposition,
boolean ignoreUnknownValues,
boolean autoSchemaUpdates) {
boolean autoSchemaUpdates,
boolean useEnhancedTableRowConversion) {
super(inner);
this.formatFunction = formatFunction;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
this.createDisposition = createDisposition;
this.ignoreUnknownValues = ignoreUnknownValues;
this.autoSchemaUpdates = autoSchemaUpdates;
this.useEnhancedTableRowConversion = useEnhancedTableRowConversion;
}

static void clearSchemaCache() throws ExecutionException, InterruptedException {
SCHEMA_CACHE.clear();
}

public boolean getUseEnhancedTableRowConversion() {
return useEnhancedTableRowConversion;
}

@Override
public MessageConverter<T> getMessageConverter(
DestinationT destination, DatasetService datasetService) throws Exception {
Expand Down Expand Up @@ -189,7 +196,8 @@ public StorageApiWritePayload toMessage(
allowMissingFields,
unknownFields,
changeType,
changeSequenceNum);
changeSequenceNum,
useEnhancedTableRowConversion);
return StorageApiWritePayload.of(
msg.toByteArray(),
unknownFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,9 @@ long flush(
getAppendClientInfo(true, null).getDescriptor()),
rowBytes),
true,
successfulRowsPredicate);
successfulRowsPredicate,
"",
getEnhancedConversionFlag());
}
org.joda.time.Instant timestamp = insertTimestamps.get(i);
failedRowsReceiver.outputWithTimestamp(
Expand Down Expand Up @@ -748,7 +750,9 @@ long flush(
.getDescriptor()),
protoBytes),
true,
Predicates.alwaysTrue());
Predicates.alwaysTrue(),
"",
getEnhancedConversionFlag());
}
element =
new BigQueryStorageApiInsertError(
Expand Down Expand Up @@ -900,7 +904,9 @@ long flush(
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(descriptor, rowBytes),
true,
successfulRowsPredicate);
successfulRowsPredicate,
"",
getEnhancedConversionFlag());
org.joda.time.Instant timestamp = c.timestamps.get(i);
successfulRowsReceiver.outputWithTimestamp(row, timestamp);
} catch (Exception e) {
Expand Down Expand Up @@ -967,6 +973,14 @@ void postFlush() {
private int streamAppendClientCount;
private final @Nullable Map<String, String> bigLakeConfiguration;

private boolean getEnhancedConversionFlag() {
if (dynamicDestinations instanceof StorageApiDynamicDestinationsTableRow) {
return ((StorageApiDynamicDestinationsTableRow<?, ?>) dynamicDestinations)
.getUseEnhancedTableRowConversion();
}
return false;
}

WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
Expand Down
Loading
Loading