Skip to content

Commit 84c2d90

Browse files
committed
start, MVP of SQL Server Source purification
* implement purification for SQL Server sources, lists tables from upstream that have CDC enabled and their capture instances * returns an error for CREATE TABLE ... FROM SOURCE for SQL Server
1 parent 0df8e09 commit 84c2d90

File tree

11 files changed

+644
-27
lines changed

11 files changed

+644
-27
lines changed

src/sql-parser/src/ast/defs/ddl.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,15 +1185,15 @@ impl_display_t!(MySqlConfigOption);
11851185

11861186
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
11871187
pub enum SqlServerConfigOptionName {
1188-
/// The name for the "capture job" that will get spawn in SQL Server to
1189-
/// populate the change table that we read from.
1190-
CaptureInstance,
1188+
/// Hex encoded string of binary serialization of
1189+
/// `mz_storage_types::sources::sql_server::SqlServerSourceDetails`.
1190+
Details,
11911191
}
11921192

11931193
impl AstDisplay for SqlServerConfigOptionName {
11941194
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
11951195
f.write_str(match self {
1196-
SqlServerConfigOptionName::CaptureInstance => "CAPTURE INSTANCE",
1196+
SqlServerConfigOptionName::Details => "DETAILS",
11971197
})
11981198
}
11991199
}
@@ -1207,7 +1207,7 @@ impl WithOptionName for SqlServerConfigOptionName {
12071207
/// on the conservative side and return `true`.
12081208
fn redact_value(&self) -> bool {
12091209
match self {
1210-
SqlServerConfigOptionName::CaptureInstance => false,
1210+
SqlServerConfigOptionName::Details => false,
12111211
}
12121212
}
12131213
}

