Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ExternalSqlTransformRegistrar implements ExternalTransformRegistrar
public static class Configuration {
String query = "";
@Nullable String dialect;
@Nullable String ddl;

public void setQuery(String query) {
this.query = query;
Expand All @@ -58,6 +59,10 @@ public void setQuery(String query) {
public void setDialect(@Nullable String dialect) {
this.dialect = dialect;
}

public void setDdl(@Nullable String ddl) {
this.ddl = ddl;
}
}

private static class Builder
Expand All @@ -76,6 +81,10 @@ public PTransform<PInput, PCollection<Row>> buildExternal(Configuration configur
}
transform = transform.withQueryPlannerClass(queryPlanner);
}
// Add any DDL string
if (configuration.ddl != null) {
transform = transform.withDdlString(configuration.ddl);
}
return transform;
}
}
Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/transforms/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
__all__ = ['SqlTransform']

SqlTransformSchema = typing.NamedTuple(
'SqlTransformSchema', [('query', str), ('dialect', typing.Optional[str])])
'SqlTransformSchema',
[('query', str), ('dialect', typing.Optional[str]),
('ddl', typing.Optional[str])])


class SqlTransform(ExternalTransform):
Expand Down Expand Up @@ -75,18 +77,19 @@ class SqlTransform(ExternalTransform):
"""
URN = 'beam:external:java:sql:v1'

def __init__(self, query, dialect=None, expansion_service=None):
def __init__(self, query, dialect=None, ddl=None, expansion_service=None):
"""
Creates a SqlTransform which will be expanded to Java's SqlTransform.
(See class docs).
:param query: The SQL query.
:param dialect: (optional) The dialect, e.g. use 'zetasql' for ZetaSQL.
:param ddl: (optional) The DDL statement.
:param expansion_service: (optional) The URL of the expansion service to use
"""
expansion_service = expansion_service or BeamJarExpansionService(
':sdks:java:extensions:sql:expansion-service:shadowJar')
super().__init__(
self.URN,
NamedTupleBasedPayloadBuilder(
SqlTransformSchema(query=query, dialect=dialect)),
SqlTransformSchema(query=query, dialect=dialect, ddl=ddl)),
expansion_service=expansion_service)
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/transforms/sql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,30 @@ def test_map(self):
| SqlTransform("SELECT * FROM PCOLLECTION WHERE shopper = 'alice'"))
assert_that(out, equal_to([('alice', {'apples': 2, 'bananas': 3})]))

def test_sql_ddl_set_option(self):
with TestPipeline() as p:
input_data = [
beam.Row(id=1, value=10),
beam.Row(id=2, value=20),
beam.Row(id=3, value=30)
]
# DDL uses SET to modify a session option (tests DDL parsing)
# Using a known Calcite option like sqlConformance
ddl_statement = """
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more like a dumpy test. Not very familiar with the DDL support under Beam. Use this dumpy one for now.

SET sqlConformance = 'LENIENT'
"""
# Query still operates on the implicit PCOLLECTION
query_statement = "SELECT * FROM PCOLLECTION WHERE id > 2"

# Input PCollection is piped directly
out = (
p | beam.Create(input_data)
# Pass both the query and the DDL
| SqlTransform(query=query_statement, ddl=ddl_statement))

# Verify the output matches the query (unaffected by the SET DDL)
assert_that(out, equal_to([(3, 30)]))


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading