Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
* Python: Added JupyterLab 4.x extension compatibility for enhanced notebook integration ([#34495](https://github.com/apache/beam/pull/34495)).
* [Python] Adding GCP Spanner Change Stream support for Python (apache_beam.io.gcp.spanner). ([#24103] https://github.com/apache/beam/issues/24103).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1015,6 +1017,32 @@ public PCollection<Row> expand(PBegin input) {
}
}

static class ChangeStreamRead extends PTransform<PBegin, PCollection<String>> {

ReadChangeStream readChangeStream;

public ChangeStreamRead(ReadChangeStream readChangeStream) {
this.readChangeStream = readChangeStream;
}

@Override
public PCollection<String> expand(PBegin input) {
return input
.apply(readChangeStream)
.apply("DataChangeRecordToStringJSON", ParDo.of(new DataChangeRecordToJsonFn()));
}
}

private static class DataChangeRecordToJsonFn extends DoFn<DataChangeRecord, String> {
private static Gson gson = new GsonBuilder().disableHtmlEscaping().create();

@ProcessElement
public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
String modJsonString = gson.toJson(input, DataChangeRecord.class);
receiver.output(modJsonString);
}
}

/**
* A {@link PTransform} that create a transaction. If applied to a {@link PCollection}, it will
* create a transaction after the {@link PCollection} is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.auto.service.AutoService;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.TimestampBound;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,8 +44,8 @@
import org.joda.time.Duration;

/**
* Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an external transform for
* cross-language usage.
* Exposes {@link SpannerIO.WriteRows}, {@link SpannerIO.ReadRows} and {@link
* SpannerIO.ChangeStreamRead} as an external transform for cross-language usage.
*/
@AutoService(ExternalTransformRegistrar.class)
public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
Expand All @@ -55,6 +56,8 @@ public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
"beam:transform:org.apache.beam:spanner_insert_or_update:v1";
public static final String DELETE_URN = "beam:transform:org.apache.beam:spanner_delete:v1";
public static final String READ_URN = "beam:transform:org.apache.beam:spanner_read:v1";
public static final String READ_CHANGE_STREAM_URN =
"beam:transform:org.apache.beam:spanner_change_stream_reader:v1";

@Override
@NonNull
Expand All @@ -66,6 +69,7 @@ public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
.put(INSERT_OR_UPDATE_URN, new InsertOrUpdateBuilder())
.put(DELETE_URN, new DeleteBuilder())
.put(READ_URN, new ReadBuilder())
.put(READ_CHANGE_STREAM_URN, new ChangeStreamReaderBuilder())
.build();
}

Expand Down Expand Up @@ -382,4 +386,113 @@ public PTransform<PCollection<Row>, PDone> buildExternal(
return SpannerIO.WriteRows.of(writeTransform, operation, configuration.table);
}
}

