Skip to content

Commit 826933f

Browse files
committed
Addressing comments
1 parent 273d7d4 commit 826933f

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,16 @@ public SpannerTransactionWriter(
101101
@Override
102102
public SpannerTransactionWriter.Result expand(
103103
PCollection<FailsafeElement<String, String>> input) {
104-
PCollectionTuple groupByResults =
104+
PCollectionTuple keyedEvents =
105105
input.apply(
106-
"Produce Key value pairs with key as Primary Key Hash",
106+
"Key By PK Hash",
107107
ParDo.of(new CreateKeyValuePairsWithPrimaryKeyHashDoFn(ddlView))
108108
.withSideInputs(ddlView)
109109
.withOutputTags(
110110
DatastreamToSpannerConstants.SUCCESSFUL_KEYED_EVENT_TAG,
111111
TupleTagList.of(List.of(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));
112112
PCollectionTuple spannerWriteResults =
113-
groupByResults
113+
keyedEvents
114114
.get(DatastreamToSpannerConstants.SUCCESSFUL_KEYED_EVENT_TAG)
115115
.setCoder(
116116
KvCoder.of(
@@ -136,13 +136,13 @@ public SpannerTransactionWriter.Result expand(
136136
DatastreamToSpannerConstants.PERMANENT_ERROR_TAG,
137137
DatastreamToSpannerConstants.RETRYABLE_ERROR_TAG))));
138138

139-
PCollection<FailsafeElement<String, String>> groupByErrorRecords =
140-
groupByResults.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG);
139+
PCollection<FailsafeElement<String, String>> keyedEventsErrorRecords =
140+
keyedEvents.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG);
141141

142142
PCollection<FailsafeElement<String, String>> permanentErrorRecords =
143143
PCollectionList.of(
144144
spannerWriteResults.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
145-
.and(groupByErrorRecords)
145+
.and(keyedEventsErrorRecords)
146146
.apply(Flatten.pCollections());
147147

148148
return Result.create(

0 commit comments

Comments
 (0)