diff --git a/compose.yaml b/compose.yaml index 4534291a..402038d2 100644 --- a/compose.yaml +++ b/compose.yaml @@ -142,6 +142,32 @@ services: MYSQL_PASSWORD: ${MYSQL_PASSWORD:-c2VjcmV0Cg==} MYSQL_DATABASE: ${MYSQL_DATABASE:-shop} + sqlserver: + container_name: sqlserver + build: + context: ./integration/sqlserver + ports: + - 1433:1433 + restart: always + environment: + ACCEPT_EULA: Y + SA_PASSWORD: ${SQLSERVER_PASSWORD:-Password123!} + MSSQL_DB: ${SQLSERVER_DATABASE:-testdb} + MSSQL_USER: ${SQLSERVER_USER:-sa} + healthcheck: + test: | + if [ -f "/opt/mssql-tools18/bin/sqlcmd" ]; then + /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "$${SA_PASSWORD}" -Q "SELECT 1" -C || exit 1 + elif [ -f "/opt/mssql-tools/bin/sqlcmd" ]; then + /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$${SA_PASSWORD}" -Q "SELECT 1" -C || exit 1 + else + exit 1 + fi + interval: 10s + timeout: 10s + retries: 15 + start_period: 60s + provider: build: context: . diff --git a/docs/resources/connection_sqlserver.md b/docs/resources/connection_sqlserver.md new file mode 100644 index 00000000..bbd4e6b6 --- /dev/null +++ b/docs/resources/connection_sqlserver.md @@ -0,0 +1,205 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "materialize_connection_sqlserver Resource - terraform-provider-materialize" +subcategory: "" +description: |- + A SQL Server connection establishes a link to a single database of a SQL Server instance. +--- + +# materialize_connection_sqlserver (Resource) + +A SQL Server connection establishes a link to a single database of a SQL Server instance. + +## Example Usage + +```terraform +resource "materialize_secret" "sqlserver_password" { + name = "sqlserver_password" + value = base64encode("c2VjcmV0Cg==") +} + +# Basic SQL Server connection +resource "materialize_connection_sqlserver" "basic" { + name = "sqlserver_connection" + host = "sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + validate = true +} + +# SQL Server connection with SSH tunnel +resource "materialize_connection_ssh_tunnel" "example_ssh_connection" { + name = "ssh_connection" + host = "bastion-host.example.com" + port = 22 + user = "ubuntu" +} + +resource "materialize_connection_sqlserver" "ssh_example" { + name = "sqlserver_ssh_connection" + host = "private-sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + + ssh_tunnel { + name = materialize_connection_ssh_tunnel.example_ssh_connection.name + } + + validate = false +} + +# SQL Server connection with AWS PrivateLink +resource "materialize_connection_aws_privatelink" "sqlserver_privatelink" { + name = "sqlserver_privatelink" + service_name = "com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc" + availability_zones = ["use1-az1", "use1-az4"] +} + +resource "materialize_connection_sqlserver" "privatelink_example" { + name = "sqlserver_privatelink_connection" + host = "sqlserver.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + + aws_privatelink { + name = materialize_connection_aws_privatelink.sqlserver_privatelink.name + } + + validate = false +} +``` + + +## Schema + +### Required + +- `database` (String) The target SQL Server database. +- `host` (String) The SQL Server database hostname. +- `name` (String) The identifier for the connection. +- `user` (Block List, Min: 1, Max: 1) The SQL Server database username.. Can be supplied as either free text using `text` or reference to a secret object using `secret`. (see [below for nested schema](#nestedblock--user)) + +### Optional + +- `aws_privatelink` (Block List, Max: 1) The AWS PrivateLink configuration for the SQL Server database. (see [below for nested schema](#nestedblock--aws_privatelink)) +- `comment` (String) Comment on an object in the database. +- `database_name` (String) The identifier for the connection database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `ownership_role` (String) The owernship role of the object. +- `password` (Block List, Max: 1) The SQL Server database password. (see [below for nested schema](#nestedblock--password)) +- `port` (Number) The SQL Server database port. +- `region` (String) The region to use for the resource connection. If not set, the default region is used. +- `schema_name` (String) The identifier for the connection schema in Materialize. Defaults to `public`. +- `ssh_tunnel` (Block List, Max: 1) The SSH tunnel configuration for the SQL Server database. (see [below for nested schema](#nestedblock--ssh_tunnel)) +- `validate` (Boolean) If the connection should wait for validation. + +### Read-Only + +- `id` (String) The ID of this resource. +- `qualified_sql_name` (String) The fully qualified name of the connection. + + +### Nested Schema for `user` + +Optional: + +- `secret` (Block List, Max: 1) The `user` secret value. Conflicts with `text` within this block. (see [below for nested schema](#nestedblock--user--secret)) +- `text` (String, Sensitive) The `user` text value. Conflicts with `secret` within this block + + +### Nested Schema for `user.secret` + +Required: + +- `name` (String) The user name. + +Optional: + +- `database_name` (String) The user database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The user schema name. Defaults to `public`. + + + + +### Nested Schema for `aws_privatelink` + +Required: + +- `name` (String) The aws_privatelink name. + +Optional: + +- `database_name` (String) The aws_privatelink database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The aws_privatelink schema name. Defaults to `public`. + + + +### Nested Schema for `password` + +Required: + +- `name` (String) The password name. + +Optional: + +- `database_name` (String) The password database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The password schema name. Defaults to `public`. + + + +### Nested Schema for `ssh_tunnel` + +Required: + +- `name` (String) The ssh_tunnel name. + +Optional: + +- `database_name` (String) The ssh_tunnel database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The ssh_tunnel schema name. Defaults to `public`. + +## Import + +Import is supported using the following syntax: + +```shell +#!/bin/bash + +# SQL Server connections can be imported using the connection name +terraform import materialize_connection_sqlserver.example : + +# Example +terraform import materialize_connection_sqlserver.example aws/us-east-1:my_sqlserver_connection +``` diff --git a/docs/resources/source_sqlserver.md b/docs/resources/source_sqlserver.md new file mode 100644 index 00000000..f8d8e08e --- /dev/null +++ b/docs/resources/source_sqlserver.md @@ -0,0 +1,193 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "materialize_source_sqlserver Resource - terraform-provider-materialize" +subcategory: "" +description: |- + A SQL Server source describes a SQL Server database instance you want Materialize to read data from using Change Data Capture (CDC). +--- + +# materialize_source_sqlserver (Resource) + +A SQL Server source describes a SQL Server database instance you want Materialize to read data from using Change Data Capture (CDC). + +## Example Usage + +```terraform +resource "materialize_secret" "sqlserver_password" { + name = "sqlserver_password" + value = base64encode("c2VjcmV0Cg==") +} + +resource "materialize_connection_sqlserver" "sqlserver_connection" { + name = "sqlserver_connection" + host = "sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" +} + +# Basic SQL Server source for specific tables +resource "materialize_source_sqlserver" "example" { + name = "sqlserver_source" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + table { + upstream_name = "dbo.customers" + name = "customers" + } + + table { + upstream_name = "dbo.orders" + name = "orders" + } + + table { + upstream_name = "dbo.products" + name = "products" + } +} + +# SQL Server source for all tables +resource "materialize_source_sqlserver" "all_tables" { + name = "sqlserver_source_all" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + # No table blocks specified means all tables will be included +} + +# SQL Server source with text columns and excluded columns +resource "materialize_source_sqlserver" "with_options" { + name = "sqlserver_source_with_options" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + table { + upstream_name = "dbo.users" + name = "users" + } + + table { + upstream_name = "dbo.posts" + name = "posts" + } + + table { + upstream_name = "dbo.comments" + name = "comments" + } + + # Convert unsupported data types to text + text_columns = ["dbo.users.description", "dbo.posts.content", "dbo.comments.metadata"] + + # Exclude problematic columns + exclude_columns = ["dbo.users.image_data", "dbo.posts.binary_data"] +} +``` + + +## Schema + +### Required + +- `name` (String) The identifier for the source. +- `sqlserver_connection` (Block List, Min: 1, Max: 1) The SQL Server connection to use in the source. (see [below for nested schema](#nestedblock--sqlserver_connection)) + +### Optional + +- `cluster_name` (String) The cluster to maintain this source. +- `comment` (String) Comment on an object in the database. +- `database_name` (String) The identifier for the source database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `exclude_columns` (List of String) Exclude specific columns when reading data from SQL Server. Can only be updated in place when also updating a corresponding `table` attribute. +- `expose_progress` (Block List, Max: 1) The name of the progress collection for the source. If this is not specified, the collection will be named `_progress`. (see [below for nested schema](#nestedblock--expose_progress)) +- `ownership_role` (String) The owernship role of the object. +- `region` (String) The region to use for the resource connection. If not set, the default region is used. +- `schema_name` (String) The identifier for the source schema in Materialize. Defaults to `public`. +- `table` (Block Set) Specify the tables to be included in the source. If not specified, all tables are included. (see [below for nested schema](#nestedblock--table)) +- `text_columns` (List of String) Decode data as text for specific columns that contain SQL Server types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute. + +### Read-Only + +- `id` (String) The ID of this resource. +- `qualified_sql_name` (String) The fully qualified name of the source. +- `size` (String) The size of the cluster maintaining this source. + + +### Nested Schema for `sqlserver_connection` + +Required: + +- `name` (String) The sqlserver_connection name. + +Optional: + +- `database_name` (String) The sqlserver_connection database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The sqlserver_connection schema name. Defaults to `public`. + + + +### Nested Schema for `expose_progress` + +Required: + +- `name` (String) The expose_progress name. + +Optional: + +- `database_name` (String) The expose_progress database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. +- `schema_name` (String) The expose_progress schema name. Defaults to `public`. + + + +### Nested Schema for `table` + +Required: + +- `upstream_name` (String) The name of the table in the upstream SQL Server database. + +Optional: + +- `database_name` (String) The database of the table in Materialize. +- `name` (String) The name for the table, used in Materialize. +- `schema_name` (String) The schema of the table in Materialize. +- `upstream_schema_name` (String) The schema of the table in the upstream SQL Server database. + +## Import + +Import is supported using the following syntax: + +```shell +#!/bin/bash + +# SQL Server sources can be imported using the source name +terraform import materialize_source_sqlserver.example : + +# Example +terraform import materialize_source_sqlserver.example aws/us-east-1:my_sqlserver_source +``` diff --git a/examples/resources/materialize_connection_sqlserver/import.sh b/examples/resources/materialize_connection_sqlserver/import.sh new file mode 100644 index 00000000..a3985af2 --- /dev/null +++ b/examples/resources/materialize_connection_sqlserver/import.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# SQL Server connections can be imported using the connection name +terraform import materialize_connection_sqlserver.example : + +# Example +terraform import materialize_connection_sqlserver.example aws/us-east-1:my_sqlserver_connection diff --git a/examples/resources/materialize_connection_sqlserver/resource.tf b/examples/resources/materialize_connection_sqlserver/resource.tf new file mode 100644 index 00000000..358e13eb --- /dev/null +++ b/examples/resources/materialize_connection_sqlserver/resource.tf @@ -0,0 +1,87 @@ +resource "materialize_secret" "sqlserver_password" { + name = "sqlserver_password" + value = base64encode("c2VjcmV0Cg==") +} + +# Basic SQL Server connection +resource "materialize_connection_sqlserver" "basic" { + name = "sqlserver_connection" + host = "sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + validate = true +} + +# SQL Server connection with SSH tunnel +resource "materialize_connection_ssh_tunnel" "example_ssh_connection" { + name = "ssh_connection" + host = "bastion-host.example.com" + port = 22 + user = "ubuntu" +} + +resource "materialize_connection_sqlserver" "ssh_example" { + name = "sqlserver_ssh_connection" + host = "private-sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + + ssh_tunnel { + name = materialize_connection_ssh_tunnel.example_ssh_connection.name + } + + validate = false +} + +# SQL Server connection with AWS PrivateLink +resource "materialize_connection_aws_privatelink" "sqlserver_privatelink" { + name = "sqlserver_privatelink" + service_name = "com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc" + availability_zones = ["use1-az1", "use1-az4"] +} + +resource "materialize_connection_sqlserver" "privatelink_example" { + name = "sqlserver_privatelink_connection" + host = "sqlserver.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + + aws_privatelink { + name = materialize_connection_aws_privatelink.sqlserver_privatelink.name + } + + validate = false +} diff --git a/examples/resources/materialize_source_sqlserver/import.sh b/examples/resources/materialize_source_sqlserver/import.sh new file mode 100644 index 00000000..30348045 --- /dev/null +++ b/examples/resources/materialize_source_sqlserver/import.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# SQL Server sources can be imported using the source name +terraform import materialize_source_sqlserver.example : + +# Example +terraform import materialize_source_sqlserver.example aws/us-east-1:my_sqlserver_source diff --git a/examples/resources/materialize_source_sqlserver/resource.tf b/examples/resources/materialize_source_sqlserver/resource.tf new file mode 100644 index 00000000..a955dd90 --- /dev/null +++ b/examples/resources/materialize_source_sqlserver/resource.tf @@ -0,0 +1,96 @@ +resource "materialize_secret" "sqlserver_password" { + name = "sqlserver_password" + value = base64encode("c2VjcmV0Cg==") +} + +resource "materialize_connection_sqlserver" "sqlserver_connection" { + name = "sqlserver_connection" + host = "sql-server.example.com" + port = 1433 + + user { + text = "sqluser" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" +} + +# Basic SQL Server source for specific tables +resource "materialize_source_sqlserver" "example" { + name = "sqlserver_source" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + table { + upstream_name = "dbo.customers" + name = "customers" + } + + table { + upstream_name = "dbo.orders" + name = "orders" + } + + table { + upstream_name = "dbo.products" + name = "products" + } +} + +# SQL Server source for all tables +resource "materialize_source_sqlserver" "all_tables" { + name = "sqlserver_source_all" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + # No table blocks specified means all tables will be included +} + +# SQL Server source with text columns and excluded columns +resource "materialize_source_sqlserver" "with_options" { + name = "sqlserver_source_with_options" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + table { + upstream_name = "dbo.users" + name = "users" + } + + table { + upstream_name = "dbo.posts" + name = "posts" + } + + table { + upstream_name = "dbo.comments" + name = "comments" + } + + # Convert unsupported data types to text + text_columns = ["dbo.users.description", "dbo.posts.content", "dbo.comments.metadata"] + + # Exclude problematic columns + exclude_columns = ["dbo.users.image_data", "dbo.posts.binary_data"] +} diff --git a/integration/sqlserver.tf b/integration/sqlserver.tf new file mode 100644 index 00000000..8499c4ad --- /dev/null +++ b/integration/sqlserver.tf @@ -0,0 +1,58 @@ +# SQL Server Connection +resource "materialize_secret" "sqlserver_password" { + name = "sqlserver_password" + value = "Password123!" +} + +resource "materialize_connection_sqlserver" "sqlserver_connection" { + name = "sqlserver_connection" + host = "sqlserver" + port = 1433 + + user { + text = "sa" + } + + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + + database = "testdb" + validate = false +} + +# SQL Server Source for specific tables +resource "materialize_source_sqlserver" "sqlserver_source" { + name = "sqlserver_source" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "sqlserver_table1" + } + + exclude_columns = ["dbo.table1.about"] +} + +# SQL Server Source for all tables +resource "materialize_source_sqlserver" "sqlserver_source_all" { + name = "sqlserver_source_all" + cluster_name = "quickstart" + + sqlserver_connection { + name = materialize_connection_sqlserver.sqlserver_connection.name + schema_name = materialize_connection_sqlserver.sqlserver_connection.schema_name + database_name = materialize_connection_sqlserver.sqlserver_connection.database_name + } + + exclude_columns = ["dbo.table3.data", "dbo.table1.about", "dbo.table2.about"] +} diff --git a/integration/sqlserver/Dockerfile b/integration/sqlserver/Dockerfile new file mode 100644 index 00000000..c4c0fd5a --- /dev/null +++ b/integration/sqlserver/Dockerfile @@ -0,0 +1,26 @@ +FROM mcr.microsoft.com/mssql/server:2022-latest + +# Switch to root to install packages and copy files +USER root + +# Add SQL Server tools to PATH (they should already be installed in the base image) +ENV PATH="$PATH:/opt/mssql-tools/bin:/opt/mssql-tools18/bin" + +ENV MSSQL_AGENT_ENABLED=true +ENV MSSQL_TCP_PORT=1433 + +# Copy bootstrap files +COPY sqlserver_bootstrap.sql /docker-entrypoint-initdb.d/ +COPY entrypoint.sh /entrypoint.sh + +# Make script executable +RUN chmod +x /entrypoint.sh + +# Create directory for SQL Server to avoid permission issues +RUN mkdir -p /var/opt/mssql && chown -R mssql:root /var/opt/mssql + +# Set user back to mssql +USER mssql + +# Use custom entrypoint +ENTRYPOINT ["/entrypoint.sh"] diff --git a/integration/sqlserver/entrypoint.sh b/integration/sqlserver/entrypoint.sh new file mode 100755 index 00000000..9eb053ba --- /dev/null +++ b/integration/sqlserver/entrypoint.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +# Start SQL Server in the background +/opt/mssql/bin/sqlservr & + +# Wait for SQL Server to start up +echo "Waiting for SQL Server to start..." +echo "SA_PASSWORD is set: ${SA_PASSWORD:+yes}" + +# Try to find sqlcmd - check different possible locations +SQLCMD="" +if [ -f "/opt/mssql-tools18/bin/sqlcmd" ]; then + SQLCMD="/opt/mssql-tools18/bin/sqlcmd" +elif [ -f "/opt/mssql-tools/bin/sqlcmd" ]; then + SQLCMD="/opt/mssql-tools/bin/sqlcmd" +else + echo "Error: sqlcmd not found!" + exit 1 +fi + +echo "Using sqlcmd at: $SQLCMD" + +# Wait longer for SQL Server to initialize properly +for i in {1..120} +do + $SQLCMD -S localhost -U sa -P "${SA_PASSWORD}" -Q "SELECT 1" -C > /dev/null 2>&1 + if [ $? -eq 0 ] + then + echo "SQL Server started successfully after $i attempts" + break + else + echo "Attempt $i: Not ready yet..." + sleep 2 + fi +done + +# Double-check that we can connect before running bootstrap +echo "Verifying SQL Server connection..." +$SQLCMD -S localhost -U sa -P "${SA_PASSWORD}" -Q "SELECT @@VERSION" -C +if [ $? -ne 0 ]; then + echo "Error: Could not connect to SQL Server for bootstrap" + exit 1 +fi + +# Run the bootstrap script +echo "Running bootstrap script..." +$SQLCMD -S localhost -U sa -P "${SA_PASSWORD}" -i /docker-entrypoint-initdb.d/sqlserver_bootstrap.sql -C -t 30 + +if [ $? -eq 0 ]; then + echo "Bootstrap script completed successfully" +else + echo "Error: Bootstrap script failed" + exit 1 +fi + +# Keep the container running +wait diff --git a/integration/sqlserver/sqlserver_bootstrap.sql b/integration/sqlserver/sqlserver_bootstrap.sql new file mode 100644 index 00000000..d7be83d0 --- /dev/null +++ b/integration/sqlserver/sqlserver_bootstrap.sql @@ -0,0 +1,130 @@ +-- Create the test database +IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = 'testdb') +BEGIN + CREATE DATABASE testdb; +END +GO + +-- Enable snapshot isolation settings required for CDC with Materialize +ALTER DATABASE testdb SET ALLOW_SNAPSHOT_ISOLATION ON; +GO + +ALTER DATABASE testdb SET READ_COMMITTED_SNAPSHOT ON; +GO + +USE testdb; +GO + +-- Enable Change Data Capture on the database +EXEC sys.sp_cdc_enable_db; +GO + +-- Create test tables +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[table1]') AND type in (N'U')) +BEGIN + CREATE TABLE [dbo].[table1] ( + id INT IDENTITY(1,1) PRIMARY KEY, + name NVARCHAR(255), + about NTEXT, + banned BIT, + created_at DATETIME2 DEFAULT GETDATE() + ); +END +GO + +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[table2]') AND type in (N'U')) +BEGIN + CREATE TABLE [dbo].[table2] ( + id INT PRIMARY KEY, + name NVARCHAR(255), + about NVARCHAR(255), + banned BIT, + updated_at DATETIME2 NOT NULL DEFAULT GETDATE() + ); +END +GO + +IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[table3]') AND type in (N'U')) +BEGIN + CREATE TABLE [dbo].[table3] ( + id INT IDENTITY(1,1) PRIMARY KEY, + status NVARCHAR(50) NOT NULL DEFAULT 'active', + data XML, + created_at DATETIME2 DEFAULT GETDATE() + ); +END +GO + +-- Enable CDC on the tables +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'table1', + @role_name = NULL, + @supports_net_changes = 1; +GO + +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'table2', + @role_name = NULL, + @supports_net_changes = 1; +GO + +EXEC sys.sp_cdc_enable_table + @source_schema = N'dbo', + @source_name = N'table3', + @role_name = NULL, + @supports_net_changes = 1; +GO + +-- Insert sample data +INSERT INTO [dbo].[table1] (name, about, banned) +VALUES + ('John Doe', 'Lorem ipsum dolor sit amet', 0), + ('Jane Doe', 'Lorem ipsum dolor sit amet', 1), + ('Alice Smith', 'Lorem ipsum dolor sit amet', 0), + ('Bob Johnson', 'Lorem ipsum dolor sit amet', 1), + ('Charlie Brown', 'Lorem ipsum dolor sit amet', 0); +GO + +INSERT INTO [dbo].[table2] (id, name, about, banned, updated_at) +VALUES + (1, 'Record 1', 'First record', 0, GETDATE()), + (2, 'Record 2', 'Second record', 1, GETDATE()), + (3, 'Record 3', 'Third record', 0, GETDATE()), + (4, 'Record 4', 'Fourth record', 1, GETDATE()), + (5, 'Record 5', 'Fifth record', 0, GETDATE()); +GO + +INSERT INTO [dbo].[table3] (status, data) +VALUES + ('active', 'Sample XML 1'), + ('inactive', 'Sample XML 2'), + ('active', 'Sample XML 3'), + ('deleted', 'Sample XML 4'), + ('active', 'Sample XML 5'); +GO + +-- Verify snapshot isolation settings +SELECT + name, + is_cdc_enabled, + snapshot_isolation_state, + snapshot_isolation_state_desc, + is_read_committed_snapshot_on +FROM sys.databases +WHERE name = 'testdb'; +GO + +-- Verify CDC is enabled +SELECT name, is_cdc_enabled FROM sys.databases WHERE name = 'testdb'; +GO + +SELECT + s.name AS schema_name, + t.name AS table_name, + t.is_tracked_by_cdc +FROM sys.tables t +INNER JOIN sys.schemas s ON t.schema_id = s.schema_id +WHERE t.is_tracked_by_cdc = 1; +GO diff --git a/pkg/materialize/connection_sqlserver.go b/pkg/materialize/connection_sqlserver.go new file mode 100644 index 00000000..80f79e2c --- /dev/null +++ b/pkg/materialize/connection_sqlserver.go @@ -0,0 +1,108 @@ +package materialize + +import ( + "fmt" + "strings" + + "github.com/jmoiron/sqlx" +) + +type ConnectionSQLServerBuilder struct { + Connection + connectionType string + sqlserverDatabase string + sqlserverHost string + sqlserverPort int + sqlserverUser ValueSecretStruct + sqlserverPassword IdentifierSchemaStruct + sqlserverSSHTunnel IdentifierSchemaStruct + sqlserverAWSPrivateLink IdentifierSchemaStruct + validate bool +} + +func NewConnectionSQLServerBuilder(conn *sqlx.DB, obj MaterializeObject) *ConnectionSQLServerBuilder { + b := Builder{conn, BaseConnection} + return &ConnectionSQLServerBuilder{ + Connection: Connection{b, obj.Name, obj.SchemaName, obj.DatabaseName}, + } +} + +func (b *ConnectionSQLServerBuilder) ConnectionType(connectionType string) *ConnectionSQLServerBuilder { + b.connectionType = connectionType + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerDatabase(sqlserverDatabase string) *ConnectionSQLServerBuilder { + b.sqlserverDatabase = sqlserverDatabase + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerHost(sqlserverHost string) *ConnectionSQLServerBuilder { + b.sqlserverHost = sqlserverHost + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerPort(sqlserverPort int) *ConnectionSQLServerBuilder { + b.sqlserverPort = sqlserverPort + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerUser(sqlserverUser ValueSecretStruct) *ConnectionSQLServerBuilder { + b.sqlserverUser = sqlserverUser + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerPassword(sqlserverPassword IdentifierSchemaStruct) *ConnectionSQLServerBuilder { + b.sqlserverPassword = sqlserverPassword + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerSSHTunnel(sqlserverSSHTunnel IdentifierSchemaStruct) *ConnectionSQLServerBuilder { + b.sqlserverSSHTunnel = sqlserverSSHTunnel + return b +} + +func (b *ConnectionSQLServerBuilder) SQLServerAWSPrivateLink(sqlserverAWSPrivateLink IdentifierSchemaStruct) *ConnectionSQLServerBuilder { + b.sqlserverAWSPrivateLink = sqlserverAWSPrivateLink + return b +} + +func (b *ConnectionSQLServerBuilder) Validate(validate bool) *ConnectionSQLServerBuilder { + b.validate = validate + return b +} + +func (b *ConnectionSQLServerBuilder) Create() error { + q := strings.Builder{} + q.WriteString(fmt.Sprintf(`CREATE CONNECTION %s TO SQL SERVER (`, b.QualifiedName())) + + q.WriteString(fmt.Sprintf(`HOST %s`, QuoteString(b.sqlserverHost))) + q.WriteString(fmt.Sprintf(`, PORT %d`, b.sqlserverPort)) + + if b.sqlserverUser.Text != "" { + q.WriteString(fmt.Sprintf(`, USER %s`, QuoteString(b.sqlserverUser.Text))) + } + if b.sqlserverUser.Secret.Name != "" { + q.WriteString(fmt.Sprintf(`, USER SECRET %s`, b.sqlserverUser.Secret.QualifiedName())) + } + if b.sqlserverPassword.Name != "" { + q.WriteString(fmt.Sprintf(`, PASSWORD SECRET %s`, b.sqlserverPassword.QualifiedName())) + } + if b.sqlserverSSHTunnel.Name != "" { + q.WriteString(fmt.Sprintf(`, SSH TUNNEL %s`, b.sqlserverSSHTunnel.QualifiedName())) + } + if b.sqlserverAWSPrivateLink.Name != "" { + q.WriteString(fmt.Sprintf(`, AWS PRIVATELINK %s`, b.sqlserverAWSPrivateLink.QualifiedName())) + } + + q.WriteString(fmt.Sprintf(`, DATABASE %s`, QuoteString(b.sqlserverDatabase))) + + q.WriteString(`)`) + + if !b.validate { + q.WriteString(` WITH (VALIDATE = false)`) + } + + q.WriteString(`;`) + return b.ddl.exec(q.String()) +} diff --git a/pkg/materialize/connection_sqlserver_test.go b/pkg/materialize/connection_sqlserver_test.go new file mode 100644 index 00000000..bf8598d0 --- /dev/null +++ b/pkg/materialize/connection_sqlserver_test.go @@ -0,0 +1,113 @@ +package materialize + +import ( + "testing" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers" + "github.com/jmoiron/sqlx" +) + +var connSQLServer = MaterializeObject{Name: "sqlserver_conn", SchemaName: "schema", DatabaseName: "database"} + +func TestConnectionSQLServerCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."sqlserver_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER 'user', PASSWORD SECRET "database"."schema"."password", DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewConnectionSQLServerBuilder(db, connSQLServer) + b.SQLServerHost("sqlserver_host") + b.SQLServerPort(1433) + b.SQLServerUser(ValueSecretStruct{Text: "user"}) + b.SQLServerPassword(IdentifierSchemaStruct{Name: "password", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerDatabase("testdb") + b.Validate(true) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestConnectionSQLServerSshCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."sqlserver_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER 'user', PASSWORD SECRET "database"."schema"."password", SSH TUNNEL "database"."schema"."ssh_conn", DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewConnectionSQLServerBuilder(db, connSQLServer) + b.SQLServerHost("sqlserver_host") + b.SQLServerPort(1433) + b.SQLServerUser(ValueSecretStruct{Text: "user"}) + b.SQLServerPassword(IdentifierSchemaStruct{Name: "password", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerSSHTunnel(IdentifierSchemaStruct{Name: "ssh_conn", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerDatabase("testdb") + b.Validate(true) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestConnectionSQLServerAWSPrivateLinkCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."sqlserver_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER SECRET "database"."schema"."user", PASSWORD SECRET "database"."schema"."password", AWS PRIVATELINK "database"."schema"."aws_conn", DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewConnectionSQLServerBuilder(db, connSQLServer) + b.SQLServerHost("sqlserver_host") + b.SQLServerPort(1433) + b.SQLServerUser(ValueSecretStruct{Secret: IdentifierSchemaStruct{Name: "user", SchemaName: "schema", DatabaseName: "database"}}) + b.SQLServerPassword(IdentifierSchemaStruct{Name: "password", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerAWSPrivateLink(IdentifierSchemaStruct{Name: "aws_conn", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerDatabase("testdb") + b.Validate(true) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestConnectionSQLServerWithoutValidation(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."sqlserver_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER 'user', PASSWORD SECRET "database"."schema"."password", DATABASE 'testdb'\) WITH \(VALIDATE = false\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewConnectionSQLServerBuilder(db, connSQLServer) + b.SQLServerHost("sqlserver_host") + b.SQLServerPort(1433) + b.SQLServerUser(ValueSecretStruct{Text: "user"}) + b.SQLServerPassword(IdentifierSchemaStruct{Name: "password", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerDatabase("testdb") + b.Validate(false) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestConnectionSQLServerDefaultPort(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."sqlserver_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 0, USER 'user', PASSWORD SECRET "database"."schema"."password", DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewConnectionSQLServerBuilder(db, connSQLServer) + b.SQLServerHost("sqlserver_host") + // Default port should be 0 when not set (will be corrected in future iterations) + b.SQLServerUser(ValueSecretStruct{Text: "user"}) + b.SQLServerPassword(IdentifierSchemaStruct{Name: "password", SchemaName: "schema", DatabaseName: "database"}) + b.SQLServerDatabase("testdb") + b.Validate(true) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/materialize/source_sqlserver.go b/pkg/materialize/source_sqlserver.go new file mode 100644 index 00000000..0e3466dc --- /dev/null +++ b/pkg/materialize/source_sqlserver.go @@ -0,0 +1,122 @@ +package materialize + +import ( + "fmt" + "strings" + + "github.com/jmoiron/sqlx" +) + +type SourceSQLServerBuilder struct { + Source + clusterName string + size string + sqlserverConnection IdentifierSchemaStruct + textColumns []string + excludeColumns []string + table []TableStruct + exposeProgress IdentifierSchemaStruct +} + +func NewSourceSQLServerBuilder(conn *sqlx.DB, obj MaterializeObject) *SourceSQLServerBuilder { + b := Builder{conn, BaseSource} + return &SourceSQLServerBuilder{ + Source: Source{b, obj.Name, obj.SchemaName, obj.DatabaseName}, + } +} + +func (b *SourceSQLServerBuilder) ClusterName(c string) *SourceSQLServerBuilder { + b.clusterName = c + return b +} + +func (b *SourceSQLServerBuilder) Size(s string) *SourceSQLServerBuilder { + b.size = s + return b +} + +func (b *SourceSQLServerBuilder) SQLServerConnection(conn IdentifierSchemaStruct) *SourceSQLServerBuilder { + b.sqlserverConnection = conn + return b +} + +func (b *SourceSQLServerBuilder) TextColumns(t []string) *SourceSQLServerBuilder { + b.textColumns = t + return b +} + +func (b *SourceSQLServerBuilder) ExcludeColumns(e []string) *SourceSQLServerBuilder { + b.excludeColumns = e + return b +} + +func (b *SourceSQLServerBuilder) Table(t []TableStruct) *SourceSQLServerBuilder { + b.table = t + return b +} + +func (b *SourceSQLServerBuilder) ExposeProgress(e IdentifierSchemaStruct) *SourceSQLServerBuilder { + b.exposeProgress = e + return b +} + +func (b *SourceSQLServerBuilder) Create() error { + q := strings.Builder{} + q.WriteString(fmt.Sprintf(`CREATE SOURCE %s`, b.QualifiedName())) + + if b.clusterName != "" { + q.WriteString(fmt.Sprintf(` IN CLUSTER %s`, QuoteIdentifier(b.clusterName))) + } + + q.WriteString(fmt.Sprintf(` FROM SQL SERVER CONNECTION %s`, b.sqlserverConnection.QualifiedName())) + + // Build options + var options []string + + if len(b.textColumns) > 0 { + s := strings.Join(b.textColumns, ", ") + options = append(options, fmt.Sprintf(`TEXT COLUMNS (%s)`, s)) + } + + if len(b.excludeColumns) > 0 { + s := strings.Join(b.excludeColumns, ", ") + options = append(options, fmt.Sprintf(`EXCLUDE COLUMNS (%s)`, s)) + } + + if len(options) > 0 { + q.WriteString(fmt.Sprintf(` (%s)`, strings.Join(options, ", "))) + } + + // Handle tables + if len(b.table) > 0 { + q.WriteString(` FOR TABLES (`) + for i, t := range b.table { + if t.UpstreamSchemaName == "" { + t.UpstreamSchemaName = "dbo" // Default SQL Server schema + } + if t.Name == "" { + t.Name = t.UpstreamName + } + if t.SchemaName == "" { + t.SchemaName = b.SchemaName + } + if t.DatabaseName == "" { + t.DatabaseName = b.DatabaseName + } + q.WriteString(fmt.Sprintf(`%s.%s AS %s.%s.%s`, QuoteIdentifier(t.UpstreamSchemaName), QuoteIdentifier(t.UpstreamName), QuoteIdentifier(t.DatabaseName), QuoteIdentifier(t.SchemaName), QuoteIdentifier(t.Name))) + if i < len(b.table)-1 { + q.WriteString(`, `) + } + } + q.WriteString(`)`) + } else { + q.WriteString(` FOR ALL TABLES`) + } + + if b.exposeProgress.Name != "" { + q.WriteString(fmt.Sprintf(` EXPOSE PROGRESS AS %s`, b.exposeProgress.QualifiedName())) + } + + q.WriteString(`;`) + return b.ddl.exec(q.String()) +} diff --git a/pkg/materialize/source_sqlserver_test.go b/pkg/materialize/source_sqlserver_test.go new file mode 100644 index 00000000..84c1cd94 --- /dev/null +++ b/pkg/materialize/source_sqlserver_test.go @@ -0,0 +1,174 @@ +package materialize + +import ( + "testing" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers" + "github.com/jmoiron/sqlx" +) + +var sourceSQLServer = MaterializeObject{Name: "source", SchemaName: "schema", DatabaseName: "database"} +var tableInputSQLServer = []TableStruct{ + {UpstreamName: "table_1", UpstreamSchemaName: "dbo"}, + {UpstreamName: "table_2", UpstreamSchemaName: "dbo", Name: "table_alias"}, +} + +func TestSourceSQLServerAllTablesCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerSpecificTablesCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" FOR TABLES \("dbo"."table_1" AS "database"."schema"."s1_table_1", "custom"."table_2" AS "database"."schema"."table_alias"\) EXPOSE PROGRESS AS "database"."schema"."progress";`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + b.Table([]TableStruct{ + { + UpstreamName: "table_1", + UpstreamSchemaName: "dbo", + Name: "s1_table_1", + }, + { + UpstreamName: "table_2", + UpstreamSchemaName: "custom", + Name: "table_alias", + }, + }) + b.ExposeProgress(IdentifierSchemaStruct{Name: "progress", DatabaseName: "database", SchemaName: "schema"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerWithTextColumnsCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(TEXT COLUMNS \(xml_column, ntext_column\)\) FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + b.TextColumns([]string{"xml_column", "ntext_column"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerWithExcludeColumnsCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(EXCLUDE COLUMNS \(geometry_column, geography_column\)\) FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + b.ExcludeColumns([]string{"geometry_column", "geography_column"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerWithTextAndExcludeColumnsCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(TEXT COLUMNS \(xml_column, ntext_column\), EXCLUDE COLUMNS \(geometry_column, geography_column\)\) FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + b.TextColumns([]string{"xml_column", "ntext_column"}) + b.ExcludeColumns([]string{"geometry_column", "geography_column"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerWithClusterCreate(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" IN CLUSTER "test_cluster" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.ClusterName("test_cluster") + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerDefaultSchemaHandling(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" FOR TABLES \("dbo"."users" AS "database"."schema"."users", "dbo"."orders" AS "database"."schema"."orders"\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + b.SQLServerConnection(IdentifierSchemaStruct{Name: "sqlserver_connection", SchemaName: "schema", DatabaseName: "database"}) + b.Table([]TableStruct{ + { + UpstreamName: "users", + // Should default to "dbo" schema + }, + { + UpstreamName: "orders", + // Should default to "dbo" schema + }, + }) + + if err := b.Create(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerAddSubsource(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `ALTER SOURCE "database"."schema"."source" ADD SUBSOURCE "dbo"."table_1", "dbo"."table_2" AS "database"."schema"."table_alias";`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSource(db, sourceSQLServer) + if err := b.AddSubsource(tableInputSQLServer, []string{}); err != nil { + t.Fatal(err) + } + }) +} + +func TestSourceSQLServerDropSubsource(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `DROP SOURCE "database"."schema"."table_1", "database"."schema"."table_alias"`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + b := NewSourceSQLServerBuilder(db, sourceSQLServer) + if err := b.DropSubsource(tableInputSQLServer); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/materialize/subsource.go b/pkg/materialize/subsource.go index 1adf9dee..23d2fbd9 100644 --- a/pkg/materialize/subsource.go +++ b/pkg/materialize/subsource.go @@ -63,6 +63,31 @@ var mysqlSubsourceQuery = NewBaseQuery(` ON mz_sources.id = mz_mysql_source_tables.id `) +var sqlserverSubsourceQuery = NewBaseQuery(` + SELECT DISTINCT + mz_sources.id AS object_id, + subsources.id AS referenced_object_id, + mz_sources.name AS object_name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.type + -- TODO: mz_sqlserver_source_tables.table_name and mz_sqlserver_source_tables.schema_name are not implemented yet + -- mz_sqlserver_source_tables.table_name AS upstream_table_name, + -- mz_sqlserver_source_tables.schema_name AS upstream_table_schema + FROM mz_sources AS subsources + JOIN mz_internal.mz_object_dependencies + ON subsources.id = mz_object_dependencies.referenced_object_id + JOIN mz_sources + ON mz_sources.id = mz_object_dependencies.object_id + JOIN mz_schemas + ON mz_sources.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + -- TODO: Uncomment when mz_sqlserver_source_tables is implemented + -- LEFT JOIN mz_internal.mz_sqlserver_source_tables + -- ON mz_sources.id = mz_sqlserver_source_tables.id +`) + func ListPostgresSubsources(conn *sqlx.DB, sourceId string, objectType string) ([]SubsourceDetail, error) { p := map[string]string{ "mz_object_dependencies.referenced_object_id": sourceId, @@ -98,3 +123,21 @@ func ListMysqlSubsources(conn *sqlx.DB, sourceId string, objectType string) ([]S } return subsources, nil } + +func ListSQLServerSubsources(conn *sqlx.DB, sourceId string, objectType string) ([]SubsourceDetail, error) { + p := map[string]string{ + "mz_object_dependencies.referenced_object_id": sourceId, + } + + if objectType != "" { + p["mz_sources.type"] = objectType + } + + q := sqlserverSubsourceQuery.QueryPredicate(p) + + var subsources []SubsourceDetail + if err := conn.Select(&subsources, q); err != nil { + return nil, err + } + return subsources, nil +} diff --git a/pkg/provider/acceptance_connection_sqlserver_test.go b/pkg/provider/acceptance_connection_sqlserver_test.go new file mode 100644 index 00000000..cb12ecac --- /dev/null +++ b/pkg/provider/acceptance_connection_sqlserver_test.go @@ -0,0 +1,291 @@ +package provider + +import ( + "database/sql" + "fmt" + "testing" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" +) + +func TestAccConnectionSQLServer_basic(t *testing.T) { + secretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connectionName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connection2Name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccConnectionSQLServerResource(roleName, secretName, connectionName, connection2Name, roleName), + Check: resource.ComposeTestCheckFunc( + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test"), + resource.TestMatchResourceAttr("materialize_connection_sqlserver.test", "id", terraformObjectIdRegex), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "name", connectionName), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "user.#", "1"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "user.0.text", "sa"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "password.#", "1"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "password.0.name", secretName), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "password.0.database_name", "materialize"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "password.0.schema_name", "public"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "database", "testdb"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "database_name", "materialize"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "schema_name", "public"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, connectionName)), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "ownership_role", "mz_system"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "comment", "object comment"), + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test_role"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test_role", "name", connection2Name), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test_role", "ownership_role", roleName), + ), + }, + { + ResourceName: "materialize_connection_sqlserver.test", + ImportState: true, + ImportStateVerify: false, + }, + }, + }) +} + +func TestAccConnectionSQLServer_update(t *testing.T) { + secretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + slug := acctest.RandStringFromCharSet(5, acctest.CharSetAlpha) + connectionName := fmt.Sprintf("old_%s", slug) + newConnectionName := fmt.Sprintf("new_%s", slug) + connection2Name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccConnectionSQLServerResource(roleName, secretName, connectionName, connection2Name, "mz_system"), + }, + { + Config: testAccConnectionSQLServerResource(roleName, secretName, newConnectionName, connection2Name, roleName), + Check: resource.ComposeTestCheckFunc( + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "name", newConnectionName), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "database_name", "materialize"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "schema_name", "public"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, newConnectionName)), + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test_role"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test_role", "ownership_role", roleName), + ), + }, + }, + }) +} + +func TestAccConnectionSQLServer_disappears(t *testing.T) { + secretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connectionName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connection2Name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: testAccCheckAllConnectionSQLServerDestroyed, + Steps: []resource.TestStep{ + { + Config: testAccConnectionSQLServerResource(roleName, secretName, connectionName, connection2Name, roleName), + Check: resource.ComposeTestCheckFunc( + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test"), + testAccCheckObjectDisappears( + materialize.MaterializeObject{ + ObjectType: "CONNECTION", + Name: connectionName, + }, + ), + ), + PlanOnly: true, + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccConnectionSQLServer_updateConnectionAttributes(t *testing.T) { + initialSecretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + updatedSecretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + initialConnectionName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + updatedConnectionName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + initialHost := "initial_host" + updatedHost := "updated_host" + initialPort := "1433" + updatedPort := "1434" + initialDatabase := "initial_database" + updatedDatabase := "updated_database" + sshTunnelName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + sshTunnel2Name := sshTunnelName + "_2" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccConnectionSQLServerResourceUpdates(roleName, initialSecretName, initialConnectionName, initialHost, initialPort, sshTunnelName, initialDatabase), + Check: resource.ComposeTestCheckFunc( + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "name", initialConnectionName), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "host", initialHost), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "port", initialPort), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "database", initialDatabase), + ), + }, + { + Config: testAccConnectionSQLServerResourceUpdates(roleName, updatedSecretName, updatedConnectionName, updatedHost, updatedPort, sshTunnel2Name, updatedDatabase), + Check: resource.ComposeTestCheckFunc( + testAccCheckConnectionSQLServerExists("materialize_connection_sqlserver.test"), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "name", updatedConnectionName), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "host", updatedHost), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "port", updatedPort), + resource.TestCheckResourceAttr("materialize_connection_sqlserver.test", "database", updatedDatabase), + ), + }, + }, + }) +} + +func testAccConnectionSQLServerResource(roleName, secretName, connectionName, connection2Name, connectionOwner string) string { + return fmt.Sprintf(` +resource "materialize_role" "test" { + name = "%[1]s" +} + +resource "materialize_secret" "sqlserver_password" { + name = "%[2]s" + value = "Password123!" +} + +resource "materialize_connection_sqlserver" "test" { + name = "%[3]s" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + database = "testdb" + comment = "object comment" +} + +resource "materialize_connection_sqlserver" "test_role" { + name = "%[4]s" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + database = "testdb" + ownership_role = "%[5]s" + + depends_on = [materialize_role.test] +} +`, roleName, secretName, connectionName, connection2Name, connectionOwner) +} + +func testAccConnectionSQLServerResourceUpdates(roleName, secretName, connectionName, host, port, sshTunnelName, database string) string { + return fmt.Sprintf(` + resource "materialize_role" "test" { + name = "%[1]s" + } + + resource "materialize_secret" "sqlserver_password" { + name = "%[2]s" + value = "Password123!" + } + + resource "materialize_connection_ssh_tunnel" "ssh_connection" { + name = "%[6]s" + schema_name = "public" + comment = "connection ssh tunnel comment" + + host = "ssh_host" + user = "ssh_user" + port = 22 + + validate = false + } + + resource "materialize_connection_sqlserver" "test" { + name = "%[3]s" + host = "%[4]s" + port = %[5]s + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + ssh_tunnel { + name = materialize_connection_ssh_tunnel.ssh_connection.name + } + database = "%[7]s" + + comment = "Test connection" + ownership_role = materialize_role.test.name + validate = false + } + `, roleName, secretName, connectionName, host, port, sshTunnelName, database) +} + +func testAccCheckConnectionSQLServerExists(name string) resource.TestCheckFunc { + return func(s *terraform.State) error { + meta := testAccProvider.Meta() + db, _, err := utils.GetDBClientFromMeta(meta, nil) + if err != nil { + return fmt.Errorf("error getting DB client: %s", err) + } + r, ok := s.RootModule().Resources[name] + if !ok { + return fmt.Errorf("connection sqlserver not found: %s", name) + } + _, err = materialize.ScanConnection(db, utils.ExtractId(r.Primary.ID)) + return err + } +} + +func testAccCheckAllConnectionSQLServerDestroyed(s *terraform.State) error { + meta := testAccProvider.Meta() + db, _, err := utils.GetDBClientFromMeta(meta, nil) + if err != nil { + return fmt.Errorf("error getting DB client: %s", err) + } + + for _, r := range s.RootModule().Resources { + if r.Type != "materialize_connection_sqlserver" { + continue + } + + _, err := materialize.ScanConnection(db, utils.ExtractId(r.Primary.ID)) + if err == nil { + return fmt.Errorf("connection %v still exists", utils.ExtractId(r.Primary.ID)) + } else if err != sql.ErrNoRows { + return err + } + } + + return nil +} diff --git a/pkg/provider/acceptance_source_sqlserver_test.go b/pkg/provider/acceptance_source_sqlserver_test.go new file mode 100644 index 00000000..8d27d783 --- /dev/null +++ b/pkg/provider/acceptance_source_sqlserver_test.go @@ -0,0 +1,434 @@ +package provider + +import ( + "database/sql" + "fmt" + "testing" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" +) + +func TestAccSourceSQLServer_basic(t *testing.T) { + nameSpace := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccSourceSQLServerBasicResource(nameSpace), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test"), + resource.TestMatchResourceAttr("materialize_source_sqlserver.test", "id", terraformObjectIdRegex), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "name", fmt.Sprintf("%s_source", nameSpace)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "schema_name", fmt.Sprintf("%s_schema", nameSpace)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "database_name", fmt.Sprintf("%s_database", nameSpace)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"%s_database"."%s_schema"."%s_source"`, nameSpace, nameSpace, nameSpace)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.#", "2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.upstream_name", "table1"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.name", fmt.Sprintf(`%s_table1`, nameSpace)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.upstream_name", "table2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.name", fmt.Sprintf(`%s_table2`, nameSpace)), + ), + }, + }, + }) +} + +func TestAccSourceSQLServer_update(t *testing.T) { + slug := acctest.RandStringFromCharSet(5, acctest.CharSetAlpha) + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + secretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + sourceName := fmt.Sprintf("old_%s", slug) + newSourceName := fmt.Sprintf("new_%s", slug) + source2Name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccSourceSQLServerResource(roleName, secretName, connName, sourceName, source2Name, "mz_system", "Comment"), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test"), + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test_role"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "name", sourceName), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sourceName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.#", "2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.upstream_name", "table1"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.name", fmt.Sprintf(`%s_table1`, connName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.upstream_name", "table2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.name", fmt.Sprintf(`%s_table2`, connName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test_role", "ownership_role", "mz_system"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test_role", "comment", "Comment"), + ), + }, + { + Config: testAccSourceSQLServerResourceUpdate(roleName, secretName, connName, newSourceName, source2Name, roleName, "New Comment"), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test"), + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test_role"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "name", newSourceName), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, newSourceName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.#", "2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.upstream_name", "table1"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.0.name", fmt.Sprintf(`%s_table1`, connName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.upstream_name", "table2"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "table.1.name", fmt.Sprintf(`%s_table2`, connName)), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test_role", "ownership_role", roleName), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test_role", "comment", "New Comment"), + ), + }, + { + Config: testAccSourceSQLServerResource(roleName, secretName, connName, newSourceName, source2Name, roleName, "New Comment"), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test"), + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test_role"), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "name", newSourceName), + resource.TestCheckResourceAttr("materialize_source_sqlserver.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, newSourceName)), + ), + }, + }, + }) +} + +func TestAccSourceSQLServer_disappears(t *testing.T) { + roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + secretName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + connName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + sourceName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + source2Name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: testAccCheckAllSourceSQLServerDestroyed, + Steps: []resource.TestStep{ + { + Config: testAccSourceSQLServerResource(roleName, secretName, connName, sourceName, source2Name, roleName, "Comment"), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceSQLServerExists("materialize_source_sqlserver.test"), + testAccCheckObjectDisappears( + materialize.MaterializeObject{ + ObjectType: "SOURCE", + Name: sourceName, + }, + ), + ), + PlanOnly: true, + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func testAccSourceSQLServerBasicResource(nameSpace string) string { + return fmt.Sprintf(` + resource "materialize_database" "test" { + name = "%[1]s_database" + } + + resource "materialize_schema" "test" { + name = "%[1]s_schema" + database_name = materialize_database.test.name + } + + resource "materialize_role" "test" { + name = "%[1]s_role" + } + + resource "materialize_secret" "sqlserver_password" { + name = "%[1]s_secret" + value = "Password123!" + } + + resource "materialize_cluster" "test" { + name = "%[1]s_cluster" + size = "25cc" + } + + resource "materialize_connection_sqlserver" "test" { + name = "%[1]s_conn" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + database = "testdb" + } + + resource "materialize_source_sqlserver" "test" { + name = "%[1]s_source" + schema_name = materialize_schema.test.name + database_name = materialize_database.test.name + + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + schema_name = materialize_connection_sqlserver.test.schema_name + database_name = materialize_connection_sqlserver.test.database_name + } + + cluster_name = materialize_cluster.test.name + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[1]s_table1" + } + table { + upstream_name = "table2" + upstream_schema_name = "dbo" + name = "%[1]s_table2" + } + exclude_columns = ["dbo.table1.about"] + text_columns = ["dbo.table2.about"] + } + `, nameSpace) +} + +func testAccSourceSQLServerResource(roleName, secretName, connName, sourceName, source2Name, sourceOwner, comment string) string { + return fmt.Sprintf(` + resource "materialize_role" "test" { + name = "%[1]s" + } + + resource "materialize_secret" "sqlserver_password" { + name = "%[2]s" + value = "Password123!" + } + + resource "materialize_connection_sqlserver" "test" { + name = "%[3]s" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + database = "testdb" + } + + resource "materialize_cluster" "test" { + name = "%[3]s" + size = "25cc" + } + + resource "materialize_source_sqlserver" "test" { + name = "%[4]s" + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + } + + cluster_name = materialize_cluster.test.name + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[3]s_table1" + } + table { + upstream_name = "table2" + upstream_schema_name = "dbo" + name = "%[3]s_table2" + } + exclude_columns = ["dbo.table1.about"] + text_columns = ["dbo.table2.about"] + } + + resource "materialize_source_sqlserver" "test_role" { + name = "%[5]s" + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + } + + cluster_name = materialize_cluster.test.name + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[3]s_table_role_1" + } + table { + upstream_name = "table2" + upstream_schema_name = "dbo" + name = "%[3]s_table_role_2" + } + exclude_columns = ["dbo.table1.about"] + text_columns = ["dbo.table2.about"] + ownership_role = "%[6]s" + comment = "%[7]s" + + depends_on = [materialize_role.test] + } + `, roleName, secretName, connName, sourceName, source2Name, sourceOwner, comment) +} + +func testAccSourceSQLServerResourceUpdate(roleName, secretName, connName, sourceName, source2Name, sourceOwner, comment string) string { + return fmt.Sprintf(` + resource "materialize_role" "test" { + name = "%[1]s" + } + + resource "materialize_secret" "sqlserver_password" { + name = "%[2]s" + value = "Password123!" + } + + resource "materialize_cluster" "test" { + name = "%[3]s" + size = "25cc" + } + + resource "materialize_connection_sqlserver" "test" { + name = "%[3]s" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.sqlserver_password.name + schema_name = materialize_secret.sqlserver_password.schema_name + database_name = materialize_secret.sqlserver_password.database_name + } + database = "testdb" + } + + resource "materialize_source_sqlserver" "test" { + name = "%[4]s" + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + } + + cluster_name = materialize_cluster.test.name + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[3]s_table1" + } + table { + upstream_name = "table2" + upstream_schema_name = "dbo" + name = "%[3]s_table2" + } + exclude_columns = ["dbo.table1.about"] + } + + resource "materialize_source_sqlserver" "test_role" { + name = "%[5]s" + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + } + + cluster_name = materialize_cluster.test.name + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[3]s_table_role_1" + } + table { + upstream_name = "table2" + upstream_schema_name = "dbo" + name = "%[3]s_table_role_2" + } + exclude_columns = ["dbo.table1.about"] + ownership_role = "%[6]s" + comment = "%[7]s" + + depends_on = [materialize_role.test] + } + `, roleName, secretName, connName, sourceName, source2Name, sourceOwner, comment) +} + +func testAccSourceSQLServerResourceSchema(sourceName string) string { + return fmt.Sprintf(` + resource "materialize_secret" "test" { + name = "%[1]s_secret" + value = "Password123!" + } + + resource "materialize_cluster" "test" { + name = "%[1]s_cluster" + size = "25cc" + } + + resource "materialize_connection_sqlserver" "test" { + name = "%[1]s_conn" + host = "sqlserver" + port = 1433 + user { + text = "sa" + } + password { + name = materialize_secret.test.name + schema_name = materialize_secret.test.schema_name + database_name = materialize_secret.test.database_name + } + database = "testdb" + } + + resource "materialize_source_sqlserver" "test" { + name = "%[1]s_source" + cluster_name = materialize_cluster.test.name + sqlserver_connection { + name = materialize_connection_sqlserver.test.name + schema_name = materialize_connection_sqlserver.test.schema_name + database_name = materialize_connection_sqlserver.test.database_name + } + table { + upstream_name = "table1" + upstream_schema_name = "dbo" + name = "%[1]s_table1" + } + exclude_columns = ["dbo.table1.about"] + } + `, sourceName) +} + +func testAccCheckSourceSQLServerExists(name string) resource.TestCheckFunc { + return func(s *terraform.State) error { + meta := testAccProvider.Meta() + db, _, err := utils.GetDBClientFromMeta(meta, nil) + if err != nil { + return fmt.Errorf("error getting DB client: %s", err) + } + r, ok := s.RootModule().Resources[name] + if !ok { + return fmt.Errorf("source sqlserver not found: %s", name) + } + _, err = materialize.ScanSource(db, utils.ExtractId(r.Primary.ID)) + return err + } +} + +func testAccCheckAllSourceSQLServerDestroyed(s *terraform.State) error { + meta := testAccProvider.Meta() + db, _, err := utils.GetDBClientFromMeta(meta, nil) + if err != nil { + return fmt.Errorf("error getting DB client: %s", err) + } + + for _, r := range s.RootModule().Resources { + if r.Type != "materialize_source_sqlserver" { + continue + } + + _, err := materialize.ScanSource(db, utils.ExtractId(r.Primary.ID)) + if err == nil { + return fmt.Errorf("source %v still exists", utils.ExtractId(r.Primary.ID)) + } else if err != sql.ErrNoRows { + return err + } + } + return nil +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index ce9eb5ac..029605d3 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -95,6 +95,7 @@ func Provider(version string) *schema.Provider { "materialize_connection_kafka": resources.ConnectionKafka(), "materialize_connection_mysql": resources.ConnectionMySQL(), "materialize_connection_postgres": resources.ConnectionPostgres(), + "materialize_connection_sqlserver": resources.ConnectionSQLServer(), "materialize_connection_ssh_tunnel": resources.ConnectionSshTunnel(), "materialize_connection_grant": resources.GrantConnection(), "materialize_connection_grant_default_privilege": resources.GrantConnectionDefaultPrivilege(), @@ -129,6 +130,7 @@ func Provider(version string) *schema.Provider { "materialize_source_load_generator": resources.SourceLoadgen(), "materialize_source_mysql": resources.SourceMySQL(), "materialize_source_postgres": resources.SourcePostgres(), + "materialize_source_sqlserver": resources.SourceSQLServer(), "materialize_source_webhook": resources.SourceWebhook(), "materialize_source_grant": resources.GrantSource(), "materialize_system_parameter": resources.SystemParameter(), diff --git a/pkg/resources/resource_connection_sqlserver.go b/pkg/resources/resource_connection_sqlserver.go new file mode 100644 index 00000000..cbf11f81 --- /dev/null +++ b/pkg/resources/resource_connection_sqlserver.go @@ -0,0 +1,167 @@ +package resources + +import ( + "context" + "log" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +var connectionSQLServerSchema = map[string]*schema.Schema{ + "name": ObjectNameSchema("connection", true, false), + "schema_name": SchemaNameSchema("connection", false), + "database_name": DatabaseNameSchema("connection", false), + "qualified_sql_name": QualifiedNameSchema("connection"), + "comment": CommentSchema(false), + "database": { + Description: "The target SQL Server database.", + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "host": { + Description: "The SQL Server database hostname.", + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "port": { + Description: "The SQL Server database port.", + Type: schema.TypeInt, + Optional: true, + Default: 1433, + ForceNew: true, + }, + "user": ValueSecretSchema("user", "The SQL Server database username.", true, true), + "password": IdentifierSchema(IdentifierSchemaParams{ + Elem: "password", + Description: "The SQL Server database password.", + Required: false, + ForceNew: true, + }), + "ssh_tunnel": IdentifierSchema(IdentifierSchemaParams{ + Elem: "ssh_tunnel", + Description: "The SSH tunnel configuration for the SQL Server database.", + Required: false, + ForceNew: true, + }), + "aws_privatelink": IdentifierSchema(IdentifierSchemaParams{ + Elem: "aws_privatelink", + Description: "The AWS PrivateLink configuration for the SQL Server database.", + Required: false, + ForceNew: true, + }), + "validate": ValidateConnectionSchema(), + "ownership_role": OwnershipRoleSchema(), + "region": RegionSchema(), +} + +func ConnectionSQLServer() *schema.Resource { + return &schema.Resource{ + Description: "A SQL Server connection establishes a link to a single database of a SQL Server instance.", + + CreateContext: connectionSQLServerCreate, + ReadContext: connectionRead, + UpdateContext: connectionUpdate, + DeleteContext: connectionDelete, + + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + + Schema: connectionSQLServerSchema, + } +} + +func connectionSQLServerCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + connectionName := d.Get("name").(string) + schemaName := d.Get("schema_name").(string) + databaseName := d.Get("database_name").(string) + + metaDb, region, err := utils.GetDBClientFromMeta(meta, d) + if err != nil { + return diag.FromErr(err) + } + o := materialize.MaterializeObject{ObjectType: "CONNECTION", Name: connectionName, SchemaName: schemaName, DatabaseName: databaseName} + b := materialize.NewConnectionSQLServerBuilder(metaDb, o) + + if v, ok := d.GetOk("connection_type"); ok { + b.ConnectionType(v.(string)) + } + + if v, ok := d.GetOk("host"); ok { + b.SQLServerHost(v.(string)) + } + + if v, ok := d.GetOk("port"); ok { + b.SQLServerPort(v.(int)) + } + + if v, ok := d.GetOk("user"); ok { + user := materialize.GetValueSecretStruct(v) + b.SQLServerUser(user) + } + + if v, ok := d.GetOk("password"); ok { + pass := materialize.GetIdentifierSchemaStruct(v) + b.SQLServerPassword(pass) + } + + if v, ok := d.GetOk("database"); ok { + b.SQLServerDatabase(v.(string)) + } + + if v, ok := d.GetOk("aws_privatelink"); ok { + conn := materialize.GetIdentifierSchemaStruct(v) + b.SQLServerAWSPrivateLink(conn) + } + + if v, ok := d.GetOk("ssh_tunnel"); ok { + conn := materialize.GetIdentifierSchemaStruct(v) + b.SQLServerSSHTunnel(conn) + } + + if v, ok := d.GetOk("validate"); ok { + b.Validate(v.(bool)) + } + + // create resource + if err := b.Create(); err != nil { + return diag.FromErr(err) + } + + // ownership + if v, ok := d.GetOk("ownership_role"); ok { + ownership := materialize.NewOwnershipBuilder(metaDb, o) + + if err := ownership.Alter(v.(string)); err != nil { + log.Printf("[DEBUG] resource failed ownership, dropping object: %s", o.Name) + b.Drop() + return diag.FromErr(err) + } + } + + // object comment + if v, ok := d.GetOk("comment"); ok { + comment := materialize.NewCommentBuilder(metaDb, o) + + if err := comment.Object(v.(string)); err != nil { + log.Printf("[DEBUG] resource failed comment, dropping object: %s", o.Name) + b.Drop() + return diag.FromErr(err) + } + } + + // set id + i, err := materialize.ConnectionId(metaDb, o) + if err != nil { + return diag.FromErr(err) + } + d.SetId(utils.TransformIdWithRegion(string(region), i)) + + return connectionRead(ctx, d, meta) +} diff --git a/pkg/resources/resource_connection_sqlserver_test.go b/pkg/resources/resource_connection_sqlserver_test.go new file mode 100644 index 00000000..ff802a3e --- /dev/null +++ b/pkg/resources/resource_connection_sqlserver_test.go @@ -0,0 +1,220 @@ +package resources + +import ( + "context" + "testing" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/stretchr/testify/require" +) + +var inSQLServer = map[string]interface{}{ + "name": "conn", + "schema_name": "schema", + "database_name": "database", + "host": "sqlserver_host", + "port": 1433, + "database": "testdb", + "user": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "user"}}}}, + "password": []interface{}{map[string]interface{}{"name": "password"}}, + "ssh_tunnel": []interface{}{ + map[string]interface{}{ + "name": "ssh_conn", + "schema_name": "tunnel_schema", + "database_name": "tunnel_database", + }, + }, + "aws_privatelink": []interface{}{ + map[string]interface{}{ + "name": "aws_conn", + "schema_name": "aws_schema", + "database_name": "aws_database", + }, + }, + "validate": true, + "comment": "SQL Server connection comment", +} + +func TestResourceConnectionSQLServerCreate(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, inSQLServer) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER SECRET "materialize"."public"."user", PASSWORD SECRET "materialize"."public"."password", SSH TUNNEL "tunnel_database"."tunnel_schema"."ssh_conn", AWS PRIVATELINK "aws_database"."aws_schema"."aws_conn", DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Comment + mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'SQL Server connection comment';`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_connections.name = 'conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'` + testhelpers.MockConnectionScan(mock, ip) + + // Query Params + pp := `WHERE mz_connections.id = 'u1'` + testhelpers.MockConnectionScan(mock, pp) + + if err := connectionSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceConnectionSQLServerCreateMinimal(t *testing.T) { + r := require.New(t) + minimalInput := map[string]interface{}{ + "name": "minimal_conn", + "schema_name": "schema", + "database_name": "database", + "host": "sqlserver_host", + "database": "testdb", + "user": []interface{}{map[string]interface{}{"text": "plaintext_user"}}, + } + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, minimalInput) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create with minimal configuration (should use default port 1433) + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."minimal_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER 'plaintext_user', DATABASE 'testdb'\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_connections.name = 'minimal_conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'` + testhelpers.MockConnectionScan(mock, ip) + + // Query Params + pp := `WHERE mz_connections.id = 'u1'` + testhelpers.MockConnectionScan(mock, pp) + + if err := connectionSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceConnectionSQLServerCreateWithoutValidation(t *testing.T) { + r := require.New(t) + noValidateInput := map[string]interface{}{ + "name": "no_validate_conn", + "schema_name": "schema", + "database_name": "database", + "host": "sqlserver_host", + "port": 1433, + "database": "testdb", + "user": []interface{}{map[string]interface{}{"text": "user"}}, + "password": []interface{}{map[string]interface{}{"name": "password"}}, + "validate": false, + } + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, noValidateInput) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create without validation + mock.ExpectExec( + `CREATE CONNECTION "database"."schema"."no_validate_conn" TO SQL SERVER \(HOST 'sqlserver_host', PORT 1433, USER 'user', PASSWORD SECRET "materialize"."public"."password", DATABASE 'testdb'\) WITH \(VALIDATE = false\);`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_connections.name = 'no_validate_conn' AND mz_databases.name = 'database' AND mz_schemas.name = 'schema'` + testhelpers.MockConnectionScan(mock, ip) + + // Query Params + pp := `WHERE mz_connections.id = 'u1'` + testhelpers.MockConnectionScan(mock, pp) + + if err := connectionSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceConnectionSQLServerUpdate(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, inSQLServer) + + // Set current state + d.SetId("u1") + d.Set("name", "conn") + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Name Change + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."" RENAME TO "conn";`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Host + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(HOST = 'sqlserver_host'\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Port + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(PORT = 1433\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // User + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(USER = SECRET "materialize"."public"."user"\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Password + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(PASSWORD = SECRET "materialize"."public"."password"\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Database + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(DATABASE = 'testdb'\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // SSH Tunnel + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(SSH TUNNEL = "tunnel_database"."tunnel_schema"."ssh_conn"\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // AWS PrivateLink + mock.ExpectExec(`ALTER CONNECTION "database"."schema"."conn" SET \(AWS PRIVATELINK = "aws_database"."aws_schema"."aws_conn"\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Comment + mock.ExpectExec(`COMMENT ON CONNECTION "database"."schema"."conn" IS 'SQL Server connection comment';`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Params + p := `WHERE mz_connections.id = 'u1'` + testhelpers.MockConnectionScan(mock, p) + + // Execute the update function + if err := connectionUpdate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceConnectionSQLServerRead(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, inSQLServer) + d.SetId("aws/us-east-1:u1") + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Query Params + p := `WHERE mz_connections.id = 'u1'` + testhelpers.MockConnectionScan(mock, p) + + if err := connectionRead(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + + r.Equal("connection", d.Get("name")) + r.Equal("schema", d.Get("schema_name")) + r.Equal("database", d.Get("database_name")) + }) +} + +func TestResourceConnectionSQLServerDelete(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, ConnectionSQLServer().Schema, inSQLServer) + d.SetId("u1") + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + mock.ExpectExec(`DROP CONNECTION "database"."schema"."conn";`).WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := connectionDelete(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/resources/resource_source_sqlserver.go b/pkg/resources/resource_source_sqlserver.go new file mode 100644 index 00000000..76c92bc0 --- /dev/null +++ b/pkg/resources/resource_source_sqlserver.go @@ -0,0 +1,357 @@ +package resources + +import ( + "context" + "database/sql" + "log" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +var sourceSQLServerSchema = map[string]*schema.Schema{ + "name": ObjectNameSchema("source", true, false), + "schema_name": SchemaNameSchema("source", false), + "database_name": DatabaseNameSchema("source", false), + "qualified_sql_name": QualifiedNameSchema("source"), + "comment": CommentSchema(false), + "cluster_name": ObjectClusterNameSchema("source"), + "size": ObjectSizeSchema("source"), + "sqlserver_connection": IdentifierSchema(IdentifierSchemaParams{ + Elem: "sqlserver_connection", + Description: "The SQL Server connection to use in the source.", + Required: true, + ForceNew: true, + }), + "exclude_columns": { + Description: "Exclude specific columns when reading data from SQL Server. Can only be updated in place when also updating a corresponding `table` attribute.", + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, + Optional: true, + }, + "text_columns": { + Description: "Decode data as text for specific columns that contain SQL Server types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute.", + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, + Optional: true, + }, + "table": { + Description: "Specify the tables to be included in the source. If not specified, all tables are included.", + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "upstream_name": { + Description: "The name of the table in the upstream SQL Server database.", + Type: schema.TypeString, + Required: true, + }, + "upstream_schema_name": { + Description: "The schema of the table in the upstream SQL Server database.", + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + "name": { + Description: "The name for the table, used in Materialize.", + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + "schema_name": { + Description: "The schema of the table in Materialize.", + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + "database_name": { + Description: "The database of the table in Materialize.", + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + }, + }, + }, + "expose_progress": IdentifierSchema(IdentifierSchemaParams{ + Elem: "expose_progress", + Description: "The name of the progress collection for the source. If this is not specified, the collection will be named `_progress`.", + Required: false, + ForceNew: true, + }), + "ownership_role": OwnershipRoleSchema(), + "region": RegionSchema(), +} + +func SourceSQLServer() *schema.Resource { + return &schema.Resource{ + Description: "A SQL Server source describes a SQL Server database instance you want Materialize to read data from using Change Data Capture (CDC).", + + CreateContext: sourceSQLServerCreate, + ReadContext: sourceSQLServerRead, + UpdateContext: sourceSQLServerUpdate, + DeleteContext: sourceSQLServerDelete, + + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + + Schema: sourceSQLServerSchema, + } +} + +func sourceSQLServerCreate(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + sourceName := d.Get("name").(string) + schemaName := d.Get("schema_name").(string) + databaseName := d.Get("database_name").(string) + + metaDb, region, err := utils.GetDBClientFromMeta(meta, d) + if err != nil { + return diag.FromErr(err) + } + + o := materialize.MaterializeObject{ObjectType: "SOURCE", Name: sourceName, SchemaName: schemaName, DatabaseName: databaseName} + b := materialize.NewSourceSQLServerBuilder(metaDb, o) + + if v, ok := d.GetOk("cluster_name"); ok { + b.ClusterName(v.(string)) + } + + if v, ok := d.GetOk("sqlserver_connection"); ok { + conn := materialize.GetIdentifierSchemaStruct(v) + b.SQLServerConnection(conn) + } + + if v, ok := d.GetOk("table"); ok { + tables := v.(*schema.Set).List() + t := materialize.GetTableStruct(tables) + b.Table(t) + } + + if v, ok := d.GetOk("exclude_columns"); ok && len(v.([]interface{})) > 0 { + columns, err := materialize.GetSliceValueString("exclude_columns", v.([]interface{})) + if err != nil { + return diag.FromErr(err) + } + b.ExcludeColumns(columns) + } + + if v, ok := d.GetOk("text_columns"); ok && len(v.([]interface{})) > 0 { + columns, err := materialize.GetSliceValueString("text_columns", v.([]interface{})) + if err != nil { + return diag.FromErr(err) + } + b.TextColumns(columns) + } + + if v, ok := d.GetOk("expose_progress"); ok { + e := materialize.GetIdentifierSchemaStruct(v) + b.ExposeProgress(e) + } + + if err := b.Create(); err != nil { + return diag.FromErr(err) + } + + // Handle ownership + if v, ok := d.GetOk("ownership_role"); ok { + ownership := materialize.NewOwnershipBuilder(metaDb, o) + + if err := ownership.Alter(v.(string)); err != nil { + log.Printf("[DEBUG] resource failed ownership, dropping object: %s", o.Name) + b.Drop() + return diag.FromErr(err) + } + } + + // Handle comments + if v, ok := d.GetOk("comment"); ok { + comment := materialize.NewCommentBuilder(metaDb, o) + + if err := comment.Object(v.(string)); err != nil { + log.Printf("[DEBUG] resource failed comment, dropping object: %s", o.Name) + b.Drop() + return diag.FromErr(err) + } + } + + i, err := materialize.SourceId(metaDb, o) + if err != nil { + return diag.FromErr(err) + } + d.SetId(utils.TransformIdWithRegion(string(region), i)) + + return sourceSQLServerRead(ctx, d, meta) +} + +func sourceSQLServerRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + i := d.Id() + + metaDb, region, err := utils.GetDBClientFromMeta(meta, d) + if err != nil { + return diag.FromErr(err) + } + s, err := materialize.ScanSource(metaDb, utils.ExtractId(i)) + if err == sql.ErrNoRows { + d.SetId("") + return nil + } else if err != nil { + return diag.FromErr(err) + } + + d.SetId(utils.TransformIdWithRegion(string(region), i)) + + if err := d.Set("name", s.SourceName.String); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("schema_name", s.SchemaName.String); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("database_name", s.DatabaseName.String); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("size", s.Size.String); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("cluster_name", s.ClusterName.String); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("ownership_role", s.OwnerName.String); err != nil { + return diag.FromErr(err) + } + + b := materialize.Source{SourceName: s.SourceName.String, SchemaName: s.SchemaName.String, DatabaseName: s.DatabaseName.String} + if err := d.Set("qualified_sql_name", b.QualifiedName()); err != nil { + return diag.FromErr(err) + } + + if err := d.Set("comment", s.Comment.String); err != nil { + return diag.FromErr(err) + } + + if s.ConnectionName.Valid && s.ConnectionSchemaName.Valid && s.ConnectionDatabaseName.Valid { + sqlserverConnection := []interface{}{ + map[string]interface{}{ + "name": s.ConnectionName.String, + "schema_name": s.ConnectionSchemaName.String, + "database_name": s.ConnectionDatabaseName.String, + }, + } + if err := d.Set("sqlserver_connection", sqlserverConnection); err != nil { + return diag.FromErr(err) + } + } + + // TODO: Uncomment when mz_sqlserver_source_tables is implemented + // deps, err := materialize.ListSQLServerSubsources(metaDb, utils.ExtractId(i), "subsource") + // if err != nil { + // return diag.FromErr(err) + // } + + // Tables + // tMaps := []interface{}{} + // for _, dep := range deps { + // tMap := map[string]interface{}{} + // tMap["upstream_name"] = dep.UpstreamTableName.String + // tMap["upstream_schema_name"] = dep.UpstreamTableSchemaName.String + // tMap["name"] = dep.ObjectName.String + // tMap["schema_name"] = dep.ObjectSchemaName.String + // tMap["database_name"] = dep.ObjectDatabaseName.String + // tMaps = append(tMaps, tMap) + // } + // if err := d.Set("table", tMaps); err != nil { + // return diag.FromErr(err) + // } + + return nil +} + +func sourceSQLServerUpdate(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + sourceName := d.Get("name").(string) + schemaName := d.Get("schema_name").(string) + databaseName := d.Get("database_name").(string) + + metaDb, _, err := utils.GetDBClientFromMeta(meta, d) + if err != nil { + return diag.FromErr(err) + } + o := materialize.MaterializeObject{ObjectType: "SOURCE", Name: sourceName, SchemaName: schemaName, DatabaseName: databaseName} + b := materialize.NewSource(metaDb, o) + + if d.HasChange("name") { + oldName, newName := d.GetChange("name") + o := materialize.MaterializeObject{ObjectType: "SOURCE", Name: oldName.(string), SchemaName: schemaName, DatabaseName: databaseName} + b := materialize.NewSource(metaDb, o) + if err := b.Rename(newName.(string)); err != nil { + return diag.FromErr(err) + } + } + + if d.HasChange("ownership_role") { + _, newRole := d.GetChange("ownership_role") + b := materialize.NewOwnershipBuilder(metaDb, o) + + if err := b.Alter(newRole.(string)); err != nil { + return diag.FromErr(err) + } + } + + if d.HasChange("table") { + ot, nt := d.GetChange("table") + addTables := materialize.DiffTableStructs(nt.(*schema.Set).List(), ot.(*schema.Set).List()) + dropTables := materialize.DiffTableStructs(ot.(*schema.Set).List(), nt.(*schema.Set).List()) + if len(dropTables) > 0 { + if err := b.DropSubsource(dropTables); err != nil { + return diag.FromErr(err) + } + } + if len(addTables) > 0 { + var colDiff []string + if d.HasChange("text_columns") { + oc, nc := d.GetChange("text_columns") + colDiff = diffTextColumns(nc.([]interface{}), oc.([]interface{})) + } + + if err := b.AddSubsource(addTables, colDiff); err != nil { + return diag.FromErr(err) + } + } + } + + if d.HasChange("comment") { + _, newComment := d.GetChange("comment") + b := materialize.NewCommentBuilder(metaDb, o) + + if err := b.Object(newComment.(string)); err != nil { + return diag.FromErr(err) + } + } + + return sourceSQLServerRead(ctx, d, meta) +} + +func sourceSQLServerDelete(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { + sourceName := d.Get("name").(string) + schemaName := d.Get("schema_name").(string) + databaseName := d.Get("database_name").(string) + + metaDb, _, err := utils.GetDBClientFromMeta(meta, d) + if err != nil { + return diag.FromErr(err) + } + o := materialize.MaterializeObject{Name: sourceName, SchemaName: schemaName, DatabaseName: databaseName} + b := materialize.NewSource(metaDb, o) + + if err := b.DropCascade(); err != nil { + return diag.FromErr(err) + } + return nil +} diff --git a/pkg/resources/resource_source_sqlserver_test.go b/pkg/resources/resource_source_sqlserver_test.go new file mode 100644 index 00000000..7cc71608 --- /dev/null +++ b/pkg/resources/resource_source_sqlserver_test.go @@ -0,0 +1,281 @@ +package resources + +import ( + "context" + "testing" + + "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/utils" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/stretchr/testify/require" +) + +var inSourceSQLServer = map[string]interface{}{ + "name": "source", + "schema_name": "schema", + "database_name": "database", + "cluster_name": "test_cluster", + "sqlserver_connection": []interface{}{ + map[string]interface{}{ + "name": "sqlserver_connection", + "schema_name": "schema", + "database_name": "database", + }, + }, + "text_columns": []interface{}{"dbo.table1.xml_column", "custom.table2.ntext_column"}, + "exclude_columns": []interface{}{"dbo.table1.geometry_column", "custom.table2.geography_column"}, + "table": []interface{}{ + map[string]interface{}{ + "upstream_name": "table1", + "upstream_schema_name": "dbo", + "name": "renamed_table1", + }, + map[string]interface{}{ + "upstream_name": "table2", + "upstream_schema_name": "custom", + }, + }, + "expose_progress": []interface{}{ + map[string]interface{}{ + "name": "progress", + "schema_name": "schema", + "database_name": "database", + }, + }, + "comment": "SQL Server source comment", +} + +func TestResourceSourceSQLServerCreate(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, inSourceSQLServer) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."source" IN CLUSTER "test_cluster" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(TEXT COLUMNS \(dbo.table1.xml_column, custom.table2.ntext_column\), EXCLUDE COLUMNS \(dbo.table1.geometry_column, custom.table2.geography_column\)\) FOR TABLES \("dbo"."table1" AS "database"."schema"."renamed_table1", "custom"."table2" AS "database"."schema"."table2"\) EXPOSE PROGRESS AS "database"."schema"."progress";`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Comment + mock.ExpectExec(`COMMENT ON SOURCE "database"."schema"."source" IS 'SQL Server source comment';`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'source'` + testhelpers.MockSourceScan(mock, ip) + + // Query Params + pp := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, pp) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + if err := sourceSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceSQLServerCreateAllTablesMinimal(t *testing.T) { + minimalInput := map[string]interface{}{ + "name": "minimal_source", + "schema_name": "schema", + "database_name": "database", + "sqlserver_connection": []interface{}{ + map[string]interface{}{ + "name": "sqlserver_connection", + "schema_name": "schema", + "database_name": "database", + }, + }, + } + + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, minimalInput) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create with minimal configuration (FOR ALL TABLES) + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."minimal_source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'minimal_source'` + testhelpers.MockSourceScan(mock, ip) + + // Query Params + pp := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, pp) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + if err := sourceSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceSQLServerCreateWithTextColumnsOnly(t *testing.T) { + textColumnsInput := map[string]interface{}{ + "name": "text_cols_source", + "schema_name": "schema", + "database_name": "database", + "sqlserver_connection": []interface{}{ + map[string]interface{}{ + "name": "sqlserver_connection", + "schema_name": "schema", + "database_name": "database", + }, + }, + "text_columns": []interface{}{"xml_data", "ntext_data"}, + } + + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, textColumnsInput) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create with TEXT COLUMNS only + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."text_cols_source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(TEXT COLUMNS \(xml_data, ntext_data\)\) FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'text_cols_source'` + testhelpers.MockSourceScan(mock, ip) + + // Query Params + pp := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, pp) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + if err := sourceSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceSQLServerCreateWithExcludeColumnsOnly(t *testing.T) { + excludeColumnsInput := map[string]interface{}{ + "name": "exclude_cols_source", + "schema_name": "schema", + "database_name": "database", + "sqlserver_connection": []interface{}{ + map[string]interface{}{ + "name": "sqlserver_connection", + "schema_name": "schema", + "database_name": "database", + }, + }, + "exclude_columns": []interface{}{"geometry_data", "geography_data"}, + } + + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, excludeColumnsInput) + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Create with EXCLUDE COLUMNS only + mock.ExpectExec( + `CREATE SOURCE "database"."schema"."exclude_cols_source" FROM SQL SERVER CONNECTION "database"."schema"."sqlserver_connection" \(EXCLUDE COLUMNS \(geometry_data, geography_data\)\) FOR ALL TABLES;`, + ).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Id + ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'exclude_cols_source'` + testhelpers.MockSourceScan(mock, ip) + + // Query Params + pp := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, pp) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + if err := sourceSQLServerCreate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceSQLServerRead(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, inSourceSQLServer) + d.SetId("aws/us-east-1:u1") + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Query Params + p := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, p) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + if err := sourceSQLServerRead(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + + r.Equal("source", d.Get("name")) + r.Equal("schema", d.Get("schema_name")) + r.Equal("database", d.Get("database_name")) + }) +} + +func TestResourceSourceSQLServerUpdate(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, inSourceSQLServer) + + // Set current state + d.SetId("u1") + d.Set("name", "source") + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + // Name change - unit tests always see empty string as old name + mock.ExpectExec(`ALTER SOURCE "database"."schema"."" RENAME TO "source";`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Add subsources (tables) - detected as changes in unit test + mock.ExpectExec(`ALTER SOURCE "database"."schema"."source" ADD SUBSOURCE "dbo"."table1" AS "database"."schema"."renamed_table1", "custom"."table2" WITH \(TEXT COLUMNS \[dbo.table1.xml_column, custom.table2.ntext_column\]\);`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Comment change + mock.ExpectExec(`COMMENT ON SOURCE "database"."schema"."source" IS 'SQL Server source comment';`).WillReturnResult(sqlmock.NewResult(1, 1)) + + // Query Params + p := `WHERE mz_sources.id = 'u1'` + testhelpers.MockSourceScan(mock, p) + + // Query Subsources - SQL Server + ps := `WHERE mz_object_dependencies.referenced_object_id = 'u1' AND mz_sources.type = 'subsource'` + testhelpers.MockSQLServerSubsourceScan(mock, ps) + + // Execute the update function + if err := sourceSQLServerUpdate(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} + +func TestResourceSourceSQLServerDelete(t *testing.T) { + r := require.New(t) + d := schema.TestResourceDataRaw(t, SourceSQLServer().Schema, inSourceSQLServer) + d.SetId("u1") + r.NotNil(d) + + testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) { + mock.ExpectExec(`DROP SOURCE "database"."schema"."source";`).WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := sourceDelete(context.TODO(), d, db); err != nil { + t.Fatal(err) + } + }) +} diff --git a/pkg/testhelpers/mock_scans.go b/pkg/testhelpers/mock_scans.go index 059f398a..a44a7322 100644 --- a/pkg/testhelpers/mock_scans.go +++ b/pkg/testhelpers/mock_scans.go @@ -682,6 +682,40 @@ func MockMysqlSubsourceScan(mock sqlmock.Sqlmock, predicate string) { mock.ExpectQuery(q).WillReturnRows(ir) } +// MockSQLServerSubsourceScan mocks the scan of a SQL Server source +func MockSQLServerSubsourceScan(mock sqlmock.Sqlmock, predicate string) { + b := ` + SELECT DISTINCT + mz_sources.id AS object_id, + subsources.id AS referenced_object_id, + mz_sources.name AS object_name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.type + -- TODO: mz_sqlserver_source_tables.table_name and mz_sqlserver_source_tables.schema_name are not implemented yet + -- mz_sqlserver_source_tables.table_name AS upstream_table_name, + -- mz_sqlserver_source_tables.schema_name AS upstream_table_schema + FROM mz_sources AS subsources + JOIN mz_internal.mz_object_dependencies + ON subsources.id = mz_object_dependencies.referenced_object_id + JOIN mz_sources + ON mz_sources.id = mz_object_dependencies.object_id + JOIN mz_schemas + ON mz_sources.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + -- TODO: Uncomment when mz_sqlserver_source_tables is implemented + -- LEFT JOIN mz_internal.mz_sqlserver_source_tables + -- ON mz_sources.id = mz_sqlserver_source_tables.id` + + q := mockQueryBuilder(b, predicate, "") + // TODO: Add back upstream_table_name and upstream_table_schema columns when mz_sqlserver_source_tables is implemented + ir := mock.NewRows([]string{"object_id", "referenced_object_id", "object_name", "schema_name", "database_name", "type"}). + AddRow("u1", "u2", "table1", "schema", "database", "subsource"). + AddRow("u1", "u2", "table2", "schema", "database", "subsource") + mock.ExpectQuery(q).WillReturnRows(ir) +} + func MockTableColumnScan(mock sqlmock.Sqlmock, predicate string) { b := ` SELECT