Skip to content

Commit 794f575

Browse files
ingore ct and tableReadersMap for serialization (#2488)
1 parent c5149e7 commit 794f575

File tree

3 files changed

+53
-21
lines changed

3 files changed

+53
-21
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/AccumulatingTableReader.java

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.apache.beam.sdk.transforms.Flatten;
2626
import org.apache.beam.sdk.transforms.MapElements;
2727
import org.apache.beam.sdk.transforms.PTransform;
28-
import org.apache.beam.sdk.transforms.SerializableFunction;
29-
import org.apache.beam.sdk.transforms.SimpleFunction;
3028
import org.apache.beam.sdk.values.PBegin;
3129
import org.apache.beam.sdk.values.PCollection;
3230
import org.apache.beam.sdk.values.PCollectionList;
@@ -39,17 +37,25 @@
3937
*/
4038
@AutoValue
4139
abstract class AccumulatingTableReader extends PTransform<PBegin, PCollectionTuple> {
42-
public abstract ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
43-
tableTransforms();
40+
// Avoid serializing tableTransform as it's needed only for expand.
41+
private transient ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
42+
tableTransforms;
4443

4544
public abstract TupleTag<SourceRow> sourceRowTag();
4645

4746
public abstract TupleTag<SourceTableReference> sourceTableReferenceTag();
4847

48+
private AccumulatingTableReader setTableTransforms(
49+
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>>
50+
tableTransforms) {
51+
this.tableTransforms = tableTransforms;
52+
return this;
53+
}
54+
4955
@Override
5056
public PCollectionTuple expand(PBegin input) {
5157
ImmutableMap<SourceTableReference, PCollection<SourceRow>> tablePCollections =
52-
this.tableTransforms().entrySet().stream()
58+
this.tableTransforms.entrySet().stream()
5359
.collect(
5460
ImmutableMap.toImmutableMap(
5561
Entry::getKey,
@@ -93,7 +99,7 @@ static Builder builder(
9399
@AutoValue.Builder
94100
abstract static class Builder {
95101

96-
abstract ImmutableMap.Builder tableTransformsBuilder();
102+
private ImmutableMap.Builder tableTransformsBuilder = new ImmutableMap.Builder();
97103

98104
abstract Builder setSourceRowTag(TupleTag<SourceRow> sourceRowTag);
99105

@@ -102,24 +108,14 @@ abstract static class Builder {
102108
Builder withTableReader(
103109
SourceTableReference sourceTableReference,
104110
PTransform<PBegin, PCollection<SourceRow>> tableReader) {
105-
this.tableTransformsBuilder().put(sourceTableReference, tableReader);
111+
this.tableTransformsBuilder.put(sourceTableReference, tableReader);
106112
return this;
107113
}
108114

109-
abstract AccumulatingTableReader build();
110-
}
111-
112-
public class SourceTableReferenceWithCount extends SimpleFunction<Long, SourceTableReference>
113-
implements SerializableFunction<Long, SourceTableReference> {
114-
private SourceTableReference tableReference;
115-
116-
SourceTableReferenceWithCount(SourceTableReference tableReference) {
117-
this.tableReference = tableReference;
118-
}
115+
abstract AccumulatingTableReader autoBuild();
119116

120-
@Override
121-
public SourceTableReference apply(Long input) {
122-
return this.tableReference.toBuilder().setRecordCount(input).build();
117+
AccumulatingTableReader build() {
118+
return autoBuild().setTableTransforms(this.tableTransformsBuilder.build());
123119
}
124120
}
125121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.source.reader.io.transform;
17+
18+
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference;
19+
import org.apache.beam.sdk.transforms.SerializableFunction;
20+
import org.apache.beam.sdk.transforms.SimpleFunction;
21+
22+
/** Generates SourceTableReference once the table read is complete. */
23+
public class SourceTableReferenceWithCount extends SimpleFunction<Long, SourceTableReference>
24+
implements SerializableFunction<Long, SourceTableReference> {
25+
26+
private SourceTableReference tableReference;
27+
28+
SourceTableReferenceWithCount(SourceTableReference tableReference) {
29+
this.tableReference = tableReference;
30+
}
31+
32+
@Override
33+
public SourceTableReference apply(Long input) {
34+
return this.tableReference.toBuilder().setRecordCount(input).build();
35+
}
36+
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/transformer/SourceRowToMutationDoFn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public abstract class SourceRowToMutationDoFn extends DoFn<SourceRow, RowContext
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(SourceRowToMutationDoFn.class);
4949

50-
private ISpannerMigrationTransformer sourceDbToSpannerTransformer;
50+
private transient ISpannerMigrationTransformer sourceDbToSpannerTransformer;
5151

5252
public void setSourceDbToSpannerTransformer(
5353
ISpannerMigrationTransformer sourceDbToSpannerTransformer) {

0 commit comments

Comments
 (0)