Skip to content

Commit 709039c

Browse files
committed
Snowflake: CREATE DYNAMIC TABLE
1 parent 60a5c8d commit 709039c

File tree

10 files changed

+750
-72
lines changed

10 files changed

+750
-72
lines changed

src/ast/dml.rs

Lines changed: 495 additions & 0 deletions
Large diffs are not rendered by default.

src/ast/helpers/stmt_create_table.rs

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub struct CreateTableBuilder {
7272
pub transient: bool,
7373
pub volatile: bool,
7474
pub iceberg: bool,
75+
pub dynamic: bool,
7576
pub name: ObjectName,
7677
pub columns: Vec<ColumnDef>,
7778
pub constraints: Vec<TableConstraint>,
@@ -83,6 +84,7 @@ pub struct CreateTableBuilder {
8384
pub without_rowid: bool,
8485
pub like: Option<ObjectName>,
8586
pub clone: Option<ObjectName>,
87+
pub version: Option<TableVersion>,
8688
pub comment: Option<CommentDef>,
8789
pub on_commit: Option<OnCommit>,
8890
pub on_cluster: Option<Ident>,
@@ -108,6 +110,11 @@ pub struct CreateTableBuilder {
108110
pub catalog_sync: Option<String>,
109111
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
110112
pub table_options: CreateTableOptions,
113+
pub target_lag: Option<String>,
114+
pub warehouse: Option<Ident>,
115+
pub refresh_mode: Option<RefreshModeKind>,
116+
pub initialize: Option<InitializeKind>,
117+
pub require_user: bool,
111118
}
112119

113120
impl CreateTableBuilder {
@@ -121,6 +128,7 @@ impl CreateTableBuilder {
121128
transient: false,
122129
volatile: false,
123130
iceberg: false,
131+
dynamic: false,
124132
name,
125133
columns: vec![],
126134
constraints: vec![],
@@ -132,6 +140,7 @@ impl CreateTableBuilder {
132140
without_rowid: false,
133141
like: None,
134142
clone: None,
143+
version: None,
135144
comment: None,
136145
on_commit: None,
137146
on_cluster: None,
@@ -157,6 +166,11 @@ impl CreateTableBuilder {
157166
catalog_sync: None,
158167
storage_serialization_policy: None,
159168
table_options: CreateTableOptions::None,
169+
target_lag: None,
170+
warehouse: None,
171+
refresh_mode: None,
172+
initialize: None,
173+
require_user: false,
160174
}
161175
}
162176
pub fn or_replace(mut self, or_replace: bool) -> Self {
@@ -199,6 +213,11 @@ impl CreateTableBuilder {
199213
self
200214
}
201215

216+
pub fn dynamic(mut self, dynamic: bool) -> Self {
217+
self.dynamic = dynamic;
218+
self
219+
}
220+
202221
pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
203222
self.columns = columns;
204223
self
@@ -248,6 +267,11 @@ impl CreateTableBuilder {
248267
self
249268
}
250269

270+
pub fn version(mut self, version: Option<TableVersion>) -> Self {
271+
self.version = version;
272+
self
273+
}
274+
251275
pub fn comment_after_column_def(mut self, comment: Option<CommentDef>) -> Self {
252276
self.comment = comment;
253277
self
@@ -382,24 +406,29 @@ impl CreateTableBuilder {
382406
self
383407
}
384408

385-
/// Returns true if the statement has exactly one source of info on the schema of the new table.
386-
/// This is Snowflake-specific, some dialects allow more than one source.
387-
pub(crate) fn validate_schema_info(&self) -> bool {
388-
let mut sources = 0;
389-
if !self.columns.is_empty() {
390-
sources += 1;
391-
}
392-
if self.query.is_some() {
393-
sources += 1;
394-
}
395-
if self.like.is_some() {
396-
sources += 1;
397-
}
398-
if self.clone.is_some() {
399-
sources += 1;
400-
}
409+
pub fn target_lag(mut self, target_lag: Option<String>) -> Self {
410+
self.target_lag = target_lag;
411+
self
412+
}
413+
414+
pub fn warehouse(mut self, warehouse: Option<Ident>) -> Self {
415+
self.warehouse = warehouse;
416+
self
417+
}
401418

402-
sources == 1
419+
pub fn refresh_mode(mut self, refresh_mode: Option<RefreshModeKind>) -> Self {
420+
self.refresh_mode = refresh_mode;
421+
self
422+
}
423+
424+
pub fn initialize(mut self, initialize: Option<InitializeKind>) -> Self {
425+
self.initialize = initialize;
426+
self
427+
}
428+
429+
pub fn require_user(mut self, require_user: bool) -> Self {
430+
self.require_user = require_user;
431+
self
403432
}
404433

405434
pub fn build(self) -> Statement {
@@ -412,6 +441,7 @@ impl CreateTableBuilder {
412441
transient: self.transient,
413442
volatile: self.volatile,
414443
iceberg: self.iceberg,
444+
dynamic: self.dynamic,
415445
name: self.name,
416446
columns: self.columns,
417447
constraints: self.constraints,
@@ -423,6 +453,7 @@ impl CreateTableBuilder {
423453
without_rowid: self.without_rowid,
424454
like: self.like,
425455
clone: self.clone,
456+
version: self.version,
426457
comment: self.comment,
427458
on_commit: self.on_commit,
428459
on_cluster: self.on_cluster,
@@ -448,6 +479,11 @@ impl CreateTableBuilder {
448479
catalog_sync: self.catalog_sync,
449480
storage_serialization_policy: self.storage_serialization_policy,
450481
table_options: self.table_options,
482+
target_lag: self.target_lag,
483+
warehouse: self.warehouse,
484+
refresh_mode: self.refresh_mode,
485+
initialize: self.initialize,
486+
require_user: self.require_user,
451487
})
452488
}
453489
}
@@ -468,6 +504,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
468504
transient,
469505
volatile,
470506
iceberg,
507+
dynamic,
471508
name,
472509
columns,
473510
constraints,
@@ -479,6 +516,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
479516
without_rowid,
480517
like,
481518
clone,
519+
version,
482520
comment,
483521
on_commit,
484522
on_cluster,
@@ -504,13 +542,19 @@ impl TryFrom<Statement> for CreateTableBuilder {
504542
catalog_sync,
505543
storage_serialization_policy,
506544
table_options,
545+
target_lag,
546+
warehouse,
547+
refresh_mode,
548+
initialize,
549+
require_user,
507550
}) => Ok(Self {
508551
or_replace,
509552
temporary,
510553
external,
511554
global,
512555
if_not_exists,
513556
transient,
557+
dynamic,
514558
name,
515559
columns,
516560
constraints,
@@ -522,6 +566,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
522566
without_rowid,
523567
like,
524568
clone,
569+
version,
525570
comment,
526571
on_commit,
527572
on_cluster,
@@ -549,6 +594,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
549594
catalog_sync,
550595
storage_serialization_policy,
551596
table_options,
597+
target_lag,
598+
warehouse,
599+
refresh_mode,
600+
initialize,
601+
require_user,
552602
}),
553603
_ => Err(ParserError::ParserError(format!(
554604
"Expected create table statement, but received: {stmt}"

src/ast/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10465,6 +10465,48 @@ impl fmt::Display for CreateUser {
1046510465
}
1046610466
}
1046710467

10468+
/// Specifies the refresh mode for the dynamic table.
10469+
///
10470+
/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table)
10471+
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
10472+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10473+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
10474+
pub enum RefreshModeKind {
10475+
Auto,
10476+
Full,
10477+
Incremental,
10478+
}
10479+
10480+
impl fmt::Display for RefreshModeKind {
10481+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
10482+
match self {
10483+
RefreshModeKind::Auto => write!(f, "AUTO"),
10484+
RefreshModeKind::Full => write!(f, "FULL"),
10485+
RefreshModeKind::Incremental => write!(f, "INCREMENTAL"),
10486+
}
10487+
}
10488+
}
10489+
10490+
/// Specifies the behavior of the initial refresh of the dynamic table.
10491+
///
10492+
/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table)
10493+
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
10494+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10495+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
10496+
pub enum InitializeKind {
10497+
OnCreate,
10498+
OnSchedule,
10499+
}
10500+
10501+
impl fmt::Display for InitializeKind {
10502+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
10503+
match self {
10504+
InitializeKind::OnCreate => write!(f, "ON_CREATE"),
10505+
InitializeKind::OnSchedule => write!(f, "ON_SCHEDULE"),
10506+
}
10507+
}
10508+
}
10509+
1046810510
#[cfg(test)]
1046910511
mod tests {
1047010512
use crate::tokenizer::Location;

src/ast/spans.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ impl Spanned for CreateTable {
579579
temporary: _, // bool
580580
external: _, // bool
581581
global: _, // bool
582+
dynamic: _, // bool
582583
if_not_exists: _, // bool
583584
transient: _, // bool
584585
volatile: _, // bool
@@ -619,6 +620,12 @@ impl Spanned for CreateTable {
619620
catalog_sync: _, // todo, Snowflake specific
620621
storage_serialization_policy: _,
621622
table_options,
623+
target_lag: _,
624+
warehouse: _,
625+
version: _,
626+
refresh_mode: _,
627+
initialize: _,
628+
require_user: _,
622629
} = self;
623630

624631
union_spans(

src/dialect/snowflake.rs

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ impl Dialect for SnowflakeDialect {
235235
_ => None,
236236
};
237237

238+
let dynamic = parser.parse_keyword(Keyword::DYNAMIC);
239+
238240
let mut temporary = false;
239241
let mut volatile = false;
240242
let mut transient = false;
@@ -259,7 +261,7 @@ impl Dialect for SnowflakeDialect {
259261
return Some(parse_create_stage(or_replace, temporary, parser));
260262
} else if parser.parse_keyword(Keyword::TABLE) {
261263
return Some(parse_create_table(
262-
or_replace, global, temporary, volatile, transient, iceberg, parser,
264+
or_replace, global, temporary, volatile, transient, iceberg, dynamic, parser,
263265
));
264266
} else if parser.parse_keyword(Keyword::DATABASE) {
265267
return Some(parse_create_database(or_replace, transient, parser));
@@ -614,13 +616,15 @@ fn parse_alter_session(parser: &mut Parser, set: bool) -> Result<Statement, Pars
614616
/// Parse snowflake create table statement.
615617
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
616618
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
619+
#[allow(clippy::too_many_arguments)]
617620
pub fn parse_create_table(
618621
or_replace: bool,
619622
global: Option<bool>,
620623
temporary: bool,
621624
volatile: bool,
622625
transient: bool,
623626
iceberg: bool,
627+
dynamic: bool,
624628
parser: &mut Parser,
625629
) -> Result<Statement, ParserError> {
626630
let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
@@ -634,6 +638,7 @@ pub fn parse_create_table(
634638
.volatile(volatile)
635639
.iceberg(iceberg)
636640
.global(global)
641+
.dynamic(dynamic)
637642
.hive_formats(Some(Default::default()));
638643

639644
// Snowflake does not enforce order of the parameters in the statement. The parser needs to
@@ -767,6 +772,49 @@ pub fn parse_create_table(
767772
Keyword::IF if parser.parse_keywords(&[Keyword::NOT, Keyword::EXISTS]) => {
768773
builder = builder.if_not_exists(true);
769774
}
775+
Keyword::TARGET_LAG => {
776+
parser.expect_token(&Token::Eq)?;
777+
let target_lag = parser.parse_literal_string()?;
778+
builder = builder.target_lag(Some(target_lag));
779+
}
780+
Keyword::WAREHOUSE => {
781+
parser.expect_token(&Token::Eq)?;
782+
let warehouse = parser.parse_identifier()?;
783+
builder = builder.warehouse(Some(warehouse));
784+
}
785+
Keyword::AT | Keyword::BEFORE => {
786+
parser.prev_token();
787+
let version = parser.maybe_parse_table_version()?;
788+
builder = builder.version(version);
789+
}
790+
Keyword::REFRESH_MODE => {
791+
parser.expect_token(&Token::Eq)?;
792+
let refresh_mode = match parser.parse_one_of_keywords(&[
793+
Keyword::AUTO,
794+
Keyword::FULL,
795+
Keyword::INCREMENTAL,
796+
]) {
797+
Some(Keyword::AUTO) => Some(RefreshModeKind::Auto),
798+
Some(Keyword::FULL) => Some(RefreshModeKind::Full),
799+
Some(Keyword::INCREMENTAL) => Some(RefreshModeKind::Incremental),
800+
_ => return parser.expected("AUTO, FULL or INCREMENTAL", next_token),
801+
};
802+
builder = builder.refresh_mode(refresh_mode);
803+
}
804+
Keyword::INITIALIZE => {
805+
parser.expect_token(&Token::Eq)?;
806+
let initialize = match parser
807+
.parse_one_of_keywords(&[Keyword::ON_CREATE, Keyword::ON_SCHEDULE])
808+
{
809+
Some(Keyword::ON_CREATE) => Some(InitializeKind::OnCreate),
810+
Some(Keyword::ON_SCHEDULE) => Some(InitializeKind::OnSchedule),
811+
_ => return parser.expected("ON_CREATE or ON_SCHEDULE", next_token),
812+
};
813+
builder = builder.initialize(initialize);
814+
}
815+
Keyword::REQUIRE if parser.parse_keyword(Keyword::USER) => {
816+
builder = builder.require_user(true);
817+
}
770818
_ => {
771819
return parser.expected("end of statement", next_token);
772820
}
@@ -777,21 +825,9 @@ pub fn parse_create_table(
777825
builder = builder.columns(columns).constraints(constraints);
778826
}
779827
Token::EOF => {
780-
if !builder.validate_schema_info() {
781-
return Err(ParserError::ParserError(
782-
"unexpected end of input".to_string(),
783-
));
784-
}
785-
786828
break;
787829
}
788830
Token::SemiColon => {
789-
if !builder.validate_schema_info() {
790-
return Err(ParserError::ParserError(
791-
"unexpected end of input".to_string(),
792-
));
793-
}
794-
795831
parser.prev_token();
796832
break;
797833
}

0 commit comments

Comments
 (0)