Skip to content

Commit 723cd4e

Browse files
committed
Add support for EMR consistent view
1 parent 8e0f23a commit 723cd4e

File tree

5 files changed

+40
-15
lines changed

5 files changed

+40
-15
lines changed

awswrangler/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from typing import Optional
2-
import logging
31
import importlib
2+
from logging import getLogger, NullHandler
3+
from typing import Optional
4+
from sys import version_info
45

56
from awswrangler.__version__ import __title__, __description__, __version__ # noqa
67
from awswrangler.session import Session # noqa
@@ -41,7 +42,7 @@ def __getattr__(self, name):
4142
return getattr(getattr(DynamicInstantiate.__default_session, self._module_name), name)
4243

4344

44-
if importlib.util.find_spec("pyspark"): # type: ignore
45+
if version_info < (3, 8) and importlib.util.find_spec("pyspark"): # type: ignore
4546
from awswrangler.spark import Spark # noqa
4647
spark: Spark = DynamicInstantiate("spark", Spark) # type: ignore
4748

@@ -56,4 +57,4 @@ def __getattr__(self, name):
5657
sagemaker: SageMaker = DynamicInstantiate("sagemaker", SageMaker) # type: ignore
5758
cloudwatchlogs: CloudWatchLogs = DynamicInstantiate("cloudwatchlogs", CloudWatchLogs) # type: ignore
5859

59-
logging.getLogger("awswrangler").addHandler(logging.NullHandler())
60+
getLogger("awswrangler").addHandler(NullHandler())