src/sql-parser/src/parser.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3469,11 +3469,8 @@ impl<'a> Parser<'a> {
34693469
fn parse_sql_server_connection_option(
34703470
&mut self,
34713471
) -> Result<SqlServerConfigOption<Raw>, ParserError> {
3472-
let name = match self.expect_one_of_keywords(&[CAPTURE])? {
3473-
CAPTURE => {
3474-
self.expect_keyword(INSTANCE)?;
3475-
SqlServerConfigOptionName::CaptureInstance
3476-
}
3472+
let name = match self.expect_one_of_keywords(&[DETAILS])? {
3473+
DETAILS => SqlServerConfigOptionName::Details,
34773474
_ => unreachable!(),
34783475
};
34793476

src/sql-server-util/src/desc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ pub struct SqlServerTableRaw {
114114
pub schema_name: Arc<str>,
115115
/// Name of the table.
116116
pub name: Arc<str>,
117+
/// The capture instance replicating changes.
118+
pub capture_instance: Arc<str>,
117119
/// Columns for the table.
118120
pub columns: Arc<[SqlServerColumnRaw]>,
119121
/// Whether or not CDC is enabled for this table.
@@ -543,6 +545,7 @@ impl RustType<proto_sql_server_column_desc::DecodeType> for SqlServerColumnDecod
543545
///
544546
/// The goal of this type is to perform any expensive "downcasts" so in the hot
545547
/// path of decoding rows we do the minimal amount of work.
548+
#[derive(Debug)]
546549
pub struct SqlServerRowDecoder {
547550
decoders: Vec<(Arc<str>, ColumnType, SqlServerColumnDecodeType)>,
548551
}
@@ -701,6 +704,7 @@ mod tests {
701704
let sql_server_desc = SqlServerTableRaw {
702705
schema_name: "my_schema".into(),
703706
name: "my_table".into(),
707+
capture_instance: "my_table_CT".into(),
704708
columns: sql_server_columns.into(),
705709
is_cdc_enabled: true,
706710
};

src/sql-server-util/src/inspect.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
1212
use itertools::Itertools;
1313
use smallvec::SmallVec;
14+
use std::collections::BTreeMap;
15+
use std::fmt;
1416
use std::sync::Arc;
1517

1618
use crate::cdc::{Lsn, RowFilterOption};
19+
use crate::desc::{SqlServerColumnRaw, SqlServerTableRaw};
1720
use crate::{Client, SqlServerError};
1821

1922
/// Returns the minimum log sequence number for the specified `capture_instance`.
@@ -159,3 +162,151 @@ WHERE c.capture_instance IN ({param_indexes});"
159162

160163
Ok(tables)
161164
}
165+
166+
/// Ensure change data capture (CDC) is enabled for the database the provided
167+
/// `client` is currently connected to.
168+
///
169+
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver16>
170+
pub async fn ensure_database_cdc_enabled(client: &mut Client) -> Result<(), SqlServerError> {
171+
static DATABASE_CDC_ENABLED_QUERY: &str =
172+
"SELECT is_cdc_enabled FROM sys.databases WHERE database_id = DB_ID();";
173+
let result = client.simple_query(DATABASE_CDC_ENABLED_QUERY).await?;
174+
175+
check_system_result(&result, "database CDC".to_string(), true)?;
176+
Ok(())
177+
}
178+
179+
/// Ensure change data capture (CDC) is enabled for the specified table.
180+
///
181+
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver16#enable-for-a-table>
182+
pub async fn ensure_table_cdc_enabled(
183+
client: &mut Client,
184+
schema: &str,
185+
table: &str,
186+
) -> Result<(), SqlServerError> {
187+
static TABLE_CDC_ENABLED_QUERY: &str = "
188+
SELECT is_tracked_by_cdc FROM sys.tables tables
189+
JOIN sys.schemas schemas
190+
ON tables.schema_id = schemas.schema_id
191+
WHERE schemas.name = @P1 AND tables.name = @P2;
192+
";
193+
let result = client
194+
.query(TABLE_CDC_ENABLED_QUERY, &[&schema, &table])
195+
.await?;
196+
197+
check_system_result(&result, "table CDC".to_string(), true)?;
198+
Ok(())
199+
}
200+
201+
/// Ensure the `SNAPSHOT` transaction isolation level is enabled for the
202+
/// database the provided `client` is currently connected to.
203+
///
204+
/// See: <https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver16>
205+
pub async fn ensure_snapshot_isolation_enabled(client: &mut Client) -> Result<(), SqlServerError> {
206+
static SNAPSHOT_ISOLATION_QUERY: &str =
207+
"SELECT snapshot_isolation_state FROM sys.databases WHERE database_id = DB_ID();";
208+
let result = client.simple_query(SNAPSHOT_ISOLATION_QUERY).await?;
209+
210+
check_system_result(&result, "snapshot isolation".to_string(), 1u8)?;
211+
Ok(())
212+
}
213+
214+
pub async fn get_tables(client: &mut Client) -> Result<Vec<SqlServerTableRaw>, SqlServerError> {
215+
static GET_TABLES_QUERY: &str = "
216+
SELECT
217+
s.name as schema_name,
218+
t.name as table_name,
219+
ch.capture_instance as capture_instance,
220+
c.name as col_name,
221+
ty.name as col_type,
222+
c.is_nullable as col_nullable,
223+
c.max_length as col_max_length,
224+
c.precision as col_precision,
225+
c.scale as col_scale
226+
FROM sys.tables t
227+
JOIN sys.schemas s ON t.schema_id = s.schema_id
228+
JOIN sys.columns c ON t.object_id = c.object_id
229+
JOIN sys.types ty ON c.system_type_id = ty.system_type_id
230+
JOIN cdc.change_tables ch ON t.object_id = ch.source_object_id
231+
";
232+
fn get_value<'a, T: tiberius::FromSql<'a>>(
233+
row: &'a tiberius::Row,
234+
name: &'static str,
235+
) -> Result<T, SqlServerError> {
236+
row.try_get(name)?
237+
.ok_or(SqlServerError::MissingColumn(name))
238+
}
239+
240+
let result = client.simple_query(GET_TABLES_QUERY).await?;
241+
242+
// Group our columns by (schema, name).
243+
let mut tables = BTreeMap::default();
244+
for row in result {
245+
let schema_name: Arc<str> = get_value::<&str>(&row, "schema_name")?.into();
246+
let table_name: Arc<str> = get_value::<&str>(&row, "table_name")?.into();
247+
let capture_instance: Arc<str> = get_value::<&str>(&row, "capture_instance")?.into();
248+
249+
let column_name = get_value::<&str>(&row, "col_name")?.into();
250+
let column = SqlServerColumnRaw {
251+
name: Arc::clone(&column_name),
252+
data_type: get_value::<&str>(&row, "col_type")?.into(),
253+
is_nullable: get_value(&row, "col_nullable")?,
254+
max_length: get_value(&row, "col_max_length")?,
255+
precision: get_value(&row, "col_precision")?,
256+
scale: get_value(&row, "col_scale")?,
257+
};
258+
259+
let columns = tables
260+
.entry((
261+
Arc::clone(&schema_name),
262+
Arc::clone(&table_name),
263+
Arc::clone(&capture_instance),
264+
))
265+
.or_insert_with(|| Vec::default());
266+
columns.push(column);
267+
}
268+
269+
// Flatten into our raw Table description.
270+
let tables = tables
271+
.into_iter()
272+
.map(|((schema, name, capture_instance), columns)| {
273+
Ok::<_, SqlServerError>(SqlServerTableRaw {
274+
schema_name: schema,
275+
name,
276+
capture_instance,
277+
columns: columns.into(),
278+
is_cdc_enabled: true,
279+
})
280+
})
281+
.collect::<Result<_, _>>()?;
282+
283+
Ok(tables)
284+
}
285+
286+
/// Helper function to parse an expected result from a "system" query.
287+
fn check_system_result<'a, T>(
288+
result: &'a SmallVec<[tiberius::Row; 1]>,
289+
name: String,
290+
expected: T,
291+
) -> Result<(), SqlServerError>
292+
where
293+
T: tiberius::FromSql<'a> + Copy + fmt::Debug + fmt::Display + PartialEq,
294+
{
295+
match &result[..] {
296+
[row] => {
297+
let result: Option<T> = row.try_get(0)?;
298+
if result == Some(expected) {
299+
Ok(())
300+
} else {
301+
Err(SqlServerError::InvalidSystemSetting {
302+
name,
303+
expected: expected.to_string(),
304+
actual: format!("{result:?}"),
305+
})
306+
}
307+
}
308+
other => Err(SqlServerError::InvariantViolated(format!(
309+
"expected 1 row, got {other:?}"
310+
))),
311+
}
312+
}

src/sql-server-util/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,8 @@ pub enum SqlServerError {
739739
SqlServer(#[from] tiberius::error::Error),
740740
#[error(transparent)]
741741
CdcError(#[from] crate::cdc::CdcError),
742+
#[error("expected column '{0}' to be present")]
743+
MissingColumn(&'static str),
742744
#[error("'{column_type}' from column '{column_name}' is not supported: {reason}")]
743745
UnsupportedDataType {
744746
column_name: String,
@@ -749,6 +751,12 @@ pub enum SqlServerError {
749751
IO(#[from] tokio::io::Error),
750752
#[error("found invalid data in the column '{column_name}': {error}")]
751753
InvalidData { column_name: String, error: String },
754+
#[error("invalid SQL Server system setting '{name}'. Expected '{expected}'. Got '{actual}'.")]
755+
InvalidSystemSetting {
756+
name: String,
757+
expected: String,
758+
actual: String,
759+
},
752760
#[error("invariant was violated: {0}")]
753761
InvariantViolated(String),
754762
#[error(transparent)]

src/sql/src/plan/error.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::plan::typeconv::CastContext;
4444
use crate::pure::error::{
4545
CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
4646
LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
47+
SqlServerSourcePurificationError,
4748
};
4849
use crate::session::vars::VarError;
4950

@@ -250,6 +251,7 @@ pub enum PlanError {
250251
LoadGeneratorSourcePurification(LoadGeneratorSourcePurificationError),
251252
CsrPurification(CsrPurificationError),
252253
MySqlSourcePurification(MySqlSourcePurificationError),
254+
SqlServerSourcePurificationError(SqlServerSourcePurificationError),
253255
UseTablesForSources(String),
254256
MissingName(CatalogItemType),
255257
InvalidRefreshAt,
@@ -331,6 +333,7 @@ impl PlanError {
331333
Self::InternalFunctionCall => Some("This function is for the internal use of the database system and cannot be called directly.".into()),
332334
Self::PgSourcePurification(e) => e.detail(),
333335
Self::MySqlSourcePurification(e) => e.detail(),
336+
Self::SqlServerSourcePurificationError(e) => e.detail(),
334337
Self::KafkaSourcePurification(e) => e.detail(),
335338
Self::LoadGeneratorSourcePurification(e) => e.detail(),
336339
Self::CsrPurification(e) => e.detail(),
@@ -439,6 +442,8 @@ impl PlanError {
439442
Self::Catalog(e) => e.hint(),
440443
Self::VarError(e) => e.hint(),
441444
Self::PgSourcePurification(e) => e.hint(),
445+
Self::MySqlSourcePurification(e) => e.hint(),
446+
Self::SqlServerSourcePurificationError(e) => e.hint(),
442447
Self::KafkaSourcePurification(e) => e.hint(),
443448
Self::LoadGeneratorSourcePurification(e) => e.hint(),
444449
Self::CsrPurification(e) => e.hint(),
@@ -739,6 +744,7 @@ impl fmt::Display for PlanError {
739744
Self::KafkaSinkPurification(e) => write!(f, "KAFKA sink validation: {}", e),
740745
Self::CsrPurification(e) => write!(f, "CONFLUENT SCHEMA REGISTRY validation: {}", e),
741746
Self::MySqlSourcePurification(e) => write!(f, "MYSQL source validation: {}", e),
747+
Self::SqlServerSourcePurificationError(e) => write!(f, "SQL SERVER source validation: {}", e),
742748
Self::UseTablesForSources(command) => write!(f, "{command} not supported; use CREATE TABLE .. FROM SOURCE instead"),
743749
Self::MangedReplicaName(name) => {
744750
write!(f, "{name} is reserved for replicas of managed clusters")
@@ -940,6 +946,12 @@ impl From<MySqlSourcePurificationError> for PlanError {
940946
}
941947
}
942948

949+
impl From<SqlServerSourcePurificationError> for PlanError {
950+
fn from(e: SqlServerSourcePurificationError) -> Self {
951+
PlanError::SqlServerSourcePurificationError(e)
952+
}
953+
}
954+
943955
impl From<IdentError> for PlanError {
944956
fn from(e: IdentError) -> Self {
945957
PlanError::InvalidIdent(e)

src/sql/src/plan/statement/ddl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ generate_extracted_config!(
532532
(ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
533533
);
534534

535-
generate_extracted_config!(SqlServerConfigOption, (CaptureInstance, String));
535+
generate_extracted_config!(SqlServerConfigOption, (Details, String));
536536

537537
pub fn plan_create_webhook_source(
538538
scx: &StatementContext,

0 commit comments

Comments
 (0)