Skip to content

Commit 1b1096d

Browse files
committed
Fix missing coder for CleanupOperationMessage side outputs
1 parent cdcb384 commit 1b1096d

File tree

1 file changed

+16
-2
lines changed
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery

1 file changed

+16
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,8 +1085,22 @@ public String apply(BigQueryDynamicReadDescriptor input) {
10851085
rowTag, getParseFn(), getBadRecordRouter(), cleanupInfoTag))
10861086
.withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag)));
10871087

1088-
PCollectionList.of(resultTuple.get(cleanupInfoTag))
1089-
.and(readResultTuple.get(cleanupInfoTag))
1088+
PCollection<KV<String, CleanupOperationMessage>> cleanupMessages1 =
1089+
resultTuple
1090+
.get(cleanupInfoTag)
1091+
.setCoder(
1092+
KvCoder.of(
1093+
StringUtf8Coder.of(), SerializableCoder.of(CleanupOperationMessage.class)));
1094+
1095+
PCollection<KV<String, CleanupOperationMessage>> cleanupMessages2 =
1096+
readResultTuple
1097+
.get(cleanupInfoTag)
1098+
.setCoder(
1099+
KvCoder.of(
1100+
StringUtf8Coder.of(), SerializableCoder.of(CleanupOperationMessage.class)));
1101+
1102+
PCollectionList.of(cleanupMessages1)
1103+
.and(cleanupMessages2)
10901104
.apply(Flatten.pCollections())
10911105
.apply("CleanupTempTables", ParDo.of(new CleanupTempTableDoFn(getBigQueryServices())));
10921106

0 commit comments

Comments
 (0)