Skip to content

Commit 214e73d

Browse files
committed
tableRowFromMessage should use setF
1 parent c8df4da commit 214e73d

File tree

1 file changed

+89
-10
lines changed

1 file changed

+89
-10
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,10 +1129,12 @@ private static long toEpochMicros(Instant timestamp) {
11291129
@VisibleForTesting
11301130
public static TableRow tableRowFromMessage(
11311131
Message message, boolean includeCdcColumns, Predicate<String> includeField) {
1132-
return tableRowFromMessage(message, includeCdcColumns, includeField, "");
1132+
Optional<TableRow> tableRow = tableRowFromMessage(message, includeCdcColumns, includeField, "");
1133+
return tableRow.orElseGet(
1134+
() -> tableRowFromMessageUseSetF(message, includeCdcColumns, includeField, ""));
11331135
}
11341136

1135-
public static TableRow tableRowFromMessage(
1137+
public static Optional<TableRow> tableRowFromMessage(
11361138
Message message,
11371139
boolean includeCdcColumns,
11381140
Predicate<String> includeField,
@@ -1143,36 +1145,113 @@ public static TableRow tableRowFromMessage(
11431145
StringBuilder fullName = new StringBuilder();
11441146
FieldDescriptor fieldDescriptor = field.getKey();
11451147
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
1148+
if ("f".equals(fieldName)) {
1149+
return Optional.empty();
1150+
}
11461151
fullName = fullName.append(namePrefix).append(fieldName);
11471152
Object fieldValue = field.getValue();
11481153
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString()))
11491154
&& includeField.test(fieldName)) {
1150-
tableRow.put(
1151-
fieldName,
1155+
Object convertedFieldValue =
11521156
jsonValueFromMessageValue(
1153-
fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString()));
1157+
fieldDescriptor,
1158+
fieldValue,
1159+
true,
1160+
includeField,
1161+
fullName.append(".").toString(),
1162+
false);
1163+
if (convertedFieldValue instanceof Optional) {
1164+
Optional<?> optional = (Optional<?>) convertedFieldValue;
1165+
if (!optional.isPresent()) {
1166+
return Optional.empty();
1167+
} else {
1168+
convertedFieldValue = optional.get();
1169+
}
1170+
}
1171+
tableRow.put(fieldName, convertedFieldValue);
11541172
}
11551173
}
1174+
return Optional.of(tableRow);
1175+
}
1176+
1177+
public static TableRow tableRowFromMessageUseSetF(
1178+
Message message,
1179+
boolean includeCdcColumns,
1180+
Predicate<String> includeField,
1181+
String namePrefix) {
1182+
List<TableCell> tableCells =
1183+
Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size());
1184+
1185+
for (FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) {
1186+
TableCell tableCell = new TableCell();
1187+
1188+
boolean isPresent =
1189+
(fieldDescriptor.isRepeated() && message.getRepeatedFieldCount(fieldDescriptor) > 0)
1190+
|| (!fieldDescriptor.isRepeated() && message.hasField(fieldDescriptor));
1191+
if (isPresent) {
1192+
StringBuilder fullName = new StringBuilder();
1193+
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
1194+
fullName = fullName.append(namePrefix).append(fieldName);
1195+
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString()))
1196+
&& includeField.test(fieldName)) {
1197+
Object fieldValue = message.getField(fieldDescriptor);
1198+
Object converted =
1199+
jsonValueFromMessageValue(
1200+
fieldDescriptor,
1201+
fieldValue,
1202+
true,
1203+
includeField,
1204+
fullName.append(".").toString(),
1205+
true);
1206+
tableCell.setV(converted);
1207+
}
1208+
}
1209+
tableCells.add(tableCell);
1210+
}
1211+
1212+
TableRow tableRow = new TableRow();
1213+
tableRow.setF(tableCells);
1214+
11561215
return tableRow;
11571216
}
11581217

1218+
// Translate a proto message value into a json value. If useSetF==false, this will fail with
1219+
// Optional.empty() if
1220+
// any fields named "f" are found (due to restrictions on the TableRow class). In that case, the
1221+
// top level will retry
1222+
// with useSetF==true. We fallback this way in order to maintain backwards compatibility with
1223+
// existing users.
11591224
public static Object jsonValueFromMessageValue(
11601225
FieldDescriptor fieldDescriptor,
11611226
Object fieldValue,
11621227
boolean expandRepeated,
11631228
Predicate<String> includeField,
1164-
String prefix) {
1229+
String prefix,
1230+
boolean useSetF) {
11651231
if (expandRepeated && fieldDescriptor.isRepeated()) {
11661232
List<Object> valueList = (List<Object>) fieldValue;
1167-
return valueList.stream()
1168-
.map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField, prefix))
1169-
.collect(toList());
1233+
List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
1234+
for (Object value : valueList) {
1235+
Object translatedValue =
1236+
jsonValueFromMessageValue(fieldDescriptor, value, false, includeField, prefix, useSetF);
1237+
if (!useSetF && translatedValue instanceof Optional) {
1238+
Optional<?> optional = (Optional<?>) translatedValue;
1239+
if (!optional.isPresent()) {
1240+
return Optional.empty();
1241+
}
1242+
translatedValue = optional.get();
1243+
}
1244+
expanded.add(translatedValue);
1245+
}
1246+
return expanded;
11701247
}
11711248

11721249
switch (fieldDescriptor.getType()) {
11731250
case GROUP:
11741251
case MESSAGE:
1175-
return tableRowFromMessage((Message) fieldValue, false, includeField, prefix);
1252+
return useSetF
1253+
? tableRowFromMessageUseSetF((Message) fieldValue, false, includeField, prefix)
1254+
: tableRowFromMessage((Message) fieldValue, false, includeField, prefix);
11761255
case BYTES:
11771256
return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
11781257
case ENUM:

0 commit comments

Comments
 (0)