From 8f3424754e84368b48a1e723a65c7b2eb2ccb546 Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Wed, 13 Aug 2025 12:19:08 +1200 Subject: [PATCH 1/5] Introduce unique IDs for Dask clusters running in the same ECS cluster. --- dask_cloudprovider/aws/ecs.py | 51 ++++++++++++++++++++------- dask_cloudprovider/cloudprovider.yaml | 4 +-- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index 7e1ce1ce..735331f0 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -573,6 +573,12 @@ class ECSCluster(SpecCluster, ConfigMixin): Name workers by adding multiples of `workers_name_step` to `workers_name_start`. Default to `1`. + dask_cluster_id: str (optional) + A unique identifier for this Dask cluster, useful if multiple Dask clusters are + running in the same ECS cluster. The value will be included in the names and/or + tags of all newly-created AWS resources related to this Dask cluster. + + Defaults to a random 10-digit UUID. cluster_arn: str (optional if fargate is true) The ARN of an existing ECS cluster to use for launching tasks. @@ -581,7 +587,7 @@ class ECSCluster(SpecCluster, ConfigMixin): A template to use for the cluster name if ``cluster_arn`` is set to ``None``. - Defaults to ``'dask-{uuid}'`` + Defaults to ``'dask-{dask_cluster_id}'`` execution_role_arn: str (optional) The ARN of an existing IAM role to use for ECS execution. @@ -628,7 +634,7 @@ class ECSCluster(SpecCluster, ConfigMixin): cloudwatch_logs_stream_prefix: str (optional) Prefix for log streams. - Defaults to the cluster name. + Defaults to ``{cluster_name}/{dask_cluster_id}``. cloudwatch_logs_default_retention: int (optional) Retention for logs in days. For use when log group is auto created. @@ -738,6 +744,7 @@ def __init__( n_workers=None, workers_name_start=0, workers_name_step=1, + dask_cluster_id=None, cluster_arn=None, cluster_name_template=None, execution_role_arn=None, @@ -791,6 +798,7 @@ def __init__( self._n_workers = n_workers self._workers_name_start = workers_name_start self._workers_name_step = workers_name_step + self.dask_cluster_id = dask_cluster_id or str(uuid.uuid4())[:10] self.cluster_arn = cluster_arn self.cluster_name = None self._cluster_name_template = cluster_name_template @@ -921,7 +929,10 @@ async def _start( if self._cloudwatch_logs_stream_prefix is None: self._cloudwatch_logs_stream_prefix = self.config.get( "cloudwatch_logs_stream_prefix" - ).format(cluster_name=self.cluster_name) + ).format( + cluster_name=self.cluster_name, + dask_cluster_id=self.dask_cluster_id, + ) if self.cloudwatch_logs_group is None: self.cloudwatch_logs_group = ( @@ -1025,7 +1036,12 @@ def _new_worker_name(self, worker_number): @property def tags(self): - return {**self._tags, **DEFAULT_TAGS, "cluster": self.cluster_name} + return { + **self._tags, + **DEFAULT_TAGS, + "cluster": self.cluster_name, + "dask_cluster_id": self.dask_cluster_id + } async def _create_cluster(self): if not self._fargate_scheduler or not self._fargate_workers: @@ -1038,7 +1054,10 @@ async def _create_cluster(self): self.cluster_name = dask.config.expand_environment_variables( self._cluster_name_template ) - self.cluster_name = self.cluster_name.format(uuid=str(uuid.uuid4())[:10]) + self.cluster_name = self.cluster_name.format( + dask_cluster_id=self.dask_cluster_id, + uuid=self.dask_cluster_id, # backwards-compatible + ) async with self._client("ecs") as ecs: response = await ecs.create_cluster( clusterName=self.cluster_name, @@ -1059,7 +1078,7 @@ async def _delete_cluster(self): @property def _execution_role_name(self): - return "{}-{}".format(self.cluster_name, "execution-role") + return "dask-{}-execution-role".format(self.dask_cluster_id) async def _create_execution_role(self): async with self._client("iam") as iam: @@ -1099,7 +1118,7 @@ async def _create_execution_role(self): @property def _task_role_name(self): - return "{}-{}".format(self.cluster_name, "task-role") + return "dask-{}-task-role".format(self.dask_cluster_id) async def _create_task_role(self): async with self._client("iam") as iam: @@ -1141,6 +1160,9 @@ async def _delete_role(self, role): await iam.delete_role(RoleName=role) async def _create_cloudwatch_logs_group(self): + # The log group does not include `dask_cluster_id` because it is + # shared by all Dask ECS clusters. But, log streams do because they are + # specific to each Dask cluster. log_group_name = "dask-ecs" async with self._client("logs") as logs: groups = await logs.describe_log_groups() @@ -1160,23 +1182,28 @@ async def _create_cloudwatch_logs_group(self): # Note: Not cleaning up the logs here as they may be useful after the cluster is destroyed return log_group_name + + @property + def _security_group_name(self): + return "dask-{}-security-group".format(self.dask_cluster_id) + async def _create_security_groups(self): async with self._client("ec2") as client: group = await create_default_security_group( - client, self.cluster_name, self._vpc, self.tags + client, self._security_group_name, self._vpc, self.tags ) weakref.finalize(self, self.sync, self._delete_security_groups) return [group] async def _delete_security_groups(self): timeout = Timeout( - 30, "Unable to delete AWS security group " + self.cluster_name, warn=True + 30, "Unable to delete AWS security group {}".format(self._security_group_name), warn=True ) async with self._client("ec2") as ec2: while timeout.run(): try: await ec2.delete_security_group( - GroupName=self.cluster_name, DryRun=False + GroupName=self._security_group_name, DryRun=False ) except Exception: await asyncio.sleep(2) @@ -1185,7 +1212,7 @@ async def _delete_security_groups(self): async def _create_scheduler_task_definition_arn(self): async with self._client("ecs") as ecs: response = await ecs.register_task_definition( - family="{}-{}".format(self.cluster_name, "scheduler"), + family="dask-{}-scheduler".format(self.dask_cluster_id), taskRoleArn=self._task_role_arn, executionRoleArn=self._execution_role_arn, networkMode="awsvpc", @@ -1255,7 +1282,7 @@ async def _create_worker_task_definition_arn(self): ) async with self._client("ecs") as ecs: response = await ecs.register_task_definition( - family="{}-{}".format(self.cluster_name, "worker"), + family="dask-{}-worker".format(self.dask_cluster_id), taskRoleArn=self._task_role_arn, executionRoleArn=self._execution_role_arn, networkMode="awsvpc", diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index ce5b8df2..23b502e7 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -17,7 +17,7 @@ cloudprovider: image: "daskdev/dask:latest" # Docker image to use for non GPU tasks cpu_architecture: "X86_64" # Runtime platform CPU architecture gpu_image: "rapidsai/rapidsai:latest" # Docker image to use for GPU tasks - cluster_name_template: "dask-{uuid}" # Template to use when creating a cluster + cluster_name_template: "dask-{dask_cluster_id}" # Template to use when creating a cluster cluster_arn: "" # ARN of existing ECS cluster to use (if not set one will be created) execution_role_arn: "" # Arn of existing execution role to use (if not set one will be created) task_role_arn: "" # Arn of existing task role to use (if not set one will be created) @@ -25,7 +25,7 @@ cloudprovider: # platform_version: "LATEST" # Fargate platformVersion string like "1.4.0" or "LATEST" cloudwatch_logs_group: "" # Name of existing cloudwatch logs group to use (if not set one will be created) - cloudwatch_logs_stream_prefix: "{cluster_name}" # Stream prefix template + cloudwatch_logs_stream_prefix: "{cluster_name}/{dask_cluster_id}" # Stream prefix template cloudwatch_logs_default_retention: 30 # Number of days to retain logs (only applied if not using existing group) vpc: "default" # VPC to use for tasks From 26bb42ef71a4a1fc6ffb095a843c9ddb045eae54 Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Wed, 13 Aug 2025 12:26:08 +1200 Subject: [PATCH 2/5] Black passing --- dask_cloudprovider/aws/ecs.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index 735331f0..ed2b5817 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -224,9 +224,9 @@ async def start(self): "awsvpcConfiguration": { "subnets": self._vpc_subnets, "securityGroups": self._security_groups, - "assignPublicIp": "ENABLED" - if self._use_public_ip - else "DISABLED", + "assignPublicIp": ( + "ENABLED" if self._use_public_ip else "DISABLED" + ), } }, } @@ -1040,7 +1040,7 @@ def tags(self): **self._tags, **DEFAULT_TAGS, "cluster": self.cluster_name, - "dask_cluster_id": self.dask_cluster_id + "dask_cluster_id": self.dask_cluster_id, } async def _create_cluster(self): @@ -1182,7 +1182,6 @@ async def _create_cloudwatch_logs_group(self): # Note: Not cleaning up the logs here as they may be useful after the cluster is destroyed return log_group_name - @property def _security_group_name(self): return "dask-{}-security-group".format(self.dask_cluster_id) @@ -1197,7 +1196,9 @@ async def _create_security_groups(self): async def _delete_security_groups(self): timeout = Timeout( - 30, "Unable to delete AWS security group {}".format(self._security_group_name), warn=True + 30, + "Unable to delete AWS security group {}".format(self._security_group_name), + warn=True, ) async with self._client("ec2") as ec2: while timeout.run(): @@ -1250,14 +1251,18 @@ async def _create_scheduler_task_definition_arn(self): "awslogs-create-group": "true", }, }, - "mountPoints": self._mount_points - if self._mount_points and self._mount_volumes_on_scheduler - else [], + "mountPoints": ( + self._mount_points + if self._mount_points and self._mount_volumes_on_scheduler + else [] + ), } ], - volumes=self._volumes - if self._volumes and self._mount_volumes_on_scheduler - else [], + volumes=( + self._volumes + if self._volumes and self._mount_volumes_on_scheduler + else [] + ), requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [], runtimePlatform={"cpuArchitecture": self._cpu_architecture}, cpu=str(self._scheduler_cpu), From 2ae04772810f9e0c5bc111e063d68f45ebfec54e Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Thu, 14 Aug 2025 08:07:06 +1200 Subject: [PATCH 3/5] Use cluster.name instead of creating a new dask cluster id. --- dask_cloudprovider/aws/ecs.py | 48 +++++++++++++-------------- dask_cloudprovider/cloudprovider.yaml | 4 +-- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index ed2b5817..c2941a8a 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -461,7 +461,9 @@ class ECSCluster(SpecCluster, ConfigMixin): This creates a dask scheduler and workers on an existing ECS cluster. All the other required resources such as roles, task definitions, tasks, etc - will be created automatically like in :class:`FargateCluster`. + will be created automatically like in :class:`FargateCluster`. Resource names will + include the value of `self.name` to uniquely associate them with this cluster, and + they will also be tagged with `dask_cluster_name` using the same value. Parameters ---------- @@ -573,21 +575,17 @@ class ECSCluster(SpecCluster, ConfigMixin): Name workers by adding multiples of `workers_name_step` to `workers_name_start`. Default to `1`. - dask_cluster_id: str (optional) - A unique identifier for this Dask cluster, useful if multiple Dask clusters are - running in the same ECS cluster. The value will be included in the names and/or - tags of all newly-created AWS resources related to this Dask cluster. - - Defaults to a random 10-digit UUID. cluster_arn: str (optional if fargate is true) The ARN of an existing ECS cluster to use for launching tasks. Defaults to ``None`` which results in a new cluster being created for you. cluster_name_template: str (optional) A template to use for the cluster name if ``cluster_arn`` is set to - ``None``. + ``None``. Valid substitution variables are: + + ``name`` <= self.name (usually a UUID) - Defaults to ``'dask-{dask_cluster_id}'`` + Defaults to ``'dask-{name}'`` execution_role_arn: str (optional) The ARN of an existing IAM role to use for ECS execution. @@ -632,9 +630,12 @@ class ECSCluster(SpecCluster, ConfigMixin): Default ``None`` (one will be created called ``dask-ecs``) cloudwatch_logs_stream_prefix: str (optional) - Prefix for log streams. + Prefix for log streams. Valid substitution variables are: + + ``name`` <= self.name (usually a UUID) + ``cluster_name`` <= self.cluster_name (ECS cluster name) - Defaults to ``{cluster_name}/{dask_cluster_id}``. + Defaults to ``{cluster_name}/{name}``. cloudwatch_logs_default_retention: int (optional) Retention for logs in days. For use when log group is auto created. @@ -744,7 +745,6 @@ def __init__( n_workers=None, workers_name_start=0, workers_name_step=1, - dask_cluster_id=None, cluster_arn=None, cluster_name_template=None, execution_role_arn=None, @@ -798,7 +798,6 @@ def __init__( self._n_workers = n_workers self._workers_name_start = workers_name_start self._workers_name_step = workers_name_step - self.dask_cluster_id = dask_cluster_id or str(uuid.uuid4())[:10] self.cluster_arn = cluster_arn self.cluster_name = None self._cluster_name_template = cluster_name_template @@ -931,7 +930,7 @@ async def _start( "cloudwatch_logs_stream_prefix" ).format( cluster_name=self.cluster_name, - dask_cluster_id=self.dask_cluster_id, + name=self.name, ) if self.cloudwatch_logs_group is None: @@ -1040,7 +1039,7 @@ def tags(self): **self._tags, **DEFAULT_TAGS, "cluster": self.cluster_name, - "dask_cluster_id": self.dask_cluster_id, + "dask_cluster_name": self.name, } async def _create_cluster(self): @@ -1055,8 +1054,8 @@ async def _create_cluster(self): self._cluster_name_template ) self.cluster_name = self.cluster_name.format( - dask_cluster_id=self.dask_cluster_id, - uuid=self.dask_cluster_id, # backwards-compatible + name=self.name, + uuid=self.name, # backwards-compatible ) async with self._client("ecs") as ecs: response = await ecs.create_cluster( @@ -1078,7 +1077,7 @@ async def _delete_cluster(self): @property def _execution_role_name(self): - return "dask-{}-execution-role".format(self.dask_cluster_id) + return "dask-{}-execution-role".format(self.name) async def _create_execution_role(self): async with self._client("iam") as iam: @@ -1118,7 +1117,7 @@ async def _create_execution_role(self): @property def _task_role_name(self): - return "dask-{}-task-role".format(self.dask_cluster_id) + return "dask-{}-task-role".format(self.name) async def _create_task_role(self): async with self._client("iam") as iam: @@ -1160,9 +1159,8 @@ async def _delete_role(self, role): await iam.delete_role(RoleName=role) async def _create_cloudwatch_logs_group(self): - # The log group does not include `dask_cluster_id` because it is - # shared by all Dask ECS clusters. But, log streams do because they are - # specific to each Dask cluster. + # The log group does not include `name` because it is shared by all Dask ECS clusters. But, + # log streams do because they are specific to each Dask cluster. log_group_name = "dask-ecs" async with self._client("logs") as logs: groups = await logs.describe_log_groups() @@ -1184,7 +1182,7 @@ async def _create_cloudwatch_logs_group(self): @property def _security_group_name(self): - return "dask-{}-security-group".format(self.dask_cluster_id) + return "dask-{}-security-group".format(self.name) async def _create_security_groups(self): async with self._client("ec2") as client: @@ -1213,7 +1211,7 @@ async def _delete_security_groups(self): async def _create_scheduler_task_definition_arn(self): async with self._client("ecs") as ecs: response = await ecs.register_task_definition( - family="dask-{}-scheduler".format(self.dask_cluster_id), + family="dask-{}-scheduler".format(self.name), taskRoleArn=self._task_role_arn, executionRoleArn=self._execution_role_arn, networkMode="awsvpc", @@ -1287,7 +1285,7 @@ async def _create_worker_task_definition_arn(self): ) async with self._client("ecs") as ecs: response = await ecs.register_task_definition( - family="dask-{}-worker".format(self.dask_cluster_id), + family="dask-{}-worker".format(self.name), taskRoleArn=self._task_role_arn, executionRoleArn=self._execution_role_arn, networkMode="awsvpc", diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 23b502e7..2a20106d 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -17,7 +17,7 @@ cloudprovider: image: "daskdev/dask:latest" # Docker image to use for non GPU tasks cpu_architecture: "X86_64" # Runtime platform CPU architecture gpu_image: "rapidsai/rapidsai:latest" # Docker image to use for GPU tasks - cluster_name_template: "dask-{dask_cluster_id}" # Template to use when creating a cluster + cluster_name_template: "dask-{name}" # Template to use when creating a cluster cluster_arn: "" # ARN of existing ECS cluster to use (if not set one will be created) execution_role_arn: "" # Arn of existing execution role to use (if not set one will be created) task_role_arn: "" # Arn of existing task role to use (if not set one will be created) @@ -25,7 +25,7 @@ cloudprovider: # platform_version: "LATEST" # Fargate platformVersion string like "1.4.0" or "LATEST" cloudwatch_logs_group: "" # Name of existing cloudwatch logs group to use (if not set one will be created) - cloudwatch_logs_stream_prefix: "{cluster_name}/{dask_cluster_id}" # Stream prefix template + cloudwatch_logs_stream_prefix: "{cluster_name}/{name}" # Stream prefix template cloudwatch_logs_default_retention: 30 # Number of days to retain logs (only applied if not using existing group) vpc: "default" # VPC to use for tasks From 64f65e7de03746ba8ce504bd700733228fff8ead Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Thu, 14 Aug 2025 08:12:52 +1200 Subject: [PATCH 4/5] Unused import --- dask_cloudprovider/aws/ecs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index c2941a8a..492029d0 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -1,6 +1,5 @@ import asyncio import logging -import uuid import warnings import weakref from typing import List, Optional From 4f9542364113b1d792b41905832fd7b6f048947c Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Mon, 25 Aug 2025 14:39:53 +1200 Subject: [PATCH 5/5] Unit tests --- dask_cloudprovider/aws/tests/test_ecs.py | 127 +++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/dask_cloudprovider/aws/tests/test_ecs.py b/dask_cloudprovider/aws/tests/test_ecs.py index 7f633c05..cbe58af0 100644 --- a/dask_cloudprovider/aws/tests/test_ecs.py +++ b/dask_cloudprovider/aws/tests/test_ecs.py @@ -1,3 +1,6 @@ +from unittest import mock +from unittest.mock import AsyncMock + import pytest aiobotocore = pytest.importorskip("aiobotocore") @@ -6,3 +9,127 @@ def test_import(): from dask_cloudprovider.aws import ECSCluster # noqa from dask_cloudprovider.aws import FargateCluster # noqa + + +def test_reuse_ecs_cluster(): + from dask_cloudprovider.aws import ECSCluster # noqa + + fc1_name = "Spooky" + fc2_name = "Weevil" + vpc_name = "MyNetwork" + vpc_subnets = ["MySubnet1", "MySubnet2"] + cluster_arn = "CompletelyMadeUp" + cluster_name = "Crunchy" + log_group_name = "dask-ecs" + + expected_execution_role_name1 = f"dask-{fc1_name}-execution-role" + expected_task_role_name1 = f"dask-{fc1_name}-task-role" + expected_log_stream_prefix1 = f"{cluster_name}/{fc1_name}" + expected_security_group_name1 = f"dask-{fc1_name}-security-group" + expected_scheduler_task_name1 = f"dask-{fc1_name}-scheduler" + expected_worker_task_name1 = f"dask-{fc1_name}-worker" + + expected_execution_role_name2 = f"dask-{fc2_name}-execution-role" + expected_task_role_name2 = f"dask-{fc2_name}-task-role" + expected_log_stream_prefix2 = f"{cluster_name}/{fc2_name}" + expected_security_group_name2 = f"dask-{fc2_name}-security-group" + expected_scheduler_task_name2 = f"dask-{fc2_name}-scheduler" + expected_worker_task_name2 = f"dask-{fc2_name}-worker" + + mock_client = AsyncMock() + mock_client.describe_clusters.return_value = { + "clusters": [{"clusterName": cluster_name}] + } + mock_client.list_account_settings.return_value = {"settings": {"value": "enabled"}} + mock_client.create_role.return_value = {"Role": {"Arn": "Random"}} + mock_client.describe_log_groups.return_value = {"logGroups": []} + + class MockSession: + class MockClient: + async def __aenter__(self, *args, **kwargs): + return mock_client + + async def __aexit__(self, *args, **kwargs): + return + + def create_client(self, *args, **kwargs): + return MockSession.MockClient() + + with ( + mock.patch( + "dask_cloudprovider.aws.ecs.get_session", return_value=MockSession() + ), + mock.patch("distributed.deploy.spec.SpecCluster._start"), + mock.patch("weakref.finalize"), + ): + # Make ourselves a test cluster + fc1 = ECSCluster( + name=fc1_name, + cluster_arn=cluster_arn, + vpc=vpc_name, + subnets=vpc_subnets, + skip_cleanup=True, + ) + # Are we re-using the existing ECS cluster? + assert fc1.cluster_name == cluster_name + # Have we made completely unique AWS resources to run on that cluster? + assert fc1._execution_role_name == expected_execution_role_name1 + assert fc1._task_role_name == expected_task_role_name1 + assert fc1._cloudwatch_logs_stream_prefix == expected_log_stream_prefix1 + assert ( + fc1.scheduler_spec["options"]["log_stream_prefix"] + == expected_log_stream_prefix1 + ) + assert ( + fc1.new_spec["options"]["log_stream_prefix"] == expected_log_stream_prefix1 + ) + assert fc1.cloudwatch_logs_group == log_group_name + assert fc1.scheduler_spec["options"]["log_group"] == log_group_name + assert fc1.new_spec["options"]["log_group"] == log_group_name + sg_calls = mock_client.create_security_group.call_args_list + assert len(sg_calls) == 1 + assert sg_calls[0].kwargs["GroupName"] == expected_security_group_name1 + td_calls = mock_client.register_task_definition.call_args_list + assert len(td_calls) == 2 + assert td_calls[0].kwargs["family"] == expected_scheduler_task_name1 + assert td_calls[1].kwargs["family"] == expected_worker_task_name1 + + # Reset mocks ready for second cluster + mock_client.create_security_group.reset_mock() + mock_client.register_task_definition.reset_mock() + + # Make ourselves a second test cluster on the same ECS cluster + fc2 = ECSCluster( + name=fc2_name, + cluster_arn=cluster_arn, + vpc=vpc_name, + subnets=vpc_subnets, + skip_cleanup=True, + ) + # Are we re-using the existing ECS cluster? + assert fc2.cluster_name == cluster_name + # Have we made completely unique AWS resources to run on that cluster? + assert fc2._execution_role_name == expected_execution_role_name2 + assert fc2._task_role_name == expected_task_role_name2 + assert fc2._cloudwatch_logs_stream_prefix == expected_log_stream_prefix2 + assert ( + fc2.scheduler_spec["options"]["log_stream_prefix"] + == expected_log_stream_prefix2 + ) + assert ( + fc2.new_spec["options"]["log_stream_prefix"] == expected_log_stream_prefix2 + ) + assert fc2.cloudwatch_logs_group == log_group_name + assert fc2.scheduler_spec["options"]["log_group"] == log_group_name + assert fc2.new_spec["options"]["log_group"] == log_group_name + sg_calls = mock_client.create_security_group.call_args_list + assert len(sg_calls) == 1 + assert sg_calls[0].kwargs["GroupName"] == expected_security_group_name2 + td_calls = mock_client.register_task_definition.call_args_list + assert len(td_calls) == 2 + assert td_calls[0].kwargs["family"] == expected_scheduler_task_name2 + assert td_calls[1].kwargs["family"] == expected_worker_task_name2 + + # Finish up + fc1.close() + fc2.close()