Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,18 @@ steps:
steps:
- id: sql-server-cdc
label: "SQL Server CDC tests"
parallelism: 2
depends_on: build-aarch64
parallelism: 1
depends_on: build-x86_64
timeout_in_minutes: 30
inputs: [test/sql-server-cdc]
plugins:
- ./ci/plugins/mzcompose:
composition: sql-server-cdc
agents:
queue: hetzner-aarch64-4cpu-8gb
# The SQL Server Docker image isn't available on ARM.
#
# See: <https://github.com/microsoft/mssql-docker/issues/864>
queue: hetzner-x86-64-4cpu-8gb

- group: "Connection tests"
key: connection-tests
Expand Down
109 changes: 109 additions & 0 deletions misc/python/materialize/checks/all_checks/sql_server_cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from random import Random
from textwrap import dedent
from typing import Any

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
from materialize.mzcompose.services.sql_server import SqlServer


class SqlServerCdcBase:
base_version: MzVersion
current_version: MzVersion
wait: bool
suffix: str
repeats: int
expects: int

def __init__(self, wait: bool, **kwargs: Any) -> None:
self.wait = wait
self.repeats = 1024 if wait else 16384
self.expects = 97350 if wait else 1633350
self.suffix = f"_{str(wait).lower()}"
super().__init__(**kwargs) # forward unused args to Check

def _can_run(self, e: Executor) -> bool:
# TODO(sql_server1): The SQL Server Docker image only runs on x86, figure out how we can
# reasonably support running this check without switching all of Platform Checks.
return False

def initialize(self) -> Testdrive:
return Testdrive(
dedent(
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
ALTER SYSTEM SET enable_sql_server_source = true;

$ sql-server-connect name=sql-server
server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}

$ sql-server-execute name=sql-server
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;

> CREATE SECRET sql_server_password_{self.suffix} AS '{SqlServer.DEFAULT_SA_PASSWORD}';

> CREATE CONNECTION sql_server_connection_{self.suffix} TO SQL SERVER (
HOST 'sql-server',
DATABASE test,
USER {SqlServer.DEFAULT_USER},
PASSWORD SECRET sql_server_password_{self.suffix}
)
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
ALTER SYSTEM SET enable_sql_server_source = true;

> VALIDATE CONNECTION sql_server_password_{self.suffix};
""",
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
ALTER SYSTEM SET enable_sql_server_source = true;

> DROP CONNECTION sql_server_password_{self.suffix};
> CREATE CONNECTION sql_server_connection2_{self.suffix} TO SQL SERVER (
HOST 'sql-server',
DATABASE test,
USER {SqlServer.DEFAULT_USER},
PASSWORD SECRET sql_server_password_{self.suffix}
);
""",
]
]

def validate(self) -> Testdrive:
sql = dedent(
f"""
$ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
ALTER SYSTEM SET enable_sql_server_source = true;

> VALIDATE CONNECTION sql_server_connection2_{self.suffix};
"""
)

return Testdrive(sql)


@externally_idempotent(False)
class SqlServerCdc(SqlServerCdcBase, Check):
def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
super().__init__(wait=True, base_version=base_version, rng=rng)
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/services/sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


class SqlServer(Service):
DEFAULT_USER = "SA"
DEFAULT_SA_PASSWORD = "RPSsql12345"

