Skip to content

Commit 7279ff1

Browse files
committed
Add pyarrow option for EMR
1 parent 3e26250 commit 7279ff1

File tree

4 files changed

+65
-20
lines changed

4 files changed

+65
-20
lines changed

awswrangler/emr.py

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,33 @@ def __init__(self, session):
2121

2222
@staticmethod
2323
def _build_cluster_args(**pars):
24+
25+
spark_env: Optional[Dict[str, str]] = None
26+
yarn_env: Optional[Dict[str, str]] = None
27+
livy_env: Optional[Dict[str, str]] = None
28+
29+
if pars["spark_pyarrow"] is True:
30+
if pars["spark_defaults"] is None:
31+
pars["spark_defaults"]: Dict[str, str] = {"spark.sql.execution.arrow.enabled": "true"}
32+
else:
33+
pars["spark_defaults"]["spark.sql.execution.arrow.enabled"]: str = "true"
34+
spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
35+
yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
36+
livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
37+
38+
if pars["python3"] is True:
39+
if spark_env is None:
40+
spark_env: Dict[str, str] = {"PYSPARK_PYTHON": "/usr/bin/python3"}
41+
else:
42+
spark_env["PYSPARK_PYTHON"]: str = "/usr/bin/python3"
43+
44+
if pars["spark_jars_path"] is not None:
45+
paths: str = ",".join(pars["spark_jars_path"])
46+
if pars["spark_defaults"] is None:
47+
pars["spark_defaults"]: Dict[str, str] = {"spark.jars": paths}
48+
else:
49+
pars["spark_defaults"]["spark.jars"]: str = paths
50+
2451
args: Dict = {
2552
"Name": pars["cluster_name"],
2653
"LogUri": pars["logging_s3_path"],
@@ -59,16 +86,36 @@ def _build_cluster_args(**pars):
5986
"log4j.rootCategory": f"{pars['spark_log_level']}, console"
6087
}
6188
}]
62-
if pars["python3"]:
89+
if spark_env is not None:
6390
args["Configurations"].append({
6491
"Classification":
6592
"spark-env",
6693
"Properties": {},
6794
"Configurations": [{
6895
"Classification": "export",
69-
"Properties": {
70-
"PYSPARK_PYTHON": "/usr/bin/python3"
71-
},
96+
"Properties": spark_env,
97+
"Configurations": []
98+
}]
99+
})
100+
if yarn_env is not None:
101+
args["Configurations"].append({
102+
"Classification":
103+
"yarn-env",
104+
"Properties": {},
105+
"Configurations": [{
106+
"Classification": "export",
107+
"Properties": yarn_env,
108+
"Configurations": []
109+
}]
110+
})
111+
if livy_env is not None:
112+
args["Configurations"].append({
113+
"Classification":
114+
"livy-env",
115+
"Properties": {},
116+
"Configurations": [{
117+
"Classification": "export",
118+
"Properties": livy_env,
72119
"Configurations": []
73120
}]
74121
})
@@ -105,16 +152,11 @@ def _build_cluster_args(**pars):
105152
"maximizeResourceAllocation": "true"
106153
}
107154
})
108-
if (pars["spark_jars_path"] is not None) or (pars["spark_defaults"] is not None):
155+
if pars["spark_defaults"] is not None:
109156
spark_defaults: Dict[str, Union[str, Dict[str, str]]] = {
110157
"Classification": "spark-defaults",
111-
"Properties": {}
158+
"Properties": pars["spark_defaults"]
112159
}
113-
if pars["spark_jars_path"] is not None:
114-
spark_defaults["Properties"]["spark.jars"]: str = ",".join(pars["spark_jars_path"])
115-
if pars["spark_defaults"] is not None:
116-
for k, v in pars["spark_defaults"].items():
117-
spark_defaults["Properties"][k]: str = v
118160
args["Configurations"].append(spark_defaults)
119161

120162
# Applications
@@ -318,7 +360,8 @@ def create_cluster(self,
318360
security_group_service_access: Optional[str] = None,
319361
spark_log_level: str = "WARN",
320362
spark_jars_path: Optional[List[str]] = None,
321-
spark_defaults: Dict[str, str] = None,
363+
spark_defaults: Optional[Dict[str, str]] = None,
364+
spark_pyarrow: bool = False,
322365
maximize_resource_allocation: bool = False,
323366
steps: Optional[List[Dict[str, Collection[str]]]] = None,
324367
keep_cluster_alive_when_no_steps: bool = True,
@@ -329,7 +372,7 @@ def create_cluster(self,
329372
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html
330373
:param cluster_name: Cluster name
331374
:param logging_s3_path: Logging s3 path (e.g. s3://BUCKET_NAME/DIRECTORY_NAME/)
332-
:param emr_release: EMR release (e.g. emr-5.27.0)
375+
:param emr_release: EMR release (e.g. emr-5.28.0)
333376
:param subnet_id: VPC subnet ID
334377
:param emr_ec2_role: IAM role name
335378
:param emr_role: IAM role name
@@ -371,6 +414,7 @@ def create_cluster(self,
371414
:param spark_log_level: log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE)
372415
:param spark_jars_path: spark.jars (e.g. [s3://.../foo.jar, s3://.../boo.jar]) (https://spark.apache.org/docs/latest/configuration.html)
373416
:param spark_defaults: (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#spark-defaults)
417+
:param spark_pyarrow: Enable PySpark to use PyArrow behind the scenes. (P.S. You must install pyarrow by your self via bootstrap)
374418
:param maximize_resource_allocation: Configure your executors to utilize the maximum resources possible (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation)
375419
:param steps: Steps definitions (Obs: Use EMR.build_step() to build that)
376420
:param keep_cluster_alive_when_no_steps: Specifies whether the cluster should remain available after completing all steps

requirements-dev.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
yapf~=0.28.0
2-
mypy~=0.740
1+
yapf~=0.29.0
2+
mypy~=0.750
33
flake8~=3.7.9
44
pytest-cov~=2.8.1
5-
cfn-lint~=0.25.2
6-
twine~=3.0.0
5+
cfn-lint~=0.25.7
6+
twine~=3.1.1
77
wheel~=0.33.6
8-
sphinx~=2.2.1
8+
sphinx~=2.2.2
99
pyspark~=2.4.4
1010
pyspark-stubs~=2.4.0.post6

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
numpy~=1.17.4
22
pandas~=0.25.3
33
pyarrow~=0.15.1
4-
botocore~=1.13.25
5-
boto3~=1.10.25
4+
botocore~=1.13.30
5+
boto3~=1.10.30
66
s3fs~=0.4.0
77
tenacity~=6.0.0
88
pg8000~=1.13.2

testing/test_awswrangler/test_emr.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def test_cluster(session, bucket, cloudformation_outputs):
7575
applications=["Hadoop", "Spark", "Ganglia", "Hive"],
7676
visible_to_all_users=True,
7777
key_pair_name=None,
78+
spark_pyarrow=True,
7879
steps=steps)
7980
sleep(10)
8081
cluster_state = session.emr.get_cluster_state(cluster_id=cluster_id)

0 commit comments

Comments
 (0)