1+ from __future__ import absolute_import
2+
3+ import apache_beam as beam
4+ from apache_beam .options .pipeline_options import PipelineOptions
5+ from apache_beam .io .external .generate_sequence import GenerateSequence
6+ from apache_beam .options .pipeline_options import SetupOptions
7+ from apache_beam .io .external .snowflake import ReadFromSnowflake , WriteToSnowflake
8+ import logging
9+
10+ SERVER_NAME = 'polideapartner.europe-west4.gcp.snowflakecomputing.com'
11+ USERNAME = 'PURBANOWICZ'
12+ PASSWORD = '12QWASZX34erdfcv!'
13+ SCHEMA = 'PUBLIC'
14+ DATABASE = 'TEST_PAWEL'
15+ STAGING_BUCKET_NAME = 'gcs://iot-beam-snowflake/'
16+ STORAGE_INTEGRATION = 'iot_beam_snowflake_integration'
17+ TABLE = 'WORDSxxx'
18+ SCHEMA_STRING = """
19+ {"schema":[
20+ {"dataType":{"type":"text","length":null},"name":"text_column","nullable":true},
21+ {"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false},
22+ {"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false}
23+ ]}
24+ """
25+
26+
27+ class Row (object ):
28+ def __init__ (self , text_column , number_column , boolean_column ):
29+ self .text_column = text_column
30+ self .number_column = number_column
31+ self .boolean_column = boolean_column
32+
33+ def __eq__ (self , other ):
34+ return self .text_column == other .text_column and \
35+ self .number_column == other .number_column and \
36+ self .boolean_column == other .boolean_column
37+
38+ def __str__ (self ):
39+ return self .text_column + " " + str (self .number_column ) + " " + str (self .boolean_column )
40+
41+
42+ def run_write (pipeline_options ):
43+ def user_data_mapper (test_row ):
44+ return [str (test_row .text_column ).encode ('utf-8' ),
45+ str (test_row .number_column ).encode ('utf-8' ),
46+ str (test_row .boolean_column ).encode ('utf-8' )
47+ ]
48+
49+ p = beam .Pipeline (options = pipeline_options )
50+ (p
51+ | GenerateSequence (start = 1 , stop = 3 )
52+ | beam .Map (lambda num : Row ("test" + str (num ), num , True ))
53+ | "Writing into Snowflake" >> WriteToSnowflake (
54+ server_name = SERVER_NAME ,
55+ username = USERNAME ,
56+ password = PASSWORD ,
57+ schema = SCHEMA ,
58+ database = DATABASE ,
59+ staging_bucket_name = STAGING_BUCKET_NAME ,
60+ storage_integration = STORAGE_INTEGRATION ,
61+ create_disposition = "CREATE_IF_NEEDED" ,
62+ write_disposition = "TRUNCATE" ,
63+ table_schema = SCHEMA_STRING ,
64+ user_data_mapper = user_data_mapper ,
65+ table = TABLE ,
66+ query = None )
67+ )
68+ result = p .run ()
69+ result .wait_until_finish ()
70+
71+
72+ def run_read (pipeline_options ):
73+ def csv_mapper (strings_array ):
74+ return Row (
75+ strings_array [0 ],
76+ int (strings_array [1 ]),
77+ bool (strings_array [2 ])
78+ )
79+
80+ def print_row (row ):
81+ logging .error ("HELLO" )
82+ logging .error (row )
83+ print (row )
84+
85+ p = beam .Pipeline (options = pipeline_options )
86+ (p
87+ | "Reading from Snowflake" >> ReadFromSnowflake (
88+ server_name = SERVER_NAME ,
89+ username = USERNAME ,
90+ password = PASSWORD ,
91+ schema = SCHEMA ,
92+ database = DATABASE ,
93+ staging_bucket_name = STAGING_BUCKET_NAME ,
94+ storage_integration_name = STORAGE_INTEGRATION ,
95+ csv_mapper = csv_mapper ,
96+ table = TABLE )
97+ | "Print" >> beam .Map (print_row )
98+ )
99+ result = p .run ()
100+ result .wait_until_finish ()
101+
102+
103+ def run (argv = None , save_main_session = True ):
104+
105+ # We use the save_main_session option because one or more DoFn's in this
106+ # workflow rely on global context (e.g., a module imported at module level).
107+ pipeline_options = PipelineOptions (pipeline_args )
108+ pipeline_options .view_as (SetupOptions ).save_main_session = save_main_session
109+
110+ run_write (pipeline_options )
111+ run_read (pipeline_options )
112+
113+ if __name__ == '__main__' :
114+ logging .getLogger ().setLevel (logging .INFO )
115+ run ()
0 commit comments