awswrangler/emr.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def _build_cluster_args(**pars):
122122
"Configurations": []
123123
}]
124124
})
125-
if pars["spark_glue_catalog"]:
125+
if pars["spark_glue_catalog"] is True:
126126
args["Configurations"].append({
127127
"Classification": "spark-hive-site",
128128
"Properties": {
@@ -131,7 +131,7 @@ def _build_cluster_args(**pars):
131131
},
132132
"Configurations": []
133133
})
134-
if pars["hive_glue_catalog"]:
134+
if pars["hive_glue_catalog"] is True:
135135
args["Configurations"].append({
136136
"Classification": "hive-site",
137137
"Properties": {
@@ -140,15 +140,25 @@ def _build_cluster_args(**pars):
140140
},
141141
"Configurations": []
142142
})
143-
if pars["presto_glue_catalog"]:
143+
if pars["presto_glue_catalog"] is True:
144144
args["Configurations"].append({
145145
"Classification": "presto-connector-hive",
146146
"Properties": {
147147
"hive.metastore.glue.datacatalog.enabled": "true"
148148
},
149149
"Configurations": []
150150
})
151-
if pars["maximize_resource_allocation"]:
151+
if pars["consistent_view"] is True:
152+
args["Configurations"].append({
153+
"Classification": "emrfs-site",
154+
"Properties": {
155+
"fs.s3.consistent.retryPeriodSeconds": str(pars.get("consistent_view_retry_seconds", "10")),
156+
"fs.s3.consistent": "true",
157+
"fs.s3.consistent.retryCount": str(pars.get("consistent_view_retry_count", "5")),
158+
"fs.s3.consistent.metadata.tableName": pars.get("consistent_view_table_name", "EmrFSMetadata")
159+
}
160+
})
161+
if pars["maximize_resource_allocation"] is True:
152162
args["Configurations"].append({
153163
"Classification": "spark",
154164
"Properties": {
@@ -351,6 +361,10 @@ def create_cluster(self,
351361
spark_glue_catalog: bool = True,
352362
hive_glue_catalog: bool = True,
353363
presto_glue_catalog: bool = True,
364+
consistent_view: bool = False,
365+
consistent_view_retry_seconds: int = 10,
366+
consistent_view_retry_count: int = 5,
367+
consistent_view_table_name: str = "EmrFSMetadata",
354368
bootstraps_paths: Optional[List[str]] = None,
355369
debugging: bool = True,
356370
applications: Optional[List[str]] = None,
@@ -369,7 +383,7 @@ def create_cluster(self,
369383
steps: Optional[List[Dict[str, Collection[str]]]] = None,
370384
keep_cluster_alive_when_no_steps: bool = True,
371385
termination_protected: bool = False,
372-
tags: Optional[Dict[str, str]] = None):
386+
tags: Optional[Dict[str, str]] = None) -> str:
373387
"""
374388
Create a EMR cluster with instance fleets configuration
375389
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html
@@ -404,6 +418,10 @@ def create_cluster(self,
404418
:param spark_glue_catalog: Spark integration with Glue Catalog?
405419
:param hive_glue_catalog: Hive integration with Glue Catalog?
406420
:param presto_glue_catalog: Presto integration with Glue Catalog?
421+
:param consistent_view: Consistent view allows EMR clusters to check for list and read-after-write consistency for Amazon S3 objects written by or synced with EMRFS. (https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html)
422+
:param consistent_view_retry_seconds: Delay between the tries (seconds)
423+
:param consistent_view_retry_count: Number of tries
424+
:param consistent_view_table_name: Name of the DynamoDB table to store the consistent view data
407425
:param bootstraps_paths: Bootstraps paths (e.g ["s3://BUCKET_NAME/script.sh"])
408426
:param debugging: Debugging enabled?
409427
:param applications: List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"])

awswrangler/session.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from typing import Optional, Dict
21
import os
3-
from logging import getLogger, Logger
42
import importlib
3+
from typing import Optional, Dict
4+
from sys import version_info
5+
from logging import getLogger, Logger
56

67
import boto3 # type: ignore
78
from botocore.config import Config # type: ignore
@@ -19,7 +20,7 @@
1920
from awswrangler.exceptions import AWSCredentialsNotFound
2021

2122
PYSPARK_INSTALLED: bool = False
22-
if importlib.util.find_spec("pyspark"): # type: ignore
23+
if version_info < (3, 8) and importlib.util.find_spec("pyspark"): # type: ignore
2324
PYSPARK_INSTALLED = True
2425
from awswrangler.spark import Spark
2526

requirements-dev.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
awscli~=1.17.13
12
yapf~=0.29.0
23
mypy~=0.761
34
flake8~=3.7.9
@@ -6,7 +7,7 @@ scikit-learn~=0.22.1
67
cfn-lint~=0.27.4
78
twine~=3.1.1
89
wheel~=0.34.2
9-
sphinx~=2.3.1
10+
sphinx~=2.4.0
1011
pyspark~=2.4.5
1112
pyspark-stubs~=2.4.0.post7
1213
jupyter~=1.0.0

testing/test_awswrangler/test_emr.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def test_cluster(session, bucket, cloudformation_outputs):
4141
steps.append(session.emr.build_step(name=cmd, command=cmd))
4242
cluster_id = session.emr.create_cluster(cluster_name="wrangler_cluster",
4343
logging_s3_path=f"s3://{bucket}/emr-logs/",
44-
emr_release="emr-5.27.0",
44+
emr_release="emr-5.29.0",
4545
subnet_id=cloudformation_outputs["SubnetId"],
4646
emr_ec2_role="EMR_EC2_DefaultRole",
4747
emr_role="EMR_DefaultRole",
@@ -93,7 +93,7 @@ def test_cluster(session, bucket, cloudformation_outputs):
9393
def test_cluster_single_node(session, bucket, cloudformation_outputs):
9494
cluster_id = session.emr.create_cluster(cluster_name="wrangler_cluster",
9595
logging_s3_path=f"s3://{bucket}/emr-logs/",
96-
emr_release="emr-5.27.0",
96+
emr_release="emr-5.29.0",
9797
subnet_id=cloudformation_outputs["SubnetId"],
9898
emr_ec2_role="EMR_EC2_DefaultRole",
9999
emr_role="EMR_DefaultRole",
@@ -122,6 +122,10 @@ def test_cluster_single_node(session, bucket, cloudformation_outputs):
122122
spark_glue_catalog=False,
123123
hive_glue_catalog=False,
124124
presto_glue_catalog=False,
125+
consistent_view=True,
126+
consistent_view_retry_count=6,
127+
consistent_view_retry_seconds=15,
128+
consistent_view_table_name="EMRConsistentView",
125129
bootstraps_paths=None,
126130
debugging=False,
127131
applications=["Hadoop", "Spark", "Ganglia", "Hive"],

0 commit comments

Comments
 (0)