Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,22 @@ public static List<Map<String, Object>> mutationsToRecords(List<Mutation> mutati
try {
List<Map<String, Object>> records = new ArrayList<>();
mutations.forEach(
entry ->
records.add(
entry.asMap().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));

entry -> {
Map<String, Object> record = new HashMap<>();
entry
.asMap()
.forEach(
(key, value) -> {
Object newValue = value;
if (value.isNull()) {
newValue = null;
} else if (value.getType() == Type.array(Type.string())) {
newValue = new ArrayList<>(value.getAsStringList());
}
record.put(key, newValue);
});
records.add(record);
});
return records;
} catch (Exception e) {
throw new RuntimeException("Error converting TableResult to Records", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public Mutation apply(GenericRecord record) {
case PG_TEXT:
case JSON:
case PG_JSONB:
case UUID:
case PG_UUID:
builder.set(column.name()).to(readString(record, avroType, fieldName).orElse(null));
break;
case BYTES:
Expand Down Expand Up @@ -168,6 +170,8 @@ public Mutation apply(GenericRecord record) {
case PG_VARCHAR:
case PG_TEXT:
case JSON:
case UUID:
case PG_UUID:
builder
.set(column.name())
.toStringArray(readStringArray(record, arrayType, fieldName).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.spanner.common.NumericUtils;
import com.google.cloud.teleport.spanner.common.SizedType;
import com.google.cloud.teleport.spanner.common.Type;
import com.google.cloud.teleport.spanner.ddl.ChangeStream;
import com.google.cloud.teleport.spanner.ddl.Column;
Expand Down Expand Up @@ -561,7 +562,7 @@ public Table toTable(String tableName, Schema schema) {
}
if (Strings.isNullOrEmpty(sqlType)) {
Type spannerType = inferType(avroType, true);
sqlType = toString(spannerType, true);
sqlType = SizedType.typeString(spannerType, -1, true);
}
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
Expand Down Expand Up @@ -685,6 +686,11 @@ com.google.cloud.teleport.spanner.common.Type inferType(Schema f, boolean suppor
? com.google.cloud.teleport.spanner.common.Type.float64()
: com.google.cloud.teleport.spanner.common.Type.pgFloat8();
case STRING:
if (LogicalTypes.uuid().equals(logicalType)) {
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
? com.google.cloud.teleport.spanner.common.Type.uuid()
: com.google.cloud.teleport.spanner.common.Type.pgUuid();
}
return (dialect == Dialect.GOOGLE_STANDARD_SQL)
? com.google.cloud.teleport.spanner.common.Type.string()
: com.google.cloud.teleport.spanner.common.Type.pgVarchar();
Expand Down Expand Up @@ -727,79 +733,4 @@ com.google.cloud.teleport.spanner.common.Type inferType(Schema f, boolean suppor
}
throw new IllegalArgumentException("Cannot infer a type " + f);
}

private String toString(
com.google.cloud.teleport.spanner.common.Type spannerType, boolean supportArray) {
switch (spannerType.getCode()) {
case BOOL:
return "BOOL";
case PG_BOOL:
return "boolean";
case INT64:
return "INT64";
case PG_INT8:
return "bigint";
case FLOAT32:
return "FLOAT32";
case PG_FLOAT4:
return "real";
case FLOAT64:
return "FLOAT64";
case PG_FLOAT8:
return "double precision";
case STRING:
return "STRING(MAX)";
case PG_TEXT:
return "text";
case PG_VARCHAR:
return "character varying";
case BYTES:
return "BYTES(MAX)";
case PG_BYTEA:
return "bytea";
case TIMESTAMP:
return "TIMESTAMP";
case PG_TIMESTAMPTZ:
return "timestamp with time zone";
case PG_SPANNER_COMMIT_TIMESTAMP:
return "spanner.commit_timestamp";
case DATE:
return "DATE";
case PG_DATE:
return "date";
case NUMERIC:
return "NUMERIC";
case PG_NUMERIC:
return "numeric";
case JSON:
return "JSON";
case PROTO:
return "PROTO<" + spannerType.getProtoTypeFqn() + ">";
case ENUM:
return "ENUM<" + spannerType.getProtoTypeFqn() + ">";
case ARRAY:
{
if (supportArray) {
com.google.cloud.teleport.spanner.common.Type element =
spannerType.getArrayElementType();
String elementStr = toString(element, false);
return "ARRAY<" + elementStr + ">";
}
// otherwise fall through and throw an error.
break;
}
case PG_ARRAY:
{
if (supportArray) {
com.google.cloud.teleport.spanner.common.Type element =
spannerType.getArrayElementType();
String elementStr = toString(element, false);
return elementStr + "[]";
}
// otherwise fall through and throw an error.
break;
}
}
throw new IllegalArgumentException("Cannot to string the type " + spannerType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,26 @@ String createColumnExpression(Column col) {
+ ") AS element), []) END AS "
+ col.name();
}
if (col.typeString().equals("UUID")) {
return String.format("CAST(t.`%s` AS STRING) AS %s", col.name(), col.name());
}
if (col.typeString().equals("ARRAY<UUID>")) {
return String.format(
"CASE WHEN t.`%s` IS NULL THEN NULL ELSE "
+ "ARRAY(SELECT CAST(e AS STRING) FROM UNNEST(%s) AS e) END AS %s",
col.name(), col.name(), col.name());
}
return "t.`" + col.name() + "`";
case POSTGRESQL:
if (col.typeString().equals("uuid")) {
return String.format("t.\"%s\"::text AS \"%s\"", col.name(), col.name());
}
if (col.typeString().equals("uuid[]")) {
return String.format(
"CASE WHEN t.\"%s\" IS NULL THEN NULL ELSE "
+ "ARRAY(SELECT e::text FROM UNNEST(t.\"%s\") AS e) END AS \"%s\"",
col.name(), col.name(), col.name());
}
return "t.\"" + col.name() + "\"";
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ protected final Mutation parseRow(
case PG_TEXT:
columnValue = Value.string(cellValue);
break;
case UUID:
case PG_UUID:
columnValue = Value.string(isNullValue ? null : cellValue);
break;
case DATE:
case PG_DATE:
if (isNullValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ private Schema avroType(
case JSON:
case PG_JSONB:
return SchemaBuilder.builder().stringType();
case UUID:
case PG_UUID:
return LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().stringType());
case BYTES:
case PG_BYTEA:
case PROTO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void processElement(ProcessContext c) throws Exception {
c.output(ddl);
}
}));
PCollection<ReadOperation> tables =
PCollection<ReadOperation> tableReadOperations =
ddl.apply("Build table read operations", new BuildReadFromTableOperations(tableNames));

PCollection<KV<String, Void>> allTableAndViewNames =
Expand Down Expand Up @@ -455,7 +455,7 @@ public void processElement(ProcessContext c) {
.apply("As view", View.asMap());

PCollection<Struct> rows =
tables.apply(
tableReadOperations.apply(
"Read all rows from Spanner",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ public GenericRecord convert(Struct row) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("DATE")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("UUID")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
} else if (dialect == Dialect.POSTGRESQL) {
if (spannerType.equals("jsonb")) {
Expand All @@ -266,6 +268,8 @@ public GenericRecord convert(Struct row) {
builder.set(field, nullValue ? null : row.getTimestamp(fieldIndex).toString());
} else if (spannerType.equals("date")) {
builder.set(field, nullValue ? null : dateToString(row.getDate(fieldIndex)));
} else if (spannerType.equals("uuid")) {
builder.set(field, nullValue ? null : row.getString(fieldIndex));
}
}
break;
Expand Down Expand Up @@ -368,7 +372,9 @@ public GenericRecord convert(Struct row) {
case STRING:
{
if (dialect == Dialect.GOOGLE_STANDARD_SQL) {
if (fieldInfo.matchesArrayPattern() || spannerType.equals("ARRAY<JSON>")) {
if (fieldInfo.matchesArrayPattern()
|| spannerType.equals("ARRAY<JSON>")
|| spannerType.equals("ARRAY<UUID>")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("ARRAY<TIMESTAMP>")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
Expand All @@ -380,7 +386,8 @@ public GenericRecord convert(Struct row) {
if (spannerType.equals("jsonb[]")) {
builder.set(field, nullValue ? null : row.getPgJsonbList(fieldIndex));
} else if (fieldInfo.matchesVarcharArrayPattern()
|| spannerType.equals("text[]")) {
|| spannerType.equals("text[]")
|| spannerType.equals("uuid[]")) {
builder.set(field, nullValue ? null : row.getStringList(fieldIndex));
} else if (spannerType.equals("timestamp with time zone[]")) {
setTimestampArray(row, builder, field, fieldIndex, nullValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*
* <p>Schema file must have all column and type definition in one line. Schema file must use the
* data type names of Cloud Spanner. We currently support the following Cloud Spanner data types: -
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP
* BOOL - DATE - FLOAT32 - FLOAT64 - INT64 - STRING - TIMESTAMP - UUID
*
* <p>Input format properties: - \\N in the source column will be considered as NULL value when
* writing to Cloud Spanner. - If you need to escape characters, you can use the "fieldQualifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ public static Code parseSpannerDataType(String columnType, Dialect dialect) {
return Code.PROTO;
} else if (columnType.startsWith("ENUM") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
return Code.ENUM;
} else if (columnType.equalsIgnoreCase("UUID") && dialect == Dialect.GOOGLE_STANDARD_SQL) {
return Code.UUID;
} else if (columnType.equalsIgnoreCase("bigint") && dialect == Dialect.POSTGRESQL) {
return Code.PG_INT8;
} else if (columnType.equalsIgnoreCase("real") && dialect == Dialect.POSTGRESQL) {
Expand Down Expand Up @@ -508,6 +510,8 @@ public static Code parseSpannerDataType(String columnType, Dialect dialect) {
} else if (columnType.equalsIgnoreCase("spanner.commit_timestamp")
&& dialect == Dialect.POSTGRESQL) {
return Code.PG_SPANNER_COMMIT_TIMESTAMP;
} else if (columnType.equalsIgnoreCase("uuid") && dialect == Dialect.POSTGRESQL) {
return Code.PG_UUID;
} else {
throw new IllegalArgumentException(
"Unrecognized or unsupported column data type: " + columnType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public static String typeString(Type type, Integer size, boolean outputAsDdlRepr
return "FLOAT64";
case PG_FLOAT8:
return "double precision";
case UUID:
return "UUID";
case PG_UUID:
return "uuid";
case STRING:
return "STRING(" + (size == -1 ? "MAX" : Integer.toString(size)) + ")";
case PG_VARCHAR:
Expand Down Expand Up @@ -195,6 +199,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals("FLOAT64")) {
return t(Type.float64(), null);
}
if (spannerType.equals("UUID")) {
return t(Type.uuid(), null);
}
if (spannerType.startsWith("STRING")) {
String sizeStr = spannerType.substring(7, spannerType.length() - 1);
int size = sizeStr.equals("MAX") ? -1 : Integer.parseInt(sizeStr);
Expand Down Expand Up @@ -329,6 +336,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) {
if (spannerType.equals("text")) {
return t(Type.pgText(), -1);
}
if (spannerType.equals("uuid")) {
return t(Type.pgUuid(), null);
}
if (spannerType.startsWith("character varying")) {
int size = -1;
if (spannerType.length() > 18) {
Expand Down
Loading