Skip to content

Commit 90cea0e

Browse files
authored
Update GCS Sink writer to include additional metadata (#3317)
* build: upstream source modifications * add tests
1 parent efa629c commit 90cea0e

File tree

8 files changed

+149
-10
lines changed

8 files changed

+149
-10
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ static SourceSchema getSourceSchema(
223223
tableConfigs.stream().map(TableConfig::tableName).collect(ImmutableList.toImmutableList());
224224
ImmutableMap<String, ImmutableMap<String, SourceColumnType>> tableSchemas =
225225
schemaDiscovery.discoverTableSchema(dataSource, config.sourceSchemaReference(), tables);
226+
227+
ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> tableIndexes =
228+
schemaDiscovery.discoverTableIndexes(dataSource, config.sourceSchemaReference(), tables);
229+
226230
LOG.info("Found table schemas: {}", tableSchemas);
227231
tableSchemas.entrySet().stream()
228232
.map(
@@ -237,6 +241,16 @@ static SourceSchema getSourceSchema(
237241
colEntry ->
238242
sourceTableSchemaBuilder.addSourceColumnNameToSourceColumnType(
239243
colEntry.getKey(), colEntry.getValue()));
244+
245+
if (tableIndexes.containsKey(tableEntry.getKey())) {
246+
sourceTableSchemaBuilder.setPrimaryKeyColumns(
247+
tableIndexes.get(tableEntry.getKey()).stream()
248+
.filter(SourceColumnIndexInfo::isPrimary)
249+
.sorted()
250+
.map(SourceColumnIndexInfo::columnName)
251+
.collect(ImmutableList.toImmutableList()));
252+
}
253+
240254
return sourceTableSchemaBuilder.build();
241255
})
242256
.forEach(sourceSchemaBuilder::addTableSchema);

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/row/SourceRow.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
import com.google.auto.value.AutoValue;
1919
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
2020
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
21+
import com.google.common.collect.ImmutableList;
2122
import java.io.Serializable;
2223
import javax.annotation.Nullable;
24+
import org.apache.avro.Schema;
25+
import org.apache.avro.SchemaBuilder;
2326
import org.apache.avro.generic.GenericRecord;
2427
import org.apache.avro.generic.GenericRecordBuilder;
2528

@@ -59,6 +62,8 @@ public abstract class SourceRow implements Serializable {
5962
@Nullable
6063
public abstract String shardId();
6164

65+
public abstract ImmutableList<String> primaryKeyColumns();
66+
6267
/**
6368
* Get the readTime epoch in microseconds.
6469
*
@@ -79,6 +84,45 @@ public GenericRecord getPayload() {
7984

8085
abstract SerializableGenericRecord record();
8186

87+
/**
88+
* Generates a schema that wraps the payload with metadata fields. We prioritize using the
89+
* payload's existing schema definition.
90+
*/
91+
public Schema gcsSchema() {
92+
return SchemaBuilder.record("SourceRowWithMetadata")
93+
.fields()
94+
.name("tableName")
95+
.type()
96+
.stringType()
97+
.noDefault()
98+
.name("shardId")
99+
.type()
100+
.nullable()
101+
.stringType()
102+
.noDefault()
103+
.name("primaryKeys")
104+
.type()
105+
.array()
106+
.items()
107+
.stringType()
108+
.noDefault()
109+
// We reuse the exact schema from the payload to ensure compatibility
110+
.name("payload")
111+
.type(this.getPayload().getSchema())
112+
.noDefault()
113+
.endRecord();
114+
}
115+
116+
/** Converts the current SourceRow into a GenericRecord suitable for GCS writing. */
117+
public GenericRecord toGcsRecord() {
118+
return new GenericRecordBuilder(this.gcsSchema())
119+
.set("tableName", this.tableName())
120+
.set("shardId", this.shardId())
121+
.set("primaryKeys", this.primaryKeyColumns())
122+
.set("payload", this.getPayload())
123+
.build();
124+
}
125+
82126
/**
83127
* returns an initialized builder for SourceRow.
84128
*
@@ -125,6 +169,8 @@ public abstract static class Builder {
125169
@SuppressWarnings("CheckReturnValue")
126170
public abstract Builder setShardId(String value);
127171

172+
public abstract Builder setPrimaryKeyColumns(ImmutableList<String> value);
173+
128174
@SuppressWarnings("CheckReturnValue")
129175
abstract Builder setRecord(SerializableGenericRecord value);
130176

@@ -150,6 +196,7 @@ protected void initialize(
150196
this.setTableSchemaUUID(sourceTableSchema.tableSchemaUUID());
151197
this.setTableName(sourceTableSchema.tableName());
152198
this.setShardId(shardId);
199+
this.setPrimaryKeyColumns(sourceTableSchema.primaryKeyColumns());
153200
this.recordBuilder = new GenericRecordBuilder(sourceTableSchema.avroSchema());
154201
this.recordBuilder.set(SourceTableSchema.READ_TIME_STAMP_FIELD_NAME, readTimeMicros);
155202
this.payloadBuilder =

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceTableSchema.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper.MapperType;
2222
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
2323
import com.google.common.base.Preconditions;
24+
import com.google.common.collect.ImmutableList;
2425
import com.google.common.collect.ImmutableMap;
2526
import java.io.Serializable;
2627
import java.util.UUID;
@@ -59,6 +60,8 @@ public abstract class SourceTableSchema implements Serializable {
5960
// Mapped Avro Schema (to unified types) that each row will carry.
6061
public abstract Schema avroSchema();
6162

63+
public abstract ImmutableList<String> primaryKeyColumns();
64+
6265
public Schema getAvroPayload() {
6366
return avroSchema().getField(PAYLOAD_FIELD_NAME).schema();
6467
}
@@ -91,6 +94,8 @@ public abstract static class Builder {
9194
abstract ImmutableMap.Builder<String, SourceColumnType>
9295
sourceColumnNameToSourceColumnTypeBuilder();
9396

97+
public abstract Builder setPrimaryKeyColumns(ImmutableList<String> value);
98+
9499
private FieldAssembler<RecordDefault<Schema>> payloadFieldAssembler;
95100

96101
public final Builder addSourceColumnNameToSourceColumnType(
@@ -132,6 +137,7 @@ public Builder() {
132137

133138
public Builder initialize(UnifiedTypeMapper.MapperType mapperType) {
134139
this.mapperType = mapperType;
140+
this.setPrimaryKeyColumns(ImmutableList.of());
135141
return this;
136142
}
137143

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/MigrateTableTransform.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,15 +183,12 @@ public WriteFilesResult<AvroDestination> writeToGCS(
183183
return sourceRows.apply(
184184
"WriteAvroToGCS",
185185
FileIO.<AvroDestination, SourceRow>writeDynamic()
186-
.by(
187-
(record) ->
188-
AvroDestination.of(
189-
record.tableName(), record.getPayload().getSchema().toString()))
186+
.by((record) -> AvroDestination.of(record.tableName(), record.gcsSchema().toString()))
190187
.via(
191188
Contextful.fn(
192189
record -> {
193190
Metrics.counter(MigrateTableTransform.class, metricName).inc();
194-
return record.getPayload();
191+
return record.toGcsRecord();
195192
}),
196193
Contextful.fn(destination -> AvroIO.sink(destination.jsonSchema)))
197194
.withDestinationCoder(AvroCoder.of(AvroDestination.class))

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public void testJdbcIoWrapperBasic() throws RetriableSchemaDiscoveryException {
125125
assertThat(tableSchema.tableName()).isEqualTo("testTable");
126126
assertThat(tableSchema.sourceColumnNameToSourceColumnType())
127127
.isEqualTo(ImmutableMap.of(testCol, testColType));
128+
assertThat(tableSchema.primaryKeyColumns()).isEqualTo(ImmutableList.of(testCol));
128129
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
129130
jdbcIoWrapper.getTableReaders();
130131
assertThat(tableReaders.size()).isEqualTo(1);
@@ -177,6 +178,7 @@ public void testJdbcIoWrapperWithoutInference() throws RetriableSchemaDiscoveryE
177178
assertThat(tableSchema.tableName()).isEqualTo("testTable");
178179
assertThat(tableSchema.sourceColumnNameToSourceColumnType())
179180
.isEqualTo(ImmutableMap.of(testCol, testColType));
181+
assertThat(tableSchema.primaryKeyColumns()).isEqualTo(ImmutableList.of(testCol));
180182
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
181183
jdbcIoWrapper.getTableReaders();
182184
assertThat(tableReaders.size()).isEqualTo(1);

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/row/SourceRowTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import static com.google.common.truth.Truth.assertThat;
1919

2020
import com.google.cloud.teleport.v2.source.reader.io.schema.SchemaTestUtils;
21+
import com.google.common.collect.ImmutableList;
2122
import junit.framework.TestCase;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.generic.GenericRecord;
2225
import org.junit.Assert;
2326
import org.junit.Test;
2427
import org.junit.runner.RunWith;
@@ -47,6 +50,56 @@ public void testSourceRowBuilds() {
4750
assertThat(sourceRow.getReadTimeMicros()).isEqualTo(testReadTime);
4851
assertThat(sourceRow.getPayload().get("firstName")).isEqualTo("abc");
4952
assertThat(sourceRow.getPayload().get("lastName")).isEqualTo("def");
53+
assertThat(sourceRow.primaryKeyColumns()).isEqualTo(ImmutableList.of());
54+
}
55+
56+
@Test
57+
public void testGcsSchema() {
58+
final String testTable = "testTable";
59+
final String shardId = "id1";
60+
final long testReadTime = 1712751118L;
61+
var schemaRef = SchemaTestUtils.generateSchemaReference("public", "mydb");
62+
var schema = SchemaTestUtils.generateTestTableSchema(testTable);
63+
SourceRow sourceRow =
64+
SourceRow.builder(schemaRef, schema, shardId, testReadTime)
65+
.setField("firstName", "abc")
66+
.setField("lastName", "def")
67+
.build();
68+
69+
Schema gcsSchema = sourceRow.gcsSchema();
70+
assertThat(gcsSchema.getName()).isEqualTo("SourceRowWithMetadata");
71+
assertThat(gcsSchema.getField("tableName")).isNotNull();
72+
assertThat(gcsSchema.getField("shardId")).isNotNull();
73+
assertThat(gcsSchema.getField("primaryKeys")).isNotNull();
74+
assertThat(gcsSchema.getField("primaryKeys").schema().getType()).isEqualTo(Schema.Type.ARRAY);
75+
assertThat(gcsSchema.getField("payload")).isNotNull();
76+
assertThat(gcsSchema.getField("payload").schema())
77+
.isEqualTo(sourceRow.getPayload().getSchema());
78+
}
79+
80+
@Test
81+
public void testToGcsRecord() {
82+
final String testTable = "testTable";
83+
final String shardId = "id1";
84+
final long testReadTime = 1712751118L;
85+
var schemaRef = SchemaTestUtils.generateSchemaReference("public", "mydb");
86+
var schema =
87+
SchemaTestUtils.generateTestTableSchemaBuilder(testTable)
88+
.setPrimaryKeyColumns(ImmutableList.of(SchemaTestUtils.TEST_FIELD_NAME_1))
89+
.build();
90+
SourceRow sourceRow =
91+
SourceRow.builder(schemaRef, schema, shardId, testReadTime)
92+
.setField("firstName", "abc")
93+
.setField("lastName", "def")
94+
.build();
95+
96+
GenericRecord gcsRecord = sourceRow.toGcsRecord();
97+
assertThat(gcsRecord.getSchema().getName()).isEqualTo("SourceRowWithMetadata");
98+
assertThat(gcsRecord.get("tableName")).isEqualTo(testTable);
99+
assertThat(gcsRecord.get("shardId")).isEqualTo(shardId);
100+
assertThat((java.util.List<?>) gcsRecord.get("primaryKeys"))
101+
.containsExactly(SchemaTestUtils.TEST_FIELD_NAME_1);
102+
assertThat(gcsRecord.get("payload")).isEqualTo(sourceRow.getPayload());
50103
}
51104

52105
@Test

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaTestUtils.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,24 @@
2222
/** Test Utility class for Generating test schema. */
2323
public class SchemaTestUtils {
2424

25-
static final String TEST_FIELD_NAME_1 = "firstName";
26-
static final String TEST_FIELD_NAME_2 = "lastName";
25+
public static final String TEST_FIELD_NAME_1 = "firstName";
26+
public static final String TEST_FIELD_NAME_2 = "lastName";
2727

2828
public static SourceSchemaReference generateSchemaReference(String namespace, String dbName) {
2929
return SourceSchemaReference.ofJdbc(
3030
JdbcSchemaReference.builder().setNamespace(namespace).setDbName(dbName).build());
3131
}
3232

33-
public static SourceTableSchema generateTestTableSchema(String tableName) {
33+
public static SourceTableSchema.Builder generateTestTableSchemaBuilder(String tableName) {
3434
return SourceTableSchema.builder(SQLDialect.MYSQL)
3535
.setTableName(tableName)
3636
.addSourceColumnNameToSourceColumnType(
3737
TEST_FIELD_NAME_1, new SourceColumnType("varchar", new Long[] {20L}, null))
3838
.addSourceColumnNameToSourceColumnType(
39-
TEST_FIELD_NAME_2, new SourceColumnType("varchar", new Long[] {20L}, null))
40-
.build();
39+
TEST_FIELD_NAME_2, new SourceColumnType("varchar", new Long[] {20L}, null));
40+
}
41+
42+
public static SourceTableSchema generateTestTableSchema(String tableName) {
43+
return generateTestTableSchemaBuilder(tableName).build();
4144
}
4245
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceTableSchemaTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
2121
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper.MapperType;
22+
import com.google.common.collect.ImmutableList;
2223
import junit.framework.TestCase;
2324
import org.apache.avro.SchemaBuilder;
2425
import org.junit.Assert;
@@ -48,6 +49,22 @@ public void testTableSchemaBuilds() {
4849
sourceTableSchema.getAvroPayload().getField(SchemaTestUtils.TEST_FIELD_NAME_2).schema())
4950
.isEqualTo(SchemaBuilder.unionOf().nullType().and().stringType().endUnion());
5051
assertThat(sourceTableSchema.tableName()).isEqualTo(testTableName);
52+
assertThat(sourceTableSchema.primaryKeyColumns()).isEmpty();
53+
}
54+
55+
@Test
56+
public void testTableSchemaWithPrimaryKey() {
57+
final String testTableName = "testTableName";
58+
var sourceTableSchema =
59+
SchemaTestUtils.generateTestTableSchemaBuilder(testTableName)
60+
.setPrimaryKeyColumns(
61+
ImmutableList.of(
62+
SchemaTestUtils.TEST_FIELD_NAME_1, SchemaTestUtils.TEST_FIELD_NAME_2))
63+
.build();
64+
assertThat(sourceTableSchema.tableName()).isEqualTo(testTableName);
65+
assertThat(sourceTableSchema.primaryKeyColumns())
66+
.containsExactly(SchemaTestUtils.TEST_FIELD_NAME_1, SchemaTestUtils.TEST_FIELD_NAME_2)
67+
.inOrder();
5168
}
5269

5370
@Test

0 commit comments

Comments
 (0)