Skip to content

Commit d37afd4

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (apache#35028)
1 parent 39cb01d commit d37afd4

File tree

3 files changed

+62
-1
lines changed

3 files changed

+62
-1
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ class BeamModulePlugin implements Plugin<Project> {
656656
def arrow_version = "15.0.2"
657657
def jmh_version = "1.34"
658658
def jupiter_version = "5.7.0"
659+
def spanner_grpc_proto_version = "6.95.1"
659660

660661
// Export Spark versions, so they are defined in a single place only
661662
project.ext.spark3_version = spark3_version
@@ -866,7 +867,7 @@ class BeamModulePlugin implements Plugin<Project> {
866867
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
868869
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
869-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
870+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
870871
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
871872
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
872873
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,18 @@
4040
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
4141
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
4242
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
43+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
44+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
45+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4346
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4447
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
4548
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
4649
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4750
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
51+
import java.util.Arrays;
4852
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
4955

5056
/**
5157
* This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
@@ -56,6 +62,7 @@
5662
*/
5763
public class ChangeStreamRecordMapper {
5864

65+
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamRecordMapper.class);
5966
private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
6067
private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
6168
private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
@@ -223,12 +230,45 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223230
return Collections.singletonList(
224231
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225232
}
233+
234+
if (containOnlyProtoType(resultSet.getCurrentRowAsStruct())) {
235+
return Arrays.asList(
236+
toV2ChangeStreamRecord(partition, resultSetMetadata, resultSet.getCurrentRowAsStruct()));
237+
}
238+
226239
// In GoogleSQL, change stream records are returned as an array of structs.
227240
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228241
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229242
.collect(Collectors.toList());
230243
}
231244

245+
boolean containOnlyProtoType(Struct currentRow){
246+
LOG.info("changliiu containOnlyProtoType1");
247+
if(currentRow.getColumnCount() !=1 || currentRow.isNull(0)){
248+
LOG.info("changliiu containOnlyProtoType2");
249+
return false;
250+
}
251+
com.google.cloud.spanner.Type columnType = currentRow.getColumnType(i);
252+
if (columnType.getCode() == com.google.cloud.spanner.Type.Code.PROTO) {
253+
LOG.info("changliiu containOnlyProtoType3");
254+
return true;
255+
}
256+
LOG.info("changliiu containOnlyProtoType4");
257+
return false;
258+
}
259+
260+
ChangeStreamRecord toV2ChangeStreamRecord(PartitionMetadata partition,
261+
ChangeStreamResultSetMetadata resultSetMetadata,
262+
Struct currentRow) {
263+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecord =
264+
currentRow.getProtoMessage(0,
265+
com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
266+
// todo finish this.
267+
LOG.info("changliiu toV2ChangeStreamRecord");
268+
return new PartitionEndRecord(Timestamp.MIN_VALUE, "xxxx",
269+
changeStreamRecordMetadataFrom(partition, Timestamp.MIN_VALUE, resultSetMetadata));
270+
}
271+
232272
Stream<ChangeStreamRecord> toChangeStreamRecord(
233273
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234274

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,4 +930,24 @@ public void testMappingJsonRowToChildPartitionRecord() {
930930
Collections.singletonList(childPartitionsRecord),
931931
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
932932
}
933+
934+
@Test
935+
public void testMappingProtoRowToPartitionStartRecord() {
936+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecord =
937+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord.newBuilder()
938+
.setStartTimestamp(Timestamp.MIN_VALUE)
939+
.setRecordSequence("sequence")
940+
.addPartitionTokens("partitionTokens")
941+
.build();
942+
com.google.spanner.v1.ChangeStreamRecord record = com.google.spanner.v1.ChangeStreamRecord.newBuilder()
943+
.setPartitionStartRecord(partitionStartRecord).build();
944+
945+
assertNotNull(partitionStartRecord);
946+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
947+
948+
when(resultSet.getPgJsonb(0)).thenReturn(jsonString);
949+
assertEquals(
950+
Collections.singletonList(partitionStartRecord),
951+
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
952+
}
933953
}

0 commit comments

Comments
 (0)