Skip to content

Commit e5e58c1

Browse files
committed
tableRowFromMessage should use setF
1 parent 6d5b984 commit e5e58c1

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.List;
26-
import java.util.Objects;
2726
import java.util.stream.Collectors;
2827
import org.apache.beam.sdk.coders.RowCoder;
2928
import org.apache.beam.sdk.schemas.Schema;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,22 +1137,33 @@ public static TableRow tableRowFromMessage(
11371137
boolean includeCdcColumns,
11381138
Predicate<String> includeField,
11391139
String namePrefix) {
1140-
// TODO: Would be more correct to generate TableRows using setF.
1141-
TableRow tableRow = new TableRow();
1142-
for (Map.Entry<FieldDescriptor, Object> field : message.getAllFields().entrySet()) {
1143-
StringBuilder fullName = new StringBuilder();
1144-
FieldDescriptor fieldDescriptor = field.getKey();
1145-
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
1146-
fullName = fullName.append(namePrefix).append(fieldName);
1147-
Object fieldValue = field.getValue();
1148-
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString()))
1149-
&& includeField.test(fieldName)) {
1150-
tableRow.put(
1151-
fieldName,
1152-
jsonValueFromMessageValue(
1153-
fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString()));
1140+
List<TableCell> tableCells =
1141+
Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size());
1142+
1143+
for (FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) {
1144+
TableCell tableCell = new TableCell();
1145+
if (message.hasField(fieldDescriptor)) {
1146+
StringBuilder fullName = new StringBuilder();
1147+
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
1148+
fullName = fullName.append(namePrefix).append(fieldName);
1149+
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString()))
1150+
&& includeField.test(fieldName)) {
1151+
Object fieldValue = message.getField(fieldDescriptor);
1152+
tableCell.setV(
1153+
jsonValueFromMessageValue(
1154+
fieldDescriptor,
1155+
fieldValue,
1156+
true,
1157+
includeField,
1158+
fullName.append(".").toString()));
1159+
}
11541160
}
1161+
tableCells.add(tableCell);
11551162
}
1163+
1164+
TableRow tableRow = new TableRow();
1165+
tableRow.setF(tableCells);
1166+
11561167
return tableRow;
11571168
}
11581169

0 commit comments

Comments
 (0)