public static class ChangeStreamReaderBuilder
implements ExternalTransformBuilder<
ChangeStreamReaderBuilder.Configuration, PBegin, PCollection<String>> {

public static class Configuration extends CrossLanguageConfiguration {
private String changeStreamName = "";
private String metadataDatabase = "";
private String metadataInstance = "";
private @Nullable Timestamp inclusiveStartAt;
private @Nullable Timestamp inclusiveEndAt;
private @Nullable String metadataTable;
private @Nullable RpcPriority rpcPriority;
private @Nullable Duration watermarkRefreshRate;

public void setChangeStreamName(String changeStreamName) {
this.changeStreamName = changeStreamName;
}

public void setInclusiveStartAt(@Nullable String inclusiveStartAtString) {
if (inclusiveStartAtString != null) {
this.inclusiveStartAt = Timestamp.parseTimestamp(inclusiveStartAtString);
}
}

public void setInclusiveEndAt(@Nullable String inclusiveEndAtString) {
if (inclusiveEndAtString != null) {
this.inclusiveEndAt = Timestamp.parseTimestamp(inclusiveEndAtString);
}
}

public void setMetadataDatabase(String metadataDatabase) {
this.metadataDatabase = metadataDatabase;
}

public void setMetadataInstance(String metadataInstance) {
this.metadataInstance = metadataInstance;
}

public void setMetadataTable(@Nullable String metadataTable) {
this.metadataTable = metadataTable;
}

public void setRpcPriority(@Nullable String rpcPriorityString) {
if (rpcPriorityString != null) {
this.rpcPriority = RpcPriority.valueOf(rpcPriorityString);
}
}

public void setWatermarkRefreshRate(@Nullable String watermarkRefreshRateString) {
if (watermarkRefreshRateString != null) {
this.watermarkRefreshRate = Duration.parse(watermarkRefreshRateString);
}
}
}

@Override
@NonNull
public PTransform<PBegin, PCollection<String>> buildExternal(
ChangeStreamReaderBuilder.Configuration configuration) {

configuration.checkMandatoryFields();

if (configuration.changeStreamName.isEmpty()) {
throw new IllegalArgumentException("ChangeStreamName can't be empty");
}

if (configuration.metadataInstance.isEmpty()) {
throw new IllegalArgumentException("MetadataInstance can't be empty");
}

if (configuration.metadataDatabase.isEmpty()) {
throw new IllegalArgumentException("MetadataDatabase can't be empty");
}

SpannerIO.ReadChangeStream readChangeStream =
SpannerIO.readChangeStream()
.withProjectId(configuration.projectId)
.withInstanceId(configuration.instanceId)
.withDatabaseId(configuration.databaseId)
.withChangeStreamName(configuration.changeStreamName)
.withMetadataInstance(configuration.metadataInstance)
.withMetadataDatabase(configuration.metadataDatabase);

if (configuration.inclusiveStartAt != null) {
readChangeStream = readChangeStream.withInclusiveStartAt(configuration.inclusiveStartAt);
}

if (configuration.inclusiveEndAt != null) {
readChangeStream = readChangeStream.withInclusiveEndAt(configuration.inclusiveEndAt);
}

if (configuration.metadataTable != null) {
readChangeStream = readChangeStream.withMetadataTable(configuration.metadataTable);
}

if (configuration.rpcPriority != null) {

readChangeStream = readChangeStream.withRpcPriority(configuration.rpcPriority);
}

if (configuration.watermarkRefreshRate != null) {
readChangeStream =
readChangeStream.withWatermarkRefreshRate(configuration.watermarkRefreshRate);
}

return new SpannerIO.ChangeStreamRead(readChangeStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ChangeStreamReaderBuilder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.InsertBuilder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerTransformRegistrar.ReadBuilder;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -48,22 +49,29 @@ public class SpannerTransformRegistrarTest {
public static final String SPANNER_PROJECT = "spanner-project";
public static final String SPANNER_TABLE = "spanner-table";
public static final String SPANNER_SQL_QUERY = "SELECT * from spanner_table;";
public static final String SPANNER_CHANGE_STREAM_NAME = "spanner-change-stream-name";
public static final String SPANNER_CHANGE_STREAM_METADATA_INSTANCE =
"spanner-change-stream-instance";
public static final String SPANNER_CHANGE_STREAM_METADATA_DATABASE =
"spanner-change-stream-database";
private SpannerTransformRegistrar spannerTransformRegistrar;
private ReadBuilder readBuilder;
private InsertBuilder writeBuilder;
private ChangeStreamReaderBuilder changeStreamReaderBuilder;

@Before
public void setup() {
spannerTransformRegistrar = new SpannerTransformRegistrar();
readBuilder = new ReadBuilder();
writeBuilder = new InsertBuilder();
changeStreamReaderBuilder = new ChangeStreamReaderBuilder();
}

@Test
public void testKnownBuilderInstances() {
Map<String, ExternalTransformBuilder<?, ?, ?>> builderInstancesMap =
spannerTransformRegistrar.knownBuilderInstances();
assertEquals(6, builderInstancesMap.size());
assertEquals(7, builderInstancesMap.size());
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_URN));
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.UPDATE_URN));
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.REPLACE_URN));
Expand All @@ -72,6 +80,9 @@ public void testKnownBuilderInstances() {
IsMapContaining.hasKey(SpannerTransformRegistrar.INSERT_OR_UPDATE_URN));
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.DELETE_URN));
assertThat(builderInstancesMap, IsMapContaining.hasKey(SpannerTransformRegistrar.READ_URN));
assertThat(
builderInstancesMap,
IsMapContaining.hasKey(SpannerTransformRegistrar.READ_CHANGE_STREAM_URN));
}

