Skip to content

Commit 6451fd8

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (apache#35028)
1 parent 39cb01d commit 6451fd8

File tree

4 files changed

+312
-1
lines changed

4 files changed

+312
-1
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ class BeamModulePlugin implements Plugin<Project> {
656656
def arrow_version = "15.0.2"
657657
def jmh_version = "1.34"
658658
def jupiter_version = "5.7.0"
659+
def spanner_grpc_proto_version = "6.95.1"
659660

660661
// Export Spark versions, so they are defined in a single place only
661662
project.ext.spark3_version = spark3_version
@@ -866,7 +867,7 @@ class BeamModulePlugin implements Plugin<Project> {
866867
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
868869
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
869-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
870+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
870871
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
871872
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
872873
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -42,7 +44,12 @@
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4446
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
49+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
50+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4551
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
52+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4653
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4754
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4855
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -223,12 +230,111 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223230
return Collections.singletonList(
224231
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225232
}
233+
234+
// TODO(changliiu): wording - v2 or mutable? comments
235+
if (maybeV2ChangeRecord(resultSet.getCurrentRowAsStruct())) {
236+
return Arrays.asList(
237+
toV2ChangeStreamRecord(partition, resultSetMetadata, resultSet.getCurrentRowAsStruct()));
238+
}
239+
226240
// In GoogleSQL, change stream records are returned as an array of structs.
227241
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228242
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229243
.collect(Collectors.toList());
230244
}
231245

246+
boolean maybeV2ChangeRecord(Struct currentRow) {
247+
return currentRow.getColumnCount() == 1
248+
&& !currentRow.isNull(0)
249+
&& currentRow.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
250+
}
251+
252+
ChangeStreamRecord toV2ChangeStreamRecord(
253+
PartitionMetadata partition,
254+
ChangeStreamResultSetMetadata resultSetMetadata,
255+
Struct currentRow) {
256+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
257+
currentRow.getProtoMessage(
258+
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
259+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
260+
return parseV2PartitionStartRecord(
261+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
262+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
263+
return parseV2PartitionEndRecord(
264+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
265+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
266+
return parseV2PartitionEventRecord(
267+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
268+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
269+
return parseV2HeartbeatRecord(
270+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
271+
}
272+
273+
// todo data change record
274+
return new PartitionEndRecord(
275+
Timestamp.MIN_VALUE,
276+
"xxxx",
277+
changeStreamRecordMetadataFrom(partition, Timestamp.MIN_VALUE, resultSetMetadata));
278+
}
279+
280+
ChangeStreamRecord parseV2PartitionStartRecord(
281+
PartitionMetadata partition,
282+
ChangeStreamResultSetMetadata resultSetMetadata,
283+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
284+
final Timestamp startTimestamp =
285+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
286+
return new PartitionStartRecord(
287+
startTimestamp,
288+
partitionStartRecordProto.getRecordSequence(),
289+
partitionStartRecordProto.getPartitionTokensList(),
290+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
291+
}
292+
293+
ChangeStreamRecord parseV2PartitionEndRecord(
294+
PartitionMetadata partition,
295+
ChangeStreamResultSetMetadata resultSetMetadata,
296+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
297+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
298+
return new PartitionEndRecord(
299+
endTimestamp,
300+
partitionEndRecordProto.getRecordSequence(),
301+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
302+
}
303+
304+
ChangeStreamRecord parseV2PartitionEventRecord(
305+
PartitionMetadata partition,
306+
ChangeStreamResultSetMetadata resultSetMetadata,
307+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
308+
final Timestamp commitTimestamp =
309+
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
310+
List<MoveInEvent> moveInEvents = new ArrayList<>();
311+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent
312+
moveInEventProto : partitionEventRecordProto.getMoveInEventsList()) {
313+
moveInEvents.add(new MoveInEvent(moveInEventProto.getSourcePartitionToken()));
314+
}
315+
List<MoveOutEvent> moveOutEvents = new ArrayList<>();
316+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent
317+
moveOutEventProto : partitionEventRecordProto.getMoveOutEventsList()) {
318+
moveOutEvents.add(new MoveOutEvent(moveOutEventProto.getDestinationPartitionToken()));
319+
}
320+
return new PartitionEventRecord(
321+
commitTimestamp,
322+
partitionEventRecordProto.getRecordSequence(),
323+
moveInEvents,
324+
moveOutEvents,
325+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
326+
}
327+
328+
ChangeStreamRecord parseV2HeartbeatRecord(
329+
PartitionMetadata partition,
330+
ChangeStreamResultSetMetadata resultSetMetadata,
331+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
332+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
333+
return new HeartbeatRecord(
334+
heartbeatTimestamp,
335+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
336+
}
337+
232338
Stream<ChangeStreamRecord> toChangeStreamRecord(
233339
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234340

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
1919

2020
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestJsonMapper.recordToJson;
21+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto;
2122
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
2223
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
2324
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsWithUnknownModTypeAndValueCaptureType;
@@ -29,6 +30,7 @@
2930
import com.google.cloud.Timestamp;
3031
import com.google.cloud.spanner.Dialect;
3132
import com.google.cloud.spanner.Struct;
33+
import com.google.cloud.spanner.Value;
3234
import java.util.Arrays;
3335
import java.util.Collections;
3436
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
@@ -41,8 +43,13 @@
4143
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
46+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
49+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4450
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
4551
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
52+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4653
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4754
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4855
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -930,4 +937,80 @@ public void testMappingJsonRowToChildPartitionRecord() {
930937
Collections.singletonList(childPartitionsRecord),
931938
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
932939
}
940+
941+
@Test
942+
public void testMappingProtoRowToPartitionStartRecord() {
943+
PartitionStartRecord partitionStartChangeStreamRecord =
944+
new PartitionStartRecord(
945+
Timestamp.MIN_VALUE,
946+
"fakeRecordSequence",
947+
Arrays.asList("partitionToken1", "partitionToken2"),
948+
null);
949+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
950+
recordToProto(partitionStartChangeStreamRecord);
951+
assertNotNull(changeStreamRecordProto);
952+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
953+
Struct changeStreamRecordProtoWrapper =
954+
Struct.newBuilder().add(Value.protoMessage(changeStreamRecordProto)).build();
955+
956+
when(resultSet.getCurrentRowAsStruct()).thenReturn(changeStreamRecordProtoWrapper);
957+
assertEquals(
958+
Collections.singletonList(partitionStartChangeStreamRecord),
959+
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
960+
}
961+
962+
@Test
963+
public void testMappingProtoRowToPartitionEndRecord() {
964+
PartitionEndRecord partitionEndChangeStreamRecord =
965+
new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null);
966+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
967+
recordToProto(partitionEndChangeStreamRecord);
968+
assertNotNull(changeStreamRecordProto);
969+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
970+
Struct changeStreamRecordProtoWrapper =
971+
Struct.newBuilder().add(Value.protoMessage(changeStreamRecordProto)).build();
972+
973+
when(resultSet.getCurrentRowAsStruct()).thenReturn(changeStreamRecordProtoWrapper);
974+
assertEquals(
975+
Collections.singletonList(partitionEndChangeStreamRecord),
976+
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
977+
}
978+
979+
@Test
980+
public void testMappingProtoRowToPartitionEventRecord() {
981+
PartitionEventRecord partitionEventChangeStreamRecord =
982+
new PartitionEventRecord(
983+
Timestamp.MIN_VALUE,
984+
"fakeRecordSequence",
985+
Arrays.asList(new MoveInEvent("token1"), new MoveInEvent("token2")),
986+
Arrays.asList(new MoveOutEvent("token1"), new MoveOutEvent("token2")),
987+
null);
988+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
989+
recordToProto(partitionEventChangeStreamRecord);
990+
assertNotNull(changeStreamRecordProto);
991+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
992+
Struct changeStreamRecordProtoWrapper =
993+
Struct.newBuilder().add(Value.protoMessage(changeStreamRecordProto)).build();
994+
995+
when(resultSet.getCurrentRowAsStruct()).thenReturn(changeStreamRecordProtoWrapper);
996+
assertEquals(
997+
Collections.singletonList(partitionEventChangeStreamRecord),
998+
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
999+
}
1000+
1001+
@Test
1002+
public void testMappingProtoRowToHeartRecord() {
1003+
HeartbeatRecord HeartbeatChangeStreamRecord = new HeartbeatRecord(Timestamp.MIN_VALUE, null);
1004+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1005+
recordToProto(HeartbeatChangeStreamRecord);
1006+
assertNotNull(changeStreamRecordProto);
1007+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1008+
Struct changeStreamRecordProtoWrapper =
1009+
Struct.newBuilder().add(Value.protoMessage(changeStreamRecordProto)).build();
1010+
1011+
when(resultSet.getCurrentRowAsStruct()).thenReturn(changeStreamRecordProtoWrapper);
1012+
assertEquals(
1013+
Collections.singletonList(HeartbeatChangeStreamRecord),
1014+
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1015+
}
9331016
}

0 commit comments

Comments
 (0)