Skip to content

Commit e1a899c

Browse files
committed
Parse v2 record.
pick 4d2a5a7 # Fix API surface test (apache#35028)
1 parent 39cb01d commit e1a899c

File tree

5 files changed

+633
-2
lines changed

5 files changed

+633
-2
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ class BeamModulePlugin implements Plugin<Project> {
656656
def arrow_version = "15.0.2"
657657
def jmh_version = "1.34"
658658
def jupiter_version = "5.7.0"
659+
def spanner_grpc_proto_version = "6.95.1"
659660

660661
// Export Spark versions, so they are defined in a single place only
661662
project.ext.spark3_version = spark3_version
@@ -866,7 +867,7 @@ class BeamModulePlugin implements Plugin<Project> {
866867
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
867868
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
868869
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
869-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
870+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
870871
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
871872
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
872873
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -42,7 +44,12 @@
4244
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4345
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4446
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveInEvent;
48+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.MoveOutEvent;
49+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
50+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4551
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
52+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
4653
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
4754
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
4855
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -223,12 +230,236 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
223230
return Collections.singletonList(
224231
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
225232
}
233+
234+
if (isProtoChangeRecord(resultSet.getCurrentRowAsStruct())) {
235+
return Arrays.asList(
236+
toProtoChangeStreamRecord(
237+
partition, resultSetMetadata, resultSet.getCurrentRowAsStruct()));
238+
}
239+
226240
// In GoogleSQL, change stream records are returned as an array of structs.
227241
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
228242
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
229243
.collect(Collectors.toList());
230244
}
231245

246+
boolean isProtoChangeRecord(Struct currentRow) {
247+
return currentRow.getColumnCount() == 1
248+
&& !currentRow.isNull(0)
249+
&& currentRow.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
250+
}
251+
252+
ChangeStreamRecord toProtoChangeStreamRecord(
253+
PartitionMetadata partition,
254+
ChangeStreamResultSetMetadata resultSetMetadata,
255+
Struct currentRow) {
256+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
257+
currentRow.getProtoMessage(
258+
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
259+
if (changeStreamRecordProto.hasPartitionStartRecord()) {
260+
return parseProtoPartitionStartRecord(
261+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
262+
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
263+
return parseProtoPartitionEndRecord(
264+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
265+
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
266+
return parseProtoPartitionEventRecord(
267+
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
268+
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
269+
return parseProtoHeartbeatRecord(
270+
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
271+
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
272+
return parseProtoDataChangeRecord(
273+
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
274+
} else {
275+
throw new IllegalArgumentException(
276+
"Unknown change stream record type " + currentRow.toString());
277+
}
278+
}
279+
280+
ChangeStreamRecord parseProtoPartitionStartRecord(
281+
PartitionMetadata partition,
282+
ChangeStreamResultSetMetadata resultSetMetadata,
283+
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
284+
final Timestamp startTimestamp =
285+
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
286+
return new PartitionStartRecord(
287+
startTimestamp,
288+
partitionStartRecordProto.getRecordSequence(),
289+
partitionStartRecordProto.getPartitionTokensList(),
290+
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
291+
}
292+
293+
ChangeStreamRecord parseProtoPartitionEndRecord(
294+
PartitionMetadata partition,
295+
ChangeStreamResultSetMetadata resultSetMetadata,
296+
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
297+
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
298+
return new PartitionEndRecord(
299+
endTimestamp,
300+
partitionEndRecordProto.getRecordSequence(),
301+
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
302+
}
303+
304+
ChangeStreamRecord parseProtoPartitionEventRecord(
305+
PartitionMetadata partition,
306+
ChangeStreamResultSetMetadata resultSetMetadata,
307+
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
308+
final Timestamp commitTimestamp =
309+
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
310+
List<MoveInEvent> moveInEvents = new ArrayList<>();
311+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveInEvent
312+
moveInEventProto : partitionEventRecordProto.getMoveInEventsList()) {
313+
moveInEvents.add(new MoveInEvent(moveInEventProto.getSourcePartitionToken()));
314+
}
315+
List<MoveOutEvent> moveOutEvents = new ArrayList<>();
316+
for (com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord.MoveOutEvent
317+
moveOutEventProto : partitionEventRecordProto.getMoveOutEventsList()) {
318+
moveOutEvents.add(new MoveOutEvent(moveOutEventProto.getDestinationPartitionToken()));
319+
}
320+
return new PartitionEventRecord(
321+
commitTimestamp,
322+
partitionEventRecordProto.getRecordSequence(),
323+
moveInEvents,
324+
moveOutEvents,
325+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
326+
}
327+
328+
ChangeStreamRecord parseProtoHeartbeatRecord(
329+
PartitionMetadata partition,
330+
ChangeStreamResultSetMetadata resultSetMetadata,
331+
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
332+
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
333+
return new HeartbeatRecord(
334+
heartbeatTimestamp,
335+
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
336+
}
337+
338+
ChangeStreamRecord parseProtoDataChangeRecord(
339+
PartitionMetadata partition,
340+
ChangeStreamResultSetMetadata resultSetMetadata,
341+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
342+
final Timestamp commitTimestamp =
343+
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
344+
return new DataChangeRecord(
345+
partition.getPartitionToken(),
346+
commitTimestamp,
347+
dataChangeRecordProto.getServerTransactionId(),
348+
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
349+
dataChangeRecordProto.getRecordSequence(),
350+
dataChangeRecordProto.getTable(),
351+
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
352+
parseProtoMod(
353+
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
354+
parseProtoModType(dataChangeRecordProto.getModType()),
355+
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
356+
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
357+
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
358+
dataChangeRecordProto.getTransactionTag(),
359+
dataChangeRecordProto.getIsSystemTransaction(),
360+
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
361+
}
362+
363+
List<ColumnType> parseProtoColumnMetadata(
364+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
365+
columnMetadataProtos) {
366+
List<ColumnType> columnTypes = new ArrayList<>();
367+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
368+
columnMetadataProto : columnMetadataProtos) {
369+
String typeCodeJson;
370+
try {
371+
typeCodeJson = this.printer.print(columnMetadataProto.getType());
372+
} catch (InvalidProtocolBufferException exc) {
373+
throw new IllegalArgumentException(
374+
"Failed to print type: " + columnMetadataProto.getType().toString());
375+
}
376+
377+
ColumnType columnType =
378+
new ColumnType(
379+
columnMetadataProto.getName(),
380+
new TypeCode(typeCodeJson),
381+
columnMetadataProto.getIsPrimaryKey(),
382+
columnMetadataProto.getOrdinalPosition());
383+
columnTypes.add(columnType);
384+
}
385+
return columnTypes;
386+
}
387+
388+
String ConvertModValueProtosToJson(
389+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
390+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
391+
columnMetadataProtos) {
392+
com.google.protobuf.Struct.Builder modStructValueBuilder =
393+
com.google.protobuf.Struct.newBuilder();
394+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
395+
modValueProtos) {
396+
String columnName =
397+
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
398+
Value columnValue = modValueProto.getValue();
399+
modStructValueBuilder.putFields(columnName, columnValue);
400+
}
401+
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
402+
String modValueJson;
403+
try {
404+
modValueJson = this.printer.print(modStructValue);
405+
} catch (InvalidProtocolBufferException exc) {
406+
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
407+
}
408+
return modValueJson;
409+
}
410+
411+
List<Mod> parseProtoMod(
412+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
413+
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
414+
columnMetadataProtos) {
415+
List<Mod> mods = new ArrayList<>();
416+
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
417+
String keysJson = ConvertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
418+
String oldValuesJson =
419+
ConvertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
420+
String newValuesJson =
421+
ConvertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
422+
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
423+
mods.add(mod);
424+
}
425+
return mods;
426+
}
427+
428+
ModType parseProtoModType(
429+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
430+
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
431+
return ModType.INSERT;
432+
} else if (modTypeProto
433+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
434+
return ModType.UPDATE;
435+
} else if (modTypeProto
436+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
437+
return ModType.DELETE;
438+
}
439+
return ModType.UNKNOWN;
440+
}
441+
442+
ValueCaptureType parseProtoValueCaptureType(
443+
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
444+
valueCaptureTypeProto) {
445+
if (valueCaptureTypeProto
446+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
447+
return ValueCaptureType.NEW_ROW;
448+
} else if (valueCaptureTypeProto
449+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
450+
return ValueCaptureType.NEW_VALUES;
451+
} else if (valueCaptureTypeProto
452+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
453+
.OLD_AND_NEW_VALUES) {
454+
return ValueCaptureType.OLD_AND_NEW_VALUES;
455+
} else if (valueCaptureTypeProto
456+
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
457+
.NEW_ROW_AND_OLD_VALUES) {
458+
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
459+
}
460+
return ValueCaptureType.UNKNOWN;
461+
}
462+
232463
Stream<ChangeStreamRecord> toChangeStreamRecord(
233464
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
234465

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public String toString() {
288288
+ '\''
289289
+ ", isSystemTransaction="
290290
+ isSystemTransaction
291-
+ ", metadata"
291+
+ ", metadata="
292292
+ metadata
293293
+ '}';
294294
}

0 commit comments

Comments
 (0)