Skip to content

Commit 10ba611

Browse files
authored
sql_server: add additional upstream constraint support (#33991)
Adds support for unique constraints and isolates constraint detection from table discovery. This also fixes an issue where a column appears multiple times in the source description if it is a member of multiple unique/PK constraints. ### Motivation Implements MaterializeInc/database-issues#9723 ### Tips for reviewer Functional changes: - `src/sql-server-util/src/inspect.rs` (changes SQL Server queries: simplify table desc query, break out constraint query) - `src/sql/src/pure/sql_server.rs` (building out SourceExportStatementDetails) ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent f23e0ee commit 10ba611

File tree

11 files changed

+570
-97
lines changed

11 files changed

+570
-97
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

misc/python/materialize/checks/all_checks/sql_server_cdc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ def validate(self) -> Testdrive:
240240
# > SELECT regexp_match(create_sql, 'TEXT COLUMNS = \\((.*?)\\)')[1] FROM (SHOW CREATE SOURCE sql_server_source_tableA{self.suffix});
241241
# "\"f4\""
242242
243-
# Confirm that the primary key information has been propagated from Pg
243+
# Confirm that the primary key information has been propagated from SQL Server
244244
> SELECT key FROM (SHOW INDEXES ON sql_server_source_tableA{self.suffix});
245245
{{f1,f2}}
246246

src/adapter/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ mz-segment = { path = "../segment" }
6666
mz-service = { path = "../service" }
6767
mz-sql = { path = "../sql" }
6868
mz-sql-parser = { path = "../sql-parser" }
69+
mz-sql-server-util = { path = "../sql-server-util"}
6970
mz-ssh-util = { path = "../ssh-util" }
7071
mz-storage-client = { path = "../storage-client" }
7172
mz-storage-types = { path = "../storage-types" }

src/adapter/src/catalog/migrate.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ pub(crate) async fn migrate(
148148
// Migration functions may also take `tx` as input to stage
149149
// arbitrary changes to the catalog.
150150
ast_rewrite_create_sink_partition_strategy(stmt)?;
151+
ast_rewrite_sql_server_constraints(stmt)?;
151152
Ok(())
152153
})?;
153154

@@ -853,3 +854,90 @@ fn ast_rewrite_create_sink_partition_strategy(
853854
.retain(|op| op.name != CreateSinkOptionName::PartitionStrategy);
854855
Ok(())
855856
}
857+
858+
// Migrate SQL Server constraint information from the columns to dedicated constraints field.
859+
fn ast_rewrite_sql_server_constraints(stmt: &mut Statement<Raw>) -> Result<(), anyhow::Error> {
860+
use mz_sql::ast::{
861+
CreateSubsourceOptionName, TableFromSourceOptionName, Value, WithOptionValue,
862+
};
863+
use mz_sql_server_util::desc::{SqlServerTableConstraint, SqlServerTableConstraintType};
864+
use mz_storage_types::sources::ProtoSourceExportStatementDetails;
865+
use mz_storage_types::sources::proto_source_export_statement_details::Kind;
866+
867+
let deets: Option<&mut String> = match stmt {
868+
Statement::CreateSubsource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
869+
if matches!(option.name, CreateSubsourceOptionName::Details)
870+
&& let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
871+
{
872+
Some(details)
873+
} else {
874+
None
875+
}
876+
}),
877+
Statement::CreateTableFromSource(stmt) => stmt.with_options.iter_mut().find_map(|option| {
878+
if matches!(option.name, TableFromSourceOptionName::Details)
879+
&& let Some(WithOptionValue::Value(Value::String(ref mut details))) = option.value
880+
{
881+
Some(details)
882+
} else {
883+
None
884+
}
885+
}),
886+
_ => None,
887+
};
888+
let Some(deets) = deets else {
889+
return Ok(());
890+
};
891+
892+
let current_value = hex::decode(&mut *deets)?;
893+
let current_value = ProtoSourceExportStatementDetails::decode(&*current_value)?;
894+
895+
// avoid further work if this isn't SQL Server
896+
if !matches!(current_value.kind, Some(Kind::SqlServer(_))) {
897+
return Ok(());
898+
};
899+
900+
let SourceExportStatementDetails::SqlServer {
901+
mut table,
902+
capture_instance,
903+
initial_lsn,
904+
} = SourceExportStatementDetails::from_proto(current_value)?
905+
else {
906+
unreachable!("statement details must exist for SQL Server");
907+
};
908+
909+
// Migration has already occured or did not need to happen.
910+
if !table.constraints.is_empty() {
911+
return Ok(());
912+
}
913+
914+
// Relocates the primary key constraint information from the individual columns to the
915+
// constraints field. This ensures that the columns no longer hold constraint information.
916+
let mut migrated_constraints: BTreeMap<_, Vec<_>> = BTreeMap::new();
917+
for col in table.columns.iter_mut() {
918+
if let Some(constraint_name) = col.primary_key_constraint.take() {
919+
migrated_constraints
920+
.entry(constraint_name)
921+
.or_default()
922+
.push(col.name.to_string());
923+
}
924+
}
925+
926+
table.constraints = migrated_constraints
927+
.into_iter()
928+
.map(|(constraint_name, column_names)| SqlServerTableConstraint {
929+
constraint_name: constraint_name.to_string(),
930+
constraint_type: SqlServerTableConstraintType::PrimaryKey,
931+
column_names,
932+
})
933+
.collect();
934+
935+
let new_value = SourceExportStatementDetails::SqlServer {
936+
table,
937+
capture_instance,
938+
initial_lsn,
939+
};
940+
*deets = hex::encode(new_value.into_proto().encode_to_vec());
941+
942+
Ok(())
943+
}

