@@ -56,6 +56,7 @@ def __init__(
5656 volume_tags : None ,
5757 use_private_ip : False ,
5858 enable_detailed_monitoring = None ,
59+ spot = False ,
5960 ** kwargs ,
6061 ):
6162 super ().__init__ (* args , ** kwargs )
@@ -81,14 +82,13 @@ def __init__(
8182 self .volume_tags = volume_tags
8283 self .use_private_ip = use_private_ip
8384 self .enable_detailed_monitoring = enable_detailed_monitoring
85+ self .spot = spot
8486
8587 async def create_vm (self ):
8688 """
8789
8890 https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.run_instances
8991 """
90- # TODO Enable Spot support
91-
9292 boto_config = botocore .config .Config (retries = dict (max_attempts = 10 ))
9393 async with self .cluster .boto_session .create_client (
9494 "ec2" , region_name = self .region , config = boto_config
@@ -161,6 +161,12 @@ async def create_vm(self):
161161 self .availability_zone = random .choice (self .availability_zone )
162162 vm_kwargs ["Placement" ] = {"AvailabilityZone" : self .availability_zone }
163163
164+ if self .spot :
165+ vm_kwargs ["InstanceMarketOptions" ] = {
166+ "MarketType" : "spot" ,
167+ "SpotOptions" : {"SpotInstanceType" : "one-time" },
168+ }
169+
164170 response = await client .run_instances (** vm_kwargs )
165171 [self .instance ] = response ["Instances" ]
166172
@@ -486,10 +492,12 @@ def __init__(
486492 volume_tags = None ,
487493 use_private_ip = None ,
488494 enable_detailed_monitoring = None ,
495+ spot = False ,
489496 ** kwargs ,
490497 ):
491498 self .boto_session = get_session ()
492499 self .config = dask .config .get ("cloudprovider.ec2" , {})
500+ self .spot = spot
493501 self .scheduler_class = EC2Scheduler
494502 self .worker_class = EC2Worker
495503 self .region = region if region is not None else self .config .get ("region" )
@@ -602,4 +610,5 @@ def __init__(
602610 self .worker_options = {** self .options }
603611 self .scheduler_options ["instance_type" ] = self .scheduler_instance_type
604612 self .worker_options ["instance_type" ] = self .worker_instance_type
613+ self .worker_options ["spot" ] = self .spot
605614 super ().__init__ (debug = debug , ** kwargs )
0 commit comments