|
12 | 12 | # Unless required by applicable law or agreed to in writing, software |
13 | 13 | # distributed under the License is distributed on an "AS IS" BASIS, |
14 | 14 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | | -# See the License for the specific langutage governing permissions and |
| 15 | +# See the License for the specific language governing permissions and |
16 | 16 | # limitations under the License. |
17 | 17 | # |
18 | 18 |
|
@@ -116,214 +116,3 @@ def enrichment_with_vertex_ai_legacy(): |
116 | 116 | | "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler) |
117 | 117 | | "Print" >> beam.Map(print)) |
118 | 118 | # [END enrichment_with_vertex_ai_legacy] |
119 | | - |
120 | | - |
121 | | -def enrichment_with_google_cloudsql_pg(): |
122 | | - # [START enrichment_with_google_cloudsql_pg] |
123 | | - import apache_beam as beam |
124 | | - from apache_beam.transforms.enrichment import Enrichment |
125 | | - from apache_beam.transforms.enrichment_handlers.cloudsql import ( |
126 | | - CloudSQLEnrichmentHandler, |
127 | | - DatabaseTypeAdapter, |
128 | | - TableFieldsQueryConfig, |
129 | | - CloudSQLConnectionConfig) |
130 | | - import os |
131 | | - |
132 | | - database_adapter = DatabaseTypeAdapter.POSTGRESQL |
133 | | - database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI") |
134 | | - database_user = int(os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")) |
135 | | - database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD") |
136 | | - database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID") |
137 | | - table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID") |
138 | | - where_clause_template = "product_id = {}" |
139 | | - where_clause_fields = ["product_id"] |
140 | | - |
141 | | - data = [ |
142 | | - beam.Row(product_id=1, name='A'), |
143 | | - beam.Row(product_id=2, name='B'), |
144 | | - beam.Row(product_id=3, name='C'), |
145 | | - ] |
146 | | - |
147 | | - connection_config = CloudSQLConnectionConfig( |
148 | | - db_adapter=database_adapter, |
149 | | - instance_connection_uri=database_uri, |
150 | | - user=database_user, |
151 | | - password=database_password, |
152 | | - db_id=database_id) |
153 | | - |
154 | | - query_config = TableFieldsQueryConfig( |
155 | | - table_id=table_id, |
156 | | - where_clause_template=where_clause_template, |
157 | | - where_clause_fields=where_clause_fields) |
158 | | - |
159 | | - cloudsql_handler = CloudSQLEnrichmentHandler( |
160 | | - connection_config=connection_config, |
161 | | - table_id=table_id, |
162 | | - query_config=query_config) |
163 | | - with beam.Pipeline() as p: |
164 | | - _ = ( |
165 | | - p |
166 | | - | "Create" >> beam.Create(data) |
167 | | - | |
168 | | - "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler) |
169 | | - | "Print" >> beam.Map(print)) |
170 | | - # [END enrichment_with_google_cloudsql_pg] |
171 | | - |
172 | | - |
173 | | -def enrichment_with_external_pg(): |
174 | | - # [START enrichment_with_external_pg] |
175 | | - import apache_beam as beam |
176 | | - from apache_beam.transforms.enrichment import Enrichment |
177 | | - from apache_beam.transforms.enrichment_handlers.cloudsql import ( |
178 | | - CloudSQLEnrichmentHandler, |
179 | | - DatabaseTypeAdapter, |
180 | | - TableFieldsQueryConfig, |
181 | | - ExternalSQLDBConnectionConfig) |
182 | | - import os |
183 | | - |
184 | | - database_adapter = DatabaseTypeAdapter.POSTGRESQL |
185 | | - database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") |
186 | | - database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) |
187 | | - database_user = os.environ.get("EXTERNAL_SQL_DB_USER") |
188 | | - database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") |
189 | | - database_id = os.environ.get("EXTERNAL_SQL_DB_ID") |
190 | | - table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") |
191 | | - where_clause_template = "product_id = {}" |
192 | | - where_clause_fields = ["product_id"] |
193 | | - |
194 | | - data = [ |
195 | | - beam.Row(product_id=1, name='A'), |
196 | | - beam.Row(product_id=2, name='B'), |
197 | | - beam.Row(product_id=3, name='C'), |
198 | | - ] |
199 | | - |
200 | | - connection_config = ExternalSQLDBConnectionConfig( |
201 | | - db_adapter=database_adapter, |
202 | | - host=database_host, |
203 | | - port=database_port, |
204 | | - user=database_user, |
205 | | - password=database_password, |
206 | | - db_id=database_id) |
207 | | - |
208 | | - query_config = TableFieldsQueryConfig( |
209 | | - table_id=table_id, |
210 | | - where_clause_template=where_clause_template, |
211 | | - where_clause_fields=where_clause_fields) |
212 | | - |
213 | | - cloudsql_handler = CloudSQLEnrichmentHandler( |
214 | | - connection_config=connection_config, |
215 | | - table_id=table_id, |
216 | | - query_config=query_config) |
217 | | - with beam.Pipeline() as p: |
218 | | - _ = ( |
219 | | - p |
220 | | - | "Create" >> beam.Create(data) |
221 | | - | "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler) |
222 | | - | "Print" >> beam.Map(print)) |
223 | | - # [END enrichment_with_external_pg] |
224 | | - |
225 | | - |
226 | | -def enrichment_with_external_mysql(): |
227 | | - # [START enrichment_with_external_mysql] |
228 | | - import apache_beam as beam |
229 | | - from apache_beam.transforms.enrichment import Enrichment |
230 | | - from apache_beam.transforms.enrichment_handlers.cloudsql import ( |
231 | | - CloudSQLEnrichmentHandler, |
232 | | - DatabaseTypeAdapter, |
233 | | - TableFieldsQueryConfig, |
234 | | - ExternalSQLDBConnectionConfig) |
235 | | - import os |
236 | | - |
237 | | - database_adapter = DatabaseTypeAdapter.MYSQL |
238 | | - database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") |
239 | | - database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) |
240 | | - database_user = os.environ.get("EXTERNAL_SQL_DB_USER") |
241 | | - database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") |
242 | | - database_id = os.environ.get("EXTERNAL_SQL_DB_ID") |
243 | | - table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") |
244 | | - where_clause_template = "product_id = {}" |
245 | | - where_clause_fields = ["product_id"] |
246 | | - |
247 | | - data = [ |
248 | | - beam.Row(product_id=1, name='A'), |
249 | | - beam.Row(product_id=2, name='B'), |
250 | | - beam.Row(product_id=3, name='C'), |
251 | | - ] |
252 | | - |
253 | | - connection_config = ExternalSQLDBConnectionConfig( |
254 | | - db_adapter=database_adapter, |
255 | | - host=database_host, |
256 | | - port=database_port, |
257 | | - user=database_user, |
258 | | - password=database_password, |
259 | | - db_id=database_id) |
260 | | - |
261 | | - query_config = TableFieldsQueryConfig( |
262 | | - table_id=table_id, |
263 | | - where_clause_template=where_clause_template, |
264 | | - where_clause_fields=where_clause_fields) |
265 | | - |
266 | | - cloudsql_handler = CloudSQLEnrichmentHandler( |
267 | | - connection_config=connection_config, |
268 | | - table_id=table_id, |
269 | | - query_config=query_config) |
270 | | - with beam.Pipeline() as p: |
271 | | - _ = ( |
272 | | - p |
273 | | - | "Create" >> beam.Create(data) |
274 | | - | "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler) |
275 | | - | "Print" >> beam.Map(print)) |
276 | | - # [END enrichment_with_external_mysql] |
277 | | - |
278 | | - |
279 | | -def enrichment_with_external_sqlserver(): |
280 | | - # [START enrichment_with_external_sqlserver] |
281 | | - import apache_beam as beam |
282 | | - from apache_beam.transforms.enrichment import Enrichment |
283 | | - from apache_beam.transforms.enrichment_handlers.cloudsql import ( |
284 | | - CloudSQLEnrichmentHandler, |
285 | | - DatabaseTypeAdapter, |
286 | | - TableFieldsQueryConfig, |
287 | | - ExternalSQLDBConnectionConfig) |
288 | | - import os |
289 | | - |
290 | | - database_adapter = DatabaseTypeAdapter.SQLSERVER |
291 | | - database_host = os.environ.get("EXTERNAL_SQL_DB_HOST") |
292 | | - database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT")) |
293 | | - database_user = os.environ.get("EXTERNAL_SQL_DB_USER") |
294 | | - database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD") |
295 | | - database_id = os.environ.get("EXTERNAL_SQL_DB_ID") |
296 | | - table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID") |
297 | | - where_clause_template = "product_id = {}" |
298 | | - where_clause_fields = ["product_id"] |
299 | | - |
300 | | - data = [ |
301 | | - beam.Row(product_id=1, name='A'), |
302 | | - beam.Row(product_id=2, name='B'), |
303 | | - beam.Row(product_id=3, name='C'), |
304 | | - ] |
305 | | - |
306 | | - connection_config = ExternalSQLDBConnectionConfig( |
307 | | - db_adapter=database_adapter, |
308 | | - host=database_host, |
309 | | - port=database_port, |
310 | | - user=database_user, |
311 | | - password=database_password, |
312 | | - db_id=database_id) |
313 | | - |
314 | | - query_config = TableFieldsQueryConfig( |
315 | | - table_id=table_id, |
316 | | - where_clause_template=where_clause_template, |
317 | | - where_clause_fields=where_clause_fields) |
318 | | - |
319 | | - cloudsql_handler = CloudSQLEnrichmentHandler( |
320 | | - connection_config=connection_config, |
321 | | - table_id=table_id, |
322 | | - query_config=query_config) |
323 | | - with beam.Pipeline() as p: |
324 | | - _ = ( |
325 | | - p |
326 | | - | "Create" >> beam.Create(data) |
327 | | - | "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler) |
328 | | - | "Print" >> beam.Map(print)) |
329 | | - # [END enrichment_with_external_sqlserver] |
0 commit comments