@Test(expected = IllegalArgumentException.class)
Expand Down Expand Up @@ -207,4 +218,136 @@ private InsertBuilder.Configuration getBasicWriteConfiguration() {
configuration.setMaxCumulativeBackoff(100L);
return configuration;
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingMandatoryFields() {
changeStreamReaderBuilder.buildExternal(new ChangeStreamReaderBuilder.Configuration());
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingDatabaseId() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();
configuration.setProjectId(SPANNER_PROJECT);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
changeStreamReaderBuilder.buildExternal(configuration);
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingInstanceId() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();
configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
changeStreamReaderBuilder.buildExternal(configuration);
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingChangeStreamName() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();
configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
changeStreamReaderBuilder.buildExternal(configuration);
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingMetadataInstance() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();
configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
changeStreamReaderBuilder.buildExternal(configuration);
}

@Test(expected = IllegalArgumentException.class)
public void testChangeStreamReaderBuilderBuildExternalWithMissingMetadataDatabase() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();
configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
changeStreamReaderBuilder.buildExternal(configuration);
}

@Test
public void testChangeStreamReaderBuilderBuildExternalWithRequiredFields() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();

configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);

PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
changeStreamReaderBuilder.buildExternal(configuration);
assertNotNull(changeStreamReaderTransform);
}

@Test
public void testChangeStreamReaderBuilderBuildExternalWithAllFields() {
String startAt = "2023-01-01T00:00:00Z";
String endAt = "2023-01-02T00:00:00Z";
String metadataTable = "meta-table";
String rpcPriority = "HIGH";
String refreshRate = "PT30S";

ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();

configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
configuration.setInclusiveStartAt(startAt);
configuration.setInclusiveEndAt(endAt);
configuration.setMetadataTable(metadataTable);
configuration.setRpcPriority(rpcPriority);
configuration.setWatermarkRefreshRate(refreshRate);

PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
changeStreamReaderBuilder.buildExternal(configuration);
assertNotNull(changeStreamReaderTransform);
}

@Test
public void testChangeStreamReaderBuilderBuildExternalWithNullOptionalValues() {
ChangeStreamReaderBuilder.Configuration configuration =
new ChangeStreamReaderBuilder.Configuration();

configuration.setProjectId(SPANNER_PROJECT);
configuration.setDatabaseId(SPANNER_DATABASE);
configuration.setInstanceId(SPANNER_INSTANCE);
configuration.setChangeStreamName(SPANNER_CHANGE_STREAM_NAME);
configuration.setMetadataInstance(SPANNER_CHANGE_STREAM_METADATA_INSTANCE);
configuration.setMetadataDatabase(SPANNER_CHANGE_STREAM_METADATA_DATABASE);
configuration.setInclusiveStartAt(null);
configuration.setInclusiveEndAt(null);
configuration.setMetadataTable(null);
configuration.setRpcPriority(null);
configuration.setWatermarkRefreshRate(null);

PTransform<PBegin, PCollection<String>> changeStreamReaderTransform =
changeStreamReaderBuilder.buildExternal(configuration);
assertNotNull(changeStreamReaderTransform);
}
}
Loading
Loading