Skip to content

Commit fafb60b

Browse files
committed
Remove LocalSpannerIO
1 parent 40e1a8b commit fafb60b

19 files changed

+38
-3755
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
2222
import java.util.List;
2323
import java.util.concurrent.ExecutionException;
24-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
24+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2525
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2626
import org.apache.beam.sdk.options.ValueProvider;
2727
import org.apache.beam.sdk.transforms.DoFn;
@@ -66,11 +66,11 @@ public PCollection<Ddl> expand(PCollection<Ddl> input) {
6666
ParDo.of(
6767
new DoFn<Ddl, Ddl>() {
6868

69-
private transient LocalSpannerAccessor spannerAccessor;
69+
private transient SpannerAccessor spannerAccessor;
7070

7171
@Setup
7272
public void setup() {
73-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
73+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
7474
}
7575

7676
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@
8080
import org.apache.beam.sdk.io.WriteFilesResult;
8181
import org.apache.beam.sdk.io.fs.ResolveOptions;
8282
import org.apache.beam.sdk.io.fs.ResourceId;
83-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
8483
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
8584
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
85+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
8686
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
8787
import org.apache.beam.sdk.options.ValueProvider;
8888
import org.apache.beam.sdk.transforms.Combine;
@@ -185,7 +185,7 @@ public WriteFilesResult<String> expand(PBegin begin) {
185185

186186
/*
187187
* Allow users to specify read timestamp.
188-
* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
188+
* CreateTransaction and CreateTransactionFn classes in SpannerIO
189189
* only take a timestamp object for exact staleness which works when
190190
* parameters are provided during template compile time. They do not work with
191191
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -457,7 +457,7 @@ public void processElement(ProcessContext c) {
457457
PCollection<Struct> rows =
458458
tableReadOperations.apply(
459459
"Read all rows from Spanner",
460-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
460+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
461461

462462
ValueProvider<ResourceId> resource =
463463
ValueProvider.NestedValueProvider.of(

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@
6666
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
6767
import org.apache.beam.sdk.io.fs.MatchResult;
6868
import org.apache.beam.sdk.io.fs.ResourceId;
69-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
70-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
69+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
7170
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
71+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
7272
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
7373
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
7474
import org.apache.beam.sdk.options.ValueProvider;
@@ -173,7 +173,7 @@ public void processElement(ProcessContext c) {
173173
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));
174174

175175
PCollectionView<Transaction> tx =
176-
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
176+
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
177177

178178
PCollection<Ddl> informationSchemaDdl =
179179
begin.apply(
@@ -273,7 +273,7 @@ public void processElement(ProcessContext c) {
273273
SpannerWriteResult result =
274274
mutations.apply(
275275
"Write mutations " + depth,
276-
LocalSpannerIO.write()
276+
SpannerIO.write()
277277
.withSchemaReadySignal(ddl)
278278
.withSpannerConfig(spannerConfig)
279279
.withCommitDeadline(Duration.standardMinutes(1))
@@ -408,7 +408,7 @@ private static class CreateTables extends PTransform<PBegin, PCollectionTuple> {
408408
private final ValueProvider<Boolean> earlyIndexCreateFlag;
409409
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;
410410

411-
private transient LocalSpannerAccessor spannerAccessor;
411+
private transient SpannerAccessor spannerAccessor;
412412
private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class);
413413

414414
/* If the schema has a lot of DDL changes after data load, it's preferable to create
@@ -464,7 +464,7 @@ public PCollectionTuple expand(PBegin begin) {
464464

465465
@Setup
466466
public void setup() {
467-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
467+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
468468
}
469469

470470
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import com.google.cloud.spanner.DatabaseClient;
1919
import com.google.cloud.spanner.Dialect;
20-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
20+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2121
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2222
import org.apache.beam.sdk.transforms.Create;
2323
import org.apache.beam.sdk.transforms.DoFn;
@@ -43,15 +43,15 @@ public PCollection<Dialect> expand(PBegin p) {
4343

4444
private static class ReadDialectFn extends DoFn<Void, Dialect> {
4545
private final SpannerConfig spannerConfig;
46-
private transient LocalSpannerAccessor spannerAccessor;
46+
private transient SpannerAccessor spannerAccessor;
4747

4848
public ReadDialectFn(SpannerConfig spannerConfig) {
4949
this.spannerConfig = spannerConfig;
5050
}
5151

5252
@Setup
5353
public void setup() throws Exception {
54-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
54+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
5555
}
5656

5757
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.cloud.spanner.Dialect;
2222
import com.google.cloud.teleport.spanner.ddl.Ddl;
2323
import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner;
24-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
24+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
2525
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
2626
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
2727
import org.apache.beam.sdk.transforms.Create;
@@ -59,7 +59,7 @@ public PCollection<Ddl> expand(PBegin p) {
5959

6060
private static class ReadInformationSchemaFn extends DoFn<Void, Ddl> {
6161
private final SpannerConfig spannerConfig;
62-
private transient LocalSpannerAccessor spannerAccessor;
62+
private transient SpannerAccessor spannerAccessor;
6363
private final PCollectionView<Transaction> tx;
6464
private final PCollectionView<Dialect> dialectView;
6565

@@ -74,7 +74,7 @@ public ReadInformationSchemaFn(
7474

7575
@Setup
7676
public void setup() throws Exception {
77-
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
77+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
7878
}
7979

8080
@Teardown

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@
5353
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
5454
import org.apache.beam.sdk.io.fs.MatchResult;
5555
import org.apache.beam.sdk.io.fs.ResourceId;
56-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
5756
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
57+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
5858
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
5959
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
6060
import org.apache.beam.sdk.options.ValueProvider;
@@ -105,7 +105,7 @@ public TextImportTransform(
105105
@Override
106106
public PDone expand(PBegin begin) {
107107
PCollectionView<Transaction> tx =
108-
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
108+
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
109109

110110
PCollectionView<Dialect> dialectView =
111111
begin
@@ -203,7 +203,7 @@ public void processElement(ProcessContext c) {
203203
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
204204
.apply(
205205
"Write mutations " + depth,
206-
LocalSpannerIO.write()
206+
SpannerIO.write()
207207
.withSpannerConfig(spannerConfig)
208208
.withCommitDeadline(Duration.standardMinutes(1))
209209
.withMaxCumulativeBackoff(Duration.standardHours(2))

v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import org.apache.beam.sdk.io.FileSystems;
3232
import org.apache.beam.sdk.io.TextIO;
3333
import org.apache.beam.sdk.io.fs.ResourceId;
34-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
3534
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
3635
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
36+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
3737
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
3838
import org.apache.beam.sdk.options.PipelineOptions;
3939
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -144,7 +144,7 @@ public static void main(String[] args) {
144144
options.getTextWritePrefix(),
145145
options.getSpannerSnapshotTime());
146146

147-
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
147+
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
148148
* only take a timestamp object for exact staleness which works when
149149
* parameters are provided during template compile time. They do not work with
150150
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -164,14 +164,14 @@ public static void main(String[] args) {
164164
PCollection<String> csv =
165165
pipeline
166166
.apply("Create export", spannerExport)
167-
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
167+
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
168168
// because ValueProvider parameters such as table name required for
169-
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
169+
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
170170
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
171171
// these parameters at the pipeline execution time.
172172
.apply(
173173
"Read all records",
174-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
174+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
175175
.apply(
176176
"Struct To Csv",
177177
MapElements.into(TypeDescriptors.strings())

v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.apache.beam.sdk.Pipeline;
2828
import org.apache.beam.sdk.io.FileSystems;
2929
import org.apache.beam.sdk.io.TextIO;
30-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
3130
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
3231
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
32+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
3333
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
3434
import org.apache.beam.sdk.options.Default;
3535
import org.apache.beam.sdk.options.PipelineOptions;
@@ -264,7 +264,7 @@ public static void main(String[] args) {
264264
options.getSpannerColumnsToExport(),
265265
ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));
266266

267-
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
267+
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
268268
* only take a timestamp object for exact staleness which works when
269269
* parameters are provided during template compile time. They do not work with
270270
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
@@ -284,14 +284,14 @@ public static void main(String[] args) {
284284
PCollection<String> json =
285285
pipeline
286286
.apply("Create export", spannerExport)
287-
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
287+
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
288288
// because ValueProvider parameters such as table name required for
289-
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
289+
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
290290
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
291291
// these parameters at the pipeline execution time.
292292
.apply(
293293
"Read all records",
294-
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
294+
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
295295
.apply(
296296
"Struct To JSON",
297297
MapElements.into(TypeDescriptors.strings())

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@
5656
import java.util.stream.Collectors;
5757
import kotlin.Pair;
5858
import org.apache.beam.sdk.io.FileSystems;
59-
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
6059
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
60+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
6161
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
6262
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
6363
import org.apache.beam.sdk.options.Default;
@@ -261,16 +261,16 @@ public static Builder builder() {
261261
return new AutoValue_SpannerConverters_ExportTransform.Builder();
262262
}
263263

264-
private LocalSpannerAccessor spannerAccessor;
264+
private SpannerAccessor spannerAccessor;
265265
private DatabaseClient databaseClient;
266266

267-
// LocalSpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
267+
// SpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
268268
// mocked database client directly instead. We can't generate stub of ExportTransform because
269269
// AutoValue generates a final class.
270-
// TODO make LocalSpannerAccessor serializable
270+
// TODO make SpannerAccessor serializable
271271
DatabaseClient getDatabaseClient(SpannerConfig spannerConfig) {
272272
if (databaseClient == null) {
273-
this.spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
273+
this.spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
274274
return this.spannerAccessor.getDatabaseClient();
275275
} else {
276276
return this.databaseClient;
@@ -817,11 +817,11 @@ public CreateTransactionFnWithTimestamp(
817817
this.spannerSnapshotTime = spannerSnapshotTime;
818818
}
819819

820-
private transient LocalSpannerAccessor spannerAccessor;
820+
private transient SpannerAccessor spannerAccessor;
821821

822822
@DoFn.Setup
823823
public void setup() throws Exception {
824-
spannerAccessor = LocalSpannerAccessor.getOrCreate(config);
824+
spannerAccessor = SpannerAccessor.getOrCreate(config);
825825
}
826826

827827
@Teardown

0 commit comments

Comments
 (0)