Skip to content

Commit 7086ac7

Browse files
committed
Add syntax for DDL for arroyo dialect
1 parent 8c1c36b commit 7086ac7

File tree

11 files changed

+194
-10
lines changed

11 files changed

+194
-10
lines changed

src/ast/ddl.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1759,7 +1759,9 @@ impl fmt::Display for ColumnOption {
17591759
Collation(n) => write!(f, "COLLATE {n}"),
17601760
Comment(v) => write!(f, "COMMENT '{}'", escape_single_quote_string(v)),
17611761
OnUpdate(expr) => write!(f, "ON UPDATE {expr}"),
1762-
MetadataField(key, _) => write!(f, "METADATA FROM '{}'", escape_single_quote_string(key)),
1762+
MetadataField(key, _) => {
1763+
write!(f, "METADATA FROM '{}'", escape_single_quote_string(key))
1764+
}
17631765
Generated {
17641766
generated_as,
17651767
sequence_options,

src/ast/dml.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ pub struct CreateTable {
209209
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
210210
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
211211
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
212+
/// Arroyo-specific: Iceberg partition transforms
213+
/// Syntax: PARTITIONED BY (hour(ts), bucket(32, id), truncate(8, color))
214+
/// <https://iceberg.apache.org/spec/#partitioning>
215+
pub arroyo_partitions: Option<Vec<Expr>>,
212216
}
213217

214218
impl Display for CreateTable {
@@ -392,6 +396,13 @@ impl Display for CreateTable {
392396
if let Some(cluster_by) = self.cluster_by.as_ref() {
393397
write!(f, " CLUSTER BY {cluster_by}")?;
394398
}
399+
if let Some(arroyo_partitions) = &self.arroyo_partitions {
400+
write!(
401+
f,
402+
" PARTITIONED BY ({})",
403+
display_comma_separated(arroyo_partitions)
404+
)?;
405+
}
395406

396407
if let Some(options) = self.options.as_ref() {
397408
write!(

src/ast/helpers/stmt_create_table.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pub struct CreateTableBuilder {
112112
pub catalog: Option<String>,
113113
pub catalog_sync: Option<String>,
114114
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
115+
pub arroyo_partitions: Option<Vec<Expr>>,
115116
}
116117

117118
impl CreateTableBuilder {
@@ -166,6 +167,7 @@ impl CreateTableBuilder {
166167
catalog: None,
167168
catalog_sync: None,
168169
storage_serialization_policy: None,
170+
arroyo_partitions: None,
169171
}
170172
}
171173
pub fn or_replace(mut self, or_replace: bool) -> Self {
@@ -415,6 +417,11 @@ impl CreateTableBuilder {
415417
self
416418
}
417419

420+
pub fn arroyo_partitions(mut self, arroyo_partitions: Option<Vec<Expr>>) -> Self {
421+
self.arroyo_partitions = arroyo_partitions;
422+
self
423+
}
424+
418425
pub fn build(self) -> Statement {
419426
Statement::CreateTable(CreateTable {
420427
or_replace: self.or_replace,
@@ -466,6 +473,7 @@ impl CreateTableBuilder {
466473
catalog: self.catalog,
467474
catalog_sync: self.catalog_sync,
468475
storage_serialization_policy: self.storage_serialization_policy,
476+
arroyo_partitions: self.arroyo_partitions,
469477
})
470478
}
471479
}
@@ -527,6 +535,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
527535
catalog,
528536
catalog_sync,
529537
storage_serialization_policy,
538+
arroyo_partitions,
530539
}) => Ok(Self {
531540
or_replace,
532541
temporary,
@@ -577,6 +586,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
577586
catalog,
578587
catalog_sync,
579588
storage_serialization_policy,
589+
arroyo_partitions,
580590
}),
581591
_ => Err(ParserError::ParserError(format!(
582592
"Expected create table statement, but received: {stmt}"

src/ast/spans.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ impl Spanned for CreateTable {
585585
catalog: _, // todo, Snowflake specific
586586
catalog_sync: _, // todo, Snowflake specific
587587
storage_serialization_policy: _, // todo, Snowflake specific
588+
arroyo_partitions: _, // todo, Arroyo specific
588589
} = self;
589590

590591
union_spans(
@@ -695,9 +696,15 @@ impl Spanned for TableConstraint {
695696
.map(|i| i.span)
696697
.chain(columns.iter().map(|i| i.span)),
697698
),
698-
TableConstraint::Watermark { column_name, watermark_expr } => {
699-
union_spans(watermark_expr.iter().map(|e| e.span()).chain(core::iter::once(column_name.span)))
700-
}
699+
TableConstraint::Watermark {
700+
column_name,
701+
watermark_expr,
702+
} => union_spans(
703+
watermark_expr
704+
.iter()
705+
.map(|e| e.span())
706+
.chain(core::iter::once(column_name.span)),
707+
),
701708
}
702709
}
703710
}

src/dialect/arroyo.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ impl Dialect for ArroyoDialect {
122122
}
123123
}
124124

125-
126125
fn supports_filter_during_aggregation(&self) -> bool {
127126
true
128127
}
@@ -171,7 +170,6 @@ impl Dialect for ArroyoDialect {
171170
true
172171
}
173172

174-
175173
/// Return true if the dialect supports empty projections in SELECT statements
176174
///
177175
/// Example

src/parser/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6785,6 +6785,18 @@ impl<'a> Parser<'a> {
67856785
None
67866786
};
67876787

6788+
// Parse Arroyo-specific PARTITIONED BY for Iceberg
6789+
let arroyo_partitions = if dialect_of!(self is ArroyoDialect | GenericDialect)
6790+
&& self.parse_keywords(&[Keyword::PARTITIONED, Keyword::BY])
6791+
{
6792+
self.expect_token(&Token::LParen)?;
6793+
let partitions = self.parse_comma_separated(Parser::parse_expr)?;
6794+
self.expect_token(&Token::RParen)?;
6795+
Some(partitions)
6796+
} else {
6797+
None
6798+
};
6799+
67886800
let create_table_config = self.parse_optional_create_table_config()?;
67896801

67906802
let default_charset = if self.parse_keywords(&[Keyword::DEFAULT, Keyword::CHARSET]) {
@@ -6866,6 +6878,7 @@ impl<'a> Parser<'a> {
68666878
.options(create_table_config.options)
68676879
.primary_key(primary_key)
68686880
.strict(strict)
6881+
.arroyo_partitions(arroyo_partitions)
68696882
.build())
68706883
}
68716884

src/tokenizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use serde::{Deserialize, Serialize};
4040
#[cfg(feature = "visitor")]
4141
use sqlparser_derive::{Visit, VisitMut};
4242

43-
use crate::dialect::{Dialect};
43+
use crate::dialect::Dialect;
4444
use crate::dialect::{
4545
BigQueryDialect, DuckDbDialect, GenericDialect, MySqlDialect, PostgreSqlDialect,
4646
SnowflakeDialect,

tests/sqlparser_arroyo.rs

Lines changed: 142 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ fn test_watermark_with_expr() {
4646
watermark_expr: Some(Expr::BinaryOp {
4747
left: Box::new(Expr::Identifier(Ident::new("timestamp"))),
4848
op: BinaryOperator::Plus,
49-
right: Box::new(Expr::Value(test_utils::number("5").with_span(Span::new(
50-
Location::new(5, 4), Location::new(5, 10)
51-
)))),
49+
right: Box::new(Expr::Value(
50+
test_utils::number("5")
51+
.with_span(Span::new(Location::new(5, 4), Location::new(5, 10)))
52+
)),
5253
}),
5354
}]
5455
);
@@ -115,3 +116,141 @@ fn test_metadata_field() {
115116
"Expected METADATA FROM option in column definition"
116117
);
117118
}
119+
120+
#[test]
121+
fn test_iceberg_partitioned_by() {
122+
let sql = "CREATE TABLE ice (
123+
ts TIMESTAMP NOT NULL,
124+
id INT NOT NULL,
125+
favorite_color TEXT
126+
) WITH (
127+
connector = 'iceberg',
128+
format = 'parquet',
129+
table_name = 'arroyo_test'
130+
) PARTITIONED BY (
131+
hour(ts),
132+
bucket(32, id),
133+
truncate(8, favorite_color)
134+
)";
135+
136+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
137+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
138+
panic!("not create table")
139+
};
140+
141+
// Verify basic structure
142+
assert_eq!(ct.name.to_string(), "ice");
143+
assert_eq!(ct.columns.len(), 3);
144+
145+
// Verify arroyo_partitions is present
146+
let partitions = ct
147+
.arroyo_partitions
148+
.as_ref()
149+
.expect("Expected arroyo_partitions to be Some");
150+
assert_eq!(partitions.len(), 3);
151+
152+
// Check each partition transform
153+
// hour(ts)
154+
match &partitions[0] {
155+
Expr::Function(f) => {
156+
assert_eq!(f.name.to_string(), "hour");
157+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
158+
assert_eq!(list.args.len(), 1);
159+
} else {
160+
panic!("Expected List arguments");
161+
}
162+
}
163+
_ => panic!("Expected Function for hour(ts)"),
164+
}
165+
166+
// bucket(32, id)
167+
match &partitions[1] {
168+
Expr::Function(f) => {
169+
assert_eq!(f.name.to_string(), "bucket");
170+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
171+
assert_eq!(list.args.len(), 2);
172+
} else {
173+
panic!("Expected List arguments");
174+
}
175+
}
176+
_ => panic!("Expected Function for bucket(32, id)"),
177+
}
178+
179+
// truncate(8, favorite_color)
180+
match &partitions[2] {
181+
Expr::Function(f) => {
182+
assert_eq!(f.name.to_string(), "truncate");
183+
if let sqlparser::ast::FunctionArguments::List(list) = &f.args {
184+
assert_eq!(list.args.len(), 2);
185+
} else {
186+
panic!("Expected List arguments");
187+
}
188+
}
189+
_ => panic!("Expected Function for truncate(8, favorite_color)"),
190+
}
191+
192+
// Test round-trip: the formatted output should parse back to the same structure
193+
let formatted = ct.to_string();
194+
let reparsed = Parser::parse_sql(&ArroyoDialect {}, &formatted).unwrap();
195+
let Statement::CreateTable(ct2) = reparsed.get(0).unwrap() else {
196+
panic!("not create table on reparse")
197+
};
198+
199+
assert_eq!(ct.arroyo_partitions, ct2.arroyo_partitions);
200+
}
201+
202+
#[test]
203+
fn test_iceberg_partitioned_by_single() {
204+
let sql = "CREATE TABLE events (
205+
event_time TIMESTAMP
206+
) WITH (
207+
connector = 'iceberg'
208+
) PARTITIONED BY (day(event_time))";
209+
210+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
211+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
212+
panic!("not create table")
213+
};
214+
215+
let partitions = ct
216+
.arroyo_partitions
217+
.as_ref()
218+
.expect("Expected arroyo_partitions");
219+
assert_eq!(partitions.len(), 1);
220+
221+
match &partitions[0] {
222+
Expr::Function(f) => {
223+
assert_eq!(f.name.to_string(), "day");
224+
}
225+
_ => panic!("Expected Function for day(event_time)"),
226+
}
227+
}
228+
229+
#[test]
230+
fn test_iceberg_partitioned_by_identity() {
231+
// Test identity transform (just a column name)
232+
let sql = "CREATE TABLE data (
233+
region TEXT,
234+
value INT
235+
) WITH (
236+
connector = 'iceberg'
237+
) PARTITIONED BY (region)";
238+
239+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
240+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
241+
panic!("not create table")
242+
};
243+
244+
let partitions = ct
245+
.arroyo_partitions
246+
.as_ref()
247+
.expect("Expected arroyo_partitions");
248+
assert_eq!(partitions.len(), 1);
249+
250+
match &partitions[0] {
251+
Expr::Identifier(ident) => {
252+
assert_eq!(ident.value, "region");
253+
}
254+
_ => panic!("Expected Identifier for region"),
255+
}
256+
}

tests/sqlparser_duckdb.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,7 @@ fn test_duckdb_union_datatype() {
745745
catalog: Default::default(),
746746
catalog_sync: Default::default(),
747747
storage_serialization_policy: Default::default(),
748+
arroyo_partitions: Default::default(),
748749
}),
749750
stmt
750751
);

tests/sqlparser_mssql.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,7 @@ fn parse_create_table_with_valid_options() {
16151615
catalog: None,
16161616
catalog_sync: None,
16171617
storage_serialization_policy: None,
1618+
arroyo_partitions: None,
16181619
})
16191620
);
16201621
}
@@ -1768,6 +1769,7 @@ fn parse_create_table_with_identity_column() {
17681769
catalog: None,
17691770
catalog_sync: None,
17701771
storage_serialization_policy: None,
1772+
arroyo_partitions: None,
17711773
}),
17721774
);
17731775
}

0 commit comments

Comments
 (0)