Skip to content

Commit b02d871

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (apache#35028)
1 parent 39cb01d commit b02d871

File tree

3 files changed

+118
-1
lines changed

3 files changed

+118
-1
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ class BeamModulePlugin implements Plugin<Project> {
656656
def arrow_version = "15.0.2"
657657
def jmh_version = "1.34"
658658
def jupiter_version = "5.7.0"
659+
def spanner_grpc_proto_version = "6.95.1"
659660

660661
// Export Spark versions, so they are defined in a single place only
661662
project.ext.spark3_version = spark3_version
@@ -866,7 +867,7 @@ class BeamModulePlugin implements Plugin<Project> {
866867
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
868869
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
869-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
870+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
870871
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
871872
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
872873
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.Arrays;
2627
import java.util.Collections;
2728
import java.util.HashSet;
2829
import java.util.List;
@@ -42,10 +43,14 @@
4243
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4344
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4445
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
46+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
4547
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4649
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4750
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4851
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
4954

5055
/**
5156
* This class is responsible for transforming a {@link Struct} to a {@link List} of {@link
@@ -56,6 +61,7 @@
5661
*/
5762
public class ChangeStreamRecordMapper {
5863

64+
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamRecordMapper.class);
5965
private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
6066
private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
6167
private static final String CHILD_PARTITIONS_RECORD_COLUMN = "child_partitions_record";
@@ -223,12 +229,100 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223229
return Collections.singletonList(
224230
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225231
}
232+
233+
if (maybeV2ChangeRecord(resultSet.getCurrentRowAsStruct())) {
234+
return Arrays.asList(
235+
toV2ChangeStreamRecord(partition, resultSetMetadata, resultSet.getCurrentRowAsStruct()));
236+
}
237+
226238
// In GoogleSQL, change stream records are returned as an array of structs.
227239
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228240
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229241
.collect(Collectors.toList());
230242
}
231243

244+
boolean maybeV2ChangeRecord(Struct currentRow) {
245+
return currentRow.getColumnCount() == 1
246+
&& !currentRow.isNull(0)
247+
&& currentRow.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
248+
}
249+
250+
ChangeStreamRecord toV2ChangeStreamRecord(
251+
PartitionMetadata partition,
252+
ChangeStreamResultSetMetadata resultSetMetadata,
253+
Struct currentRow) {
254+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
255+
currentRow.getProtoMessage(
256+
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
257+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
258+
return parseV2PartitionStartRecord(
259+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
260+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
261+
return parseV2PartitionEndRecord(
262+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
263+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
264+
return parseV2PartitionEventRecord(
265+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
266+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
267+
return parseV2HeartbeatRecord(
268+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
269+
}
270+
271+
// todo data change record
272+
return new PartitionEndRecord(
273+
Timestamp.MIN_VALUE,
274+
"xxxx",
275+
changeStreamRecordMetadataFrom(partition, Timestamp.MIN_VALUE, resultSetMetadata));
276+
}
277+
278+
ChangeStreamRecord parseV2PartitionStartRecord(
279+
PartitionMetadata partition,
280+
ChangeStreamResultSetMetadata resultSetMetadata,
281+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
282+
final Timestamp startTimestamp =
283+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
284+
return new PartitionStartRecord(
285+
startTimestamp,
286+
partitionStartRecordProto.getRecordSequence(),
287+
partitionStartRecordProto.getPartitionTokensList(),
288+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
289+
}
290+
291+
ChangeStreamrecord parseV2PartitionEndRecord(
292+
PartitionMetadata partition,
293+
ChangeStreamResultSetMetadata resultSetMetadata,
294+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
295+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
296+
return new PartitionEndRecord(
297+
endTimestamp,
298+
partitionEndRecordProto.getRecordSequence(),
299+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
300+
}
301+
302+
ChangeStreamrecord parseV2PartitionEventRecord(
303+
PartitionMetadata partition,
304+
ChangeStreamResultSetMetadata resultSetMetadata,
305+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
306+
final Timestamp commitTimestamp =
307+
Timestamp.fromProto(partitionEndRecordProto.getCommitTimestamp());
308+
return new PartitionEventRecord(
309+
commitTimestamp,
310+
partitionEventRecordProto.getRecordSequence(),
311+
partitionEventRecordProto.getMoveInEventsList(),
312+
partitionEventRecordProto.getMoveOutEventsList(),
313+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
314+
}
315+
316+
ChangeStreamRecord parseV2HeartbeatRecord(
317+
PartitionMetadata partition,
318+
ChangeStreamResultSetMetadata resultSetMetadata,
319+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
320+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
321+
return new HeartbeatRecord(
322+
heartbeatTimestamp,
323+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
324+
}
325+
232326
Stream<ChangeStreamRecord> toChangeStreamRecord(
233327
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234328

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,4 +930,26 @@ public void testMappingJsonRowToChildPartitionRecord() {
930930
Collections.singletonList(childPartitionsRecord),
931931
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
932932
}
933+
934+
@Test
935+
public void testMappingProtoRowToPartitionStartRecord() {
936+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecord =
937+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord.newBuilder()
938+
.setRecordSequence("sequence")
939+
.addPartitionTokens("partitionTokens")
940+
.build();
941+
com.google.spanner.v1.ChangeStreamRecord record =
942+
com.google.spanner.v1.ChangeStreamRecord.newBuilder()
943+
.setPartitionStartRecord(partitionStartRecord)
944+
.build();
945+
946+
assertNotNull(record);
947+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
948+
949+
Struct ret = Struct.newBuilder().set("ChangeStreamRecord").to(record).build();
950+
when(resultSet.getCurrentRowAsStruct()).thenReturn(ret);
951+
assertEquals(
952+
Collections.singletonList(partitionStartRecord),
953+
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
954+
}
933955
}

0 commit comments

Comments
 (0)