Skip to content

Commit 795d4a9

Browse files
committed
Add data path helper to ease execution in gcp dataproc
1 parent 379c314 commit 795d4a9

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

spark/compras_top_ten_countries.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1+
import sys
12
from pyspark import SparkContext
2-
from helpers import item_fields, parse_item
3+
from helpers import dataUrl, item_fields, parse_item
34

45
sc = SparkContext('local', 'compras')
5-
txt = sc.textFile('data/compras_tiny.csv')
6+
txt = sc.textFile(dataUrl('compras_tiny.csv'))
67
no_header = txt.filter(lambda s: not s.startswith(item_fields[0]))
78
parsed = no_header.map(lambda s: parse_item(s)).cache()
89

910
countries_rdd = sc \
10-
.textFile('./data/country_codes.csv') \
11+
.textFile(dataUrl('country_codes.csv')) \
1112
.map(lambda c: tuple(reversed(c.split(','))))
1213

1314
join_rdd = parsed \
@@ -20,5 +21,5 @@
2021
print(join_rdd.take(10))
2122

2223
# print map(lambda i: (i[0], i[1][1], i[1][0]), join_rdd.take(10))
23-
# join_rdd.saveAsTextFile('./top10countries', 'org.apache.hadoop.io.compress.GzipCodec')
24+
# join_rdd.saveAsTextFile(dataUrl('out/top10countries'), 'org.apache.hadoop.io.compress.GzipCodec')
2425

spark/helpers.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,8 @@ def isoDate(raw_string):
5252
except Exception:
5353
return None
5454

55+
def dataUrl(fileName):
56+
base = "./data"
57+
# base = "gs://bigdataupv_data"
58+
return os.path.join(base, fileName)
59+

0 commit comments

Comments
 (0)