diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java index 7b22d121f0..6c6bcdfead 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java @@ -577,7 +577,11 @@ protected void executeSqlScript(JDBCResourceManager resourceManager, String reso for (String d : ddls) { if (!d.isBlank()) { try { - resourceManager.runSQLUpdate(d); + if (d.toLowerCase().trim().startsWith("select")) { + resourceManager.runSQLQuery(d); + } else { + resourceManager.runSQLUpdate(d); + } } catch (Exception e) { LOG.error("Exception while executing DDL {}", d, e); throw e; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDatastreamToSpannerDataTypesIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDatastreamToSpannerDataTypesIT.java new file mode 100644 index 0000000000..7273df2f46 --- /dev/null +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDatastreamToSpannerDataTypesIT.java @@ -0,0 +1,671 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.spanner.Struct; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.conditions.ChainedConditionCheck; +import org.apache.beam.it.conditions.ConditionCheck; +import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager; +import org.apache.beam.it.gcp.datastream.DatastreamResourceManager; +import org.apache.beam.it.gcp.datastream.PostgresqlSource; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck; +import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts; +import org.apache.beam.it.gcp.storage.GcsResourceManager; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An integration test for {@link DataStreamToSpanner} Flex template which tests migration of all + * PostgreSQL data types. + */ +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(DataStreamToSpanner.class) +@RunWith(JUnit4.class) +public class PostgreSQLDatastreamToSpannerDataTypesIT extends DataStreamToSpannerITBase { + + private static final Logger LOG = + LoggerFactory.getLogger(PostgreSQLDatastreamToSpannerDataTypesIT.class); + + private static final String POSTGRESQL_DDL_RESOURCE = + "PostgreSQLDataTypesIT/postgresql-data-types.sql"; + private static final String SPANNER_DDL_RESOURCE = "PostgreSQLDataTypesIT/spanner-schema.sql"; + private static final String PG_DIALECT_SPANNER_DDL_RESOURCE = + "PostgreSQLDataTypesIT/pg-dialect-spanner-schema.sql"; + + private static final List UNSUPPORTED_TYPE_TABLES = + List.of( + "t_bigint_array_to_int64_array", + "t_bigint_array_to_string", + "t_bit_to_bool_array", + "t_bit_varying_to_bool_array", + "t_bool_array_to_bool_array", + "t_bool_array_to_string", + "t_box", + "t_box_to_float64_array", + "t_circle", + "t_circle_to_float64_array", + "t_datemultirange", + "t_daterange", + "t_enum", + "t_float_array_to_float64_array", + "t_float_array_to_string", + "t_int_array_to_int64_array", + "t_int_array_to_string", + "t_int4multirange", + "t_int4range", + "t_int8multirange", + "t_int8range", + "t_interval", + "t_interval_to_int64", + "t_line_to_float64_array", + "t_lseg_to_float64_array", + "t_money_to_int64", + "t_nummultirange", + "t_numrange", + "t_path", + "t_path_to_float64_array", + "t_pg_lsn", + "t_pg_snapshot", + "t_point_to_float64_array", + "t_polygon_to_float64_array", + "t_real_array_to_float32_array", + "t_real_array_to_string", + "t_smallint_array_to_int64_array", + "t_smallint_array_to_string", + "t_time", + "t_time_with_time_zone", + "t_time_without_time_zone", + "t_timetz", + "t_tsmultirange", + "t_tsquery", + "t_tsrange", + "t_tstzmultirange", + "t_tstzrange", + "t_tsvector", + "t_txid_snapshot", + "t_varbit_to_bool_array"); + private static final String PUBLICATION_NAME = "data_types_test_publication"; + private static final String REPLICATION_SLOT_NAME = "data_types_test_replication_slot"; + private static final String PG_DIALECT_REPLICATION_SLOT_NAME = + "pg_dialect_data_types_test_replication_slot"; + + private static boolean initialized = false; + private static CloudPostgresResourceManager postgresResourceManager; + private static SpannerResourceManager spannerResourceManager; + private static SpannerResourceManager pgDialectSpannerResourceManager; + private static GcsResourceManager gcsResourceManager; + private static PubsubResourceManager pubsubResourceManager; + private static DatastreamResourceManager datastreamResourceManager; + + private static HashSet testInstances = new HashSet<>(); + + @Before + public void setUp() throws IOException { + skipBaseCleanup = true; + synchronized (PostgreSQLDatastreamToSpannerDataTypesIT.class) { + testInstances.add(this); + if (!initialized) { + LOG.info("Setting up PostgreSQL resource manager..."); + postgresResourceManager = CloudPostgresResourceManager.builder(testName).build(); + LOG.info( + "PostgreSQL resource manager created with URI: {}", postgresResourceManager.getUri()); + LOG.info("Setting up Spanner resource manager..."); + spannerResourceManager = setUpSpannerResourceManager(); + LOG.info( + "Spanner resource manager created with instance ID: {}", + spannerResourceManager.getInstanceId()); + LOG.info("Setting up PG dialect Spanner resource manager..."); + pgDialectSpannerResourceManager = setUpPGDialectSpannerResourceManager(); + LOG.info( + "PG dialect Spanner resource manager created with instance ID: {}", + pgDialectSpannerResourceManager.getInstanceId()); + LOG.info("Setting up GCS resource manager..."); + gcsResourceManager = setUpSpannerITGcsResourceManager(); + LOG.info("GCS resource manager created with bucket: {}", gcsResourceManager.getBucket()); + LOG.info("Setting up Pub/Sub resource manager..."); + pubsubResourceManager = setUpPubSubResourceManager(); + LOG.info("Pub/Sub resource manager created."); + LOG.info("Setting up Datastream resource manager..."); + datastreamResourceManager = + DatastreamResourceManager.builder(testName, PROJECT, REGION) + .setCredentialsProvider(credentialsProvider) + .setPrivateConnectivity("datastream-connect-2") + .build(); + LOG.info("Datastream resource manager created"); + + LOG.info("Executing PostgreSQL DDL script..."); + executeSqlScript(postgresResourceManager, POSTGRESQL_DDL_RESOURCE); + + initialized = true; + } + } + } + + @AfterClass + public static void cleanUp() throws IOException { + LOG.info("Cleaning up resources..."); + for (PostgreSQLDatastreamToSpannerDataTypesIT instance : testInstances) { + instance.tearDownBase(); + } + + // It is important to clean up Datastream before trying to drop the replication slot. + ResourceManagerUtils.cleanResources(datastreamResourceManager); + + try { + postgresResourceManager.runSQLQuery( + "SELECT pg_drop_replication_slot('" + REPLICATION_SLOT_NAME + "')"); + } catch (Exception e) { + LOG.warn("Failed to drop replication slot {}:", REPLICATION_SLOT_NAME, e); + } + try { + postgresResourceManager.runSQLQuery( + "SELECT pg_drop_replication_slot('" + PG_DIALECT_REPLICATION_SLOT_NAME + "')"); + } catch (Exception e) { + LOG.warn("Failed to drop replication slot {}:", PG_DIALECT_REPLICATION_SLOT_NAME, e); + } + try { + postgresResourceManager.runSQLUpdate("DROP PUBLICATION IF EXISTS " + PUBLICATION_NAME); + } catch (Exception e) { + LOG.warn("Failed to drop publication {}:", PUBLICATION_NAME, e); + } + + ResourceManagerUtils.cleanResources( + postgresResourceManager, + spannerResourceManager, + pgDialectSpannerResourceManager, + gcsResourceManager, + pubsubResourceManager); + } + + @Test + public void testPostgreSqlDataTypes() throws Exception { + LOG.info("Creating Spanner DDL..."); + createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE); + + PostgresqlSource postgresqlSource = + PostgresqlSource.builder( + postgresResourceManager.getHost(), + postgresResourceManager.getUsername(), + postgresResourceManager.getPassword(), + postgresResourceManager.getPort(), + postgresResourceManager.getDatabaseName(), + REPLICATION_SLOT_NAME, + PUBLICATION_NAME) + .setAllowedTables(Map.of("public", getAllowedTables())) + .build(); + + LOG.info("Launching Dataflow job..."); + PipelineLauncher.LaunchInfo jobInfo = + launchDataflowJob( + "postgresql-data-types", + null, + null, + "postgresql-datastream-to-spanner-data-types", + spannerResourceManager, + pubsubResourceManager, + new HashMap<>(), + null, + null, + gcsResourceManager, + datastreamResourceManager, + null, + postgresqlSource); + assertThatPipeline(jobInfo).isRunning(); + + Map>> expectedData = getExpectedData(); + + ChainedConditionCheck condition = buildConditionCheck(spannerResourceManager, expectedData); + LOG.info("Waiting for pipeline to process data..."); + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(20)), condition); + assertThatResult(result).meetsConditions(); + + validateResult(spannerResourceManager, expectedData); + } + + @Test + public void testPostgreSqlDataTypesPGDialect() throws Exception { + LOG.info("Creating PG Dialect Spanner DDL..."); + createSpannerDDL(pgDialectSpannerResourceManager, PG_DIALECT_SPANNER_DDL_RESOURCE); + + PostgresqlSource postgresqlSource = + PostgresqlSource.builder( + postgresResourceManager.getHost(), + postgresResourceManager.getUsername(), + postgresResourceManager.getPassword(), + postgresResourceManager.getPort(), + postgresResourceManager.getDatabaseName(), + PG_DIALECT_REPLICATION_SLOT_NAME, + PUBLICATION_NAME) + .setAllowedTables(Map.of("public", getAllowedTables())) + .build(); + + LOG.info("Launching Dataflow job..."); + PipelineLauncher.LaunchInfo jobInfo = + launchDataflowJob( + "postgresql-data-types-pg-dialect", + null, + null, + "postgresql-datastream-to-spanner-data-types-pg-dialect", + pgDialectSpannerResourceManager, + pubsubResourceManager, + new HashMap<>(), + null, + null, + gcsResourceManager, + datastreamResourceManager, + null, + postgresqlSource); + assertThatPipeline(jobInfo).isRunning(); + + Map>> expectedData = getExpectedDataPGDialect(); + + ChainedConditionCheck condition = + buildConditionCheck(pgDialectSpannerResourceManager, expectedData); + LOG.info("Waiting for pipeline to process data..."); + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(20)), condition); + assertThatResult(result).meetsConditions(); + + validateResult(pgDialectSpannerResourceManager, expectedData); + } + + private void validateResult( + SpannerResourceManager resourceManager, Map>> expectedData) { + // These types are not mapped as expected, ignore them to avoid failing the test. + Set ignoredTypeMappings = + Set.of( + "bit", + "bit_to_string", + "bit_varying", + "bit_varying_to_string", + "bytea", + "macaddr", + "macaddr8", + "uuid_to_bytes", + "varbit", + "varbit_to_string", + "t_bigint_array_to_int64_array", + "t_bigint_array_to_string", + "t_bit_to_bool_array", + "t_bit_varying_to_bool_array", + "t_bool_array_to_bool_array", + "t_bool_array_to_string", + "t_box_to_float64_array", + "t_circle_to_float64_array", + "t_float_array_to_float64_array", + "t_float_array_to_string", + "t_int_array_to_int64_array", + "t_int_array_to_string", + "t_line_to_float64_array", + "t_lseg_to_float64_array", + "t_money_to_int64", + "t_path_to_float64_array", + "t_point_to_float64_array", + "t_polygon_to_float64_array", + "t_real_array_to_float32_array", + "t_real_array_to_string", + "t_smallint_array_to_int64_array", + "t_smallint_array_to_string", + "t_varbit_to_bool_array"); + // Validate supported data types. + for (Map.Entry>> entry : expectedData.entrySet()) { + String type = entry.getKey(); + String tableName = String.format("t_%s", type); + if (ignoredTypeMappings.contains(type) || ignoredTypeMappings.contains(tableName)) { + LOG.warn( + "Mapping for {} is ignored to avoid failing the test (it does not map as expected)...", + type); + continue; + } + LOG.info("Asserting type: {}", type); + + List rows = resourceManager.readTableRecords(tableName, "id", "col"); + for (Struct row : rows) { + // Limit logs printed for very large strings. + String rowString = row.toString(); + if (rowString.length() > 1000) { + rowString = rowString.substring(0, 1000); + } + LOG.info("Found row: {}", rowString); + } + SpannerAsserts.assertThatStructs(rows) + .hasRecordsUnorderedCaseInsensitiveColumns(entry.getValue()); + } + + // Validate unsupported types. + for (String table : UNSUPPORTED_TYPE_TABLES) { + if (ignoredTypeMappings.contains(table)) { + LOG.warn( + "Mapping for {} is ignored to avoid failing the test (it does not map as expected)...", + table); + continue; + } + // Unsupported rows should still be migrated. Each source table has 2 rows. + assertThat(resourceManager.getRowCount(table)).isEqualTo(2L); + } + } + + private List> createRows(Object... values) { + List vals = Arrays.asList(values); + List> rows = new ArrayList<>(); + for (int i = 0; i < vals.size(); i++) { + Map row = new HashMap<>(); + row.put("id", i + 1); + row.put("col", vals.get(i)); + rows.add(row); + } + return rows; + } + + private List getAllowedTables() { + Map>> expectedData = getExpectedData(); + List tableNames = new ArrayList<>(expectedData.size() + UNSUPPORTED_TYPE_TABLES.size()); + for (String tableSuffix : expectedData.keySet()) { + tableNames.add("t_" + tableSuffix); + } + tableNames.addAll(UNSUPPORTED_TYPE_TABLES); + return tableNames; + } + + private ChainedConditionCheck buildConditionCheck( + SpannerResourceManager resourceManager, Map>> expectedData) { + // These tables fail to migrate the expected number of rows, ignore them to avoid having to wait + // for the timeout. + Set ignoredTables = + Set.of( + "t_bigint_array_to_int64_array", + "t_bigint_array_to_string", + "t_bit_to_bool_array", + "t_bit_varying_to_bool_array", + "t_bool_array_to_bool_array", + "t_bool_array_to_string", + "t_box_to_float64_array", + "t_circle_to_float64_array", + "t_float_array_to_float64_array", + "t_float_array_to_string", + "t_int_array_to_int64_array", + "t_int_array_to_string", + "t_line_to_float64_array", + "t_lseg_to_float64_array", + "t_path_to_float64_array", + "t_point_to_float64_array", + "t_polygon_to_float64_array", + "t_real_array_to_float32_array", + "t_real_array_to_string", + "t_smallint_array_to_int64_array", + "t_smallint_array_to_string", + "t_varbit_to_bool_array"); + List conditions = new ArrayList<>(expectedData.size()); + + ConditionCheck combinedCondition = null; + int numCombinedConditions = 0; + for (Map.Entry>> entry : expectedData.entrySet()) { + if (ignoredTables.contains(entry.getKey())) { + continue; + } + String tableName = String.format("t_%s", entry.getKey()); + int numRows = entry.getValue().size(); + ConditionCheck c = + SpannerRowsCheck.builder(resourceManager, tableName).setMinRows(numRows).build(); + if (combinedCondition == null) { + combinedCondition = c; + } else { + combinedCondition.and(c); + } + numCombinedConditions += 1; + if (numCombinedConditions >= 3) { + conditions.add(combinedCondition); + combinedCondition = null; + numCombinedConditions = 0; + } + } + + ConditionCheck unsupportedTableCondition = null; + for (String unsupportedTypeTable : UNSUPPORTED_TYPE_TABLES) { + if (ignoredTables.contains(unsupportedTypeTable)) { + continue; + } + ConditionCheck c = + SpannerRowsCheck.builder(resourceManager, unsupportedTypeTable).setMinRows(2).build(); + if (unsupportedTableCondition == null) { + unsupportedTableCondition = c; + } else { + unsupportedTableCondition.and(c); + } + } + conditions.add(unsupportedTableCondition); + + return ChainedConditionCheck.builder(conditions).build(); + } + + private Map>> getExpectedData() { + HashMap>> result = new HashMap<>(); + result.put("bigint", createRows("-9223372036854775808", "9223372036854775807", "42", "NULL")); + result.put( + "bigint_to_string", + createRows("-9223372036854775808", "9223372036854775807", "42", "NULL")); + result.put("bigserial", createRows("-9223372036854775808", "9223372036854775807", "42")); + result.put( + "bigserial_to_string", createRows("-9223372036854775808", "9223372036854775807", "42")); + result.put("bit", createRows("AA==", "gA==", "NULL")); + result.put("bit_to_string", createRows("AA==", "gA==", "NULL")); + result.put("bit_varying", createRows("UA==", "NULL")); + result.put("bit_varying_to_string", createRows("UA==", "NULL")); + result.put("bool", createRows("false", "true", "NULL")); + result.put("bool_to_string", createRows("false", "true", "NULL")); + result.put("boolean", createRows("false", "true", "NULL")); + result.put("boolean_to_string", createRows("false", "true", "NULL")); + result.put("bytea", createRows("YWJj", "NULL")); + result.put("bytea_to_string", createRows("YWJj", "NULL")); + result.put("char", createRows("a", "Θ", "NULL")); + result.put("character", createRows("a", "Ξ", "NULL")); + result.put("character_varying", createRows("testing character varying", "NULL")); + result.put("cidr", createRows("192.168.100.128/25", "NULL")); + result.put("date", createRows("0001-01-01", "9999-12-31", "NULL")); + result.put("date_to_string", createRows("0001-01-01", "9999-12-31", "NULL")); + result.put("decimal", createRows("0.12", "NULL")); + result.put("decimal_to_string", createRows("0.12", "NULL")); + result.put( + "double_precision", + createRows( + "-1.9876542E307", "1.9876542E307", "NaN", "-Infinity", "Infinity", "1.23", "NULL")); + result.put( + "double_precision_to_string", + createRows( + "-1.9876542E+307", "1.9876542E+307", "NaN", "-Infinity", "Infinity", "1.23", "NULL")); + result.put( + "float_to_float64", + createRows( + "-1.9876542E307", "1.9876542E307", "NaN", "-Infinity", "Infinity", "1.23", "NULL")); + result.put( + "float_to_string", + createRows( + "-1.9876542E+307", "1.9876542E+307", "NaN", "-Infinity", "Infinity", "1.23", "NULL")); + result.put( + "float4", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "2.34", "NULL")); + result.put( + "float4_to_float32", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "2.34", "NULL")); + result.put( + "float4_to_string", + createRows( + "-1.9876542E+38", "1.9876542E+38", "NaN", "-Infinity", "Infinity", "2.34", "NULL")); + result.put( + "float8", + createRows( + "-1.9876542E307", "1.9876542E307", "NaN", "-Infinity", "Infinity", "3.45", "NULL")); + result.put( + "float8_to_string", + createRows( + "-1.9876542E+307", "1.9876542E+307", "NaN", "-Infinity", "Infinity", "3.45", "NULL")); + result.put("inet", createRows("192.168.1.0/24", "NULL")); + result.put("int", createRows("-2147483648", "2147483647", "1", "NULL")); + result.put("int_to_string", createRows("-2147483648", "2147483647", "1", "NULL")); + result.put("integer", createRows("-2147483648", "2147483647", "2", "NULL")); + result.put("integer_to_string", createRows("-2147483648", "2147483647", "2", "NULL")); + result.put("int2", createRows("-32768", "32767", "3", "NULL")); + result.put("int2_to_string", createRows("-32768", "32767", "3", "NULL")); + result.put("int4", createRows("-2147483648", "2147483647", "4", "NULL")); + result.put("int4_to_string", createRows("-2147483648", "2147483647", "4", "NULL")); + result.put("int8", createRows("-9223372036854775808", "9223372036854775807", "5", "NULL")); + result.put( + "int8_to_string", createRows("-9223372036854775808", "9223372036854775807", "5", "NULL")); + result.put("json", createRows("{\"duplicate_key\":2}", "{\"null_key\":null}", "NULL")); + result.put( + "json_to_string", createRows("{\"duplicate_key\": 2}", "{\"null_key\": null}", "NULL")); + result.put("jsonb", createRows("{\"duplicate_key\":2}", "{\"null_key\":null}", "NULL")); + result.put( + "jsonb_to_string", createRows("{\"duplicate_key\": 2}", "{\"null_key\": null}", "NULL")); + result.put( + "large_decimal_to_numeric", + createRows( + // Decimals with scale larger than supported in Spanner are rounded + "0.12", "100000000000000000000000", "12345678901234567890.123456789", "NULL")); + result.put( + "large_decimal_to_string", + createRows( + "0.1200000000", + "99999999999999999999999.9999999999", + "123456789012345678901234567890.12...", + "NULL")); + result.put( + "large_numeric_to_numeric", + createRows( + // Decimals with scale larger than supported in Spanner are rounded + "0.12", "100000000000000000000000", "12345678901234567890.123456789", "NULL")); + result.put( + "large_numeric_to_string", + createRows( + "0.1200000000", + "99999999999999999999999.9999999999", + "123456789012345678901234567890.12...", + "NULL")); + result.put("macaddr", createRows("08:00:2b:01:02:03", "NULL")); + result.put("macaddr8", createRows("08:00:2b:01:02:03:04:05", "NULL")); + result.put("money", createRows("123.45", "NULL")); + result.put("numeric", createRows("4.56", "NULL")); + result.put("numeric_to_string", createRows("4.56", "NULL")); + result.put("oid", createRows("1000", "NULL")); + result.put( + "real", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "5.67", "NULL")); + result.put( + "real_to_float32", + createRows( + "-1.9876542E38", "1.9876542E38", "NaN", "-Infinity", "Infinity", "5.67", "NULL")); + result.put( + "real_to_string", + createRows( + "-1.9876542E+38", "1.9876542E+38", "NaN", "-Infinity", "Infinity", "5.67", "NULL")); + result.put("serial", createRows("-2147483648", "2147483647", "6")); + result.put("serial_to_string", createRows("-2147483648", "2147483647", "6")); + result.put("serial2", createRows("-32768", "32767", "7")); + result.put("serial2_to_string", createRows("-32768", "32767", "7")); + result.put("serial4", createRows("-2147483648", "2147483647", "8")); + result.put("serial4_to_string", createRows("-2147483648", "2147483647", "8")); + result.put("serial8", createRows("-9223372036854775808", "9223372036854775807", "9")); + result.put("serial8_to_string", createRows("-9223372036854775808", "9223372036854775807", "9")); + result.put("smallint", createRows("-32768", "32767", "10", "NULL")); + result.put("smallint_to_string", createRows("-32768", "32767", "10", "NULL")); + result.put("smallserial", createRows("-32768", "32767", "11")); + result.put("smallserial_to_string", createRows("-32768", "32767", "11")); + result.put("text", createRows("testing text", "NULL")); + result.put("timestamp", createRows("1970-01-02T03:04:05.123456Z", "NULL")); + result.put("timestamp_to_timestamp", createRows("1970-01-02T03:04:05.123456000Z", "NULL")); + result.put( + "timestamptz", + createRows("1970-02-02T18:05:06.123456000Z", "1970-02-03T05:05:06.123456000Z", "NULL")); + result.put( + "timestamptz_to_string", + createRows("1970-02-02T18:05:06.123456Z", "1970-02-03T05:05:06.123456Z", "NULL")); + result.put( + "timestamp_with_time_zone", + createRows("1970-02-02T18:05:06.123456000Z", "1970-02-03T05:05:06.123456000Z", "NULL")); + result.put( + "timestamp_with_timezone_to_string", + createRows("1970-02-02T18:05:06.123456Z", "1970-02-03T05:05:06.123456Z", "NULL")); + result.put("timestamp_without_time_zone", createRows("1970-01-02T03:04:05.123456Z", "NULL")); + result.put("uuid", createRows("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "NULL")); + result.put("uuid_to_bytes", createRows("oO68mZwLTvi7bWu5vTgKEQ==", "NULL")); + result.put("varbit", createRows("wA==", "NULL")); + result.put("varbit_to_string", createRows("wA==", "NULL")); + result.put("varchar", createRows("testing varchar", "NULL")); + result.put("xml", createRows("123", "NULL")); + return result; + } + + private Map>> getExpectedDataPGDialect() { + // Expected data for PG dialect is roughly similar to the spanner dialect data, with some minor + // differences. Notably, some data types like numeric have slightly different behaviour. + Map>> expectedData = getExpectedData(); + + expectedData.put("decimal", createRows("0.120000000", "NULL")); + expectedData.put("json", createRows("{\"duplicate_key\": 2}", "{\"null_key\": null}", "NULL")); + expectedData.put("jsonb", createRows("{\"duplicate_key\": 2}", "{\"null_key\": null}", "NULL")); + expectedData.put( + "large_decimal_to_numeric", + createRows( + // Decimals with scale larger than supported in Spanner are rounded + "0.120000000", + "100000000000000000000000.000000000", + "12345678901234567890.123456789", + "NULL")); + expectedData.put( + "large_numeric_to_numeric", + createRows( + // Decimals with scale larger than supported in Spanner are rounded + "0.120000000", + "100000000000000000000000.000000000", + "12345678901234567890.123456789", + "NULL")); + expectedData.put("numeric", createRows("4.560000000", "NULL")); + + return expectedData; + } +} diff --git a/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/pg-dialect-spanner-schema.sql b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/pg-dialect-spanner-schema.sql new file mode 100644 index 0000000000..ae237f29c4 --- /dev/null +++ b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/pg-dialect-spanner-schema.sql @@ -0,0 +1,139 @@ +CREATE TABLE IF NOT EXISTS t_bigint (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bigint_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bigint_array_to_int64_array (id INT8, col INT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bigint_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bigserial (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bigserial_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit (id INT8, col BYTEA, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit_to_bool_array (id INT8, col BOOL[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit_varying (id INT8, col BYTEA, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit_varying_to_bool_array (id INT8, col BOOL[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bit_varying_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bool (id INT8, col BOOL, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bool_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bool_array_to_bool_array (id INT8, col BOOL[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bool_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_boolean (id INT8, col BOOL, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_boolean_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_box (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_box_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bytea (id INT8, col BYTEA, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_bytea_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_char (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_character (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_character_varying (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_cidr (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_circle (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_circle_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_date (id INT8, col DATE, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_date_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_datemultirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_daterange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_decimal (id INT8, col NUMERIC, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_decimal_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_double_precision (id INT8, col FLOAT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_double_precision_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_enum (id INT8 , col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float_to_float64 (id INT8, col FLOAT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float_array_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float4 (id INT8, col FLOAT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float4_to_float32 (id INT8, col FLOAT4, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float4_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float8 (id INT8, col FLOAT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_float8_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_inet (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int_array_to_int64_array (id INT8, col INT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int2 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int2_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int4 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int4_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int4multirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int4range (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int8 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int8_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int8multirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_int8range (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_integer (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_integer_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_interval (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_interval_to_int64 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_json (id INT8, col JSONB, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_json_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_jsonb (id INT8, col JSONB, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_jsonb_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_large_decimal_to_numeric (id INT8, col NUMERIC, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_large_decimal_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_large_numeric_to_numeric (id INT8, col NUMERIC, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_large_numeric_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_line (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_line_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_lseg (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_lseg_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_macaddr (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_macaddr8 (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_money (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_money_to_int64 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_numeric (id INT8, col NUMERIC, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_numeric_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_nummultirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_numrange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_oid (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_path (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_path_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_pg_lsn (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_pg_snapshot (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_point (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_point_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_polygon (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_polygon_to_float64_array (id INT8, col FLOAT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_real (id INT8, col FLOAT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_real_to_float32 (id INT8, col FLOAT4, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_real_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_real_array_to_float32_array (id INT8, col FLOAT4[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_real_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial2 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial2_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial4 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial4_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial8 (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_serial8_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallint (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallint_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallint_array_to_int64_array (id INT8, col INT8[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallint_array_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallserial (id INT8, col INT8, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_smallserial_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_text (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_time (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_time_with_time_zone (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_time_without_time_zone (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamp (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamp_to_timestamp (id INT8, col TIMESTAMPTZ, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamp_with_time_zone (id INT8, col TIMESTAMPTZ, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamp_with_timezone_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamp_without_time_zone (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamptz (id INT8, col TIMESTAMPTZ, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timestamptz_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_timetz (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tsmultirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tsquery (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tsrange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tstzmultirange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tstzrange (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_tsvector (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_txid_snapshot (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_uuid (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_uuid_to_bytes (id INT8, col BYTEA, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_varbit (id INT8, col BYTEA, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_varbit_to_bool_array (id INT8, col BOOL[], PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_varbit_to_string (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_varchar (id INT8, col VARCHAR, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS t_xml (id INT8, col VARCHAR, PRIMARY KEY (id)); diff --git a/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/postgresql-data-types.sql b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/postgresql-data-types.sql new file mode 100644 index 0000000000..f022b582bb --- /dev/null +++ b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/postgresql-data-types.sql @@ -0,0 +1,287 @@ +ALTER USER CURRENT_USER WITH REPLICATION; + +CREATE TYPE myenum AS ENUM ('enum1', 'enum2', 'enum3'); + +CREATE TABLE t_bigint (id serial primary key, col bigint); +CREATE TABLE t_bigint_to_string (id serial primary key, col bigint); +CREATE TABLE t_bigint_array_to_int64_array (id serial primary key, col bigint[]); +CREATE TABLE t_bigint_array_to_string (id serial primary key, col bigint[]); +CREATE TABLE t_bigserial (id serial primary key, col bigserial); +CREATE TABLE t_bigserial_to_string (id serial primary key, col bigserial); +CREATE TABLE t_bit (id serial primary key, col bit); +CREATE TABLE t_bit_to_string (id serial primary key, col bit(32)); +CREATE TABLE t_bit_to_bool_array (id serial primary key, col bit(32)); +CREATE TABLE t_bit_varying (id serial primary key, col bit varying); +CREATE TABLE t_bit_varying_to_bool_array (id serial primary key, col bit varying(32)); +CREATE TABLE t_bit_varying_to_string (id serial primary key, col bit varying(32)); +CREATE TABLE t_bool (id serial primary key, col bool); +CREATE TABLE t_bool_to_string (id serial primary key, col bool); +CREATE TABLE t_bool_array_to_bool_array (id serial primary key, col bool[]); +CREATE TABLE t_bool_array_to_string (id serial primary key, col bool[]); +CREATE TABLE t_boolean (id serial primary key, col boolean); +CREATE TABLE t_boolean_to_string (id serial primary key, col boolean); +CREATE TABLE t_box (id serial primary key, col box); +CREATE TABLE t_box_to_float64_array (id serial primary key, col box); +CREATE TABLE t_bytea (id serial primary key, col bytea); +CREATE TABLE t_bytea_to_string (id serial primary key, col bytea); +CREATE TABLE t_char (id serial primary key, col char); +CREATE TABLE t_character (id serial primary key, col character); +CREATE TABLE t_character_varying (id serial primary key, col character varying); +CREATE TABLE t_cidr (id serial primary key, col cidr); +CREATE TABLE t_circle (id serial primary key, col circle); +CREATE TABLE t_circle_to_float64_array (id serial primary key, col circle); +CREATE TABLE t_date (id serial primary key, col date); +CREATE TABLE t_date_to_string (id serial primary key, col date); +CREATE TABLE t_datemultirange (id serial primary key, col datemultirange); +CREATE TABLE t_daterange (id serial primary key, col daterange); +CREATE TABLE t_decimal (id serial primary key, col numeric(10,2)); +CREATE TABLE t_decimal_to_string (id serial primary key, col decimal(10,2)); +CREATE TABLE t_double_precision (id serial primary key, col double precision); +CREATE TABLE t_double_precision_to_string (id serial primary key, col double precision); +CREATE TABLE t_enum (id serial primary key, col myenum); +CREATE TABLE t_float_to_float64 (id serial primary key, col float); +CREATE TABLE t_float_to_string (id serial primary key, col float); +CREATE TABLE t_float_array_to_float64_array (id serial primary key, col float[]); +CREATE TABLE t_float_array_to_string (id serial primary key, col float[]); +CREATE TABLE t_float4 (id serial primary key, col float4); +CREATE TABLE t_float4_to_float32 (id serial primary key, col float4); +CREATE TABLE t_float4_to_string (id serial primary key, col float4); +CREATE TABLE t_float8 (id serial primary key, col float8); +CREATE TABLE t_float8_to_string (id serial primary key, col float8); +CREATE TABLE t_inet (id serial primary key, col inet); +CREATE TABLE t_int (id serial primary key, col int); +CREATE TABLE t_int_to_string (id serial primary key, col int); +CREATE TABLE t_int_array_to_int64_array (id serial primary key, col int[]); +CREATE TABLE t_int_array_to_string (id serial primary key, col int[]); +CREATE TABLE t_int2 (id serial primary key, col int2); +CREATE TABLE t_int2_to_string (id serial primary key, col int2); +CREATE TABLE t_int4 (id serial primary key, col int4); +CREATE TABLE t_int4_to_string (id serial primary key, col int4); +CREATE TABLE t_int4multirange (id serial primary key, col int4multirange); +CREATE TABLE t_int4range (id serial primary key, col int4range); +CREATE TABLE t_int8 (id serial primary key, col int8); +CREATE TABLE t_int8_to_string (id serial primary key, col int8); +CREATE TABLE t_int8multirange (id serial primary key, col int8multirange); +CREATE TABLE t_int8range (id serial primary key, col int8range); +CREATE TABLE t_integer (id serial primary key, col integer); +CREATE TABLE t_integer_to_string (id serial primary key, col integer); +CREATE TABLE t_interval (id serial primary key, col interval); +CREATE TABLE t_interval_to_int64 (id serial primary key, col interval); +CREATE TABLE t_json (id serial primary key, col json); +CREATE TABLE t_json_to_string (id serial primary key, col json); +CREATE TABLE t_jsonb (id serial primary key, col jsonb); +CREATE TABLE t_jsonb_to_string (id serial primary key, col jsonb); +CREATE TABLE t_large_decimal_to_numeric (id serial primary key, col decimal(40,10)); +CREATE TABLE t_large_decimal_to_string (id serial primary key, col decimal(40,10)); +CREATE TABLE t_large_numeric_to_numeric (id serial primary key, col numeric(40,10)); +CREATE TABLE t_large_numeric_to_string (id serial primary key, col numeric(40,10)); +CREATE TABLE t_line (id serial primary key, col line); +CREATE TABLE t_line_to_float64_array (id serial primary key, col line); +CREATE TABLE t_lseg (id serial primary key, col lseg); +CREATE TABLE t_lseg_to_float64_array (id serial primary key, col lseg); +CREATE TABLE t_macaddr (id serial primary key, col macaddr); +CREATE TABLE t_macaddr8 (id serial primary key, col macaddr8); +CREATE TABLE t_money (id serial primary key, col money); +CREATE TABLE t_money_to_int64 (id serial primary key, col money); +CREATE TABLE t_numeric (id serial primary key, col numeric(10,2)); +CREATE TABLE t_numeric_to_string (id serial primary key, col numeric(10,2)); +CREATE TABLE t_nummultirange (id serial primary key, col nummultirange); +CREATE TABLE t_numrange (id serial primary key, col numrange); +CREATE TABLE t_oid (id serial primary key, col oid); +CREATE TABLE t_path (id serial primary key, col path); +CREATE TABLE t_path_to_float64_array (id serial primary key, col path); +CREATE TABLE t_pg_lsn (id serial primary key, col pg_lsn); +CREATE TABLE t_pg_snapshot (id serial primary key, col pg_snapshot); +CREATE TABLE t_point (id serial primary key, col point); +CREATE TABLE t_point_to_float64_array (id serial primary key, col point); +CREATE TABLE t_polygon (id serial primary key, col polygon); +CREATE TABLE t_polygon_to_float64_array (id serial primary key, col polygon); +CREATE TABLE t_real (id serial primary key, col real); +CREATE TABLE t_real_to_float32 (id serial primary key, col real); +CREATE TABLE t_real_to_string (id serial primary key, col real); +CREATE TABLE t_real_array_to_float32_array (id serial primary key, col real[]); +CREATE TABLE t_real_array_to_string (id serial primary key, col real[]); +CREATE TABLE t_serial (id serial primary key, col serial); +CREATE TABLE t_serial_to_string (id serial primary key, col serial); +CREATE TABLE t_serial2 (id serial primary key, col serial2); +CREATE TABLE t_serial2_to_string (id serial primary key, col serial2); +CREATE TABLE t_serial4 (id serial primary key, col serial4); +CREATE TABLE t_serial4_to_string (id serial primary key, col serial4); +CREATE TABLE t_serial8 (id serial primary key, col serial8); +CREATE TABLE t_serial8_to_string (id serial primary key, col serial8); +CREATE TABLE t_smallint (id serial primary key, col smallint); +CREATE TABLE t_smallint_to_string (id serial primary key, col smallint); +CREATE TABLE t_smallint_array_to_int64_array (id serial primary key, col smallint[]); +CREATE TABLE t_smallint_array_to_string (id serial primary key, col smallint[]); +CREATE TABLE t_smallserial (id serial primary key, col smallserial); +CREATE TABLE t_smallserial_to_string (id serial primary key, col smallserial); +CREATE TABLE t_text (id serial primary key, col text); +CREATE TABLE t_time (id serial primary key, col time); +CREATE TABLE t_time_with_time_zone (id serial primary key, col time with time zone); +CREATE TABLE t_time_without_time_zone (id serial primary key, col time without time zone); +CREATE TABLE t_timestamp (id serial primary key, col timestamp); +CREATE TABLE t_timestamp_to_timestamp (id serial primary key, col timestamp); +CREATE TABLE t_timestamp_with_time_zone (id serial primary key, col timestamp with time zone); +CREATE TABLE t_timestamp_with_timezone_to_string (id serial primary key, col timestamp with time zone); +CREATE TABLE t_timestamp_without_time_zone (id serial primary key, col timestamp without time zone); +CREATE TABLE t_timestamptz (id serial primary key, col timestamptz); +CREATE TABLE t_timestamptz_to_string (id serial primary key, col timestamptz); +CREATE TABLE t_timetz (id serial primary key, col timetz); +CREATE TABLE t_tsmultirange (id serial primary key, col tsmultirange); +CREATE TABLE t_tsquery (id serial primary key, col tsquery); +CREATE TABLE t_tsrange (id serial primary key, col tsrange); +CREATE TABLE t_tstzmultirange (id serial primary key, col tstzmultirange); +CREATE TABLE t_tstzrange (id serial primary key, col tstzrange); +CREATE TABLE t_tsvector (id serial primary key, col tsvector); +CREATE TABLE t_txid_snapshot (id serial primary key, col txid_snapshot); +CREATE TABLE t_uuid (id serial primary key, col uuid); +CREATE TABLE t_uuid_to_bytes (id serial primary key, col uuid); +CREATE TABLE t_varbit (id serial primary key, col varbit); +CREATE TABLE t_varbit_to_string (id serial primary key, col varbit(32)); +CREATE TABLE t_varbit_to_bool_array (id serial primary key, col varbit(32)); +CREATE TABLE t_varchar (id serial primary key, col varchar); +CREATE TABLE t_xml (id serial primary key, col xml); + +INSERT INTO t_bigint (col) VALUES (-9223372036854775808), (9223372036854775807), (42), (NULL); +INSERT INTO t_bigint_to_string (col) VALUES (-9223372036854775808), (9223372036854775807), (42), (NULL); +INSERT INTO t_bigint_array_to_int64_array (col) VALUES ('{-9223372036854775808, 9223372036854775807}'), (NULL); +INSERT INTO t_bigint_array_to_string (col) VALUES ('{-9223372036854775808, 9223372036854775807}'), (NULL); +INSERT INTO t_bigserial (col) VALUES (-9223372036854775808), (9223372036854775807), (42); +INSERT INTO t_bigserial_to_string (col) VALUES (-9223372036854775808), (9223372036854775807), (42); +INSERT INTO t_bit (col) VALUES (0::bit), (1::bit), (NULL); +INSERT INTO t_bit_to_string (col) VALUES (0::bit(32)), (1::bit(32)), (NULL); +INSERT INTO t_bit_to_bool_array (col) VALUES (0::bit(32)), (NULL); +INSERT INTO t_bit_varying (col) VALUES ('0101'::bit varying), (NULL); +INSERT INTO t_bit_varying_to_string (col) VALUES ('0101'::bit varying(32)), (NULL); +INSERT INTO t_bit_varying_to_bool_array (col) VALUES ('0101'::bit varying(32)), (NULL); +INSERT INTO t_bool (col) VALUES (false), (true), (NULL); +INSERT INTO t_bool_to_string (col) VALUES (false), (true), (NULL); +INSERT INTO t_bool_array_to_bool_array (col) VALUES ('{false, true}'), (NULL); +INSERT INTO t_bool_array_to_string (col) VALUES ('{false, true}'), (NULL); +INSERT INTO t_boolean (col) VALUES (false), (true), (NULL); +INSERT INTO t_boolean_to_string (col) VALUES (false), (true), (NULL); +INSERT INTO t_box (col) VALUES ('((1, 2), (3, 4))'), (NULL); +INSERT INTO t_box_to_float64_array (col) VALUES ('((1, 2), (3, 4))'), (NULL); +INSERT INTO t_bytea (col) VALUES ('abc'::bytea), (NULL); +INSERT INTO t_bytea_to_string (col) VALUES ('abc'::bytea), (NULL); +INSERT INTO t_char (col) VALUES ('a'), ('Θ'), (NULL); +INSERT INTO t_character (col) VALUES ('a'), ('Ξ'), (NULL); +INSERT INTO t_character_varying (col) VALUES ('testing character varying'), (NULL); +INSERT INTO t_cidr (col) VALUES ('192.168.100.128/25'), (NULL); +INSERT INTO t_circle (col) VALUES ('((1, 2), 3)'), (NULL); +INSERT INTO t_circle_to_float64_array (col) VALUES ('((1, 2), 3)'), (NULL); +INSERT INTO t_date (col) VALUES ('0001-01-01'::date), ('9999-12-31'::date), (NULL); +INSERT INTO t_date_to_string (col) VALUES ('0001-01-01'::date), ('9999-12-31'::date), (NULL); +INSERT INTO t_datemultirange (col) VALUES ('{[0001-01-01, 9999-12-31]}'), (NULL); +INSERT INTO t_daterange (col) VALUES ('[0001-01-01, 9999-12-31]'), (NULL); +INSERT INTO t_decimal (col) VALUES (0.12), (NULL); +INSERT INTO t_decimal_to_string (col) VALUES (0.12), (NULL); +INSERT INTO t_double_precision (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (1.23), (NULL); +INSERT INTO t_double_precision_to_string (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (1.23), (NULL); +INSERT INTO t_enum (col) VALUES ('enum1'), (NULL); +INSERT INTO t_float_to_float64 (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (1.23), (NULL); +INSERT INTO t_float_to_string (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (1.23), (NULL); +INSERT INTO t_float_array_to_float64_array (col) VALUES ('{-1.9876542e307, 1.9876542e307}'), (NULL); +INSERT INTO t_float_array_to_string (col) VALUES ('{-1.9876542e307, 1.9876542e307}'), (NULL); +INSERT INTO t_float4 (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (2.34), (NULL); +INSERT INTO t_float4_to_float32 (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (2.34), (NULL); +INSERT INTO t_float4_to_string (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (2.34), (NULL); +INSERT INTO t_float8 (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (3.45), (NULL); +INSERT INTO t_float8_to_string (col) VALUES ('-1.9876542e307'), ('1.9876542e307'), ('NaN'), ('-Infinity'), ('Infinity'), (3.45), (NULL); +INSERT INTO t_inet (col) VALUES ('192.168.1.0/24'), (NULL); +INSERT INTO t_int (col) VALUES (-2147483648), (2147483647), (1), (NULL); +INSERT INTO t_int_to_string (col) VALUES (-2147483648), (2147483647), (1), (NULL); +INSERT INTO t_int_array_to_int64_array (col) VALUES ('{-2147483648, 2147483647}'), (NULL); +INSERT INTO t_int_array_to_string (col) VALUES ('{-2147483648, 2147483647}'), (NULL); +INSERT INTO t_int2 (col) VALUES (-32768), (32767), (3), (NULL); +INSERT INTO t_int2_to_string (col) VALUES (-32768), (32767), (3), (NULL); +INSERT INTO t_int4 (col) VALUES (-2147483648), (2147483647), (4), (NULL); +INSERT INTO t_int4_to_string (col) VALUES (-2147483648), (2147483647), (4), (NULL); +INSERT INTO t_int4multirange (col) VALUES ('{[10, 20]}'), (NULL); +INSERT INTO t_int4range (col) VALUES ('[10, 20]'), (NULL); +INSERT INTO t_int8 (col) VALUES (-9223372036854775808), (9223372036854775807), (5), (NULL); +INSERT INTO t_int8_to_string (col) VALUES (-9223372036854775808), (9223372036854775807), (5), (NULL); +INSERT INTO t_int8multirange (col) VALUES ('{[30, 40]}'), (NULL); +INSERT INTO t_int8range (col) VALUES ('[30, 40]'), (NULL); +INSERT INTO t_integer (col) VALUES (-2147483648), (2147483647), (2), (NULL); +INSERT INTO t_integer_to_string (col) VALUES (-2147483648), (2147483647), (2), (NULL); +INSERT INTO t_interval (col) VALUES ('1 hour'), (NULL); +INSERT INTO t_interval_to_int64 (col) VALUES ('1 hour'), (NULL); +INSERT INTO t_json (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_json_to_string (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_jsonb (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_jsonb_to_string (col) VALUES ('{"duplicate_key": 1, "duplicate_key": 2}'), ('{"null_key": null}'), (NULL); +INSERT INTO t_large_decimal_to_numeric (col) VALUES (0.12), (99999999999999999999999.9999999999), (12345678901234567890.1234567890), (NULL); +INSERT INTO t_large_decimal_to_string (col) VALUES (0.12), (99999999999999999999999.9999999999), (123456789012345678901234567890.1234567890), (NULL); +INSERT INTO t_large_numeric_to_numeric (col) VALUES (0.12), (99999999999999999999999.9999999999), (12345678901234567890.1234567890), (NULL); +INSERT INTO t_large_numeric_to_string (col) VALUES (0.12), (99999999999999999999999.9999999999), (123456789012345678901234567890.1234567890), (NULL); +INSERT INTO t_line (col) VALUES ('{ 1, 2, 3 }'), (NULL); +INSERT INTO t_line_to_float64_array (col) VALUES ('{ 1, 2, 3 }'), (NULL); +INSERT INTO t_lseg (col) VALUES ('[ (1, 2), (3, 4) ]'), (NULL); +INSERT INTO t_lseg_to_float64_array (col) VALUES ('[ (1, 2), (3, 4) ]'), (NULL); +INSERT INTO t_macaddr (col) VALUES ('08:00:2b:01:02:03'), (NULL); +INSERT INTO t_macaddr8 (col) VALUES ('08:00:2b:01:02:03:04:05'), (NULL); +INSERT INTO t_money (col) VALUES ('123.45'::money), (NULL); +INSERT INTO t_money_to_int64 (col) VALUES ('123.45'::money), (NULL); +INSERT INTO t_numeric (col) VALUES (4.56), (NULL); +INSERT INTO t_numeric_to_string (col) VALUES (4.56), (NULL); +INSERT INTO t_nummultirange (col) VALUES ('{[50, 60]}'), (NULL); +INSERT INTO t_numrange (col) VALUES ('[50, 60]'), (NULL); +INSERT INTO t_oid (col) VALUES (1000::oid), (NULL); +INSERT INTO t_path (col) VALUES ('[ (1, 2), (3, 4), (5, 6) ]'), (NULL); +INSERT INTO t_path_to_float64_array (col) VALUES ('[ (1, 2), (3, 4), (5, 6) ]'), (NULL); +INSERT INTO t_pg_lsn (col) VALUES ('123/0'::pg_lsn), (NULL); +INSERT INTO t_pg_snapshot (col) VALUES ('1000:1000:'::pg_snapshot), (NULL); +INSERT INTO t_point (col) VALUES ('(1, 2)'), (NULL); +INSERT INTO t_point_to_float64_array (col) VALUES ('(1, 2)'), (NULL); +INSERT INTO t_polygon (col) VALUES ('( (1, 2), (3, 4) )'), (NULL); +INSERT INTO t_polygon_to_float64_array (col) VALUES ('( (1, 2), (3, 4) )'), (NULL); +INSERT INTO t_real (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (5.67), (NULL); +INSERT INTO t_real_to_float32 (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (5.67), (NULL); +INSERT INTO t_real_to_string (col) VALUES ('-1.9876542e38'), ('1.9876542e38'), ('NaN'), ('-Infinity'), ('Infinity'), (5.67), (NULL); +INSERT INTO t_real_array_to_float32_array (col) VALUES ('{-1.9876542e38, 1.9876542e38}'), (NULL); +INSERT INTO t_real_array_to_string (col) VALUES ('{-1.9876542e38, 1.9876542e38}'), (NULL); +INSERT INTO t_serial (col) VALUES (-2147483648), (2147483647), (6); +INSERT INTO t_serial_to_string (col) VALUES (-2147483648), (2147483647), (6); +INSERT INTO t_serial2 (col) VALUES (-32768), (32767), (7); +INSERT INTO t_serial2_to_string (col) VALUES (-32768), (32767), (7); +INSERT INTO t_serial4 (col) VALUES (-2147483648), (2147483647), (8); +INSERT INTO t_serial4_to_string (col) VALUES (-2147483648), (2147483647), (8); +INSERT INTO t_serial8 (col) VALUES (-9223372036854775808), (9223372036854775807), (9); +INSERT INTO t_serial8_to_string (col) VALUES (-9223372036854775808), (9223372036854775807), (9); +INSERT INTO t_smallint (col) VALUES (-32768), (32767), (10), (NULL); +INSERT INTO t_smallint_to_string (col) VALUES (-32768), (32767), (10), (NULL); +INSERT INTO t_smallint_array_to_int64_array (col) VALUES ('{-32768, 32767}'), (NULL); +INSERT INTO t_smallint_array_to_string (col) VALUES ('{-32768, 32767}'), (NULL); +INSERT INTO t_smallserial (col) VALUES (-32768), (32767), (11); +INSERT INTO t_smallserial_to_string (col) VALUES (-32768), (32767), (11); +INSERT INTO t_text (col) VALUES ('testing text'), (NULL); +INSERT INTO t_time (col) VALUES ('24:00:00'::time), (NULL); +INSERT INTO t_time_with_time_zone (col) VALUES ('23:59:59+10:00'), (NULL); +INSERT INTO t_time_without_time_zone (col) VALUES ('24:00:00'::time), (NULL); +INSERT INTO t_timestamp (col) VALUES ('1970-01-02 03:04:05.123456'::timestamp), (NULL); +INSERT INTO t_timestamp_to_timestamp (col) VALUES ('1970-01-02 03:04:05.123456'::timestamp), (NULL); +INSERT INTO t_timestamp_with_time_zone (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timestamp_with_timezone_to_string (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timestamp_without_time_zone (col) VALUES ('1970-01-02 03:04:05.123456'::timestamp), (NULL); +INSERT INTO t_timestamptz (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timestamptz_to_string (col) VALUES ('1970-02-03 04:05:06.123456+10:00'::timestamptz), ('1970-02-03 04:05:06.123456-01'::timestamptz), (NULL); +INSERT INTO t_timetz (col) VALUES ('23:59:59+10:00'), (NULL); +INSERT INTO t_tsmultirange (col) VALUES ('{[1970-01-01 01:00, 1970-01-01 02:00]}'), (NULL); +INSERT INTO t_tsquery (col) VALUES ('fat & rat'::tsquery), (NULL); +INSERT INTO t_tsrange (col) VALUES ('[1970-01-01 01:00, 1970-01-01 02:00]'), (NULL); +INSERT INTO t_tstzmultirange (col) VALUES ('{[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]}'), (NULL); +INSERT INTO t_tstzrange (col) VALUES ('[1970-01-01 01:00+10:00, 1970-01-01 02:00+10:00]'), (NULL); +INSERT INTO t_tsvector (col) VALUES ('a fat cat sat on a mat'::tsvector), (NULL); +INSERT INTO t_txid_snapshot (col) VALUES ('10:20:10,14,15'::txid_snapshot), (NULL); +INSERT INTO t_uuid (col) VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid), (NULL); +INSERT INTO t_uuid_to_bytes (col) VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid), (NULL); +INSERT INTO t_varbit (col) VALUES ('1100'::varbit), (NULL); +INSERT INTO t_varbit_to_string (col) VALUES ('1100'::varbit(32)), (NULL); +INSERT INTO t_varbit_to_bool_array (col) VALUES ('1100'::varbit(32)), (NULL); +INSERT INTO t_varchar (col) VALUES ('testing varchar'), (NULL); +INSERT INTO t_xml (col) VALUES ('123'::xml), (NULL); + +CREATE PUBLICATION data_types_test_publication FOR ALL TABLES; +SELECT pg_create_logical_replication_slot('data_types_test_replication_slot', 'pgoutput'); +SELECT pg_create_logical_replication_slot('pg_dialect_data_types_test_replication_slot', 'pgoutput'); \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/spanner-schema.sql b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/spanner-schema.sql new file mode 100644 index 0000000000..c86609cf98 --- /dev/null +++ b/v2/datastream-to-spanner/src/test/resources/PostgreSQLDataTypesIT/spanner-schema.sql @@ -0,0 +1,139 @@ +CREATE TABLE IF NOT EXISTS t_bigint (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bigint_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bigint_array_to_int64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bigint_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bigserial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bigserial_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit_to_bool_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit_varying (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit_varying_to_bool_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bit_varying_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bool (id INT64, col BOOL) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bool_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bool_array_to_bool_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bool_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_boolean (id INT64, col BOOL) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_boolean_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_box (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_box_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bytea (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_bytea_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_char (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_character (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_character_varying (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_cidr (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_circle (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_circle_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_date (id INT64, col DATE) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_date_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_datemultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_daterange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_decimal (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_decimal_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_double_precision (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_double_precision_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_enum (id INT64 , col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float_to_float64 (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float_array_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float4 (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float4_to_float32 (id INT64, col FLOAT32) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float4_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float8 (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_float8_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_inet (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int_array_to_int64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int2 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int2_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int4 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int4_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int4multirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int4range (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int8 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int8_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int8multirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_int8range (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_integer (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_integer_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_interval (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_interval_to_int64 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_json (id INT64, col JSON) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_json_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_jsonb (id INT64, col JSON) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_jsonb_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_large_decimal_to_numeric (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_large_decimal_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_large_numeric_to_numeric (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_large_numeric_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_line (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_line_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_lseg (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_lseg_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_macaddr (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_macaddr8 (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_money (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_money_to_int64 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_numeric (id INT64, col NUMERIC) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_numeric_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_nummultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_numrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_oid (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_path (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_path_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_pg_lsn (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_pg_snapshot (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_point (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_point_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_polygon (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_polygon_to_float64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_real (id INT64, col FLOAT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_real_to_float32 (id INT64, col FLOAT32) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_real_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_real_array_to_float32_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_real_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial2 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial2_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial4 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial4_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial8 (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_serial8_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallint (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallint_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallint_array_to_int64_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallint_array_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallserial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_smallserial_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_text (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_time (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_time_with_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_time_without_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamp (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamp_to_timestamp (id INT64, col TIMESTAMP) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamp_with_time_zone (id INT64, col TIMESTAMP) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamp_with_timezone_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamp_without_time_zone (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamptz (id INT64, col TIMESTAMP) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timestamptz_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_timetz (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tsmultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tsquery (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tsrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tstzmultirange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tstzrange (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_tsvector (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_txid_snapshot (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_uuid (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_uuid_to_bytes (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_varbit (id INT64, col BYTES(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_varbit_to_bool_array (id INT64, col ARRAY) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_varbit_to_string (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_varchar (id INT64, col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_xml (id INT64, col STRING(MAX)) PRIMARY KEY (id);