diff --git a/aztk/client/base/base_operations.py b/aztk/client/base/base_operations.py index a1cff5be..1d6001fe 100644 --- a/aztk/client/base/base_operations.py +++ b/aztk/client/base/base_operations.py @@ -1,9 +1,10 @@ from aztk import models from aztk.internal import cluster_data -from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node, - generate_user_on_cluster, generate_user_on_node, get_application_log, get_recent_job, - get_remote_login_settings, get_task_state, list_tasks, node_run, run, ssh_into_node, task_table) +from .helpers import (create_batch_resources, create_user_on_cluster, create_user_on_node, delete_batch_resources, + delete_user_on_cluster, delete_user_on_node, generate_user_on_cluster, generate_user_on_node, + get_application_log, get_node, get_remote_login_settings, get_task, get_task_state, list_tasks, + node_run, run, ssh_into_node, task_table) class BaseOperations: @@ -12,15 +13,19 @@ class BaseOperations: Attributes: batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the Azure Batch service. - blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage - Blob service. + cloud_storage_account (:obj:`azure.storage.blob.CloudStorageAccount`): Azure Storage account used + block_blob_service (:obj:`azure.storage.blob.CloudStorageAccount`): Client used to interact with the + Azure Storage Blob service. + table_service (:obj:`azure.storage.blob.CloudStorageAccount`): Client used to interact with the Azure Storage + Table service. secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate with Azure and the clusters. """ def __init__(self, context): self.batch_client = context["batch_client"] - self.blob_client = context["blob_client"] + self.cloud_storage_account = context["cloud_storage_account"] + self.block_blob_service = context["block_blob_service"] self.table_service = context["table_service"] self.secrets_configuration = context["secrets_configuration"] @@ -53,7 +58,7 @@ def get_cluster_data(self, id: str) -> cluster_data.ClusterData: Returns: :obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster """ - return cluster_data.ClusterData(self.blob_client, id) + return cluster_data.ClusterData(self.block_blob_service, id) def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): """Open an ssh tunnel to a node @@ -75,6 +80,56 @@ def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port """ ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal) + def create_batch_resources( + self, + id, + start_task, + job_manager_task, + vm_size, + vm_image_model, + on_all_tasks_complete, + mixed_mode, + software_metadata_key, + mode_metadata_key, + size_dedicated, + size_low_priority, + subnet_id, + job_metadata, + ): + """Create the underlying batch resources for a cluster or a job + Args: + ... + Returns: + ... + """ + + return create_batch_resources.create_batch_resources( + self.batch_client, + id, + start_task, + job_manager_task, + vm_size, + vm_image_model, + on_all_tasks_complete, + mixed_mode, + software_metadata_key, + mode_metadata_key, + size_dedicated, + size_low_priority, + subnet_id, + job_metadata, + ) + + def delete_batch_resources(self, id, keep_logs): + """Delete the underlying batch resources for a cluster or a job + Args: + ... + Returns: + ... + """ + + return delete_batch_resources.delete_batch_resources(self, id, keep_logs) + def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None): """Create a user on a node @@ -234,28 +289,6 @@ def create_task_table(self, id: str): """ return task_table.create_task_table(self.table_service, id) - def list_task_table_entries(self, id): - """list tasks in a storage table - - Args: - id (:obj:`str`): the id of the cluster - - Returns: - :obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table - """ - return task_table.list_task_table_entries(self.table_service, id) - - def get_task_from_table(self, id, task_id): - """Create a storage table to track tasks - - Args: - id (:obj:`str`): the id of the cluster - - Returns: - :obj:`[aztk.models.Task]`: the task with id task_id from the cluster's storage table - """ - return task_table.get_task_from_table(self.table_service, id, task_id) - def insert_task_into_task_table(self, id, task): """Insert a task into the table @@ -300,16 +333,6 @@ def list_tasks(self, id): """ return list_tasks.list_tasks(self, id) - def get_recent_job(self, id): - """Get the most recently run job in an Azure Batch job schedule - - Args: - id (:obj:`str`): the id of the job schedule - Returns: - :obj:`[azure.batch.models.Job]`: the most recently run job on the job schedule - """ - return get_recent_job.get_recent_job(self, id) - def get_task_state(self, id: str, task_name: str): """Get the status of a submitted task @@ -322,25 +345,24 @@ def get_task_state(self, id: str, task_name: str): """ return get_task_state.get_task_state(self, id, task_name) - def list_batch_tasks(self, id: str): - """Get the status of a submitted task + def get_task(self, id: str, task_id: str): + """Get a task submitted to a cluster Args: - id (:obj:`str`): the name of the cluster the task was submitted to + id (:obj:`str`): the id of the cluster Returns: - :obj:`[aztk.models.Task]`: list of aztk tasks + :obj:`[aztk.models.Task]`: the submitted task with id task_id """ - return task_table.list_batch_tasks(self.batch_client, id) - - def get_batch_task(self, id: str, task_id: str): - """Get the status of a submitted task + return get_task.get_task(self, id, task_id) + def get_node(self, id: str, node_id: str): + """Get a node in a cluster Args: - id (:obj:`str`): the name of the cluster the task was submitted to - task_id (:obj:`str`): the name of the task to get + id (:obj:`str`): the id of the cluster + node_id (:obj:`str`): the id of the node Returns: - :obj:`aztk.models.Task`: aztk Task representing the Batch Task + :obj:`[azure.batch.models.ComputeNode]`: the requested node """ - return task_table.get_batch_task(self.batch_client, id, task_id) + return get_node.get_node(self, id, node_id) diff --git a/aztk/client/base/helpers/create_batch_resources.py b/aztk/client/base/helpers/create_batch_resources.py new file mode 100644 index 00000000..44fd1be5 --- /dev/null +++ b/aztk/client/base/helpers/create_batch_resources.py @@ -0,0 +1,69 @@ +from datetime import timedelta + +import azure.batch.models as batch_models + +from aztk.utils import constants, helpers + + +def create_batch_resources( + batch_client, + id, + start_task, + job_manager_task, + vm_size, + vm_image_model, + on_all_tasks_complete, + mixed_mode, + software_metadata_key, + mode_metadata_key, + size_dedicated, + size_low_priority, + subnet_id, + job_metadata, +): + autoscale_formula = "$TargetDedicatedNodes = {0}; " "$TargetLowPriorityNodes = {1}".format( + size_dedicated, size_low_priority) + + sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku( + vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, batch_client) + + network_conf = None + if subnet_id is not None: + network_conf = batch_models.NetworkConfiguration(subnet_id=subnet_id) + + auto_pool_specification = batch_models.AutoPoolSpecification( + pool_lifetime_option=batch_models.PoolLifetimeOption.job, + auto_pool_id_prefix=id, + keep_alive=False, + pool=batch_models.PoolSpecification( + display_name=id, + virtual_machine_configuration=batch_models.VirtualMachineConfiguration( + image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use), + vm_size=vm_size, + enable_auto_scale=True, + auto_scale_formula=autoscale_formula, + auto_scale_evaluation_interval=timedelta(minutes=5), + start_task=start_task, + enable_inter_node_communication=not mixed_mode, + network_configuration=network_conf, + max_tasks_per_node=4, + metadata=[ + batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key), + batch_models.MetadataItem( + name=constants.AZTK_MODE_METADATA_KEY, + value=constants.AZTK_JOB_MODE_METADATA) # dyanmically change to cluster/job metadata + ]), + ) + + job = batch_models.JobAddParameter( + id=id, + pool_info=batch_models.PoolInformation(auto_pool_specification=auto_pool_specification), + job_manager_task=job_manager_task, + on_all_tasks_complete=on_all_tasks_complete, + metadata=[ + batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key), + batch_models.MetadataItem(name=constants.AZTK_MODE_METADATA_KEY, value=mode_metadata_key) + ] + job_metadata, + ) + + return batch_client.job.add(job) diff --git a/aztk/client/base/helpers/delete_batch_resources.py b/aztk/client/base/helpers/delete_batch_resources.py new file mode 100644 index 00000000..b3c017d3 --- /dev/null +++ b/aztk/client/base/helpers/delete_batch_resources.py @@ -0,0 +1,29 @@ +from azure.batch.models import BatchErrorException +from msrest.exceptions import ClientRequestError + +from aztk.utils import BackOffPolicy, helpers, retry + + +@retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) +def delete_batch_resources(core_base_operations, job_id, keep_logs: bool = False): + success = False + + # delete batch job, autopool + try: + core_base_operations.batch_client.job.delete(job_id) + success = True + except BatchErrorException: + pass + + # delete storage container + if not keep_logs: + cluster_data = core_base_operations.get_cluster_data(job_id) + cluster_data.delete_container(job_id) + success = True + + table_exists = core_base_operations.table_service.exists(job_id) + if table_exists: + core_base_operations.delete_task_table(job_id) + success = True + + return success diff --git a/aztk/client/base/helpers/get_application_log.py b/aztk/client/base/helpers/get_application_log.py index b992f2b0..0a7c270c 100644 --- a/aztk/client/base/helpers/get_application_log.py +++ b/aztk/client/base/helpers/get_application_log.py @@ -6,7 +6,7 @@ from aztk import error, models from aztk.models import Task, TaskState -from aztk.utils import constants, helpers +from aztk.utils import batch_error_manager, constants, helpers output_file = constants.TASK_WORKING_DIR + "/" + constants.SPARK_SUBMIT_LOGS_FILE @@ -19,19 +19,15 @@ def __check_task_node_exist(batch_client, cluster_id: str, task: Task) -> bool: return False -def __wait_for_app_to_be_running(base_operations, cluster_id: str, application_name: str) -> Task: - """ - Wait for the batch task to leave the waiting state into running(or completed if it was fast enough) - """ - - while True: - task_state = base_operations.get_task_state(cluster_id, application_name) - - if task_state in [batch_models.TaskState.active, batch_models.TaskState.preparing]: - # TODO: log - time.sleep(5) - else: - return base_operations.get_batch_task(id=cluster_id, task_id=application_name) +def wait_for_task(base_operations, cluster_id, application_name): + # TODO: ensure get_task_state not None or throw + task = base_operations.get_task(cluster_id, application_name) + while task.state not in [TaskState.Completed, TaskState.Failed]: + time.sleep(3) + # TODO: enable logger + # log.debug("{} {}: application not yet complete".format(cluster_id, application_name)) + task = base_operations.get_task(cluster_id, application_name) + return task def __get_output_file_properties(batch_client, cluster_id: str, application_name: str): @@ -48,18 +44,18 @@ def __get_output_file_properties(batch_client, cluster_id: str, application_name raise e -def get_log_from_storage(blob_client, container_name, application_name, task): +def get_log_from_storage(block_blob_service, container_name, application_name, task): """ Args: - blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage + block_blob_service (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage Blob service. container_name (:obj:`str`): the name of the Azure Blob storage container to get data from application_name (:obj:`str`): the name of the application to get logs for task (:obj:`aztk.models.Task`): the aztk task for for this application - """ try: - blob = blob_client.get_blob_to_text(container_name, application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE) + blob = block_blob_service.get_blob_to_text(container_name, + application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE) except azure.common.AzureMissingResourceHttpError: raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.") @@ -77,10 +73,11 @@ def wait_for_scheduling_target_task(base_operations, cluster_id, application_nam application_state = base_operations.get_task_state(cluster_id, application_name) while TaskState(application_state) not in [TaskState.Completed, TaskState.Failed]: time.sleep(3) + print("Application {}: State {}".format(TaskState(application_state), application_name)) # TODO: enable logger # log.debug("{} {}: application not yet complete".format(cluster_id, application_name)) application_state = base_operations.get_task_state(cluster_id, application_name) - return base_operations.get_task_from_table(cluster_id, application_name) + return base_operations.get_task(cluster_id, application_name) def get_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0): @@ -88,13 +85,12 @@ def get_log(base_operations, cluster_id: str, application_name: str, tail=False, task_id = application_name cluster_configuration = base_operations.get_cluster_configuration(cluster_id) + task = wait_for_task(base_operations, cluster_id, application_name) if cluster_configuration.scheduling_target is not models.SchedulingTarget.Any: - task = wait_for_scheduling_target_task(base_operations, cluster_id, application_name) - return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task) + return get_log_from_storage(base_operations.block_blob_service, cluster_id, application_name, task) else: - task = __wait_for_app_to_be_running(base_operations, cluster_id, application_name) if not __check_task_node_exist(base_operations.batch_client, cluster_id, task): - return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task) + return get_log_from_storage(base_operations.block_blob_service, cluster_id, application_name, task) file = __get_output_file_properties(base_operations.batch_client, cluster_id, application_name) target_bytes = file.content_length @@ -129,7 +125,5 @@ def get_log(base_operations, cluster_id: str, application_name: str, tail=False, def get_application_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0): - try: + with batch_error_manager(): return get_log(base_operations, cluster_id, application_name, tail, current_bytes) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/client/base/helpers/get_node.py b/aztk/client/base/helpers/get_node.py new file mode 100644 index 00000000..a95692d5 --- /dev/null +++ b/aztk/client/base/helpers/get_node.py @@ -0,0 +1,7 @@ +from aztk.utils import batch_error_manager + + +def get_node(core_base_operations, cluster_id, node_id): + with batch_error_manager(): + cluster = core_base_operations.get(cluster_id) + return core_base_operations.batch_client.compute_node.get(cluster.pool.id, node_id) diff --git a/aztk/client/base/helpers/get_recent_job.py b/aztk/client/base/helpers/get_recent_job.py deleted file mode 100644 index 9ac089ee..00000000 --- a/aztk/client/base/helpers/get_recent_job.py +++ /dev/null @@ -1,14 +0,0 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers - - -# Note: this only works with jobs, not clusters -# cluster impl is planned to change to job schedule -def get_recent_job(core_job_operations, id): - try: - job_schedule = core_job_operations.batch_client.job_schedule.get(id) - return core_job_operations.batch_client.job.get(job_schedule.execution_info.recent_job.id) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/client/base/helpers/get_remote_login_settings.py b/aztk/client/base/helpers/get_remote_login_settings.py index 95e13a25..21baae93 100644 --- a/aztk/client/base/helpers/get_remote_login_settings.py +++ b/aztk/client/base/helpers/get_remote_login_settings.py @@ -1,22 +1,19 @@ -from azure.batch.models import BatchErrorException +from aztk import models +from aztk.utils import batch_error_manager -from aztk import error, models -from aztk.utils import helpers - -def _get_remote_login_settings(base_client, pool_id: str, node_id: str): +def _get_remote_login_settings(core_base_operations, cluster_id: str, node_id: str): """ Get the remote_login_settings for node - :param pool_id + :param cluster_id :param node_id :returns aztk.models.RemoteLogin """ - result = base_client.batch_client.compute_node.get_remote_login_settings(pool_id, node_id) + cluster = core_base_operations.get(cluster_id) + result = core_base_operations.batch_client.compute_node.get_remote_login_settings(cluster.pool.id, node_id) return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port)) -def get_remote_login_settings(base_client, cluster_id: str, node_id: str): - try: - return _get_remote_login_settings(base_client, cluster_id, node_id) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) +def get_remote_login_settings(core_base_operations, cluster_id: str, node_id: str): + with batch_error_manager(): + return _get_remote_login_settings(core_base_operations, cluster_id, node_id) diff --git a/aztk/client/base/helpers/get_task.py b/aztk/client/base/helpers/get_task.py new file mode 100644 index 00000000..bb1efbd8 --- /dev/null +++ b/aztk/client/base/helpers/get_task.py @@ -0,0 +1,21 @@ +from aztk.models import SchedulingTarget + +from .task_table import get_batch_task, get_task_from_table + + +def get_task(core_base_operations, id, task_id): + """Get a task submitted to a job or cluster + + This will work for both Batch scheduling and scheduling_target + + Args: + id: cluster or job id + Returns: + List[aztk.models.Task] + + """ + scheduling_target = core_base_operations.get_cluster_configuration(id).scheduling_target + if scheduling_target is not SchedulingTarget.Any: + return get_task_from_table(core_base_operations.table_service, id, task_id) + else: + return get_batch_task(core_base_operations.batch_client, id, task_id) diff --git a/aztk/client/base/helpers/get_task_state.py b/aztk/client/base/helpers/get_task_state.py index 98074a4e..1e574fdf 100644 --- a/aztk/client/base/helpers/get_task_state.py +++ b/aztk/client/base/helpers/get_task_state.py @@ -1,18 +1,8 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.models import SchedulingTarget, TaskState -from aztk.utils import helpers +from aztk.models import SchedulingTarget +from aztk.utils import batch_error_manager def get_task_state(core_cluster_operations, cluster_id: str, task_id: str): - try: - scheduling_target = core_cluster_operations.get_cluster_configuration(cluster_id).scheduling_target - if scheduling_target is not SchedulingTarget.Any: - task = core_cluster_operations.get_task_from_table(cluster_id, task_id) - return task.state - else: - task = core_cluster_operations.get_batch_task(cluster_id, task_id) + with batch_error_manager(): + task = core_cluster_operations.get_task(cluster_id, task_id) return task.state - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/client/base/helpers/list_tasks.py b/aztk/client/base/helpers/list_tasks.py index 969c820b..89fce7c1 100644 --- a/aztk/client/base/helpers/list_tasks.py +++ b/aztk/client/base/helpers/list_tasks.py @@ -1,13 +1,14 @@ +from azure.batch.models import BatchErrorException + from aztk.models import SchedulingTarget -from .get_recent_job import get_recent_job -from .task_table import list_task_table_entries +from .task_table import list_batch_tasks, list_task_table_entries def list_tasks(core_base_operations, id): """List all tasks on a job or cluster - This will work for both Batch scheduling and scheduling_target + This works for both Batch scheduling and scheduling_target Args: id: cluster or job id @@ -19,8 +20,4 @@ def list_tasks(core_base_operations, id): if scheduling_target is not SchedulingTarget.Any: return list_task_table_entries(core_base_operations.table_service, id) else: - # note: this currently only works for job_schedules - # cluster impl is planned to move to job schedules - recent_run_job = get_recent_job(core_base_operations, id) - tasks = core_base_operations.list_batch_tasks(id=recent_run_job.id) - return tasks + return list_batch_tasks(batch_client=core_base_operations.batch_client, id=id) diff --git a/aztk/client/base/helpers/node_run.py b/aztk/client/base/helpers/node_run.py index 02320123..e9bb1216 100644 --- a/aztk/client/base/helpers/node_run.py +++ b/aztk/client/base/helpers/node_run.py @@ -4,17 +4,16 @@ def node_run(base_client, cluster_id, node_id, command, internal, container_name=None, timeout=None, block=True): cluster = base_client.get(cluster_id) - pool, nodes = cluster.pool, list(cluster.nodes) try: - node = next(node for node in nodes if node.id == node_id) + node = next(node for node in cluster.nodes if node.id == node_id) except StopIteration: raise error.AztkError("Node with id {} not found".format(node_id)) if internal: node_rls = models.RemoteLogin(ip_address=node.ip_address, port="22") else: - node_rls = base_client.get_remote_login_settings(pool.id, node.id) + node_rls = base_client.get_remote_login_settings(cluster_id, node.id) try: - generated_username, ssh_key = base_client.generate_user_on_node(pool.id, node.id) + generated_username, ssh_key = base_client.generate_user_on_node(cluster.pool.id, node.id) output = ssh_lib.node_exec_command( node.id, command, @@ -27,4 +26,4 @@ def node_run(base_client, cluster_id, node_id, command, internal, container_name block=block) return output finally: - base_client.delete_user_on_node(cluster_id, node.id, generated_username) + base_client.delete_user_on_node(cluster.pool.id, node.id, generated_username) diff --git a/aztk/client/base/helpers/run.py b/aztk/client/base/helpers/run.py index c82b9898..f234b70a 100644 --- a/aztk/client/base/helpers/run.py +++ b/aztk/client/base/helpers/run.py @@ -1,11 +1,8 @@ import asyncio -from azure.batch.models import BatchErrorException - import aztk.models as models -from aztk import error +from aztk.utils import batch_error_manager from aztk.utils import ssh as ssh_lib -from aztk.utils import helpers def cluster_run(base_operations, cluster_id, command, internal, container_name=None, timeout=None): @@ -14,11 +11,9 @@ def cluster_run(base_operations, cluster_id, command, internal, container_name=N if internal: cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes] else: - cluster_nodes = [(node, base_operations.get_remote_login_settings(pool.id, node.id)) for node in nodes] - try: + cluster_nodes = [(node, base_operations.get_remote_login_settings(cluster_id, node.id)) for node in nodes] + with batch_error_manager(): generated_username, ssh_key = base_operations.generate_user_on_cluster(pool.id, nodes) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) try: output = asyncio.get_event_loop().run_until_complete( diff --git a/aztk/client/base/helpers/task_table.py b/aztk/client/base/helpers/task_table.py index 5d92b80e..5dd9da25 100644 --- a/aztk/client/base/helpers/task_table.py +++ b/aztk/client/base/helpers/task_table.py @@ -42,7 +42,7 @@ def __convert_batch_task_to_aztk_task(batch_task): task.id = batch_task.id if batch_task.node_info: task.node_id = batch_task.node_info.node_id - task.state = batch_task.state + task.state = TaskState(batch_task.state.value) task.state_transition_time = batch_task.state_transition_time task.command_line = batch_task.command_line task.exit_code = batch_task.execution_info.exit_code diff --git a/aztk/client/client.py b/aztk/client/client.py index 49b8740c..b7a2e5bf 100644 --- a/aztk/client/client.py +++ b/aztk/client/client.py @@ -13,7 +13,7 @@ class CoreClient: def __init__(self): self.secrets_configuration = None self.batch_client = None - self.blob_client = None + self.cloud_storage_account = None self.table_service = None def _get_context(self, secrets_configuration: models.SecretsConfiguration): @@ -21,11 +21,13 @@ def _get_context(self, secrets_configuration: models.SecretsConfiguration): azure_api.validate_secrets(secrets_configuration) self.batch_client = azure_api.make_batch_client(secrets_configuration) - self.blob_client = azure_api.make_blob_client(secrets_configuration) + self.cloud_storage_account = azure_api.make_cloud_storage_account(secrets_configuration) + self.block_blob_service = self.cloud_storage_account.create_block_blob_service() self.table_service = azure_api.make_table_service(secrets_configuration) context = { "batch_client": self.batch_client, - "blob_client": self.blob_client, + "cloud_storage_account": self.cloud_storage_account, + "block_blob_service": self.block_blob_service, "table_service": self.table_service, "secrets_configuration": self.secrets_configuration, } diff --git a/aztk/client/cluster/helpers/copy.py b/aztk/client/cluster/helpers/copy.py index f0935723..25352679 100644 --- a/aztk/client/cluster/helpers/copy.py +++ b/aztk/client/cluster/helpers/copy.py @@ -2,10 +2,9 @@ from azure.batch.models import BatchErrorException -import aztk.models as models -from aztk import error +from aztk import models +from aztk.utils import batch_error_manager from aztk.utils import ssh as ssh_lib -from aztk.utils import helpers def cluster_copy( @@ -23,12 +22,10 @@ def cluster_copy( if internal: cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes] else: - cluster_nodes = [(node, cluster_operations.get_remote_login_settings(pool.id, node.id)) for node in nodes] + cluster_nodes = [(node, cluster_operations.get_remote_login_settings(cluster_id, node.id)) for node in nodes] - try: + with batch_error_manager(): generated_username, ssh_key = cluster_operations.generate_user_on_cluster(pool.id, nodes) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) try: output = asyncio.get_event_loop().run_until_complete( diff --git a/aztk/client/cluster/helpers/create.py b/aztk/client/cluster/helpers/create.py index 91ef524d..598f632f 100644 --- a/aztk/client/cluster/helpers/create.py +++ b/aztk/client/cluster/helpers/create.py @@ -1,9 +1,7 @@ -from datetime import timedelta - import azure.batch.models as batch_models from aztk import models -from aztk.utils import constants, helpers +from aztk.utils import constants def create_pool_and_job_and_table( @@ -11,7 +9,7 @@ def create_pool_and_job_and_table( cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, - VmImageModel, + vm_image_model, ): """ Create a pool and job @@ -25,51 +23,24 @@ def create_pool_and_job_and_table( # save cluster configuration in storage core_cluster_operations.get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf) - # reuse pool_id as job_id - pool_id = cluster_conf.cluster_id - job_id = cluster_conf.cluster_id - - # Get a verified node agent sku - sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku( - VmImageModel.publisher, VmImageModel.offer, VmImageModel.sku, core_cluster_operations.batch_client) - - network_conf = None - if cluster_conf.subnet_id is not None: - network_conf = batch_models.NetworkConfiguration(subnet_id=cluster_conf.subnet_id) - auto_scale_formula = "$TargetDedicatedNodes={0}; $TargetLowPriorityNodes={1}".format( - cluster_conf.size, cluster_conf.size_low_priority) - - # Configure the pool - pool = batch_models.PoolAddParameter( - id=pool_id, - virtual_machine_configuration=batch_models.VirtualMachineConfiguration( - image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use), - vm_size=cluster_conf.vm_size, - enable_auto_scale=True, - auto_scale_formula=auto_scale_formula, - auto_scale_evaluation_interval=timedelta(minutes=5), + core_cluster_operations.create_batch_resources( + id=cluster_conf.cluster_id, start_task=start_task, - enable_inter_node_communication=True if not cluster_conf.subnet_id else False, - max_tasks_per_node=4, - network_configuration=network_conf, - metadata=[ - batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key), - batch_models.MetadataItem( - name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_CLUSTER_MODE_METADATA), - ], + job_manager_task=None, + vm_size=cluster_conf.vm_size, + vm_image_model=vm_image_model, + on_all_tasks_complete=batch_models.OnAllTasksComplete.no_action, + mixed_mode=cluster_conf.mixed_mode, + software_metadata_key=software_metadata_key, + mode_metadata_key=constants.AZTK_CLUSTER_MODE_METADATA, + size_dedicated=cluster_conf.size, + size_low_priority=cluster_conf.size_low_priority, + subnet_id=cluster_conf.subnet_id, + job_metadata=[], ) - # Create the pool + create user for the pool - helpers.create_pool_if_not_exist(pool, core_cluster_operations.batch_client) - - # Create job - job = batch_models.JobAddParameter(id=job_id, pool_info=batch_models.PoolInformation(pool_id=pool_id)) - - # Add job to batch - core_cluster_operations.batch_client.job.add(job) - # create storage task table if cluster_conf.scheduling_target != models.SchedulingTarget.Any: core_cluster_operations.create_task_table(cluster_conf.cluster_id) - return helpers.get_cluster(cluster_conf.cluster_id, core_cluster_operations.batch_client) + return core_cluster_operations.get(cluster_conf.cluster_id) diff --git a/aztk/client/cluster/helpers/delete.py b/aztk/client/cluster/helpers/delete.py deleted file mode 100644 index 1cbc13a7..00000000 --- a/aztk/client/cluster/helpers/delete.py +++ /dev/null @@ -1,43 +0,0 @@ -from azure.batch.models import BatchErrorException -from msrest.exceptions import ClientRequestError - -from aztk.utils import BackOffPolicy, retry - - -def delete_pool_and_job_and_table(core_cluster_operations, pool_id: str, keep_logs: bool = False): - """ - Delete a pool and it's associated job - :param cluster_id: the pool to add the user to - :return bool: deleted the pool if exists and job if exists - """ - # job id is equal to pool id - job_exists = True - - try: - core_cluster_operations.batch_client.job.get(pool_id) - except BatchErrorException: - job_exists = False - - pool_exists = core_cluster_operations.batch_client.pool.exists(pool_id) - - table_exists = core_cluster_operations.table_service.exists(pool_id) - - if job_exists: - delete_object(core_cluster_operations.batch_client.job.delete, pool_id) - - if pool_exists: - delete_object(core_cluster_operations.batch_client.pool.delete, pool_id) - - if table_exists: - delete_object(core_cluster_operations.delete_task_table, pool_id) - - if not keep_logs: - cluster_data = core_cluster_operations.get_cluster_data(pool_id) - cluster_data.delete_container(pool_id) - - return job_exists or pool_exists or table_exists - - -@retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) -def delete_object(function, *args, **kwargs): - return function(*args, **kwargs) diff --git a/aztk/client/cluster/helpers/get.py b/aztk/client/cluster/helpers/get.py index d8674afd..146d2e8e 100644 --- a/aztk/client/cluster/helpers/get.py +++ b/aztk/client/cluster/helpers/get.py @@ -1,5 +1,11 @@ -# TODO: return Cluster instead of (pool, nodes) -from aztk import models +from aztk import error, models + + +def convert_job_id_to_pool_id(batch_client, cluster_id): + job = batch_client.job.get(cluster_id) + if job.execution_info and job.execution_info.pool_id: + return job.execution_info.pool_id + raise error.AztkError("No cluster with id {} does not exist.".format(cluster_id)) def get_pool_details(core_cluster_operations, cluster_id: str): @@ -8,6 +14,7 @@ def get_pool_details(core_cluster_operations, cluster_id: str): :param cluster_id: Id of the cluster :return pool: CloudPool, nodes: ComputeNodePaged """ - pool = core_cluster_operations.batch_client.pool.get(cluster_id) - nodes = core_cluster_operations.batch_client.compute_node.list(pool_id=cluster_id) - return models.Cluster(pool, nodes) + pool_id = convert_job_id_to_pool_id(core_cluster_operations.batch_client, cluster_id) + pool = core_cluster_operations.batch_client.pool.get(pool_id) + nodes = core_cluster_operations.batch_client.compute_node.list(pool_id=pool_id) + return models.Cluster(cluster_id, pool, nodes) diff --git a/aztk/client/cluster/helpers/list.py b/aztk/client/cluster/helpers/list.py index b439d93e..087e369b 100644 --- a/aztk/client/cluster/helpers/list.py +++ b/aztk/client/cluster/helpers/list.py @@ -2,17 +2,23 @@ from aztk.utils import constants -def list_clusters(cluster_client, software_metadata_key): +def job_get_pool(core_cluster_operations, job): + if job.execution_info and job.execution_info.pool_id: + return core_cluster_operations.batch_client.pool.get(job.execution_info.pool_id) + + +def list_clusters(core_cluster_operations, software_metadata_key): """ List all the cluster on your account. """ - pools = cluster_client.batch_client.pool.list() + jobs = core_cluster_operations.batch_client.job.list() software_metadata = (constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key) cluster_metadata = (constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA) aztk_clusters = [] - for pool in [pool for pool in pools if pool.metadata]: - pool_metadata = [(metadata.name, metadata.value) for metadata in pool.metadata] - if all([metadata in pool_metadata for metadata in [software_metadata, cluster_metadata]]): - aztk_clusters.append(models.Cluster(pool)) + for job in jobs: + if job.metadata: + job_metadata = [(metadata.name, metadata.value) for metadata in job.metadata] + if all([metadata in job_metadata for metadata in [software_metadata, cluster_metadata]]): + aztk_clusters.append(models.Cluster(job.id, job_get_pool(core_cluster_operations, job))) return aztk_clusters diff --git a/aztk/client/cluster/operations.py b/aztk/client/cluster/operations.py index efac874e..5a96e041 100644 --- a/aztk/client/cluster/operations.py +++ b/aztk/client/cluster/operations.py @@ -1,7 +1,7 @@ from aztk.client.base import BaseOperations from aztk.models import ClusterConfiguration -from .helpers import copy, create, delete, get, list, wait_for_task_to_complete +from .helpers import copy, create, get, list, wait_for_task_to_complete class CoreClusterOperations(BaseOperations): @@ -58,20 +58,6 @@ def copy(self, id, source_path, destination_path=None, container_name=None, inte """ return copy.cluster_copy(self, id, source_path, destination_path, container_name, internal, get, timeout) - def delete(self, id: str, keep_logs: bool = False): - """Copy files to or from every node in a cluster. - - Args: - id (:obj:`str`): the id of the cluster to delete - keep_logs (:obj:`bool`): If True, the logs related to this cluster in Azure Storage are not deleted. - Defaults to False. - - Returns: - :obj:`List[aztk.models.NodeOutput]`: - A list of NodeOutput objects representing the output of the copy command. - """ - return delete.delete_pool_and_job_and_table(self, id, keep_logs) - def list(self, software_metadata_key): """List clusters running the specified software. diff --git a/aztk/client/job/helpers/get.py b/aztk/client/job/helpers/get.py new file mode 100644 index 00000000..ce7136ff --- /dev/null +++ b/aztk/client/job/helpers/get.py @@ -0,0 +1 @@ +# TODO: pass diff --git a/aztk/client/job/helpers/submit.py b/aztk/client/job/helpers/submit.py index 65a9d174..71ba29db 100644 --- a/aztk/client/job/helpers/submit.py +++ b/aztk/client/job/helpers/submit.py @@ -29,59 +29,23 @@ def submit_job( core_job_operations.get_cluster_data(job_configuration.id).save_cluster_config( job_configuration.to_cluster_config()) - # get a verified node agent sku - sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku( - vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, core_job_operations.batch_client) - - # set up subnet if necessary - network_conf = None - if job_configuration.subnet_id: - network_conf = batch_models.NetworkConfiguration(subnet_id=job_configuration.subnet_id) - - # set up a schedule for a recurring job - auto_pool_specification = batch_models.AutoPoolSpecification( - pool_lifetime_option=batch_models.PoolLifetimeOption.job_schedule, - auto_pool_id_prefix=job_configuration.id, - keep_alive=False, - pool=batch_models.PoolSpecification( - display_name=job_configuration.id, - virtual_machine_configuration=batch_models.VirtualMachineConfiguration( - image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use), - vm_size=job_configuration.vm_size, - enable_auto_scale=True, - auto_scale_formula=autoscale_formula, - auto_scale_evaluation_interval=timedelta(minutes=5), - start_task=start_task, - enable_inter_node_communication=not job_configuration.mixed_mode(), - network_configuration=network_conf, - max_tasks_per_node=4, - metadata=[ - batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key), - batch_models.MetadataItem( - name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA), - ], - ), - ) - - # define job specification - job_spec = batch_models.JobSpecification( - pool_info=batch_models.PoolInformation(auto_pool_specification=auto_pool_specification), - display_name=job_configuration.id, - on_all_tasks_complete=batch_models.OnAllTasksComplete.terminate_job, + core_job_operations.create_batch_resources( + id=job_configuration.id, + start_task=start_task, job_manager_task=job_manager_task, - metadata=[batch_models.MetadataItem(name="applications", value=application_metadata)], + vm_size=job_configuration.vm_size, + vm_image_model=vm_image_model, + on_all_tasks_complete=batch_models.OnAllTasksComplete.terminate_job, + mixed_mode=job_configuration.mixed_mode, + software_metadata_key=software_metadata_key, + mode_metadata_key=constants.AZTK_JOB_MODE_METADATA, + size_dedicated=job_configuration.max_dedicated_nodes, + size_low_priority=job_configuration.max_low_pri_nodes, + subnet_id=job_configuration.subnet_id, + job_metadata=[batch_models.MetadataItem(name="applications", value=application_metadata)], ) - # define schedule - schedule = batch_models.Schedule( - do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None) - - # create job schedule and add task - setup = batch_models.JobScheduleAddParameter(id=job_configuration.id, schedule=schedule, job_specification=job_spec) - - core_job_operations.batch_client.job_schedule.add(setup) - if job_configuration.scheduling_target != models.SchedulingTarget.Any: core_job_operations.create_task_table(job_configuration.id) - return core_job_operations.batch_client.job_schedule.get(job_schedule_id=job_configuration.id) + return core_job_operations.batch_client.job.get(job_id=job_configuration.id) diff --git a/aztk/internal/cluster_data/blob_data.py b/aztk/internal/cluster_data/blob_data.py index 6e3fe117..42a1fa53 100644 --- a/aztk/internal/cluster_data/blob_data.py +++ b/aztk/internal/cluster_data/blob_data.py @@ -9,20 +9,20 @@ class BlobData: Object mapping to a blob entry. Can generate resource files for batch """ - def __init__(self, blob_client: BlockBlobService, container: str, blob: str): + def __init__(self, block_blob_service: BlockBlobService, container: str, blob: str): self.container = container self.blob = blob self.dest = blob - self.blob_client = blob_client + self.block_blob_service = block_blob_service def to_resource_file(self, dest: str = None) -> batch_models.ResourceFile: - sas_token = self.blob_client.generate_blob_shared_access_signature( + sas_token = self.block_blob_service.generate_blob_shared_access_signature( self.container, self.blob, permission=BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365), ) - sas_url = self.blob_client.make_blob_url(self.container, self.blob, sas_token=sas_token) + sas_url = self.block_blob_service.make_blob_url(self.container, self.blob, sas_token=sas_token) return batch_models.ResourceFile(file_path=dest or self.dest, blob_source=sas_url) diff --git a/aztk/internal/cluster_data/cluster_data.py b/aztk/internal/cluster_data/cluster_data.py index adc10d5c..05f670af 100644 --- a/aztk/internal/cluster_data/cluster_data.py +++ b/aztk/internal/cluster_data/cluster_data.py @@ -3,6 +3,7 @@ import azure.common import yaml +from azure.storage.blob import BlockBlobService from msrest.exceptions import ClientRequestError from aztk import error @@ -23,8 +24,8 @@ class ClusterData: APPLICATIONS_DIR = "applications" CLUSTER_CONFIG_FILE = "config.yaml" - def __init__(self, blob_client, cluster_id: str): - self.blob_client = blob_client + def __init__(self, block_blob_service: BlockBlobService, cluster_id: str): + self.block_blob_service = block_blob_service self.cluster_id = cluster_id self._ensure_container() @@ -33,13 +34,13 @@ def save_cluster_config(self, cluster_config): blob_path = self.CLUSTER_DIR + "/" + self.CLUSTER_CONFIG_FILE content = yaml.dump(cluster_config) container_name = cluster_config.cluster_id - self.blob_client.create_blob_from_text(container_name, blob_path, content) + self.block_blob_service.create_blob_from_text(container_name, blob_path, content) @retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) def read_cluster_config(self): blob_path = self.CLUSTER_DIR + "/" + self.CLUSTER_CONFIG_FILE try: - result = self.blob_client.get_blob_to_text(self.cluster_id, blob_path) + result = self.block_blob_service.get_blob_to_text(self.cluster_id, blob_path) return yaml.load(result.content) except azure.common.AzureMissingResourceHttpError: raise error.AztkError("Cluster {} doesn't have cluster configuration in storage".format(self.cluster_id)) @@ -48,13 +49,13 @@ def read_cluster_config(self): @retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) def upload_file(self, blob_path: str, local_path: str) -> BlobData: - self.blob_client.create_blob_from_path(self.cluster_id, blob_path, local_path) - return BlobData(self.blob_client, self.cluster_id, blob_path) + self.block_blob_service.create_blob_from_path(self.cluster_id, blob_path, local_path) + return BlobData(self.block_blob_service, self.cluster_id, blob_path) @retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) def upload_bytes(self, blob_path: str, bytes_io: io.BytesIO) -> BlobData: - self.blob_client.create_blob_from_bytes(self.cluster_id, blob_path, bytes_io.getvalue()) - return BlobData(self.blob_client, self.cluster_id, blob_path) + self.block_blob_service.create_blob_from_bytes(self.cluster_id, blob_path, bytes_io.getvalue()) + return BlobData(self.block_blob_service, self.cluster_id, blob_path) def upload_cluster_file(self, blob_path: str, local_path: str) -> BlobData: blob_data = self.upload_bytes(self.CLUSTER_DIR + "/" + blob_path, local_path) @@ -71,8 +72,8 @@ def upload_node_data(self, node_data: NodeData) -> BlobData: @retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) def _ensure_container(self): - self.blob_client.create_container(self.cluster_id, fail_on_exist=False) + self.block_blob_service.create_container(self.cluster_id, fail_on_exist=False) @retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) def delete_container(self, container_name: str): - self.blob_client.delete_container(container_name) + self.block_blob_service.delete_container(container_name) diff --git a/aztk/models/cluster.py b/aztk/models/cluster.py index b4699508..120a7147 100644 --- a/aztk/models/cluster.py +++ b/aztk/models/cluster.py @@ -4,8 +4,8 @@ class Cluster: - def __init__(self, pool: batch_models.CloudPool, nodes: batch_models.ComputeNodePaged = None): - self.id = pool.id + def __init__(self, id, pool: batch_models.CloudPool, nodes: batch_models.ComputeNodePaged = None): + self.id = id self.pool = pool self.nodes = nodes self.vm_size = pool.vm_size diff --git a/aztk/models/task_state.py b/aztk/models/task_state.py index a76b881b..6dc9a8b0 100644 --- a/aztk/models/task_state.py +++ b/aztk/models/task_state.py @@ -2,6 +2,7 @@ class TaskState(Enum): + Active = "active" Running = "running" Completed = "completed" Failed = "failed" diff --git a/aztk/node_scripts/core/config.py b/aztk/node_scripts/core/config.py index 7a1727c9..342d0db7 100644 --- a/aztk/node_scripts/core/config.py +++ b/aztk/node_scripts/core/config.py @@ -1,14 +1,6 @@ import os import re -import azure.batch.batch_auth as batchauth -import azure.batch.batch_service_client as batch -import azure.storage.blob as blob -from azure.common.credentials import ServicePrincipalCredentials -from azure.mgmt.batch import BatchManagementClient -from azure.mgmt.storage import StorageManagementClient -from azure.storage.common import CloudStorageAccount - from aztk.node_scripts.core import log from aztk.spark import Client, models @@ -40,41 +32,6 @@ storage_account_suffix = os.environ.get("STORAGE_ACCOUNT_SUFFIX") -def get_blob_client() -> blob.BlockBlobService: - if not storage_resource_id: - return blob.BlockBlobService( - account_name=storage_account_name, account_key=storage_account_key, endpoint_suffix=storage_account_suffix) - else: - credentials = ServicePrincipalCredentials( - client_id=client_id, secret=credential, tenant=tenant_id, resource="https://management.core.windows.net/") - m = RESOURCE_ID_PATTERN.match(storage_resource_id) - accountname = m.group("account") - subscription = m.group("subscription") - resourcegroup = m.group("resourcegroup") - mgmt_client = StorageManagementClient(credentials, subscription) - key = (mgmt_client.storage_accounts.list_keys(resource_group_name=resourcegroup, account_name=accountname) - .keys[0].value) - storage_client = CloudStorageAccount(accountname, key) - return storage_client.create_block_blob_service() - - -def get_batch_client() -> batch.BatchServiceClient: - if not batch_resource_id: - base_url = batch_service_url - credentials = batchauth.SharedKeyCredentials(batch_account_name, batch_account_key) - else: - credentials = ServicePrincipalCredentials( - client_id=client_id, secret=credential, tenant=tenant_id, resource="https://management.core.windows.net/") - m = RESOURCE_ID_PATTERN.match(batch_resource_id) - batch_client = BatchManagementClient(credentials, m.group("subscription")) - account = batch_client.batch_account.get(m.group("resourcegroup"), m.group("account")) - base_url = "https://%s/" % account.account_endpoint - credentials = ServicePrincipalCredentials( - client_id=client_id, secret=credential, tenant=tenant_id, resource="https://batch.core.windows.net/") - - return batch.BatchServiceClient(credentials, base_url=base_url) - - def get_spark_client(): if all([batch_resource_id, client_id, credential, storage_resource_id, tenant_id]): serice_principle_configuration = models.ServicePrincipalConfiguration( @@ -106,10 +63,12 @@ def get_spark_client(): spark_client = get_spark_client() -# note: the batch_client and blob_client in _core_cluster_operations +# note: the batch_client and cloud_storage_account in _core_cluster_operations # is the same as in _core_job_operations batch_client = spark_client.cluster._core_cluster_operations.batch_client -blob_client = spark_client.cluster._core_cluster_operations.blob_client +cloud_storage_account = spark_client.cluster._core_cluster_operations.cloud_storage_account +block_blob_service = spark_client.cluster._core_cluster_operations.block_blob_service +table_service = spark_client.cluster._core_cluster_operations.table_service log.info("Pool id is %s", pool_id) log.info("Node id is %s", node_id) diff --git a/aztk/node_scripts/core/logger.py b/aztk/node_scripts/core/logger.py index dab61457..04003cf1 100644 --- a/aztk/node_scripts/core/logger.py +++ b/aztk/node_scripts/core/logger.py @@ -1,9 +1,10 @@ -import sys import logging +import sys log = logging.getLogger("aztk.node-agent") DEFAULT_FORMAT = "%(message)s" +VERBOSE_FORMAT = "[%(asctime)s] [%(filename)s:%(module)s:%(funcName)s:%(lineno)d] %(levelname)s - %(message)s" def setup_logging(): @@ -11,4 +12,7 @@ def setup_logging(): logging.root.removeHandler(handler) log.setLevel(logging.INFO) - logging.basicConfig(stream=sys.stdout, format=DEFAULT_FORMAT) + logging.basicConfig(stream=sys.stdout, format=VERBOSE_FORMAT) + + +setup_logging() diff --git a/aztk/node_scripts/core/utils.py b/aztk/node_scripts/core/utils.py new file mode 100644 index 00000000..512b381c --- /dev/null +++ b/aztk/node_scripts/core/utils.py @@ -0,0 +1,17 @@ +import azure.batch.models as batchmodels + +from aztk.node_scripts.core import config + + +def get_pool() -> batchmodels.CloudPool: + return config.batch_client.pool.get(config.pool_id) + + +def get_node(node_id: str) -> batchmodels.ComputeNode: + return config.batch_client.compute_node.get(config.pool_id, node_id) + + +def get_master_node_id(cluster_id) -> str: + cluster = config.spark_client.cluster.get(cluster_id) + if cluster.master_node_id: + return cluster.master_node_id diff --git a/aztk/node_scripts/install/create_user.py b/aztk/node_scripts/install/create_user.py index 650f0e26..12e63523 100644 --- a/aztk/node_scripts/install/create_user.py +++ b/aztk/node_scripts/install/create_user.py @@ -1,10 +1,13 @@ import os +from datetime import datetime, timedelta, timezone + import azure.batch.models as batch_models +import yaml from azure.batch.models import BatchErrorException -from Cryptodome.PublicKey import RSA from Cryptodome.Cipher import AES, PKCS1_OAEP -from datetime import datetime, timezone, timedelta -import yaml +from Cryptodome.PublicKey import RSA + +from aztk.node_scripts.core import log """ Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists """ @@ -14,7 +17,7 @@ def create_user(batch_client): path = os.path.join(os.environ["AZTK_WORKING_DIR"], "user.yaml") if not os.path.isfile(path): - print("No user to create.") + log.info("No user to create.") return with open(path, "r", encoding="UTF-8") as file: @@ -35,7 +38,8 @@ def create_user(batch_client): ), ) except BatchErrorException as e: - print(e) + import traceback + log.info(traceback.format_exc()) def decrypt_password(user_conf): diff --git a/aztk/node_scripts/install/install.py b/aztk/node_scripts/install/install.py index 11ade3be..24d9dc65 100644 --- a/aztk/node_scripts/install/install.py +++ b/aztk/node_scripts/install/install.py @@ -3,14 +3,14 @@ from aztk.internal import cluster_data from aztk.models.plugins import PluginTarget from aztk.node_scripts import wait_until_master_selected -from aztk.node_scripts.core import config +from aztk.node_scripts.core import config, log from aztk.node_scripts.install import (create_user, pick_master, plugins, spark, spark_container) def read_cluster_config(): - data = cluster_data.ClusterData(config.blob_client, config.cluster_id) + data = cluster_data.ClusterData(config.block_blob_service, config.cluster_id) cluster_config = data.read_cluster_config() - print("Got cluster config", cluster_config) + log.info("Got cluster config: %s", cluster_config) return cluster_config @@ -20,27 +20,20 @@ def setup_host(docker_repo: str, docker_run_options: str): :param docker_repo: location of the Docker image to use :param docker_run_options: additional command-line options to pass to docker run """ - client = config.batch_client - - create_user.create_user(batch_client=client) + create_user.create_user(batch_client=config.batch_client) if os.environ["AZ_BATCH_NODE_IS_DEDICATED"] == "true" or os.environ["AZTK_MIXED_MODE"] == "false": - is_master = pick_master.find_master(client) + is_master = pick_master.find_master(config.batch_client) else: is_master = False wait_until_master_selected.main() is_worker = not is_master or os.environ.get("AZTK_WORKER_ON_MASTER") == "true" - master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id)) - master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id) - if is_master: - os.environ["AZTK_IS_MASTER"] = "true" - else: - os.environ["AZTK_IS_MASTER"] = "false" - if is_worker: - os.environ["AZTK_IS_WORKER"] = "true" - else: - os.environ["AZTK_IS_WORKER"] = "false" + cluster = config.spark_client.cluster.get(id=config.cluster_id) + master_node = config.batch_client.compute_node.get(config.pool_id, cluster.master_node_id) + + os.environ["AZTK_IS_MASTER"] = "true" if is_master else "false" + os.environ["AZTK_IS_WORKER"] = "true" if is_worker else "false" os.environ["AZTK_MASTER_IP"] = master_node.ip_address @@ -62,11 +55,11 @@ def setup_spark_container(): """ is_master = os.environ.get("AZTK_IS_MASTER") == "true" is_worker = os.environ.get("AZTK_IS_WORKER") == "true" - print("Setting spark container. Master: ", is_master, ", Worker: ", is_worker) + log.info("Setting spark container. Master: %s, Worker: %s", is_master, is_worker) - print("Copying spark setup config") + log.info("Copying spark setup config") spark.setup_conf() - print("Done copying spark setup config") + log.info("Done copying spark setup config") spark.setup_connection() @@ -78,4 +71,7 @@ def setup_spark_container(): plugins.setup_plugins(target=PluginTarget.SparkContainer, is_master=is_master, is_worker=is_worker) + # TODO: this is a good candidate for a lock. + # this function holds lock until completion, + # poller wait to aquire lock open("/tmp/setup_complete", "a").close() diff --git a/aztk/node_scripts/install/pick_master.py b/aztk/node_scripts/install/pick_master.py index a02f9a13..6364041d 100644 --- a/aztk/node_scripts/install/pick_master.py +++ b/aztk/node_scripts/install/pick_master.py @@ -6,7 +6,8 @@ from azure.batch.models import BatchErrorException from msrest.exceptions import ClientRequestError -from aztk.node_scripts.core import config +import aztk.models +from aztk.node_scripts.core import config, log MASTER_NODE_METADATA_KEY = "_spark_master_node" @@ -15,33 +16,19 @@ class CannotAllocateMasterError(Exception): pass -def get_master_node_id(pool: batchmodels.CloudPool): - """ - :returns: the id of the node that is the assigned master of this pool - """ - if pool.metadata is None: - return None - - for metadata in pool.metadata: - if metadata.name == MASTER_NODE_METADATA_KEY: - return metadata.value - - return None - - -def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodels.CloudPool): - current_metadata = pool.metadata or [] +def try_assign_self_as_master(client: batch.BatchServiceClient, cluster: aztk.models.Cluster): + current_metadata = cluster.pool.metadata or [] new_metadata = current_metadata + [{"name": MASTER_NODE_METADATA_KEY, "value": config.node_id}] try: client.pool.patch( config.pool_id, batchmodels.PoolPatchParameter(metadata=new_metadata), - batchmodels.PoolPatchOptions(if_match=pool.e_tag), + batchmodels.PoolPatchOptions(if_match=cluster.pool.e_tag), ) return True except (BatchErrorException, ClientRequestError): - print("Couldn't assign itself as master the pool because the pool was modified since last get.") + log.info("Couldn't assign itself as master the pool because the pool was modified since last get.") return False @@ -56,22 +43,21 @@ def find_master(client: batch.BatchServiceClient) -> bool: # return False for i in range(0, 5): - pool = client.pool.get(config.pool_id) - master = get_master_node_id(pool) + cluster = config.spark_client.cluster.get(config.cluster_id) - if master: - if master == config.node_id: - print("Node is already the master '{0}'".format(master)) + if cluster.master_node_id: + if cluster.master_node_id == config.node_id: + log.info("Node is already the master '{0}'".format(cluster.master_node_id)) return True else: - print("Pool already has a master '{0}'. This node will be a worker".format(master)) + log.info("Pool already has a master '{0}'. This node will be a worker".format(cluster.master_node_id)) return False else: - print("Pool has no master. Trying to assign itself! ({0}/5)".format(i + 1)) - result = try_assign_self_as_master(client, pool) + log.info("Pool has no master. Trying to assign itself! ({0}/5)".format(i + 1)) + result = try_assign_self_as_master(client, cluster) if result: - print("Assignment was successful! Node {0} is the new master.".format(config.node_id)) + log.info("Assignment was successful! Node {0} is the new master.".format(config.node_id)) return True raise CannotAllocateMasterError("Unable to assign node as a master in 5 tries") diff --git a/aztk/node_scripts/install/spark.py b/aztk/node_scripts/install/spark.py index a1ad2ff0..104b37ea 100644 --- a/aztk/node_scripts/install/spark.py +++ b/aztk/node_scripts/install/spark.py @@ -10,7 +10,7 @@ import azure.batch.models as batchmodels -from aztk.node_scripts.core import config +from aztk.node_scripts.core import config, log, utils from aztk.node_scripts.install import pick_master batch_client = config.batch_client @@ -31,29 +31,12 @@ def setup_as_worker(): start_spark_worker() -def get_pool() -> batchmodels.CloudPool: - return batch_client.pool.get(config.pool_id) - - -def get_node(node_id: str) -> batchmodels.ComputeNode: - return batch_client.compute_node.get(config.pool_id, node_id) - - -def list_nodes() -> List[batchmodels.ComputeNode]: - """ - List all the nodes in the pool. - """ - # TODO use continuation token & verify against current/target dedicated of - # pool - return batch_client.compute_node.list(config.pool_id) - - def setup_connection(): """ This setup spark config with which nodes are slaves and which are master """ - master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id)) - master_node = get_node(master_node_id) + master_node_id = utils.get_master_node_id(config.cluster_id) + master_node = utils.get_node(master_node_id) master_config_file = os.path.join(spark_conf_folder, "master") master_file = open(master_config_file, "w", encoding="UTF-8") @@ -66,13 +49,13 @@ def setup_connection(): def wait_for_master(): print("Waiting for master to be ready.") - master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id)) + master_node_id = utils.get_master_node_id(config.cluster_id) if master_node_id == config.node_id: return while True: - master_node = get_node(master_node_id) + master_node = utils.get_node(master_node_id) if master_node.state in [batchmodels.ComputeNodeState.idle, batchmodels.ComputeNodeState.running]: break @@ -82,7 +65,7 @@ def wait_for_master(): def start_spark_master(): - master_ip = get_node(config.node_id).ip_address + master_ip = utils.get_node(config.node_id).ip_address exe = os.path.join(spark_home, "sbin", "start-master.sh") cmd = [exe, "-h", master_ip, "--webui-port", str(config.spark_web_ui_port)] print("Starting master with '{0}'".format(" ".join(cmd))) @@ -97,8 +80,7 @@ def start_spark_master(): def start_spark_worker(): wait_for_master() exe = os.path.join(spark_home, "sbin", "start-slave.sh") - master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id)) - master_node = get_node(master_node_id) + master_node = utils.get_node(utils.get_master_node_id(config.cluster_id)) cmd = [exe, "spark://{0}:7077".format(master_node.ip_address), "--webui-port", str(config.spark_worker_ui_port)] print("Connecting to master with '{0}'".format(" ".join(cmd))) @@ -169,9 +151,9 @@ def copy_jars(): dest = os.path.join(spark_default_path_dest, jar) print("copy {} to {}".format(src, dest)) copyfile(src, dest) - except Exception as e: - print("Failed to copy jar files with error:") - print(e) + except Exception: + import traceback + print("Failed to copy jar files with error: {}".format(traceback.format_exc())) def parse_configuration_file(path_to_file: str): @@ -183,9 +165,9 @@ def parse_configuration_file(path_to_file: str): split = line.split() properties[split[0]] = split[1] return properties - except Exception as e: - print("Failed to parse configuration file:", path_to_file, "with error:") - print(e) + except Exception: + import traceback + log.print("Failed to parse configuration file: {} with error {}".format(path_to_file, traceback.format_exc())) def start_history_server(): diff --git a/aztk/node_scripts/main.py b/aztk/node_scripts/main.py index 91914201..16b808b1 100644 --- a/aztk/node_scripts/main.py +++ b/aztk/node_scripts/main.py @@ -20,5 +20,4 @@ def run(): if __name__ == "__main__": - logger.setup_logging() run() diff --git a/aztk/node_scripts/scheduling/common.py b/aztk/node_scripts/scheduling/common.py index 40593296..1995e5a1 100644 --- a/aztk/node_scripts/scheduling/common.py +++ b/aztk/node_scripts/scheduling/common.py @@ -1,10 +1,14 @@ import datetime import os +import shlex +import subprocess +import sys import azure.batch.models as batch_models import azure.storage.blob as blob import requests import yaml +from azure.storage.common import CloudStorageAccount from aztk.node_scripts.core import config from aztk.node_scripts.scheduling import scheduling_target @@ -19,7 +23,7 @@ def load_application(application_file_path): return application -def upload_log(blob_client, application): +def upload_log(block_blob_service, application): """ upload output.log to storage account """ @@ -28,7 +32,7 @@ def upload_log(blob_client, application): container_name=os.environ["STORAGE_LOGS_CONTAINER"], application_name=application.name, file_path=log_file, - blob_client=blob_client, + block_blob_service=block_blob_service, use_full_path=False, ) @@ -36,13 +40,13 @@ def upload_log(blob_client, application): def upload_file_to_container(container_name, application_name, file_path, - blob_client=None, + block_blob_service=None, use_full_path=False, node_path=None) -> batch_models.ResourceFile: """ Uploads a local file to an Azure Blob storage container. - :param blob_client: A blob service client. - :type blocblob_clientk_blob_client: `azure.storage.blob.BlockBlobService` + :param block_blob_service: A blob service client. + :type block_blob_service: `azure.storage.blob.BlockBlobService` :param str container_name: The name of the Azure Blob storage container. :param str file_path: The local path to the file. :param str node_path: Path on the local node. By default will be the same as file_path @@ -61,25 +65,24 @@ def upload_file_to_container(container_name, if not node_path: node_path = blob_name - blob_client.create_container(container_name, fail_on_exist=False) + block_blob_service.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_path(container_name, blob_path, file_path) + block_blob_service.create_blob_from_path(container_name, blob_path, file_path) - sas_token = blob_client.generate_blob_shared_access_signature( + sas_token = block_blob_service.generate_blob_shared_access_signature( container_name, blob_path, permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7), ) - sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token) + sas_url = block_blob_service.make_blob_url(container_name, blob_path, sas_token=sas_token) return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url) def upload_error_log(error, application_file_path): application = load_application(application_file_path) - blob_client = config.blob_client error_log_path = os.path.join(os.environ["AZ_BATCH_TASK_WORKING_DIR"], "error.log") with open(error_log_path, "w", encoding="UTF-8") as error_log: @@ -89,13 +92,19 @@ def upload_error_log(error, application_file_path): container_name=os.environ["STORAGE_LOGS_CONTAINER"], application_name=application.name, file_path=os.path.realpath(error_log.name), - blob_client=blob_client, + block_blob_service=config.block_blob_service, use_full_path=False, ) - upload_log(blob_client, application) + upload_log(config.block_blob_service, application) def download_task_definition(task_sas_url): response = scheduling_target.http_request_wrapper(requests.get, task_sas_url, timeout=10) yaml_serialized_task = response.content return yaml.load(yaml_serialized_task) + + +def run_command(command, cluster_id, application): + return_code = subprocess.call(command, shell=True) + upload_log(config.block_blob_service, application) + return return_code diff --git a/aztk/node_scripts/scheduling/job_submission.py b/aztk/node_scripts/scheduling/job_submission.py index c6e24c09..1180fba2 100644 --- a/aztk/node_scripts/scheduling/job_submission.py +++ b/aztk/node_scripts/scheduling/job_submission.py @@ -5,8 +5,7 @@ import azure.batch.models as batch_models import yaml -from aztk.node_scripts.core import config -from aztk.node_scripts.install.pick_master import get_master_node_id +from aztk.node_scripts.core import config, log, utils from aztk.node_scripts.scheduling import common, scheduling_target from aztk.spark.models import ApplicationState from aztk.utils import constants @@ -24,14 +23,13 @@ def read_downloaded_tasks(): try: tasks.append(yaml.load(stream)) except yaml.YAMLError as exc: - print(exc) + log.info(exc) return tasks -def affinitize_task_to_master(batch_client, cluster_id, task): - pool = batch_client.pool.get(config.pool_id) - master_node_id = get_master_node_id(pool) - master_node = batch_client.compute_node.get(pool_id=cluster_id, node_id=master_node_id) +def affinitize_task_to_master(cluster_id, task): + cluster = config.spark_client.cluster.get(id=cluster_id) + master_node = config.batch_client.compute_node.get(pool_id=cluster.pool.id, node_id=cluster.master_node_id) task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id) return task @@ -40,13 +38,9 @@ def schedule_tasks(tasks): """ Handle the request to submit a task """ - batch_client = config.batch_client - for task in tasks: - # affinitize task to master - task = affinitize_task_to_master(batch_client, os.environ["AZ_BATCH_POOL_ID"], task) - # schedule the task - batch_client.task.add(job_id=os.environ["AZ_BATCH_JOB_ID"], task=task) + task = affinitize_task_to_master(config.cluster_id, task) + config.batch_client.task.add(job_id=config.cluster_id, task=task) def select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target): @@ -61,7 +55,6 @@ def schedule_with_target(scheduling_target, task_sas_urls): for task_sas_url in task_sas_urls: task_definition = common.download_task_definition(task_sas_url) task_working_dir = "/mnt/aztk/startup/tasks/workitems/{}".format(task_definition.id) - aztk_cluster_id = os.environ.get("AZTK_CLUSTER_ID") task_cmd = ( r"source ~/.bashrc; " r"mkdir -p {0};" @@ -70,12 +63,12 @@ def schedule_with_target(scheduling_target, task_sas_urls): r"export STORAGE_LOGS_CONTAINER={1};" r"cd $AZ_BATCH_TASK_WORKING_DIR; " r'$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python $AZTK_WORKING_DIR/aztk/node_scripts/scheduling/submit.py "{2}" >> {3} 2>&1'. - format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE)) - node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target) + format(task_working_dir, config.cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE)) + node_id = select_scheduling_target_node(config.spark_client.cluster, config.cluster_id, scheduling_target) node_run_output = config.spark_client.cluster.node_run( - config.pool_id, node_id, task_cmd, timeout=120, block=False, internal=True) + config.cluster_id, node_id, task_cmd, timeout=120, block=False, internal=True) # block job_manager_task until scheduling_target task completion - wait_until_tasks_complete(aztk_cluster_id) + wait_until_tasks_complete(config.cluster_id) def wait_until_tasks_complete(id): @@ -99,10 +92,10 @@ def wait_until_tasks_complete(id): scheduling_target = None if scheduling_target: - print("scheduling with target") + log.info("scheduling with target") task_sas_urls = [task_sas_url for task_sas_url in sys.argv[2:]] schedule_with_target(scheduling_target, task_sas_urls) else: - print("scheduling with batch") + log.info("scheduling with batch") tasks = read_downloaded_tasks() schedule_tasks(tasks) diff --git a/aztk/node_scripts/scheduling/scheduling_target.py b/aztk/node_scripts/scheduling/scheduling_target.py index 2500ee44..24f49420 100644 --- a/aztk/node_scripts/scheduling/scheduling_target.py +++ b/aztk/node_scripts/scheduling/scheduling_target.py @@ -1,10 +1,13 @@ import concurrent.futures +import datetime import os import time import requests from aztk import error +from aztk.models import Task, TaskState +from aztk.node_scripts.core import config def http_request_wrapper(func, *args, timeout=None, max_execution_time=300, **kwargs): @@ -51,3 +54,50 @@ def download_task_resource_files(task_id, resource_files): raise error.AztkError(errors) else: return [result.result() for result in done] + + +def insert_task_into_task_table(cluster_id, task_definition): + current_time = datetime.datetime.utcnow() + task = Task( + id=task_definition.id, + node_id=os.environ.get("AZ_BATCH_NODE_ID", None), + state=TaskState.Running, + state_transition_time=current_time, + command_line=task_definition.command_line, + start_time=current_time, + end_time=None, + exit_code=None, + failure_info=None, + ) + + config.spark_client.cluster._core_cluster_operations.insert_task_into_task_table(cluster_id, task) + return task + + +def get_task(cluster_id, task_id): + return config.spark_client.cluster._core_cluster_operations.get_task(cluster_id, task_id) + + +def mark_task_complete(cluster_id, task_id, exit_code): + current_time = datetime.datetime.utcnow() + + task = get_task(cluster_id, task_id) + task.end_time = current_time + task.exit_code = exit_code + task.state = TaskState.Completed + task.state_transition_time = current_time + + config.spark_client.cluster._core_cluster_operations.update_task_in_task_table(cluster_id, task) + + +def mark_task_failure(cluster_id, task_id, exit_code, failure_info): + current_time = datetime.datetime.utcnow() + + task = get_task(cluster_id, task_id) + task.end_time = current_time + task.exit_code = exit_code + task.state = TaskState.Failed + task.state_transition_time = current_time + task.failure_info = failure_info + + config.spark_client.cluster._core_cluster_operations.update_task_in_task_table(cluster_id, task) diff --git a/aztk/node_scripts/scheduling/submit.py b/aztk/node_scripts/scheduling/submit.py index f6eb26ed..b2e0b2d8 100644 --- a/aztk/node_scripts/scheduling/submit.py +++ b/aztk/node_scripts/scheduling/submit.py @@ -1,11 +1,9 @@ -import datetime import logging import os import subprocess import sys -from aztk.models import Task, TaskState -from aztk.node_scripts.core import config +from aztk.node_scripts.core import config, log from aztk.node_scripts.scheduling import common, scheduling_target from aztk.utils.command_builder import CommandBuilder @@ -49,9 +47,7 @@ def __app_submit_cmd(application): os.path.expandvars(application.application) + " " + " ".join(["'" + str(app_arg) + "'" for app_arg in (application.application_args or [])])) - with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream: - stream.write(spark_submit_cmd.to_str()) - + log.info("Spark submit cmd: %s", spark_submit_cmd.to_str()) return spark_submit_cmd @@ -59,14 +55,12 @@ def receive_submit_request(application_file_path): """ Handle the request to submit a task """ - blob_client = config.blob_client application = common.load_application(application_file_path) cmd = __app_submit_cmd(application) exit_code = -1 try: - exit_code = subprocess.call(cmd.to_str(), shell=True) - common.upload_log(blob_client, application) + exit_code = common.run_command(cmd.to_str(), config.cluster_id, application) except Exception as e: common.upload_error_log(str(e), os.path.join(os.environ["AZ_BATCH_TASK_WORKING_DIR"], "application.yaml")) return exit_code @@ -84,68 +78,19 @@ def ssh_submit(task_sas_url): aztk_cluster_id = os.environ.get("AZTK_CLUSTER_ID") try: # update task table before running - task = insert_task_into_task_table(aztk_cluster_id, task_definition) + task = scheduling_target.insert_task_into_task_table(aztk_cluster_id, task_definition) # run task and upload log - exit_code = subprocess.call(cmd.to_str(), shell=True) - common.upload_log(config.blob_client, application) - #TODO: enable logging - # print("completed application, updating storage table") - mark_task_complete(aztk_cluster_id, task.id, exit_code) + exit_code = common.run_command(cmd.to_str(), config.cluster_id, application) + log.info("completed application, updating storage table") + scheduling_target.mark_task_complete(aztk_cluster_id, task.id, exit_code) except Exception as e: - #TODO: enable logging - # print("application failed, updating storage table") - mark_task_failure(aztk_cluster_id, task_definition.id, exit_code, str(e)) + log.info("application failed, updating storage table") + import traceback + scheduling_target.mark_task_failure(aztk_cluster_id, task_definition.id, exit_code, traceback.format_exc()) return exit_code -def insert_task_into_task_table(cluster_id, task_definition): - current_time = datetime.datetime.utcnow() - task = Task( - id=task_definition.id, - node_id=os.environ.get("AZ_BATCH_NODE_ID", None), - state=TaskState.Running, - state_transition_time=current_time, - command_line=task_definition.command_line, - start_time=current_time, - end_time=None, - exit_code=None, - failure_info=None, - ) - - config.spark_client.cluster._core_cluster_operations.insert_task_into_task_table(cluster_id, task) - return task - - -def get_task(cluster_id, task_id): - return config.spark_client.cluster._core_cluster_operations.get_task_from_table(cluster_id, task_id) - - -def mark_task_complete(cluster_id, task_id, exit_code): - current_time = datetime.datetime.utcnow() - - task = get_task(cluster_id, task_id) - task.end_time = current_time - task.exit_code = exit_code - task.state = TaskState.Completed - task.state_transition_time = current_time - - config.spark_client.cluster._core_cluster_operations.update_task_in_task_table(cluster_id, task) - - -def mark_task_failure(cluster_id, task_id, exit_code, failure_info): - current_time = datetime.datetime.utcnow() - - task = get_task(cluster_id, task_id) - task.end_time = current_time - task.exit_code = exit_code - task.state = TaskState.Failed - task.state_transition_time = current_time - task.failure_info = failure_info - - config.spark_client.cluster._core_cluster_operations.update_task_in_task_table(cluster_id, task) - - if __name__ == "__main__": exit_code = 1 @@ -160,5 +105,5 @@ def mark_task_failure(cluster_id, task_id, exit_code, failure_info): os.path.join(os.environ["AZ_BATCH_TASK_WORKING_DIR"], "application.yaml")) else: exit_code = receive_submit_request(os.path.join(os.environ["AZ_BATCH_TASK_WORKING_DIR"], "application.yaml")) - # print("exit code", exit_code) + log.info("Exit code: %s", str(exit_code)) sys.exit(exit_code) diff --git a/aztk/node_scripts/wait_until_master_selected.py b/aztk/node_scripts/wait_until_master_selected.py index 2ecd6864..ba06b54c 100644 --- a/aztk/node_scripts/wait_until_master_selected.py +++ b/aztk/node_scripts/wait_until_master_selected.py @@ -6,12 +6,11 @@ def main(): while master is None: try: - from aztk.node_scripts.core import config - from aztk.node_scripts.install.pick_master import get_master_node_id + from aztk.node_scripts.core import config, utils batch_client = config.batch_client pool = batch_client.pool.get(config.pool_id) - master = get_master_node_id(pool) + master = utils.get_master_node_id(pool) time.sleep(1) except Exception as e: diff --git a/aztk/spark/client/base/helpers/generate_application_task.py b/aztk/spark/client/base/helpers/generate_application_task.py index de255479..af630cbe 100644 --- a/aztk/spark/client/base/helpers/generate_application_task.py +++ b/aztk/spark/client/base/helpers/generate_application_task.py @@ -16,7 +16,7 @@ def generate_application_task(core_base_operations, container_id, application, r container_name=container_id, application_name=application.name, file_path=application.application, - blob_client=core_base_operations.blob_client, + block_blob_service=core_base_operations.block_blob_service, use_full_path=False, ) @@ -32,7 +32,7 @@ def generate_application_task(core_base_operations, container_id, application, r container_name=container_id, application_name=application.name, file_path=jar, - blob_client=core_base_operations.blob_client, + block_blob_service=core_base_operations.block_blob_service, use_full_path=False, ) jar_resource_file_paths.append(current_jar_resource_file_path) @@ -45,7 +45,7 @@ def generate_application_task(core_base_operations, container_id, application, r container_name=container_id, application_name=application.name, file_path=py_file, - blob_client=core_base_operations.blob_client, + block_blob_service=core_base_operations.block_blob_service, use_full_path=False, ) py_files_resource_file_paths.append(current_py_files_resource_file_path) @@ -58,7 +58,7 @@ def generate_application_task(core_base_operations, container_id, application, r container_name=container_id, application_name=application.name, file_path=file, - blob_client=core_base_operations.blob_client, + block_blob_service=core_base_operations.block_blob_service, use_full_path=False, ) files_resource_file_paths.append(files_resource_file_path) @@ -73,7 +73,7 @@ def generate_application_task(core_base_operations, container_id, application, r application_name=application.name, file_path="application.yaml", content=yaml.dump(application), - blob_client=core_base_operations.blob_client, + block_blob_service=core_base_operations.block_blob_service, ) resource_files.append(application_definition_file) diff --git a/aztk/spark/client/base/helpers/list_applications.py b/aztk/spark/client/base/helpers/list_applications.py index 8945a455..c5dafd5c 100644 --- a/aztk/spark/client/base/helpers/list_applications.py +++ b/aztk/spark/client/base/helpers/list_applications.py @@ -1,23 +1,7 @@ -import azure.batch.models as batch_models -from azure.batch.models import BatchErrorException - -from aztk import error from aztk.spark import models -from aztk.utils import helpers - - -def _list_applications(core_operations, id): - # info about the app - scheduling_target = core_operations.get_cluster_configuration(id).scheduling_target - if scheduling_target is not models.SchedulingTarget.Any: - return models.Application(core_operations.list_applications(id)) - - recent_run_job = core_operations.get_recent_job(id) - return core_operations.list_batch_tasks(id=recent_run_job.id) +from aztk.utils import batch_error_manager def list_applications(core_operations, id): - try: - return models.Application(_list_applications(core_operations, id)) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) + with batch_error_manager(): + return models.Application(core_operations.list_tasks(id)) diff --git a/aztk/spark/client/base/operations.py b/aztk/spark/client/base/operations.py index d2d428b1..0cfc460e 100644 --- a/aztk/spark/client/base/operations.py +++ b/aztk/spark/client/base/operations.py @@ -80,6 +80,6 @@ def list_applications(self, id): application_name (:obj:`str`): the name of the application to get Returns: - :obj:`aztk.spark.models.Application`: object representing that state and output of an application + :obj:`List[aztk.spark.models.Application]`: A list of the applications submitted in the job """ return list_applications.list_applications(self, id) diff --git a/aztk/spark/client/cluster/helpers/copy.py b/aztk/spark/client/cluster/helpers/copy.py index 6f76a409..9d62d836 100644 --- a/aztk/spark/client/cluster/helpers/copy.py +++ b/aztk/spark/client/cluster/helpers/copy.py @@ -1,7 +1,4 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def cluster_copy( @@ -13,7 +10,7 @@ def cluster_copy( internal: bool = False, timeout: int = None, ): - try: + with batch_error_manager(): container_name = None if host else "spark" return core_cluster_operations.copy( cluster_id, @@ -24,5 +21,3 @@ def cluster_copy( internal=internal, timeout=timeout, ) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/create.py b/aztk/spark/client/cluster/helpers/create.py index aabfc347..605fb3ae 100644 --- a/aztk/spark/client/cluster/helpers/create.py +++ b/aztk/spark/client/cluster/helpers/create.py @@ -1,12 +1,10 @@ import azure.batch.models as batch_models -from azure.batch.models import BatchErrorException -from aztk import error from aztk import models as base_models from aztk.internal.cluster_data import NodeData from aztk.spark import models -from aztk.spark.utils import constants, util -from aztk.utils import helpers +from aztk.spark.utils import constants, wait_for_master +from aztk.utils import batch_error_manager POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity( auto_user=batch_models.AutoUserSpecification( @@ -43,7 +41,7 @@ def create_cluster(core_cluster_operations, cluster_conf.validate() cluster_data = core_cluster_operations.get_cluster_data(cluster_conf.cluster_id) - try: + with batch_error_manager(): zip_resource_files = None node_data = NodeData(cluster_conf).add_core().done() zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file() @@ -67,10 +65,7 @@ def create_cluster(core_cluster_operations, # Wait for the master to be ready if wait: - util.wait_for_master_to_be_ready(core_cluster_operations, spark_cluster_operations, cluster.id) + wait_for_master.wait_for_master(core_cluster_operations, spark_cluster_operations, cluster.id) cluster = spark_cluster_operations.get(cluster.id) return cluster - - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/create_user.py b/aztk/spark/client/cluster/helpers/create_user.py index d95016a8..b1edf44c 100644 --- a/aztk/spark/client/cluster/helpers/create_user.py +++ b/aztk/spark/client/cluster/helpers/create_user.py @@ -1,7 +1,5 @@ -from azure.batch.models import BatchErrorException - from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def create_user( @@ -12,11 +10,9 @@ def create_user( password: str = None, ssh_key: str = None, ) -> str: - try: + with batch_error_manager(): cluster = spark_cluster_operations.get(cluster_id) master_node_id = cluster.master_node_id if not master_node_id: raise error.ClusterNotReadyError("The master has not yet been picked, a user cannot be added.") core_cluster_operations.create_user_on_cluster(cluster.id, cluster.nodes, username, ssh_key, password) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/delete.py b/aztk/spark/client/cluster/helpers/delete.py index 063907a2..436e28f3 100644 --- a/aztk/spark/client/cluster/helpers/delete.py +++ b/aztk/spark/client/cluster/helpers/delete.py @@ -1,11 +1,6 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def delete_cluster(core_cluster_operations, cluster_id: str, keep_logs: bool = False): - try: - return core_cluster_operations.delete(cluster_id, keep_logs) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) + with batch_error_manager(): + return core_cluster_operations.delete_batch_resources(cluster_id, keep_logs) diff --git a/aztk/spark/client/cluster/helpers/diagnostics.py b/aztk/spark/client/cluster/helpers/diagnostics.py index 3343f3c8..208641d6 100644 --- a/aztk/spark/client/cluster/helpers/diagnostics.py +++ b/aztk/spark/client/cluster/helpers/diagnostics.py @@ -1,9 +1,7 @@ import os -from azure.batch.models import BatchErrorException - from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def _run(spark_cluster_operations, cluster_id, output_directory=None, brief=False): @@ -40,8 +38,6 @@ def _build_diagnostic_ssh_command(brief): def run_cluster_diagnostics(spark_cluster_operations, cluster_id, output_directory=None, brief=False): - try: + with batch_error_manager(): output = _run(spark_cluster_operations, cluster_id, output_directory, brief) return output - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/download.py b/aztk/spark/client/cluster/helpers/download.py index 261640c2..94a9ba8a 100644 --- a/aztk/spark/client/cluster/helpers/download.py +++ b/aztk/spark/client/cluster/helpers/download.py @@ -1,7 +1,4 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def cluster_download( @@ -13,7 +10,7 @@ def cluster_download( internal: bool = False, timeout: int = None, ): - try: + with batch_error_manager(): container_name = None if host else "spark" return core_cluster_operations.copy( cluster_id, @@ -24,5 +21,3 @@ def cluster_download( internal=internal, timeout=timeout, ) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/get.py b/aztk/spark/client/cluster/helpers/get.py index 11a7347e..30c4cd11 100644 --- a/aztk/spark/client/cluster/helpers/get.py +++ b/aztk/spark/client/cluster/helpers/get.py @@ -1,13 +1,8 @@ -from azure.batch.models import BatchErrorException - -from aztk import error from aztk.spark import models -from aztk.utils import helpers +from aztk.utils import batch_error_manager def get_cluster(core_cluster_operations, cluster_id: str): - try: + with batch_error_manager(): cluster = core_cluster_operations.get(cluster_id) return models.Cluster(cluster) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/get_application_state.py b/aztk/spark/client/cluster/helpers/get_application_state.py index e2e06e2e..17d4db9c 100644 --- a/aztk/spark/client/cluster/helpers/get_application_state.py +++ b/aztk/spark/client/cluster/helpers/get_application_state.py @@ -1,12 +1,7 @@ -from azure.batch.models import BatchErrorException - -from aztk import error from aztk.spark.models import ApplicationState -from aztk.utils import helpers +from aztk.utils import batch_error_manager def get_application_state(core_cluster_operations, cluster_id: str, app_name: str): - try: + with batch_error_manager(): return ApplicationState(core_cluster_operations.get_task_state(cluster_id, app_name).value) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/get_configuration.py b/aztk/spark/client/cluster/helpers/get_configuration.py index e0c19fb4..a722d188 100644 --- a/aztk/spark/client/cluster/helpers/get_configuration.py +++ b/aztk/spark/client/cluster/helpers/get_configuration.py @@ -1,11 +1,6 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def get_configuration(core_cluster_operations, cluster_id: str): - try: + with batch_error_manager(): return core_cluster_operations.get_cluster_configuration(cluster_id) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/get_remote_login_settings.py b/aztk/spark/client/cluster/helpers/get_remote_login_settings.py index d40ccf16..9a5c55ef 100644 --- a/aztk/spark/client/cluster/helpers/get_remote_login_settings.py +++ b/aztk/spark/client/cluster/helpers/get_remote_login_settings.py @@ -1,12 +1,7 @@ -from azure.batch.models import BatchErrorException - -from aztk import error from aztk.spark import models -from aztk.utils import helpers +from aztk.utils import batch_error_manager def get_remote_login_settings(core_cluster_operations, id: str, node_id: str): - try: + with batch_error_manager(): return models.RemoteLogin(core_cluster_operations.get_remote_login_settings(id, node_id)) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/list.py b/aztk/spark/client/cluster/helpers/list.py index d893859f..9f2e5589 100644 --- a/aztk/spark/client/cluster/helpers/list.py +++ b/aztk/spark/client/cluster/helpers/list.py @@ -1,14 +1,9 @@ -from azure.batch.models import BatchErrorException - -from aztk import error from aztk import models as base_models from aztk.spark import models -from aztk.utils import helpers +from aztk.utils import batch_error_manager def list_clusters(core_cluster_operations): - try: + with batch_error_manager(): software_metadata_key = base_models.Software.spark return [models.Cluster(cluster) for cluster in core_cluster_operations.list(software_metadata_key)] - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/node_run.py b/aztk/spark/client/cluster/helpers/node_run.py index f32e066a..4ac142b9 100644 --- a/aztk/spark/client/cluster/helpers/node_run.py +++ b/aztk/spark/client/cluster/helpers/node_run.py @@ -1,7 +1,4 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def node_run( @@ -14,7 +11,7 @@ def node_run( timeout=None, block=False, ): - try: + with batch_error_manager(): return core_cluster_operations.node_run( cluster_id, node_id, @@ -23,5 +20,3 @@ def node_run( container_name="spark" if not host else None, timeout=timeout, block=block) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/run.py b/aztk/spark/client/cluster/helpers/run.py index b5f36134..c1c7bf9d 100644 --- a/aztk/spark/client/cluster/helpers/run.py +++ b/aztk/spark/client/cluster/helpers/run.py @@ -1,7 +1,4 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def cluster_run(core_cluster_operations, @@ -10,8 +7,6 @@ def cluster_run(core_cluster_operations, host=False, internal: bool = False, timeout=None): - try: + with batch_error_manager(): return core_cluster_operations.run( cluster_id, command, internal, container_name="spark" if not host else None, timeout=timeout) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/ssh_into_master.py b/aztk/spark/client/cluster/helpers/ssh_into_master.py index e79f2c31..1417b078 100644 --- a/aztk/spark/client/cluster/helpers/ssh_into_master.py +++ b/aztk/spark/client/cluster/helpers/ssh_into_master.py @@ -1,7 +1,4 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def ssh_into_master( @@ -14,9 +11,7 @@ def ssh_into_master( port_forward_list=None, internal=False, ): - try: + with batch_error_manager(): master_node_id = spark_cluster_operations.get(cluster_id).master_node_id core_cluster_operations.ssh_into_node(cluster_id, master_node_id, username, ssh_key, password, port_forward_list, internal) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/submit.py b/aztk/spark/client/cluster/helpers/submit.py index 7a4b97e2..b8b174f8 100644 --- a/aztk/spark/client/cluster/helpers/submit.py +++ b/aztk/spark/client/cluster/helpers/submit.py @@ -1,11 +1,9 @@ import azure.batch.models as batch_models import yaml -from azure.batch.models import BatchErrorException -from aztk import error from aztk.error import AztkError from aztk.spark import models -from aztk.utils import constants, helpers +from aztk.utils import batch_error_manager, constants, helpers def __get_node(core_cluster_operations, node_id: str, cluster_id: str) -> batch_models.ComputeNode: @@ -17,18 +15,18 @@ def affinitize_task_to_master(core_cluster_operations, spark_cluster_operations, if cluster.master_node_id is None: raise AztkError("Master has not yet been selected. Please wait until the cluster is finished provisioning.") master_node = core_cluster_operations.batch_client.compute_node.get( - pool_id=cluster_id, node_id=cluster.master_node_id) + pool_id=cluster.pool.id, node_id=cluster.master_node_id) task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id) return task -def upload_serialized_task_to_storage(blob_client, cluster_id, task): +def upload_serialized_task_to_storage(block_blob_service, cluster_id, task): return helpers.upload_text_to_container( container_name=cluster_id, application_name=task.id, file_path="task.yaml", content=yaml.dump(task), - blob_client=blob_client, + block_blob_service=block_blob_service, ) @@ -50,15 +48,8 @@ def schedule_with_target( internal, ): # upload "real" task definition to storage - serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id, - task) - # # schedule "ghost" task - ghost_task = batch_models.TaskAddParameter( - id=task.id, - command_line="/bin/bash", - ) - # tell the node to run the task - core_cluster_operations.batch_client.task.add(cluster_id, task=ghost_task) + serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.block_blob_service, + cluster_id, task) task_working_dir = "/mnt/aztk/startup/tasks/workitems/{}".format(task.id) @@ -119,8 +110,7 @@ def submit( wait: bool = False, internal: bool = False, ): - try: + with batch_error_manager(): + submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait, internal) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/helpers/wait.py b/aztk/spark/client/cluster/helpers/wait.py index 95b13efa..a572177f 100644 --- a/aztk/spark/client/cluster/helpers/wait.py +++ b/aztk/spark/client/cluster/helpers/wait.py @@ -5,7 +5,6 @@ def wait_for_application_to_complete(core_cluster_operations, id, application_name): - try: + from aztk.utils import batch_error_manager + with batch_error_manager(): return core_cluster_operations.wait(id, application_name) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/delete.py b/aztk/spark/client/job/helpers/delete.py index 25b77f92..1131920b 100644 --- a/aztk/spark/client/job/helpers/delete.py +++ b/aztk/spark/client/job/helpers/delete.py @@ -1,35 +1,6 @@ -from azure.batch.models import BatchErrorException -from msrest.exceptions import ClientRequestError +from aztk.utils import batch_error_manager -from aztk import error -from aztk.utils import BackOffPolicy, helpers, retry - -def _delete(core_job_operations, spark_job_operations, job_id, keep_logs: bool = False): - deleted_job_schedule = False - - # delete job_schedule - try: - core_job_operations.batch_client.job_schedule.delete(job_id) - deleted_job_schedule = True - except BatchErrorException: - pass - - # delete storage container - if keep_logs: - cluster_data = core_job_operations.get_cluster_data(job_id) - cluster_data.delete_container(job_id) - - table_exists = core_job_operations.table_service.exists(job_id) - if table_exists: - core_job_operations.delete_task_table(job_id) - - return deleted_job_schedule - - -@retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError)) -def delete(core_job_operations, spark_job_operations, job_id: str, keep_logs: bool = False): - try: - return _delete(core_job_operations, spark_job_operations, job_id, keep_logs) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) +def delete(core_job_operations, job_id: str, keep_logs: bool = False): + with batch_error_manager(): + return core_job_operations.delete_batch_resources(job_id, keep_logs) diff --git a/aztk/spark/client/job/helpers/get.py b/aztk/spark/client/job/helpers/get.py index 0b904410..7a90965c 100644 --- a/aztk/spark/client/job/helpers/get.py +++ b/aztk/spark/client/job/helpers/get.py @@ -1,28 +1,21 @@ -from azure.batch.models import BatchErrorException - from aztk import error from aztk.spark import models -from aztk.utils import helpers +from aztk.utils import batch_error_manager def _get_job(core_job_operations, job_id): - job = core_job_operations.batch_client.job_schedule.get(job_id) + job = core_job_operations.batch_client.job.get(job_id) tasks = [app for app in core_job_operations.list_tasks(id=job_id) if app.id != job_id] - recent_run_job = core_job_operations.get_recent_job(job_id) - pool_prefix = recent_run_job.pool_info.auto_pool_specification.auto_pool_id_prefix + with batch_error_manager(): + pool = core_job_operations.batch_client.pool.get(job.execution_info.pool_id) pool = nodes = None - for cloud_pool in core_job_operations.batch_client.pool.list(): - if pool_prefix in cloud_pool.id: - pool = cloud_pool - break + if pool: nodes = core_job_operations.batch_client.compute_node.list(pool_id=pool.id) return job, tasks, pool, nodes def get_job(core_job_operations, job_id): - try: + with batch_error_manager(): job, tasks, pool, nodes = _get_job(core_job_operations, job_id) return models.Job(job, tasks, pool, nodes) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/get_application.py b/aztk/spark/client/job/helpers/get_application.py index cdcc6ee3..481635d0 100644 --- a/aztk/spark/client/job/helpers/get_application.py +++ b/aztk/spark/client/job/helpers/get_application.py @@ -7,20 +7,14 @@ def _get_application(core_operations, job_id, application_name): - # info about the app - recent_run_job = core_operations.get_recent_job(job_id) - scheduling_target = core_operations.get_cluster_configuration(job_id).scheduling_target - if scheduling_target is not models.SchedulingTarget.Any: - return core_operations.get_task_from_table(job_id, application_name) try: - return core_operations.get_batch_task(id=recent_run_job.id, task_id=application_name) + return core_operations.get_task(id=job_id, task_id=application_name) except batch_models.BatchErrorException: raise error.AztkError( "The Spark application {0} is still being provisioned or does not exist.".format(application_name)) def get_application(core_operations, job_id, application_name): - try: + from aztk.utils import batch_error_manager + with batch_error_manager(): return models.Application(_get_application(core_operations, job_id, application_name)) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/get_application_log.py b/aztk/spark/client/job/helpers/get_application_log.py index b4f23112..faba7cd9 100644 --- a/aztk/spark/client/job/helpers/get_application_log.py +++ b/aztk/spark/client/job/helpers/get_application_log.py @@ -1,9 +1,8 @@ import azure.batch.models as batch_models -from azure.batch.models import BatchErrorException from aztk import error +from aztk.models import TaskState from aztk.spark import models -from aztk.utils import helpers def _get_application_log(core_job_operations, spark_job_operations, job_id, application_name): @@ -11,12 +10,8 @@ def _get_application_log(core_job_operations, spark_job_operations, job_id, appl if scheduling_target is not models.SchedulingTarget.Any: return core_job_operations.get_application_log(job_id, application_name) - # TODO: change where the logs are uploaded so they aren't overwritten on scheduled runs - # current: job_id, application_name/output.log - # new: job_id, recent_run_job.id/application_name/output.log - recent_run_job = core_job_operations.get_recent_job(job_id) try: - task = core_job_operations.get_batch_task(id=recent_run_job.id, task_id=application_name) + task = core_job_operations.get_task(id=job_id, task_id=application_name) except batch_models.BatchErrorException as e: # task may not exist since it may not yet be scheduled # see if the task is written to metadata of pool @@ -28,9 +23,9 @@ def _get_application_log(core_job_operations, spark_job_operations, job_id, appl raise error.AztkError("The application {0} does not exist".format(application_name)) else: if task.state in ( - batch_models.TaskState.active, - batch_models.TaskState.running, - batch_models.TaskState.preparing, + TaskState.active, + TaskState.running, + TaskState.preparing, ): raise error.AztkError("The application {0} has not yet finished executing.".format(application_name)) @@ -38,8 +33,7 @@ def _get_application_log(core_job_operations, spark_job_operations, job_id, appl def get_job_application_log(core_job_operations, spark_job_operations, job_id, application_name): - try: + from aztk.utils import batch_error_manager + with batch_error_manager(): return models.ApplicationLog( _get_application_log(core_job_operations, spark_job_operations, job_id, application_name)) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/list.py b/aztk/spark/client/job/helpers/list.py index 3551fdd3..34eb63f5 100644 --- a/aztk/spark/client/job/helpers/list.py +++ b/aztk/spark/client/job/helpers/list.py @@ -5,12 +5,12 @@ from aztk.utils import helpers -def _list_jobs(core_job_operations): - return [cloud_job_schedule for cloud_job_schedule in core_job_operations.batch_client.job_schedule.list()] +def filter_aztk_jobs(jobs): + #TODO: filter by metadata + return jobs def list_jobs(core_job_operations): - try: - return [models.Job(cloud_job_schedule) for cloud_job_schedule in _list_jobs(core_job_operations)] - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) + from aztk.utils import batch_error_manager + with batch_error_manager(): + return [models.Job(job) for job in filter_aztk_jobs(core_job_operations.batch_client.job.list())] diff --git a/aztk/spark/client/job/helpers/list_applications.py b/aztk/spark/client/job/helpers/list_applications.py index ed7a75ec..4b579e9c 100644 --- a/aztk/spark/client/job/helpers/list_applications.py +++ b/aztk/spark/client/job/helpers/list_applications.py @@ -1,15 +1,12 @@ -from azure.batch.models import BatchErrorException - -from aztk import error from aztk.spark import models -from aztk.utils import helpers +from aztk.utils import batch_error_manager def _list_applications(core_job_operations, job_id): - recent_run_job = core_job_operations.get_recent_job(job_id) + job = core_job_operations.batch_client.job.get(job_id) # get application names from Batch job metadata applications = {} - for metadata_item in recent_run_job.metadata: + for metadata_item in job.metadata: if metadata_item.name == "applications": for app_name in metadata_item.value.split("\n"): applications[app_name] = None @@ -26,11 +23,9 @@ def _list_applications(core_job_operations, job_id): # currently, it returns a dictionary indicating whether # a task has been scheduled or not def list_applications(core_job_operations, job_id): - try: + with batch_error_manager(): applications = _list_applications(core_job_operations, job_id) for item in applications: if applications[item]: applications[item] = models.Application(applications[item]) return applications - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/stop.py b/aztk/spark/client/job/helpers/stop.py index 1d961915..3d701473 100644 --- a/aztk/spark/client/job/helpers/stop.py +++ b/aztk/spark/client/job/helpers/stop.py @@ -1,19 +1,12 @@ -from azure.batch.models import BatchErrorException - -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def _stop(core_job_operations, job_id): # terminate currently running job and tasks - recent_run_job = core_job_operations.get_recent_job(job_id) - core_job_operations.batch_client.job.terminate(recent_run_job.id) - # terminate job_schedule - core_job_operations.batch_client.job_schedule.terminate(job_id) + job = core_job_operations.batch_client.job.get(job_id) + core_job_operations.batch_client.job.terminate(job.id) def stop(self, job_id): - try: + with batch_error_manager(): return _stop(self, job_id) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/stop_application.py b/aztk/spark/client/job/helpers/stop_application.py index e770316e..9abebf10 100644 --- a/aztk/spark/client/job/helpers/stop_application.py +++ b/aztk/spark/client/job/helpers/stop_application.py @@ -2,11 +2,11 @@ def stop_app(core_job_operations, job_id, application_name): - recent_run_job = core_job_operations.get_recent_job(job_id) + job = core_job_operations.batch_client.job.get(job_id) # stop batch task try: - core_job_operations.batch_client.task.terminate(job_id=recent_run_job.id, task_id=application_name) + core_job_operations.batch_client.task.terminate(job_id=job.id, task_id=application_name) return True except BatchErrorException: return False diff --git a/aztk/spark/client/job/helpers/submit.py b/aztk/spark/client/job/helpers/submit.py index 35a5c980..298287a6 100644 --- a/aztk/spark/client/job/helpers/submit.py +++ b/aztk/spark/client/job/helpers/submit.py @@ -1,13 +1,12 @@ import azure.batch.models as batch_models import yaml -from azure.batch.models import BatchErrorException -from aztk import error +import aztk.spark.utils.constants from aztk import models as base_models from aztk.internal.cluster_data import NodeData from aztk.spark import models from aztk.spark.models import SchedulingTarget -from aztk.utils import helpers +from aztk.utils import batch_error_manager, helpers from aztk.utils.command_builder import CommandBuilder @@ -39,7 +38,7 @@ def generate_job_manager_task(core_job_operations, job, application_tasks): application_name=application.name + ".yaml", file_path=application.name + ".yaml", content=yaml.dump(task), - blob_client=core_job_operations.blob_client, + block_blob_service=core_job_operations.block_blob_service, ) resource_files.append(task_definition_resource_file) @@ -54,7 +53,7 @@ def generate_job_manager_task(core_job_operations, job, application_tasks): command_line=helpers.wrap_commands_in_shell([task_cmd]), resource_files=resource_files, kill_job_on_completion=False, - allow_low_priority_node=True, + allow_low_priority_node=True, #TODO: false unless job only has low priority user_identity=batch_models.UserIdentity( auto_user=batch_models.AutoUserSpecification( scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)), @@ -78,7 +77,7 @@ def submit_job(core_job_operations, spark_job_operations, job_configuration: models.JobConfiguration, wait: bool = False): - try: + with batch_error_manager(): job_configuration = _apply_default_for_job_config(job_configuration) job_configuration.validate() cluster_data = core_job_operations.get_cluster_data(job_configuration.id) @@ -107,8 +106,6 @@ def submit_job(core_job_operations, software_metadata_key = base_models.Software.spark - vm_image = models.VmImage(publisher="Canonical", offer="UbuntuServer", sku="16.04") - autoscale_formula = "$TargetDedicatedNodes = {0}; " "$TargetLowPriorityNodes = {1}".format( job_configuration.max_dedicated_nodes, job_configuration.max_low_pri_nodes) @@ -118,7 +115,7 @@ def submit_job(core_job_operations, job_manager_task=job_manager_task, autoscale_formula=autoscale_formula, software_metadata_key=software_metadata_key, - vm_image_model=vm_image, + vm_image_model=aztk.spark.utils.constants.SPARK_VM_IMAGE, application_metadata="\n".join(application.name for application in (job_configuration.applications or [])), ) @@ -126,6 +123,3 @@ def submit_job(core_job_operations, spark_job_operations.wait(id=job_configuration.id) return models.Job(job) - - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/helpers/wait_until_complete.py b/aztk/spark/client/job/helpers/wait_until_complete.py index 2ef44aa9..a5a79d49 100644 --- a/aztk/spark/client/job/helpers/wait_until_complete.py +++ b/aztk/spark/client/job/helpers/wait_until_complete.py @@ -1,22 +1,18 @@ import time import azure.batch.models as batch_models -from azure.batch.models import BatchErrorException -from aztk import error -from aztk.utils import helpers +from aztk.utils import batch_error_manager def _wait_until_job_finished(core_job_operations, job_id): - job_state = core_job_operations.batch_client.job_schedule.get(job_id).state + job_state = core_job_operations.batch_client.job.get(job_id).state - while job_state not in [batch_models.JobScheduleState.completed, batch_models.JobScheduleState.terminating]: + while job_state not in [batch_models.JobState.completed, batch_models.JobState.terminating]: time.sleep(3) - job_state = core_job_operations.batch_client.job_schedule.get(job_id).state + job_state = core_job_operations.batch_client.job.get(job_id).state def wait_until_job_finished(core_job_operations, job_id): - try: + with batch_error_manager(): _wait_until_job_finished(core_job_operations, job_id) - except BatchErrorException as e: - raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/job/operations.py b/aztk/spark/client/job/operations.py index 861229f6..4b12126f 100644 --- a/aztk/spark/client/job/operations.py +++ b/aztk/spark/client/job/operations.py @@ -35,7 +35,7 @@ def delete(self, id, keep_logs: bool = False): Returns: :obj:`bool`: True if the deletion process was successful. """ - return delete.delete(self._core_job_operations, self, id, keep_logs) + return delete.delete(self._core_job_operations, id, keep_logs) def get(self, id): """Get details about the state of a job. diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 746fec10..91bf4646 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -19,7 +19,7 @@ def __init__(self, version: str, environment: str = None, environment_version: s class Cluster(aztk.models.Cluster): def __init__(self, cluster: aztk.models.Cluster): - super().__init__(cluster.pool, cluster.nodes) + super().__init__(cluster.id, cluster.pool, cluster.nodes) self.master_node_id = self.__get_master_node_id() self.gpu_enabled = helpers.is_gpu_enabled(cluster.pool.vm_size) @@ -154,6 +154,7 @@ def __init__( class ApplicationState(Enum): + Active = "active" #TODO Running = "running" Completed = "completed" Failed = "failed" @@ -261,19 +262,19 @@ class JobState(Enum): class Job: def __init__( self, - cloud_job_schedule: batch_models.CloudJobSchedule, + cloud_job: batch_models.CloudJob, tasks: List[aztk.models.Task] = None, pool: batch_models.CloudPool = None, nodes: batch_models.ComputeNodePaged = None, ): - self.id = cloud_job_schedule.id - self.last_modified = cloud_job_schedule.last_modified - self.state = JobState(cloud_job_schedule.state.name) - self.state_transition_time = cloud_job_schedule.state_transition_time - self.creation_time = cloud_job_schedule.creation_time + self.id = cloud_job.id + self.last_modified = cloud_job.last_modified + self.state = JobState(cloud_job.state.name) + self.state_transition_time = cloud_job.state_transition_time + self.creation_time = cloud_job.creation_time self.applications = [Application(task) for task in (tasks or [])] if pool: - self.cluster = Cluster(aztk.models.Cluster(pool, nodes)) + self.cluster = Cluster(aztk.models.Cluster(cloud_job.id, pool, nodes)) else: self.cluster = None diff --git a/aztk/spark/utils/util.py b/aztk/spark/utils/util.py deleted file mode 100644 index ec03be92..00000000 --- a/aztk/spark/utils/util.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import print_function - -import datetime -import time - -import azure.batch.models as batch_models - -from aztk.utils import constants - - -class MasterInvalidStateError(Exception): - pass - - -def wait_for_master_to_be_ready(core_operations, spark_operations, cluster_id: str): - - master_node_id = None - start_time = datetime.datetime.now() - while True: - if not master_node_id: - master_node_id = spark_operations.get(cluster_id).master_node_id - if not master_node_id: - time.sleep(5) - continue - - master_node = core_operations.batch_client.compute_node.get(cluster_id, master_node_id) - - if master_node.state in [batch_models.ComputeNodeState.idle, batch_models.ComputeNodeState.running]: - break - elif master_node.state is batch_models.ComputeNodeState.start_task_failed: - raise MasterInvalidStateError("Start task failed on master") - elif master_node.state in [batch_models.ComputeNodeState.unknown, batch_models.ComputeNodeState.unusable]: - raise MasterInvalidStateError("Master is in an invalid state") - else: - now = datetime.datetime.now() - - delta = now - start_time - if delta.total_seconds() > constants.WAIT_FOR_MASTER_TIMEOUT: - raise MasterInvalidStateError("Master didn't become ready before timeout.") - - time.sleep(10) diff --git a/aztk/spark/utils/wait_for_master.py b/aztk/spark/utils/wait_for_master.py new file mode 100644 index 00000000..5c4cd5bb --- /dev/null +++ b/aztk/spark/utils/wait_for_master.py @@ -0,0 +1,36 @@ +from __future__ import print_function + +import datetime +import time + +import azure.batch.models as batch_models + +from aztk.utils import constants + + +class MasterInvalidStateError(Exception): + pass + + +def wait_for_master(core_operations, spark_operations, cluster_id: str): + cluster = None + master_node = None + start_time = datetime.datetime.now() + while True: + delta = datetime.datetime.now() - start_time + if delta.total_seconds() > constants.WAIT_FOR_MASTER_TIMEOUT: + raise MasterInvalidStateError("Master didn't become ready before timeout.") + + cluster = spark_operations.get(cluster_id) + + if cluster.master_node_id: + master_node = core_operations.get_node(cluster_id, cluster.master_node_id) + if master_node: + if master_node.state in [batch_models.ComputeNodeState.idle, batch_models.ComputeNodeState.running]: + break + if master_node.state is batch_models.ComputeNodeState.start_task_failed: + raise MasterInvalidStateError("Start task failed on master") + elif master_node.state in [batch_models.ComputeNodeState.unknown, batch_models.ComputeNodeState.unusable]: + raise MasterInvalidStateError("Master is in an invalid state") + + time.sleep(5) diff --git a/aztk/utils/__init__.py b/aztk/utils/__init__.py index 9b640a3d..8a225f98 100644 --- a/aztk/utils/__init__.py +++ b/aztk/utils/__init__.py @@ -1,4 +1,5 @@ from . import (azure_api, command_builder, constants, file_utils, get_ssh_key, helpers, secure_utils) +from .batch_error_manager import batch_error_manager from .deprecation import deprecate, deprecated from .retry import BackOffPolicy, retry from .try_func import try_func diff --git a/aztk/utils/azure_api.py b/aztk/utils/azure_api.py index 7f49745a..ccfb17af 100644 --- a/aztk/utils/azure_api.py +++ b/aztk/utils/azure_api.py @@ -71,7 +71,7 @@ def make_batch_client(secrets): return batch_client -def make_blob_client(secrets): +def make_cloud_storage_account(secrets): """ Creates a blob client object :param str storage_account_key: storage account key @@ -81,10 +81,9 @@ def make_blob_client(secrets): if secrets.shared_key: # Set up SharedKeyCredentials - blob_client = blob.BlockBlobService( + cloud_storage_account = CloudStorageAccount( account_name=secrets.shared_key.storage_account_name, account_key=secrets.shared_key.storage_account_key, - endpoint_suffix=secrets.shared_key.storage_account_suffix, ) else: # Set up ServicePrincipalCredentials @@ -107,10 +106,9 @@ def make_blob_client(secrets): resource_group_name=resourcegroup, account_name=accountname, ).keys[0].value) - storage_client = CloudStorageAccount(accountname, key) - blob_client = storage_client.create_block_blob_service() + cloud_storage_account = CloudStorageAccount(accountname, key) - return blob_client + return cloud_storage_account def make_table_service(secrets): @@ -145,6 +143,7 @@ def make_table_service(secrets): return table_service +#TODO:replace with retry decorator def retry_function(function, retry_attempts: int, retry_interval: int, exception: Exception, *args, **kwargs): import time diff --git a/aztk/utils/batch_error_manager.py b/aztk/utils/batch_error_manager.py new file mode 100644 index 00000000..d2732d0d --- /dev/null +++ b/aztk/utils/batch_error_manager.py @@ -0,0 +1,18 @@ +import traceback +from contextlib import contextmanager + +from azure.batch.models import BatchErrorException + +from aztk import error +from aztk.utils import helpers + + +@contextmanager +def batch_error_manager(verbose=False): + try: + yield + except BatchErrorException as e: + if verbose: + # TODO: change to log.debug + print(traceback.format_exc()) + raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/utils/constants.py b/aztk/utils/constants.py index f4096615..67bcc55d 100644 --- a/aztk/utils/constants.py +++ b/aztk/utils/constants.py @@ -65,3 +65,7 @@ TASK_WORKING_DIR = "wd" SPARK_SUBMIT_LOGS_FILE = "output.log" +""" + log streaming +""" +STREAMING_DOWNLOAD_CHUNK_SIZE = 1048576 # 1024 * 1024 diff --git a/aztk/utils/helpers.py b/aztk/utils/helpers.py index 461c3756..a6c1681d 100644 --- a/aztk/utils/helpers.py +++ b/aztk/utils/helpers.py @@ -24,13 +24,6 @@ def is_gpu_enabled(vm_size: str): return bool(re.search("nv|nc", vm_size, flags=re.IGNORECASE)) -def get_cluster(cluster_id, batch_client): - pool = batch_client.pool.get(cluster_id) - nodes = batch_client.compute_node.list(pool_id=cluster_id) - - return aztk.models.Cluster(pool, nodes) - - def wait_for_tasks_to_complete(job_id, batch_client): """ Waits for all the tasks in a particular job to complete. @@ -63,21 +56,25 @@ def wait_for_task_to_complete(job_id: str, task_id: str, batch_client): return -def upload_text_to_container(container_name: str, application_name: str, content: str, file_path: str, - blob_client=None) -> batch_models.ResourceFile: +def upload_text_to_container(container_name: str, + application_name: str, + content: str, + file_path: str, + block_blob_service=None) -> batch_models.ResourceFile: + blob_name = file_path blob_path = application_name + "/" + blob_name # + '/' + time_stamp + '/' + blob_name - blob_client.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_text(container_name, blob_path, content) + block_blob_service.create_container(container_name, fail_on_exist=False) + block_blob_service.create_blob_from_text(container_name, blob_path, content) - sas_token = blob_client.generate_blob_shared_access_signature( + sas_token = block_blob_service.generate_blob_shared_access_signature( container_name, blob_path, permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365), ) - sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token) + sas_url = block_blob_service.make_blob_url(container_name, blob_path, sas_token=sas_token) return batch_models.ResourceFile(file_path=blob_name, blob_source=sas_url) @@ -85,13 +82,13 @@ def upload_text_to_container(container_name: str, application_name: str, content def upload_file_to_container(container_name, application_name, file_path, - blob_client=None, + block_blob_service=None, use_full_path=False, node_path=None) -> batch_models.ResourceFile: """ Uploads a local file to an Azure Blob storage container. - :param blob_client: A blob service client. - :type blocblob_clientk_blob_client: `azure.storage.blob.BlockBlobService` + :param block_blob_service: A blob service client. + :type block_blob_service: `azure.storage.common.BlockBlobService` :param str container_name: The name of the Azure Blob storage container. :param str file_path: The local path to the file. :param str node_path: Path on the local node. By default will be the same as file_path @@ -99,6 +96,7 @@ def upload_file_to_container(container_name, :return: A ResourceFile initialized with a SAS URL appropriate for Batch tasks. """ + file_path = normalize_path(file_path) blob_name = None if use_full_path: @@ -110,41 +108,22 @@ def upload_file_to_container(container_name, if not node_path: node_path = blob_name - blob_client.create_container(container_name, fail_on_exist=False) + block_blob_service.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_path(container_name, blob_path, file_path) + block_blob_service.create_blob_from_path(container_name, blob_path, file_path) - sas_token = blob_client.generate_blob_shared_access_signature( + sas_token = block_blob_service.generate_blob_shared_access_signature( container_name, blob_path, permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7), ) - sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token) + sas_url = block_blob_service.make_blob_url(container_name, blob_path, sas_token=sas_token) return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url) -def create_pool_if_not_exist(pool, batch_client): - """ - Creates the specified pool if it doesn't already exist - :param batch_client: The batch client to use. - :type batch_client: `batchserviceclient.BatchServiceClient` - :param pool: The pool to create. - :type pool: `batchserviceclient.models.PoolAddParameter` - """ - try: - batch_client.pool.add(pool) - except batch_models.BatchErrorException as e: - if e.error.code == "PoolExists": - raise error.AztkError( - "A cluster with the same id already exists. Use a different id or delete the existing cluster") - else: - raise - return True - - def wait_for_all_nodes_state(pool, node_state, batch_client): """ Waits for all nodes in pool to reach any specified state in set @@ -196,61 +175,6 @@ def select_latest_verified_vm_image_with_node_agent_sku(publisher, offer, sku_st return (sku_to_use.id, image_ref_to_use) -def create_sas_token(container_name, blob_name, permission, blob_client, expiry=None, timeout=None): - """ - Create a blob sas token - :param blob_client: The storage block blob client to use. - :type blob_client: `azure.storage.blob.BlockBlobService` - :param str container_name: The name of the container to upload the blob to. - :param str blob_name: The name of the blob to upload the local file to. - :param expiry: The SAS expiry time. - :type expiry: `datetime.datetime` - :param int timeout: timeout in minutes from now for expiry, - will only be used if expiry is not specified - :return: A SAS token - :rtype: str - """ - if expiry is None: - if timeout is None: - timeout = 30 - expiry = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) - return blob_client.generate_blob_shared_access_signature( - container_name, blob_name, permission=permission, expiry=expiry) - - -def upload_blob_and_create_sas(container_name, blob_name, file_name, expiry, blob_client, timeout=None): - """ - Uploads a file from local disk to Azure Storage and creates a SAS for it. - :param blob_client: The storage block blob client to use. - :type blob_client: `azure.storage.blob.BlockBlobService` - :param str container_name: The name of the container to upload the blob to. - :param str blob_name: The name of the blob to upload the local file to. - :param str file_name: The name of the local file to upload. - :param expiry: The SAS expiry time. - :type expiry: `datetime.datetime` - :param int timeout: timeout in minutes from now for expiry, - will only be used if expiry is not specified - :return: A SAS URL to the blob with the specified expiry time. - :rtype: str - """ - blob_client.create_container(container_name, fail_on_exist=False) - - blob_client.create_blob_from_path(container_name, blob_name, file_name) - - sas_token = create_sas_token( - container_name, - blob_name, - permission=blob.BlobPermissions.READ, - blob_client=None, - expiry=expiry, - timeout=timeout, - ) - - sas_url = blob_client.make_blob_url(container_name, blob_name, sas_token=sas_token) - - return sas_url - - def wrap_commands_in_shell(commands): """ Wrap commands in a shell @@ -262,34 +186,6 @@ def wrap_commands_in_shell(commands): return "/bin/bash -c 'set -e -o pipefail; {};'".format(";".join(commands)) -def get_connection_info(pool_id, node_id, batch_client): - """ - Get connection info of specified node in pool - :param batch_client: The batch client to use. - :type batch_client: `batchserviceclient.BatchServiceClient` - :param str pool_id: The pool id to look up - :param str node_id: The node id to look up - """ - rls = batch_client.compute_node.get_remote_login_settings(pool_id, node_id) - remote_ip = rls.remote_login_ip_address - ssh_port = str(rls.remote_login_port) - return (remote_ip, ssh_port) - - -def get_cluster_total_target_nodes(pool): - """ - Get the total number of target nodes (dedicated + low pri) for the pool - """ - return pool.target_dedicated_nodes + pool.target_low_priority_nodes - - -def get_cluster_total_current_nodes(pool): - """ - Get the total number of current nodes (dedicated + low pri) in the pool - """ - return pool.current_dedicated_nodes + pool.current_low_priority_nodes - - def normalize_path(path: str) -> str: """ Convert a path in a path that will work well with blob storage and unix @@ -350,18 +246,18 @@ def format_batch_exception(batch_exception): return "\n".join(l) -def save_cluster_config(cluster_config, blob_client): +def save_cluster_config(cluster_config, block_blob_service): blob_path = "config.yaml" content = yaml.dump(cluster_config) container_name = cluster_config.cluster_id - blob_client.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_text(container_name, blob_path, content) + block_blob_service.create_container(container_name, fail_on_exist=False) + block_blob_service.create_blob_from_text(container_name, blob_path, content) -def read_cluster_config(cluster_id: str, blob_client: blob.BlockBlobService): +def read_cluster_config(cluster_id: str, block_blob_service: blob.BlockBlobService): blob_path = "config.yaml" try: - result = blob_client.get_blob_to_text(cluster_id, blob_path) + result = block_blob_service.get_blob_to_text(cluster_id, blob_path) return yaml.load(result.content) except azure.common.AzureMissingResourceHttpError: logging.warning("Cluster %s doesn't have cluster configuration in storage", cluster_id) diff --git a/aztk_cli/entrypoint.py b/aztk_cli/entrypoint.py index c1588587..ec24ccf8 100644 --- a/aztk_cli/entrypoint.py +++ b/aztk_cli/entrypoint.py @@ -5,6 +5,7 @@ pip install -e . """ import argparse +import traceback import warnings from typing import NamedTuple @@ -44,8 +45,10 @@ def main(): run_software(args) except BatchErrorException as e: utils.print_batch_exception(e) + log.debug(traceback.format_exc()) except aztk.error.AztkError as e: log.error(str(e)) + log.debug(traceback.format_exc()) def setup_common_args(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py b/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py index 927a6719..8aacde85 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py @@ -3,7 +3,7 @@ import typing import aztk -from aztk_cli import config, utils, log +from aztk_cli import config, log, utils def setup_parser(parser: argparse.ArgumentParser): @@ -32,4 +32,4 @@ def execute(args: typing.NamedTuple): with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: f.write(app_log.log) else: - log.print(app_log.log) + log.print(app_log.log.read().decode('utf-8')) diff --git a/docs/14-azure-files.md b/docs/14-azure-files.md index cd081f07..3ffd539b 100644 --- a/docs/14-azure-files.md +++ b/docs/14-azure-files.md @@ -5,7 +5,7 @@ The ability to load a file share on the cluster is really useful when you want t Mounting an Azure Files share in the cluster only required updating the cluster.yaml file at `.aztk/cluster.yaml`. For example, the following configuration will load two files shares into the cluster, one with my notebooks and one will a small data set that I have previously uploaded to Azure Files. ```yaml -azure_files: +file_shares: - storage_account_name: STORAGE_ACCOUNT_NAME storage_account_key: STORAGE_ACCOUNT_KEY # Name of the file share in Azure Files diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index 5839cc28..1b05a8e3 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -18,12 +18,11 @@ from __future__ import print_function import sys -from random import random from operator import add +from random import random from pyspark.sql import SparkSession - if __name__ == "__main__": """ Usage: pi [partitions] @@ -39,7 +38,7 @@ def f(_): x = random() * 2 - 1 y = random() * 2 - 1 - return 1 if x ** 2 + y ** 2 <= 1 else 0 + return 1 if x**2 + y**2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) diff --git a/setup.py b/setup.py index a4671f4b..cff07793 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ def find_package_files(root, directory, dest=""): "azure-batch~=5.1.0", "azure-mgmt-batch~=5.0.1", "azure-mgmt-storage~=3.0.0", - "azure-storage-blob~=1.3.1", + "azure-storage-blob~=1.4.0", "azure-cosmosdb-table~=1.0.5", "pycryptodomex~=3.6.6", "PyYAML~=3.13", diff --git a/tests/integration_tests/spark/sdk/clean_up_cluster.py b/tests/integration_tests/spark/sdk/clean_up_cluster.py index 275028e0..5f48f49a 100644 --- a/tests/integration_tests/spark/sdk/clean_up_cluster.py +++ b/tests/integration_tests/spark/sdk/clean_up_cluster.py @@ -1,5 +1,5 @@ import azure.batch.models as batch_models -from azure.batch.models import BatchErrorException +from azure.batch.models import BatchErrorException, ComputeNodeState from aztk.error import AztkError @@ -8,19 +8,8 @@ def clean_up_cluster(spark_client, id): try: cluster = spark_client.cluster.get(id) nodes = [node for node in cluster.nodes] - if not any([ - node.state in [batch_models.ComputeNodeState.unusable, batch_models.ComputeNodeState.start_task_failed] - for node in nodes - ]): + dont_delete_states = [ComputeNodeState.unusable, ComputeNodeState.start_task_failed] + if not any([node.state in dont_delete_states for node in nodes]): spark_client.cluster.delete(id=id) except (BatchErrorException, AztkError) as e: - # pass in the event that the cluster does not exist - print(str(e)) - acceptable_failures = [ - "The specified job has been marked for deletion and is being garbage collected.", - "The specified pool has been marked for deletion and is being reclaimed." - ] - if any(item in str(e) for item in acceptable_failures): - pass - else: - raise e + pass diff --git a/tests/integration_tests/spark/sdk/cluster/test_cluster.py b/tests/integration_tests/spark/sdk/cluster/test_cluster.py index bc21cdba..f9b47a68 100644 --- a/tests/integration_tests/spark/sdk/cluster/test_cluster.py +++ b/tests/integration_tests/spark/sdk/cluster/test_cluster.py @@ -73,7 +73,7 @@ def test_list_clusters(): def test_get_remote_login_settings(): - test_id = "get-remote-login-" + test_id = "remote-login-" cluster_configuration = aztk.spark.models.ClusterConfiguration( cluster_id=test_id + base_cluster_id, size=2, @@ -187,7 +187,7 @@ def test_create_user_ssh_key(): def test_get_application_state_complete(): - test_id = "app-status-complete-" + test_id = "app-status-" cluster_configuration = aztk.spark.models.ClusterConfiguration( cluster_id=test_id + base_cluster_id, size=2, @@ -214,7 +214,6 @@ def test_get_application_state_complete(): max_retry_count=None) try: spark_client.cluster.create(cluster_configuration, wait=True) - spark_client.cluster.submit( id=cluster_configuration.cluster_id, application=application_configuration, wait=True) state = spark_client.cluster.get_application_state( @@ -249,7 +248,7 @@ def test_delete_cluster(): def test_spark_processes_up(): - test_id = "spark-processes-up-" + test_id = "spark-up-" cluster_configuration = aztk.spark.models.ClusterConfiguration( cluster_id=test_id + base_cluster_id, size=2, diff --git a/tests/integration_tests/spark/sdk/get_client.py b/tests/integration_tests/spark/sdk/get_client.py index 1559e638..a39451c3 100644 --- a/tests/integration_tests/spark/sdk/get_client.py +++ b/tests/integration_tests/spark/sdk/get_client.py @@ -42,6 +42,6 @@ def get_spark_client(): def get_test_suffix(prefix: str): # base cluster name dt = datetime.now() - current_time = dt.microsecond + current_time = str(dt.microsecond)[:4] base_cluster_id = "{0}-{1}".format(prefix, current_time) return base_cluster_id