def __init__(
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ impl CatalogState {
diff,
)
}
// TODO(sql_server1): Catalog item updates.
"sql-server" => vec![],
// Load generator sources don't have any special
// updates.
"load-generator" => vec![],
Expand Down Expand Up @@ -1086,6 +1088,7 @@ impl CatalogState {
ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
ConnectionDetails::Ssh { .. } => "ssh-tunnel",
ConnectionDetails::MySql { .. } => "mysql",
ConnectionDetails::SqlServer(_) => "sql-server",
}),
Datum::String(&owner_id.to_string()),
privileges,
Expand Down Expand Up @@ -1128,7 +1131,8 @@ impl CatalogState {
}
ConnectionDetails::Csr(_)
| ConnectionDetails::Postgres(_)
| ConnectionDetails::MySql(_) => (),
| ConnectionDetails::MySql(_)
| ConnectionDetails::SqlServer(_) => (),
};
updates
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,7 @@ impl ConnectionResolver for CatalogState {
Aws(conn) => Aws(conn),
AwsPrivatelink(conn) => AwsPrivatelink(conn),
MySql(conn) => MySql(conn.into_inline_connection(self)),
SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use mz_sql::session::vars::{
MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
MAX_SOURCES, MAX_TABLES,
MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES,
};
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::inline::IntoInlineConnection;
Expand Down Expand Up @@ -1406,6 +1406,7 @@ impl Coordinator {
let mut new_kafka_connections = 0;
let mut new_postgres_connections = 0;
let mut new_mysql_connections = 0;
let mut new_sql_server_connections = 0;
let mut new_aws_privatelink_connections = 0;
let mut new_tables = 0;
let mut new_sources = 0;
Expand Down Expand Up @@ -1473,6 +1474,7 @@ impl Coordinator {
ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
ConnectionDetails::MySql(_) => new_mysql_connections += 1,
ConnectionDetails::SqlServer(_) => new_sql_server_connections += 1,
ConnectionDetails::AwsPrivatelink(_) => {
new_aws_privatelink_connections += 1
}
Expand Down Expand Up @@ -1643,6 +1645,7 @@ impl Coordinator {
let mut current_aws_privatelink_connections = 0;
let mut current_postgres_connections = 0;
let mut current_mysql_connections = 0;
let mut current_sql_server_connections = 0;
let mut current_kafka_connections = 0;
for c in self.catalog().user_connections() {
let connection = c
Expand All @@ -1653,6 +1656,7 @@ impl Coordinator {
ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
ConnectionDetails::MySql(_) => current_mysql_connections += 1,
ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
ConnectionDetails::Csr(_)
| ConnectionDetails::Ssh { .. }
Expand Down Expand Up @@ -1680,6 +1684,13 @@ impl Coordinator {
"MySQL Connection",
MAX_MYSQL_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
current_sql_server_connections,
new_sql_server_connections,
SystemVars::max_sql_server_connections,
"SQL Server Connection",
MAX_SQL_SERVER_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
current_aws_privatelink_connections,
new_aws_privatelink_connections,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl Coordinator {
match ingestion.desc.connection {
GenericSourceConnection::Postgres(_)
| GenericSourceConnection::MySql(_)
| GenericSourceConnection::SqlServer(_)
| GenericSourceConnection::Kafka(_)
| GenericSourceConnection::LoadGenerator(_) => {
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
Expand Down
4 changes: 3 additions & 1 deletion src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,9 @@ impl Source {
// These multi-output sources do not use their primary
// source's data shard, so we don't include it in accounting
// for users.
GenericSourceConnection::Postgres(_) | GenericSourceConnection::MySql(_) => 0,
GenericSourceConnection::Postgres(_)
| GenericSourceConnection::MySql(_)
| GenericSourceConnection::SqlServer(_) => 0,
GenericSourceConnection::LoadGenerator(lg) => {
// TODO: make this a method on the load generator.
if lg.load_generator.views().is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions src/sql-server-util/src/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ pub enum CdcError {
pub struct Lsn([u8; 10]);

impl Lsn {
/// Interpret the provided bytes as an [`Lsn`].
pub fn interpret(bytes: [u8; 10]) -> Self {
Lsn(bytes)
}

/// Return the underlying byte slice for this [`Lsn`].
pub fn as_bytes(&self) -> &[u8] {
self.0.as_slice()
Expand Down
3 changes: 3 additions & 0 deletions src/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ rust_library(
"//src/sql-lexer:mz_sql_lexer",
"//src/sql-parser:mz_sql_parser",
"//src/sql-pretty:mz_sql_pretty",
"//src/sql-server-util:mz_sql_server_util",
"//src/ssh-util:mz_ssh_util",
"//src/storage-types:mz_storage_types",
"//src/tracing:mz_tracing",
Expand Down Expand Up @@ -123,6 +124,7 @@ rust_test(
"//src/sql-lexer:mz_sql_lexer",
"//src/sql-parser:mz_sql_parser",
"//src/sql-pretty:mz_sql_pretty",
"//src/sql-server-util:mz_sql_server_util",
"//src/ssh-util:mz_ssh_util",
"//src/storage-types:mz_storage_types",
"//src/tracing:mz_tracing",
Expand Down Expand Up @@ -166,6 +168,7 @@ rust_doc_test(
"//src/sql-lexer:mz_sql_lexer",
"//src/sql-parser:mz_sql_parser",
"//src/sql-pretty:mz_sql_pretty",
"//src/sql-server-util:mz_sql_server_util",
"//src/ssh-util:mz_ssh_util",
"//src/storage-types:mz_storage_types",
"//src/tracing:mz_tracing",
Expand Down
1 change: 1 addition & 0 deletions src/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mz-secrets = { path = "../secrets" }
mz-sql-parser = { path = "../sql-parser" }
mz-sql-pretty = { path = "../sql-pretty" }
mz-sql-lexer = { path = "../sql-lexer" }
mz-sql-server-util = { path = "../sql-server-util" }
mz-ssh-util = { path = "../ssh-util" }
mz-storage-types = { path = "../storage-types" }
mz-tracing = { path = "../tracing" }
Expand Down
6 changes: 5 additions & 1 deletion src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use mz_storage_types::connections::aws::AwsConnection;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::connections::{
AwsPrivatelinkConnection, CsrConnection, KafkaConnection, MySqlConnection, PostgresConnection,
SshConnection,
SqlServerConnectionDetails, SshConnection,
};
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
Expand Down Expand Up @@ -1613,6 +1613,7 @@ pub enum ConnectionDetails {
Aws(AwsConnection),
AwsPrivatelink(AwsPrivatelinkConnection),
MySql(MySqlConnection<ReferencedConnection>),
SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
}

impl ConnectionDetails {
Expand All @@ -1635,6 +1636,9 @@ impl ConnectionDetails {
ConnectionDetails::MySql(c) => {
mz_storage_types::connections::Connection::MySql(c.clone())
}
ConnectionDetails::SqlServer(c) => {
mz_storage_types::connections::Connection::SqlServer(c.clone())
}
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use mz_repr::{strconv, CatalogItemId, ColumnName};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{IdentError, UnresolvedItemName};
use mz_sql_parser::parser::{ParserError, ParserStatementError};
use mz_sql_server_util::SqlServerError;
use mz_storage_types::sources::ExternalReferenceResolutionError;

use crate::catalog::{
Expand Down Expand Up @@ -163,6 +164,9 @@ pub enum PlanError {
MySqlConnectionErr {
cause: Arc<MySqlError>,
},
SqlServerConnectionErr {
cause: Arc<SqlServerError>,
},
SubsourceNameConflict {
name: UnresolvedItemName,
upstream_references: Vec<UnresolvedItemName>,
Expand Down Expand Up @@ -619,6 +623,9 @@ impl fmt::Display for PlanError {
Self::MySqlConnectionErr { cause } => {
write!(f, "failed to connect to MySQL database: {}", cause)
}
Self::SqlServerConnectionErr { cause } => {
write!(f, "failed to connect to SQL Server database: {}", cause)
}
Self::SubsourceNameConflict {
name , upstream_references: _,
} => {
Expand Down Expand Up @@ -885,6 +892,12 @@ impl From<MySqlError> for PlanError {
}
}

impl From<SqlServerError> for PlanError {
fn from(e: SqlServerError) -> PlanError {
PlanError::SqlServerConnectionErr { cause: Arc::new(e) }
}
}

impl From<VarError> for PlanError {
fn from(e: VarError) -> Self {
PlanError::VarError(e)
Expand Down
Loading