@@ -210,20 +210,36 @@ def test_map(self):
210210 assert_that (out , equal_to ([('alice' , {'apples' : 2 , 'bananas' : 3 })]))
211211
212212 def test_sql_transform_schema_parameter (self ):
213- # Define the schema using the actual SqlTransformSchema
214- from apache_beam .transforms .sql import SqlTransformSchema
215- schema = SqlTransformSchema (
216- query = """SELECT
217- CAST(2 AS INT) AS `id`,
218- CAST('bar' AS VARCHAR) AS `str`,
219- CAST(1.618 AS DOUBLE) AS `flt`""" ,
220- dialect = None ,
221- ddl = None )
222-
213+ # Test DDL
223214 with TestPipeline () as p :
224- out = p | SqlTransform (sql_transform_schema = schema )
215+ input_data = [
216+ beam .Row (id = 1 , name = 'Alice' ),
217+ beam .Row (id = 2 , name = 'Bob' ),
218+ beam .Row (id = 3 , name = 'Charlie' )
219+ ]
220+ ddl_statement_pcoll = """
221+ CREATE EXTERNAL TABLE PCOLLECTION (
222+ id INT,
223+ name VARCHAR
224+ )
225+ TYPE 'beam'
226+ LOCATION 'pcollection'
227+ """
228+ query_statement_pcoll = "SELECT id, name FROM PCOLLECTION WHERE id > 1"
229+
230+ schema = typing .NamedTuple (
231+ 'SqlTransformSchema' , [('query' , query_statement_pcoll ),
232+ ('ddl' , ddl_statement_pcoll )])
233+
234+ out = (
235+ p | beam .Create (input_data )
236+ | SqlTransform (sql_transform_schema = schema ))
237+
225238 # Verify the output matches the query defined in the schema
226- assert_that (out , equal_to ([(2 , "bar" , 1.618 )]))
239+ assert_that (
240+ out ,
241+ equal_to ([beam .Row (id = 2 , name = 'Bob' ), beam .Row (id = 3 ,
242+ name = 'Charlie' )]))
227243
228244 def test_sql_transform_schema_parameter_with_warning (self ):
229245 from apache_beam .transforms .sql import SqlTransformSchema
0 commit comments