Skip to content

Commit db2d527

Browse files
committed
Remove LocalSpannerIO
1 parent 3e25241 commit db2d527

20 files changed

+38
-3813
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
2222
import java.util.List;
2323
import java.util.concurrent.ExecutionException;
24-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
24+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2525
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2626
import org.apache.beam.sdk.options.ValueProvider;
2727
import org.apache.beam.sdk.transforms.DoFn;
@@ -66,11 +66,11 @@ public PCollection<Ddl> expand(PCollection<Ddl> input) {
6666
ParDo.of(
6767
new DoFn<Ddl, Ddl>() {
6868

69-
private transient LocalSpannerAccessor spannerAccessor;
69+
private transient SpannerAccessor spannerAccessor;
7070

7171
@Setup
7272
public void setup() {
73-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
73+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
7474
}
7575

7676
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@
7777
import org.apache.beam.sdk.io.WriteFilesResult;
7878
import org.apache.beam.sdk.io.fs.ResolveOptions;
7979
import org.apache.beam.sdk.io.fs.ResourceId;
80-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
8180
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
8281
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
82+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
8383
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
8484
import org.apache.beam.sdk.options.ValueProvider;
8585
import org.apache.beam.sdk.transforms.Combine;
@@ -182,7 +182,7 @@ public WriteFilesResult<String> expand(PBegin begin) {
182182

183183
/*
184184
* Allow users to specify read timestamp.
185-
* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
185+
* CreateTransaction and CreateTransactionFn classes in SpannerIO
186186
* only take a timestamp object for exact staleness which works when
187187
* parameters are provided during template compile time. They do not work with
188188
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -403,7 +403,7 @@ public void processElement(ProcessContext c) {
403403
PCollection<Struct> rows =
404404
tables.apply(
405405
"Read all rows from Spanner",
406-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
406+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
407407

408408
ValueProvider<ResourceId> resource =
409409
ValueProvider.NestedValueProvider.of(

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@
5959
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
6060
import org.apache.beam.sdk.io.fs.MatchResult;
6161
import org.apache.beam.sdk.io.fs.ResourceId;
62-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
63-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
62+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
6463
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
64+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
6565
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
6666
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
6767
import org.apache.beam.sdk.options.ValueProvider;
@@ -166,7 +166,7 @@ public void processElement(ProcessContext c) {
166166
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));
167167

168168
PCollectionView<Transaction> tx =
169-
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
169+
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
170170

171171
PCollection<Ddl> informationSchemaDdl =
172172
begin.apply(
@@ -266,7 +266,7 @@ public void processElement(ProcessContext c) {
266266
SpannerWriteResult result =
267267
mutations.apply(
268268
"Write mutations " + depth,
269-
LocalSpannerIO.write()
269+
SpannerIO.write()
270270
.withSchemaReadySignal(ddl)
271271
.withSpannerConfig(spannerConfig)
272272
.withCommitDeadline(Duration.standardMinutes(1))
@@ -401,7 +401,7 @@ private static class CreateTables extends PTransform<PBegin, PCollectionTuple> {
401401
private final ValueProvider<Boolean> earlyIndexCreateFlag;
402402
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;
403403

404-
private transient LocalSpannerAccessor spannerAccessor;
404+
private transient SpannerAccessor spannerAccessor;
405405
private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class);
406406

407407
/* If the schema has a lot of DDL changes after data load, it's preferable to create
@@ -457,7 +457,7 @@ public PCollectionTuple expand(PBegin begin) {
457457

458458
@Setup
459459
public void setup() {
460-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
460+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
461461
}
462462

463463
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import com.google.cloud.spanner.DatabaseClient;
1919
import com.google.cloud.spanner.Dialect;
20-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
20+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2121
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2222
import org.apache.beam.sdk.transforms.Create;
2323
import org.apache.beam.sdk.transforms.DoFn;
@@ -43,15 +43,15 @@ public PCollection<Dialect> expand(PBegin p) {
4343

4444
private static class ReadDialectFn extends DoFn<Void, Dialect> {
4545
private final SpannerConfig spannerConfig;
46-
private transient LocalSpannerAccessor spannerAccessor;
46+
private transient SpannerAccessor spannerAccessor;
4747

4848
public ReadDialectFn(SpannerConfig spannerConfig) {
4949
this.spannerConfig = spannerConfig;
5050
}
5151

5252
@Setup
5353
public void setup() throws Exception {
54-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
54+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
5555
}
5656

5757
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.cloud.spanner.Dialect;
2222
import com.google.cloud.teleport.spanner.ddl.Ddl;
2323
import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner;
24-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
24+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2525
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2626
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
2727
import org.apache.beam.sdk.transforms.Create;
@@ -59,7 +59,7 @@ public PCollection<Ddl> expand(PBegin p) {
5959

6060
private static class ReadInformationSchemaFn extends DoFn<Void, Ddl> {
6161
private final SpannerConfig spannerConfig;
62-
private transient LocalSpannerAccessor spannerAccessor;
62+
private transient SpannerAccessor spannerAccessor;
6363
private final PCollectionView<Transaction> tx;
6464
private final PCollectionView<Dialect> dialectView;
6565

@@ -74,7 +74,7 @@ public ReadInformationSchemaFn(
7474

7575
@Setup
7676
public void setup() throws Exception {
77-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
77+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
7878
}
7979

8080
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@
5353
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
5454
import org.apache.beam.sdk.io.fs.MatchResult;
5555
import org.apache.beam.sdk.io.fs.ResourceId;
56-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
5756
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
57+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
5858
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
5959
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
6060
import org.apache.beam.sdk.options.ValueProvider;
@@ -105,7 +105,7 @@ public TextImportTransform(
105105
@Override
106106
public PDone expand(PBegin begin) {
107107
PCollectionView<Transaction> tx =
108-
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
108+
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
109109

110110
PCollectionView<Dialect> dialectView =
111111
begin
@@ -203,7 +203,7 @@ public void processElement(ProcessContext c) {
203203
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
204204
.apply(
205205
"Write mutations " + depth,
206-
LocalSpannerIO.write()
206+
SpannerIO.write()
207207
.withSpannerConfig(spannerConfig)
208208
.withCommitDeadline(Duration.standardMinutes(1))
209209
.withMaxCumulativeBackoff(Duration.standardHours(2))

v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import org.apache.beam.sdk.io.FileSystems;
3232
import org.apache.beam.sdk.io.TextIO;
3333
import org.apache.beam.sdk.io.fs.ResourceId;
34-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
3534
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
3635
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
36+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
3737
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
3838
import org.apache.beam.sdk.options.PipelineOptions;
3939
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -142,7 +142,7 @@ public static void main(String[] args) {
142142
options.getTextWritePrefix(),
143143
options.getSpannerSnapshotTime());
144144

145-
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
145+
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
146146
* only take a timestamp object for exact staleness which works when
147147
* parameters are provided during template compile time. They do not work with
148148
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -162,14 +162,14 @@ public static void main(String[] args) {
162162
PCollection<String> csv =
163163
pipeline
164164
.apply("Create export", spannerExport)
165-
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
165+
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
166166
// because ValueProvider parameters such as table name required for
167-
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
167+
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
168168
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
169169
// these parameters at the pipeline execution time.
170170
.apply(
171171
"Read all records",
172-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
172+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
173173
.apply(
174174
"Struct To Csv",
175175
MapElements.into(TypeDescriptors.strings())

v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.apache.beam.sdk.Pipeline;
2828
import org.apache.beam.sdk.io.FileSystems;
2929
import org.apache.beam.sdk.io.TextIO;
30-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
3130
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
3231
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
32+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
3333
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
3434
import org.apache.beam.sdk.options.Default;
3535
import org.apache.beam.sdk.options.PipelineOptions;
@@ -253,7 +253,7 @@ public static void main(String[] args) {
253253
options.getSpannerColumnsToExport(),
254254
ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));
255255

256-
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
256+
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
257257
* only take a timestamp object for exact staleness which works when
258258
* parameters are provided during template compile time. They do not work with
259259
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -273,14 +273,14 @@ public static void main(String[] args) {
273273
PCollection<String> json =
274274
pipeline
275275
.apply("Create export", spannerExport)
276-
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
276+
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
277277
// because ValueProvider parameters such as table name required for
278-
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
278+
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
279279
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
280280
// these parameters at the pipeline execution time.
281281
.apply(
282282
"Read all records",
283-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
283+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
284284
.apply(
285285
"Struct To JSON",
286286
MapElements.into(TypeDescriptors.strings())

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
import java.util.function.BiFunction;
5353
import java.util.stream.Collectors;
5454
import org.apache.beam.sdk.io.FileSystems;
55-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
5655
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
56+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
5757
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
5858
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
5959
import org.apache.beam.sdk.options.Default;
@@ -249,16 +249,16 @@ public static Builder builder() {
249249
return new AutoValue_SpannerConverters_ExportTransform.Builder();
250250
}
251251

252-
private LocalSpannerAccessor spannerAccessor;
252+
private SpannerAccessor spannerAccessor;
253253
private DatabaseClient databaseClient;
254254

255-
// LocalSpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
255+
// SpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
256256
// mocked database client directly instead. We can't generate stub of ExportTransform because
257257
// AutoValue generates a final class.
258-
// TODO make LocalSpannerAccessor serializable
258+
// TODO make SpannerAccessor serializable
259259
DatabaseClient getDatabaseClient(SpannerConfig spannerConfig) {
260260
if (databaseClient == null) {
261-
this.spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
261+
this.spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
262262
return this.spannerAccessor.getDatabaseClient();
263263
} else {
264264
return this.databaseClient;
@@ -766,11 +766,11 @@ public CreateTransactionFnWithTimestamp(
766766
this.spannerSnapshotTime = spannerSnapshotTime;
767767
}
768768

769-
private transient LocalSpannerAccessor spannerAccessor;
769+
private transient SpannerAccessor spannerAccessor;
770770

771771
@DoFn.Setup
772772
public void setup() throws Exception {
773-
spannerAccessor = LocalSpannerAccessor.getOrCreate(config);
773+
spannerAccessor = SpannerAccessor.getOrCreate(config);
774774
}
775775

776776
@Teardown

0 commit comments

Comments
 (0)