Skip to content

Commit becc7f8

Browse files
authored
Add UUID support in import/export templates (#2145)
* Add UUID support in import/export templates * Add UUID support in import/export templates * Add UUID support in import/export templates * Spotless fix * Separate out UUID Avro integration tests * Separate out UUID CSV import test * Fix ImportPipelineIT tests * Separate out UUID SpannerToTextIT tests * Separate out UUID SpannerToTextIT tests * Add more unit tests * Remove println * Add todos for cleanups * Add null tests * Add comment * Associate bug id with todos * Add todos * Fix test * Add todos
1 parent e6a1f63 commit becc7f8

33 files changed

+1207
-103
lines changed

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,24 @@ public static List<Map<String, Object>> mutationsToRecords(List<Mutation> mutati
7474
try {
7575
List<Map<String, Object>> records = new ArrayList<>();
7676
mutations.forEach(
77-
entry ->
78-
records.add(
79-
entry.asMap().entrySet().stream()
80-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
81-
77+
entry -> {
78+
Map<String, Object> record = new HashMap<>();
79+
entry
80+
.asMap()
81+
.forEach(
82+
(key, value) -> {
83+
Object newValue = value;
84+
// null `Value` objects `toString()` returns "NULL", causing test failures
85+
// when compared with Java nulls `toString()`. This workaround handles that.
86+
if (value.isNull()) {
87+
newValue = null;
88+
} else if (value.getType() == Type.array(Type.string())) {
89+
newValue = new ArrayList<>(value.getAsStringList());
90+
}
91+
record.put(key, newValue);
92+
});
93+
records.add(record);
94+
});
8295
return records;
8396
} catch (Exception e) {
8497
throw new RuntimeException("Error converting TableResult to Records", e);

v1/src/main/java/com/google/cloud/teleport/spanner/AvroRecordConverter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public Mutation apply(GenericRecord record) {
9898
case PG_TEXT:
9999
case JSON:
100100
case PG_JSONB:
101+
case UUID:
102+
case PG_UUID:
101103
builder.set(column.name()).to(readString(record, avroType, fieldName).orElse(null));
102104
break;
103105
case BYTES:
@@ -168,6 +170,8 @@ public Mutation apply(GenericRecord record) {
168170
case PG_VARCHAR:
169171
case PG_TEXT:
170172
case JSON:
173+
case UUID:
174+
case PG_UUID:
171175
builder
172176
.set(column.name())
173177
.toStringArray(readStringArray(record, arrayType, fieldName).orElse(null));

v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java

Lines changed: 7 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
import com.google.cloud.spanner.Dialect;
5757
import com.google.cloud.teleport.spanner.common.NumericUtils;
58+
import com.google.cloud.teleport.spanner.common.SizedType;
5859
import com.google.cloud.teleport.spanner.common.Type;
5960
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
6061
import com.google.cloud.teleport.spanner.ddl.Column;
@@ -561,7 +562,7 @@ public Table toTable(String tableName, Schema schema) {
561562
}
562563
if (Strings.isNullOrEmpty(sqlType)) {
563564
Type spannerType = inferType(avroType, true);
564-
sqlType = toString(spannerType, true);
565+
sqlType = SizedType.typeString(spannerType, -1, true);
565566
}
566567
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
567568
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
@@ -685,6 +686,11 @@ com.google.cloud.teleport.spanner.common.Type inferType(Schema f, boolean suppor
685686
? com.google.cloud.teleport.spanner.common.Type.float64()
686687
: com.google.cloud.teleport.spanner.common.Type.pgFloat8();
687688
case STRING:
689+
if (LogicalTypes.uuid().equals(logicalType)) {
690+
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
691+
? com.google.cloud.teleport.spanner.common.Type.uuid()
692+
: com.google.cloud.teleport.spanner.common.Type.pgUuid();
693+
}
688694
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
689695
? com.google.cloud.teleport.spanner.common.Type.string()
690696
: com.google.cloud.teleport.spanner.common.Type.pgVarchar();
@@ -727,79 +733,4 @@ com.google.cloud.teleport.spanner.common.Type inferType(Schema f, boolean suppor
727733
}
728734
throw new IllegalArgumentException("Cannot infer a type " + f);
729735
}
730-
731-
private String toString(
732-
com.google.cloud.teleport.spanner.common.Type spannerType, boolean supportArray) {
733-
switch (spannerType.getCode()) {
734-
case BOOL:
735-
return "BOOL";
736-
case PG_BOOL:
737-
return "boolean";
738-
case INT64:
739-
return "INT64";
740-
case PG_INT8:
741-
return "bigint";
742-
case FLOAT32:
743-
return "FLOAT32";
744-
case PG_FLOAT4:
745-
return "real";
746-
case FLOAT64:
747-
return "FLOAT64";
748-
case PG_FLOAT8:
749-
return "double precision";
750-
case STRING:
751-
return "STRING(MAX)";
752-
case PG_TEXT:
753-
return "text";
754-
case PG_VARCHAR:
755-
return "character varying";
756-
case BYTES:
757-
return "BYTES(MAX)";
758-
case PG_BYTEA:
759-
return "bytea";
760-
case TIMESTAMP:
761-
return "TIMESTAMP";
762-
case PG_TIMESTAMPTZ:
763-
return "timestamp with time zone";
764-
case PG_SPANNER_COMMIT_TIMESTAMP:
765-
return "spanner.commit_timestamp";
766-
case DATE:
767-
return "DATE";
768-
case PG_DATE:
769-
return "date";
770-
case NUMERIC:
771-
return "NUMERIC";
772-
case PG_NUMERIC:
773-
return "numeric";
774-
case JSON:
775-
return "JSON";
776-
case PROTO:
777-
return "PROTO<" + spannerType.getProtoTypeFqn() + ">";
778-
case ENUM:
779-
return "ENUM<" + spannerType.getProtoTypeFqn() + ">";
780-
case ARRAY:
781-
{
782-
if (supportArray) {
783-
com.google.cloud.teleport.spanner.common.Type element =
784-
spannerType.getArrayElementType();
785-
String elementStr = toString(element, false);
786-
return "ARRAY<" + elementStr + ">";
787-
}
788-
// otherwise fall through and throw an error.
789-
break;
790-
}
791-
case PG_ARRAY:
792-
{
793-
if (supportArray) {
794-
com.google.cloud.teleport.spanner.common.Type element =
795-
spannerType.getArrayElementType();
796-
String elementStr = toString(element, false);
797-
return elementStr + "[]";
798-
}
799-
// otherwise fall through and throw an error.
800-
break;
801-
}
802-
}
803-
throw new IllegalArgumentException("Cannot to string the type " + spannerType);
804-
}
805736
}

v1/src/main/java/com/google/cloud/teleport/spanner/BuildReadFromTableOperations.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,28 @@ String createColumnExpression(Column col) {
164164
+ ") AS element), []) END AS "
165165
+ col.name();
166166
}
167+
// TODO(b/394493438): Remove casting once google-cloud-spanner supports UUID type
168+
if (col.typeString().equals("UUID")) {
169+
return String.format("CAST(t.`%s` AS STRING) AS %s", col.name(), col.name());
170+
}
171+
if (col.typeString().equals("ARRAY<UUID>")) {
172+
return String.format(
173+
"CASE WHEN t.`%s` IS NULL THEN NULL ELSE "
174+
+ "ARRAY(SELECT CAST(e AS STRING) FROM UNNEST(%s) AS e) END AS %s",
175+
col.name(), col.name(), col.name());
176+
}
167177
return "t.`" + col.name() + "`";
168178
case POSTGRESQL:
179+
// TODO(b/394493438): Remove casting once google-cloud-spanner supports UUID type
180+
if (col.typeString().equals("uuid")) {
181+
return String.format("t.\"%s\"::text AS \"%s\"", col.name(), col.name());
182+
}
183+
if (col.typeString().equals("uuid[]")) {
184+
return String.format(
185+
"CASE WHEN t.\"%s\" IS NULL THEN NULL ELSE "
186+
+ "ARRAY(SELECT e::text FROM UNNEST(t.\"%s\") AS e) END AS \"%s\"",
187+
col.name(), col.name(), col.name());
188+
}
169189
return "t.\"" + col.name() + "\"";
170190
default:
171191
throw new IllegalArgumentException(

v1/src/main/java/com/google/cloud/teleport/spanner/CSVRecordToMutation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ protected final Mutation parseRow(
194194
case PG_TEXT:
195195
columnValue = Value.string(cellValue);
196196
break;
197+
case UUID:
198+
case PG_UUID:
199+
columnValue = Value.string(isNullValue ? null : cellValue);
200+
break;
197201
case DATE:
198202
case PG_DATE:
199203
if (isNullValue) {

v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,9 @@ private Schema avroType(
486486
case JSON:
487487
case PG_JSONB:
488488
return SchemaBuilder.builder().stringType();
489+
case UUID:
490+
case PG_UUID:
491+
return LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().stringType());
489492
case BYTES:
490493
case PG_BYTEA:
491494
case PROTO:

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public void processElement(ProcessContext c) throws Exception {
282282
c.output(ddl);
283283
}
284284
}));
285-
PCollection<ReadOperation> tables =
285+
PCollection<ReadOperation> tableReadOperations =
286286
ddl.apply("Build table read operations", new BuildReadFromTableOperations(tableNames));
287287

288288
PCollection<KV<String, Void>> allTableAndViewNames =
@@ -455,7 +455,7 @@ public void processElement(ProcessContext c) {
455455
.apply("As view", View.asMap());
456456

457457
PCollection<Struct> rows =
458-
tables.apply(
458+
tableReadOperations.apply(
459459
"Read all rows from Spanner",
460460
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
461461

v1/src/main/java/com/google/cloud/teleport/spanner/SpannerRecordConverter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ public GenericRecord convert(Struct row) {
254254
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
255255
} else if (spannerType.equals("DATE")) {
256256
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
257+
} else if (spannerType.equals("UUID")) {
258+
builder.set(field, nullValue ? null : row.getString(fieldIndex));
257259
}
258260
} else if (dialect == Dialect.POSTGRESQL) {
259261
if (spannerType.equals("jsonb")) {
@@ -266,6 +268,8 @@ public GenericRecord convert(Struct row) {
266268
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
267269
} else if (spannerType.equals("date")) {
268270
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
271+
} else if (spannerType.equals("uuid")) {
272+
builder.set(field, nullValue ? null : row.getString(fieldIndex));
269273
}
270274
}
271275
break;
@@ -368,7 +372,9 @@ public GenericRecord convert(Struct row) {
368372
case STRING:
369373
{
370374
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
371-
if (fieldInfo.matchesArrayPattern() || spannerType.equals("ARRAY<JSON>")) {
375+
if (fieldInfo.matchesArrayPattern()
376+
|| spannerType.equals("ARRAY<JSON>")
377+
|| spannerType.equals("ARRAY<UUID>")) {
372378
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
373379
} else if (spannerType.equals("ARRAY<TIMESTAMP>")) {
374380
setTimestampArray(row, builder, field, fieldIndex, nullValue);
@@ -380,7 +386,8 @@ public GenericRecord convert(Struct row) {
380386
if (spannerType.equals("jsonb[]")) {
381387
builder.set(field, nullValue ? null : row.getPgJsonbList(fieldIndex));
382388
} else if (fieldInfo.matchesVarcharArrayPattern()
383-
|| spannerType.equals("text[]")) {
389+
|| spannerType.equals("text[]")
390+
|| spannerType.equals("uuid[]")) {
384391
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
385392
} else if (spannerType.equals("timestamp with time zone[]")) {
386393
setTimestampArray(row, builder, field, fieldIndex, nullValue);

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportPipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
*
5252
* <p>Schema file must have all column and type definition in one line. Schema file must use the
5353
* data type names of Cloud Spanner. We currently support the following Cloud Spanner data types: -
54-
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP
54+
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP - UUID
5555
*
5656
* <p>Input format properties: - \\N in the source column will be considered as NULL value when
5757
* writing to Cloud Spanner. - If you need to escape characters, you can use the "fieldQualifier"

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ public static Code parseSpannerDataType(String columnType, Dialect dialect) {
481481
return Code.PROTO;
482482
} else if (columnType.startsWith("ENUM") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
483483
return Code.ENUM;
484+
} else if (columnType.equalsIgnoreCase("UUID") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
485+
return Code.UUID;
484486
} else if (columnType.equalsIgnoreCase("bigint") && dialect == Dialect.POSTGRESQL) {
485487
return Code.PG_INT8;
486488
} else if (columnType.equalsIgnoreCase("real") && dialect == Dialect.POSTGRESQL) {
@@ -508,6 +510,8 @@ public static Code parseSpannerDataType(String columnType, Dialect dialect) {
508510
} else if (columnType.equalsIgnoreCase("spanner.commit_timestamp")
509511
&& dialect == Dialect.POSTGRESQL) {
510512
return Code.PG_SPANNER_COMMIT_TIMESTAMP;
513+
} else if (columnType.equalsIgnoreCase("uuid") && dialect == Dialect.POSTGRESQL) {
514+
return Code.PG_UUID;
511515
} else {
512516
throw new IllegalArgumentException(
513517
"Unrecognized or unsupported column data type: " + columnType);

0 commit comments

Comments
 (0)