Skip to content

Commit d328e14

Browse files
committed
Fix EMR.create_cluster() for single node clusters
1 parent 93b946b commit d328e14

File tree

4 files changed

+123
-74
lines changed

4 files changed

+123
-74
lines changed

awswrangler/emr.py

Lines changed: 74 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -160,83 +160,85 @@ def _build_cluster_args(**pars):
160160
args["Instances"]["InstanceFleets"].append(fleet_master)
161161

162162
# Core Instance Fleet
163-
timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER"
164-
fleet_core: Dict = {
165-
"Name":
166-
"CORE",
167-
"InstanceFleetType":
168-
"CORE",
169-
"TargetOnDemandCapacity":
170-
pars["instance_num_on_demand_core"],
171-
"TargetSpotCapacity":
172-
pars["instance_num_spot_core"],
173-
"InstanceTypeConfigs": [
174-
{
175-
"InstanceType": pars["instance_type_core"],
176-
"WeightedCapacity": 1,
177-
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_core"],
178-
"EbsConfiguration": {
179-
"EbsBlockDeviceConfigs": [{
180-
"VolumeSpecification": {
181-
"SizeInGB": pars["instance_ebs_size_core"],
182-
"VolumeType": "gp2"
183-
},
184-
"VolumesPerInstance": 1
185-
}],
186-
"EbsOptimized":
187-
True
163+
if (pars["instance_num_spot_core"] > 0) or pars["instance_num_on_demand_core"] > 0:
164+
timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER"
165+
fleet_core: Dict = {
166+
"Name":
167+
"CORE",
168+
"InstanceFleetType":
169+
"CORE",
170+
"TargetOnDemandCapacity":
171+
pars["instance_num_on_demand_core"],
172+
"TargetSpotCapacity":
173+
pars["instance_num_spot_core"],
174+
"InstanceTypeConfigs": [
175+
{
176+
"InstanceType": pars["instance_type_core"],
177+
"WeightedCapacity": 1,
178+
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_core"],
179+
"EbsConfiguration": {
180+
"EbsBlockDeviceConfigs": [{
181+
"VolumeSpecification": {
182+
"SizeInGB": pars["instance_ebs_size_core"],
183+
"VolumeType": "gp2"
184+
},
185+
"VolumesPerInstance": 1
186+
}],
187+
"EbsOptimized":
188+
True
189+
},
188190
},
189-
},
190-
],
191-
}
192-
if pars["instance_num_spot_core"] > 0:
193-
fleet_core["LaunchSpecifications"]: Dict = {
194-
"SpotSpecification": {
195-
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"],
196-
"TimeoutAction": timeout_action_core,
197-
}
191+
],
198192
}
199-
args["Instances"]["InstanceFleets"].append(fleet_core)
193+
if pars["instance_num_spot_core"] > 0:
194+
fleet_core["LaunchSpecifications"]: Dict = {
195+
"SpotSpecification": {
196+
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"],
197+
"TimeoutAction": timeout_action_core,
198+
}
199+
}
200+
args["Instances"]["InstanceFleets"].append(fleet_core)
200201

201-
# # Task Instance Fleet
202-
timeout_action_task: str = "SWITCH_TO_ON_DEMAND" if pars[
203-
"spot_timeout_to_on_demand_task"] else "TERMINATE_CLUSTER"
204-
fleet_task: Dict = {
205-
"Name":
206-
"TASK",
207-
"InstanceFleetType":
208-
"TASK",
209-
"TargetOnDemandCapacity":
210-
pars["instance_num_on_demand_task"],
211-
"TargetSpotCapacity":
212-
pars["instance_num_spot_task"],
213-
"InstanceTypeConfigs": [
214-
{
215-
"InstanceType": pars["instance_type_task"],
216-
"WeightedCapacity": 1,
217-
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_task"],
218-
"EbsConfiguration": {
219-
"EbsBlockDeviceConfigs": [{
220-
"VolumeSpecification": {
221-
"SizeInGB": pars["instance_ebs_size_task"],
222-
"VolumeType": "gp2"
223-
},
224-
"VolumesPerInstance": 1
225-
}],
226-
"EbsOptimized":
227-
True
202+
# Task Instance Fleet
203+
if (pars["instance_num_spot_task"] > 0) or pars["instance_num_on_demand_task"] > 0:
204+
timeout_action_task: str = "SWITCH_TO_ON_DEMAND" if pars[
205+
"spot_timeout_to_on_demand_task"] else "TERMINATE_CLUSTER"
206+
fleet_task: Dict = {
207+
"Name":
208+
"TASK",
209+
"InstanceFleetType":
210+
"TASK",
211+
"TargetOnDemandCapacity":
212+
pars["instance_num_on_demand_task"],
213+
"TargetSpotCapacity":
214+
pars["instance_num_spot_task"],
215+
"InstanceTypeConfigs": [
216+
{
217+
"InstanceType": pars["instance_type_task"],
218+
"WeightedCapacity": 1,
219+
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_task"],
220+
"EbsConfiguration": {
221+
"EbsBlockDeviceConfigs": [{
222+
"VolumeSpecification": {
223+
"SizeInGB": pars["instance_ebs_size_task"],
224+
"VolumeType": "gp2"
225+
},
226+
"VolumesPerInstance": 1
227+
}],
228+
"EbsOptimized":
229+
True
230+
},
228231
},
229-
},
230-
],
231-
}
232-
if pars["instance_num_spot_task"] > 0:
233-
fleet_task["LaunchSpecifications"]: Dict = {
234-
"SpotSpecification": {
235-
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"],
236-
"TimeoutAction": timeout_action_task,
237-
}
232+
],
238233
}
239-
args["Instances"]["InstanceFleets"].append(fleet_task)
234+
if pars["instance_num_spot_task"] > 0:
235+
fleet_task["LaunchSpecifications"]: Dict = {
236+
"SpotSpecification": {
237+
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"],
238+
"TimeoutAction": timeout_action_task,
239+
}
240+
}
241+
args["Instances"]["InstanceFleets"].append(fleet_task)
240242

241243
logger.info(f"args: \n{json.dumps(args, default=str, indent=4)}")
242244
return args

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
numpy~=1.17.3
22
pandas~=0.25.2
33
pyarrow~=0.14.0
4-
botocore~=1.12.253
5-
boto3~=1.9.253
4+
botocore~=1.13.2
5+
boto3~=1.10.2
66
s3fs~=0.3.5
77
tenacity~=5.1.1
88
pg8000~=1.13.2

setup-dev-env.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/usr/bin/env bash
2+
set -e
23

34
pip install --upgrade pip
45
pip install --upgrade -r requirements.txt

testing/test_awswrangler/test_emr.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,49 @@ def test_cluster(session, bucket, cloudformation_outputs):
8484
print(f"step_state: {step_state}")
8585
assert step_state == "PENDING"
8686
session.emr.terminate_cluster(cluster_id=cluster_id)
87+
88+
89+
def test_cluster_single_node(session, bucket, cloudformation_outputs):
90+
cluster_id = session.emr.create_cluster(
91+
cluster_name="wrangler_cluster",
92+
logging_s3_path=f"s3://{bucket}/emr-logs/",
93+
emr_release="emr-5.27.0",
94+
subnet_id=cloudformation_outputs["SubnetId"],
95+
emr_ec2_role="EMR_EC2_DefaultRole",
96+
emr_role="EMR_DefaultRole",
97+
instance_type_master="m5.xlarge",
98+
instance_type_core="m5.xlarge",
99+
instance_type_task="m5.xlarge",
100+
instance_ebs_size_master=50,
101+
instance_ebs_size_core=50,
102+
instance_ebs_size_task=50,
103+
instance_num_on_demand_master=1,
104+
instance_num_on_demand_core=0,
105+
instance_num_on_demand_task=0,
106+
instance_num_spot_master=0,
107+
instance_num_spot_core=0,
108+
instance_num_spot_task=0,
109+
spot_bid_percentage_of_on_demand_master=100,
110+
spot_bid_percentage_of_on_demand_core=100,
111+
spot_bid_percentage_of_on_demand_task=100,
112+
spot_provisioning_timeout_master=5,
113+
spot_provisioning_timeout_core=5,
114+
spot_provisioning_timeout_task=5,
115+
spot_timeout_to_on_demand_master=False,
116+
spot_timeout_to_on_demand_core=False,
117+
spot_timeout_to_on_demand_task=False,
118+
python3=False,
119+
spark_glue_catalog=False,
120+
hive_glue_catalog=False,
121+
presto_glue_catalog=False,
122+
bootstraps_paths=None,
123+
debugging=False,
124+
applications=["Hadoop", "Spark", "Ganglia", "Hive"],
125+
visible_to_all_users=True,
126+
key_pair_name=None,
127+
)
128+
sleep(10)
129+
cluster_state = session.emr.get_cluster_state(cluster_id=cluster_id)
130+
print(f"cluster_state: {cluster_state}")
131+
assert cluster_state == "STARTING"
132+
session.emr.terminate_cluster(cluster_id=cluster_id)

0 commit comments

Comments
 (0)