src/sql-server-util/src/desc.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ message ProtoSqlServerTableDesc {
2020
string name = 1;
2121
string schema_name = 2;
2222
repeated ProtoSqlServerColumnDesc columns = 3;
23+
repeated ProtoSqlServerTableConstraint constraints = 5;
24+
}
25+
26+
message ProtoSqlServerTableConstraint {
27+
string constraint_name = 1;
28+
oneof constraint_type {
29+
google.protobuf.Empty primary_key = 2;
30+
google.protobuf.Empty unique = 3;
31+
}
32+
repeated string column_names = 4;
2333
}
2434

2535
message ProtoSqlServerColumnDesc {

src/sql-server-util/src/desc.rs

Lines changed: 117 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use serde::{Deserialize, Serialize};
3838
use std::collections::BTreeSet;
3939
use std::sync::Arc;
4040

41+
use crate::desc::proto_sql_server_table_constraint::ConstraintType;
4142
use crate::{SqlServerDecodeError, SqlServerError};
4243

4344
include!(concat!(env!("OUT_DIR"), "/mz_sql_server_util.rs"));
@@ -59,24 +60,34 @@ pub struct SqlServerTableDesc {
5960
pub name: Arc<str>,
6061
/// Columns for the table.
6162
pub columns: Box<[SqlServerColumnDesc]>,
63+
/// Constraints for the table.
64+
pub constraints: Vec<SqlServerTableConstraint>,
6265
}
6366

6467
impl SqlServerTableDesc {
6568
/// Creating a [`SqlServerTableDesc`] from a [`SqlServerTableRaw`] description.
6669
///
6770
/// Note: Not all columns from SQL Server can be ingested into Materialize. To determine if a
6871
/// column is supported see [`SqlServerColumnDesc::decode_type`].
69-
pub fn new(raw: SqlServerTableRaw) -> Self {
72+
pub fn new(
73+
raw: SqlServerTableRaw,
74+
raw_constraints: Vec<SqlServerTableConstraintRaw>,
75+
) -> Result<Self, SqlServerError> {
7076
let columns: Box<[_]> = raw
7177
.columns
7278
.into_iter()
7379
.map(SqlServerColumnDesc::new)
7480
.collect();
75-
SqlServerTableDesc {
81+
let constraints = raw_constraints
82+
.into_iter()
83+
.map(SqlServerTableConstraint::try_from)
84+
.collect::<Result<Vec<_>, _>>()?;
85+
Ok(SqlServerTableDesc {
7686
schema_name: raw.schema_name,
7787
name: raw.name,
7888
columns,
79-
}
89+
constraints,
90+
})
8091
}
8192

8293
/// Returns the [`SqlServerQualifiedTableName`] for this [`SqlServerTableDesc`].
@@ -121,6 +132,7 @@ impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
121132
name: self.name.to_string(),
122133
schema_name: self.schema_name.to_string(),
123134
columns: self.columns.iter().map(|c| c.into_proto()).collect(),
135+
constraints: self.constraints.iter().map(|c| c.into_proto()).collect(),
124136
}
125137
}
126138

@@ -130,10 +142,99 @@ impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
130142
.into_iter()
131143
.map(|c| c.into_rust())
132144
.collect::<Result<_, _>>()?;
145+
let constraints = proto
146+
.constraints
147+
.into_iter()
148+
.map(|c| c.into_rust())
149+
.collect::<Result<_, _>>()?;
133150
Ok(SqlServerTableDesc {
134151
schema_name: proto.schema_name.into(),
135152
name: proto.name.into(),
136153
columns,
154+
constraints,
155+
})
156+
}
157+
}
158+
159+
/// SQL Server table constraint type (e.g. PRIMARY KEY, UNIQUE, etc.)
160+
/// See <https://learn.microsoft.com/en-us/sql/relational-databases/system-information-schema-views/table-constraints-transact-sql?view=sql-server-ver17>
161+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
162+
pub enum SqlServerTableConstraintType {
163+
PrimaryKey,
164+
Unique,
165+
}
166+
167+
impl TryFrom<String> for SqlServerTableConstraintType {
168+
type Error = SqlServerError;
169+
170+
fn try_from(value: String) -> Result<Self, Self::Error> {
171+
match value.as_str() {
172+
"PRIMARY KEY" => Ok(Self::PrimaryKey),
173+
"UNIQUE" => Ok(Self::Unique),
174+
name => Err(SqlServerError::InvalidData {
175+
column_name: "constraint_type".into(),
176+
error: format!("Unknown constraint type: {name}"),
177+
}),
178+
}
179+
}
180+
}
181+
182+
impl RustType<proto_sql_server_table_constraint::ConstraintType> for SqlServerTableConstraintType {
183+
fn into_proto(&self) -> proto_sql_server_table_constraint::ConstraintType {
184+
match self {
185+
SqlServerTableConstraintType::PrimaryKey => ConstraintType::PrimaryKey(()),
186+
SqlServerTableConstraintType::Unique => ConstraintType::Unique(()),
187+
}
188+
}
189+
190+
fn from_proto(
191+
proto: proto_sql_server_table_constraint::ConstraintType,
192+
) -> Result<Self, mz_proto::TryFromProtoError> {
193+
Ok(match proto {
194+
ConstraintType::PrimaryKey(_) => SqlServerTableConstraintType::PrimaryKey,
195+
ConstraintType::Unique(_) => SqlServerTableConstraintType::Unique,
196+
})
197+
}
198+
}
199+
200+
/// SQL Server table constraint.
201+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
202+
pub struct SqlServerTableConstraint {
203+
pub constraint_name: String,
204+
pub constraint_type: SqlServerTableConstraintType,
205+
pub column_names: Vec<String>,
206+
}
207+
208+
impl TryFrom<SqlServerTableConstraintRaw> for SqlServerTableConstraint {
209+
type Error = SqlServerError;
210+
211+
fn try_from(value: SqlServerTableConstraintRaw) -> Result<Self, Self::Error> {
212+
Ok(SqlServerTableConstraint {
213+
constraint_name: value.constraint_name,
214+
constraint_type: value.constraint_type.try_into()?,
215+
column_names: value.columns,
216+
})
217+
}
218+
}
219+
220+
impl RustType<ProtoSqlServerTableConstraint> for SqlServerTableConstraint {
221+
fn into_proto(&self) -> ProtoSqlServerTableConstraint {
222+
ProtoSqlServerTableConstraint {
223+
constraint_name: self.constraint_name.clone(),
224+
constraint_type: Some(self.constraint_type.into_proto()),
225+
column_names: self.column_names.clone(),
226+
}
227+
}
228+
229+
fn from_proto(
230+
proto: ProtoSqlServerTableConstraint,
231+
) -> Result<Self, mz_proto::TryFromProtoError> {
232+
Ok(SqlServerTableConstraint {
233+
constraint_name: proto.constraint_name,
234+
constraint_type: proto
235+
.constraint_type
236+
.into_rust_if_some("ProtoSqlServerTableConstraint::constraint_type")?,
237+
column_names: proto.column_names,
137238
})
138239
}
139240
}
@@ -189,7 +290,8 @@ pub struct SqlServerColumnDesc {
189290
/// Note: This type might differ from the `decode_type`, e.g. a user can
190291
/// specify `TEXT COLUMNS` to decode columns as text.
191292
pub column_type: Option<SqlColumnType>,
192-
/// If this column is part of the primary key for the table, and the name of the constraint.
293+
/// This field is deprecated and will be removed in a future version. This exists only for the
294+
/// purpose of migrating from old representations.
193295
pub primary_key_constraint: Option<Arc<str>>,
194296
/// Rust type we should parse the data from a [`tiberius::Row`] as.
195297
pub decode_type: SqlServerColumnDecodeType,
@@ -223,7 +325,7 @@ impl SqlServerColumnDesc {
223325
};
224326
SqlServerColumnDesc {
225327
name: Arc::clone(&raw.name),
226-
primary_key_constraint: raw.primary_key_constraint.clone(),
328+
primary_key_constraint: None,
227329
column_type,
228330
decode_type,
229331
raw_type: Arc::clone(&raw.data_type),
@@ -528,8 +630,6 @@ pub struct SqlServerColumnRaw {
528630
pub data_type: Arc<str>,
529631
/// Whether or not the column is nullable.
530632
pub is_nullable: bool,
531-
/// If the column is part of the primary key for the table, and the name of the constraint.
532-
pub primary_key_constraint: Option<Arc<str>>,
533633
/// Maximum length (in bytes) of the column.
534634
///
535635
/// For `varchar(max)`, `nvarchar(max)`, `varbinary(max)`, or `xml` this will be `-1`. For
@@ -548,6 +648,14 @@ pub struct SqlServerColumnRaw {
548648
pub is_computed: bool,
549649
}
550650

651+
/// Raw metadata for a table constraint.
652+
#[derive(Clone, Debug)]
653+
pub struct SqlServerTableConstraintRaw {
654+
pub constraint_name: String,
655+
pub constraint_type: String,
656+
pub columns: Vec<String>,
657+
}
658+
551659
/// Rust type that we should use when reading a column from SQL Server.
552660
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
553661
pub enum SqlServerColumnDecodeType {
@@ -1049,7 +1157,6 @@ mod tests {
10491157
name: name.into(),
10501158
data_type: data_type.into(),
10511159
is_nullable: false,
1052-
primary_key_constraint: None,
10531160
max_length: 0,
10541161
precision: 0,
10551162
scale: 0,
@@ -1147,7 +1254,7 @@ mod tests {
11471254
}),
11481255
columns: sql_server_columns.into(),
11491256
};
1150-
let sql_server_desc = SqlServerTableDesc::new(sql_server_desc);
1257+
let sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
11511258

11521259
let max_length = Some(VarCharMaxLength::try_from(16).unwrap());
11531260
let relation_desc = RelationDesc::builder()
@@ -1231,7 +1338,7 @@ mod tests {
12311338
}),
12321339
columns: columns.into(),
12331340
};
1234-
let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc);
1341+
let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
12351342
sql_server_desc.apply_text_columns(&BTreeSet::from(["a"]));
12361343

12371344
// We should support decoding every datatype to a string.

0 commit comments

Comments
 (0)