Skip to content

Commit 50abdfd

Browse files
committed
dataflow example
1 parent d0ca1c2 commit 50abdfd

File tree

3 files changed

+27
-159
lines changed

3 files changed

+27
-159
lines changed

README.md

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -249,34 +249,13 @@ list for currently supported runtime options.
249249
### Cross-language example
250250
An example is showing simple usage of [cross-language](https://beam.apache.org/roadmap/portability/) by writing objects into Snowflake and reading them from Snowflake.
251251
252-
Currently, cross-language is supporting only by [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
253-
For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
254-
and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
252+
For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/).
255253
256254
#### Extra setup:
257-
Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup. The specific setup for current version of snowflake is following:
258-
1. Setup a Flink cluster by following the Flink [Setup Quickstart](https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html)
259-
or [Setting up Apache Flink on Mac OS X](https://streambench.wordpress.com/2017/10/26/setting-up-apache-flink-on-mac-os-x/)
260-
2. Download Job server image:
255+
1. Follow steps form previous section `Setup required by all examples`
256+
1. Download and install Apache Beam Python SDK
261257
```
262-
docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
263-
```
264-
3. Download Apache Beam Java SDK image:
265-
```
266-
docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev
267-
```
268-
4. Change tag of downloaded Java SDK image to make the whole setup work:
269-
```
270-
docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev
271-
```
272-
5. Start Job server:
273-
```
274-
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
275-
```
276-
6. Download [Apache Beam Python SDK](https://storage.cloud.google.com/snowflake_artifacts/apachebeam_snowflake.whl?_ga=2.54472813.-471657054.1583857613).
277-
7. Install python Apache Beam Python SDK using Python 2.7
278-
```
279-
python -m pip install apachebeam_snowflake.whl
258+
pip install apache-beam
280259
```
281260
282261
#### Executing:
@@ -288,16 +267,18 @@ python -m pip install apachebeam_snowflake.whl
288267
SCHEMA = <SNOWFLAKE SCHEMA>
289268
DATABASE = <SNOWFLAKE DATABASE>
290269
STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
291-
STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME>
270+
STORAGE_INTEGRATION_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
292271
TABLE = <SNOWFLAKE TABLE NAME>
272+
273+
OPTIONS =[
274+
"--runner=DataflowRunner",
275+
"--project=<GCP PROJECT ID>",
276+
"--staging_location=gs://<BUCKET NAME>/tmp/",
277+
"--region=<REGION>",
278+
"--temp_location=gs://<BUCKET NAME>/tmp/"
279+
]
293280
```
294281
2. Run xlang_example.py:
295282
```
296283
python xlang_example.py
297284
```
298-
2. [Go to Flink console](http://localhost:8081/)
299-
![Xlang Flink result](./images/xlang_flink_result.png)
300-
3. Go to GCS bucket to check saved files:
301-
![Xlang GCS result](./images/xlang_gcs_result.png)
302-
4. Check console
303-
![Xlang console result](./images/xlang_console_result.png)

wip.py

Lines changed: 0 additions & 115 deletions
This file was deleted.

xlang_example.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from apache_beam.options.pipeline_options import PipelineOptions
33
from apache_beam.io.external.generate_sequence import GenerateSequence
44
from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake
5+
import logging
56

67
SERVER_NAME = <SNOWFLAKE SERVER NAME>
78
USERNAME = <SNOWFLAKE USERNAME>
@@ -11,7 +12,6 @@
1112
STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
1213
STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME>
1314
TABLE = <SNOWFLAKE TABLE NAME>
14-
EXPANSION_SERVICE = 'localhost:8097'
1515
SCHEMA_STRING = """
1616
{"schema":[
1717
{"dataType":{"type":"text","length":null},"name":"text_column","nullable":true},
@@ -21,10 +21,11 @@
2121
"""
2222

2323
OPTIONS =[
24-
"--runner=FlinkRunner",
25-
"--flink_version=1.10",
26-
"--flink_master=localhost:8081",
27-
"--environment_type=LOOPBACK"
24+
"--runner=DataflowRunner",
25+
"--project=<GCP PROJECT ID>",
26+
"--staging_location=gs://<BUCKET NAME>/tmp/",
27+
"--region=<REGION>",
28+
"--temp_location=gs://<BUCKET NAME>/tmp/"
2829
]
2930

3031
class Row(object):
@@ -50,7 +51,7 @@ def user_data_mapper(test_row):
5051

5152
p = beam.Pipeline(options=PipelineOptions(OPTIONS))
5253
(p
53-
| GenerateSequence(start=1, stop=3, expansion_service=EXPANSION_SERVICE)
54+
| GenerateSequence(start=1, stop=3)
5455
| beam.Map(lambda num: Row("test" + str(num), num, True))
5556
| "Writing into Snowflake" >> WriteToSnowflake(
5657
server_name=SERVER_NAME,
@@ -65,8 +66,7 @@ def user_data_mapper(test_row):
6566
table_schema=SCHEMA_STRING,
6667
user_data_mapper=user_data_mapper,
6768
table=TABLE,
68-
query=None,
69-
expansion_service=EXPANSION_SERVICE)
69+
query=None)
7070
)
7171
result = p.run()
7272
result.wait_until_finish()
@@ -91,18 +91,20 @@ def print_row(row):
9191
schema=SCHEMA,
9292
database=DATABASE,
9393
staging_bucket_name=STAGING_BUCKET_NAME,
94-
storage_integration=STORAGE_INTEGRATION,
94+
storage_integration_name=STORAGE_INTEGRATION,
9595
csv_mapper=csv_mapper,
96-
table=TABLE,
97-
expansion_service=EXPANSION_SERVICE)
96+
table=TABLE)
9897
| "Print" >> beam.Map(print_row)
9998
)
10099
result = p.run()
101100
result.wait_until_finish()
102101

102+
103103
def run():
104-
run_write()
104+
# run_write()
105105
run_read()
106106

107+
107108
if __name__ == '__main__':
109+
logging.getLogger().setLevel(logging.INFO)
108110
run()

0 commit comments

Comments
 (0)