Skip to content

Commit 328be5f

Browse files
chloeguramatthchr
authored andcommitted
Add Sample 4 Job Scheduler (#271)
* Add Sample 4 Job Scheduler
1 parent d5ac798 commit 328be5f

File tree

4 files changed

+305
-1
lines changed

4 files changed

+305
-1
lines changed

Python/Batch/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ path invocation. This sample can be run on Windows with an appropriate openssl
4949
binary and modified openssl invocations (i.e., `openssl.exe` instead of
5050
`openssl`).
5151

52+
#### [sample4\_job\_scheduler.py](./sample4_job_scheduler.py)
53+
This sample demonstrates how to use a Job Schedule to run recurring work. The
54+
sample creates a Job Schedule with a Job specification that has an AutoPool
55+
with a StartTask and a JobManager Task. The Job Schedule will create a Job,
56+
at which point the AutoPool for that Job is created. The AutoPool's StartTask
57+
will run on every Compute Node, downloading and installing Python. Once completed,
58+
the Job's JobManager Task will execute, running a simple Python program. The Job
59+
will complete once all tasks under it (here, only the Job Manager) have
60+
completed, at which point the Job Schedule is able to create the next Job
61+
recurrence based on its schedule. This Job Schedule is configured to run every
62+
10 minutes, for 30 minutes in total. The Jobs created underneath the Job
63+
Schedule will each create their own CloudServices AutoPool. The AutoPool's
64+
lifetime is scoped to the Job.
65+
66+
5267
## Azure Batch on Linux Best Practices
5368

5469
Although some of the Python samples are not specific to Linux, the Azure Batch

Python/Batch/common/helpers.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import azure.storage.blob as azureblob
3232
import azure.batch.models as batchmodels
3333

34-
3534
_STANDARD_OUT_FILE_NAME = 'stdout.txt'
3635
_STANDARD_ERROR_FILE_NAME = 'stderr.txt'
3736
_SAMPLES_CONFIG_FILE_NAME = 'configuration.cfg'
@@ -40,6 +39,7 @@
4039
class TimeoutError(Exception):
4140
"""An error which can occur if a timeout has expired.
4241
"""
42+
4343
def __init__(self, message):
4444
self.message = message
4545

@@ -516,3 +516,49 @@ def wrap_commands_in_shell(ostype, commands):
516516
return 'cmd.exe /c "{}"'.format('&'.join(commands))
517517
else:
518518
raise ValueError('unknown ostype: {}'.format(ostype))
519+
520+
521+
def wait_for_job_under_job_schedule(batch_client, job_schedule_id, timeout):
522+
"""Waits for a job to be created and returns a job id.
523+
524+
:param batch_client: The batch client to use.
525+
:type batch_client: `batchserviceclient.BatchServiceClient`
526+
:param str job_schedule_id: The id of the job schedule to monitor.
527+
:param timeout: The maximum amount of time to wait.
528+
:type timeout: `datetime.timedelta`
529+
"""
530+
cloud_job_schedule = batch_client.job_schedule.get(
531+
job_schedule_id=job_schedule_id)
532+
533+
time_to_timeout_at = datetime.datetime.now() + timeout
534+
535+
while datetime.datetime.now() < time_to_timeout_at:
536+
print("Checking if job exists...")
537+
if cloud_job_schedule.execution_info.recent_job.id is not None:
538+
return cloud_job_schedule.execution_info.recent_job.id
539+
time.sleep(1)
540+
541+
raise TimeoutError("Timed out waiting for tasks to complete")
542+
543+
544+
def wait_for_job_schedule_to_complete(batch_client, job_schedule_id, timeout):
545+
"""Waits for a job schedule to complete.
546+
547+
:param batch_client: The batch client to use.
548+
:type batch_client: `batchserviceclient.BatchServiceClient`
549+
:param str job_schedule_id: The id of the job schedule to monitor.
550+
:param timeout: The maximum amount of time to wait.
551+
:type timeout: `datetime.datetime`
552+
"""
553+
554+
cloud_job_schedule = batch_client.job_schedule.get(
555+
job_schedule_id=job_schedule_id)
556+
557+
while datetime.datetime.now() < timeout:
558+
print("Checking if job schedule is complete...")
559+
state = cloud_job_schedule.state
560+
if state == batchmodels.JobScheduleState.completed:
561+
return
562+
time.sleep(10)
563+
564+
return
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[DEFAULT]
2+
shoulddeletejobschedule=true
3+
poolvmsize=STANDARD_D1_V2
4+
poolvmcount=1
5+
shoulddeletecontainer=true
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# sample4_job_scheduler.py Code Sample
2+
#
3+
# Copyright (c) Microsoft Corporation
4+
#
5+
# All rights reserved.
6+
#
7+
# MIT License
8+
#
9+
# Permission is hereby granted, free of charge, to any person obtaining a
10+
# copy of this software and associated documentation files (the "Software"),
11+
# to deal in the Software without restriction, including without limitation
12+
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
13+
# and/or sell copies of the Software, and to permit persons to whom the
14+
# Software is furnished to do so, subject to the following conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be included in
17+
# all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25+
# DEALINGS IN THE SOFTWARE.
26+
27+
from __future__ import print_function
28+
29+
try:
30+
import configparser
31+
except ImportError:
32+
import ConfigParser as configparser
33+
34+
import datetime
35+
import os
36+
37+
import azure.batch.batch_service_client as batch
38+
import azure.storage.blob as azureblob
39+
import azure.batch.batch_auth as batchauth
40+
import azure.batch.models as batchmodels
41+
42+
import common.helpers
43+
44+
_CONTAINER_NAME = 'jobscheduler'
45+
_SIMPLE_TASK_NAME = 'simple_task.py'
46+
_SIMPLE_TASK_PATH = os.path.join('resources', 'simple_task.py')
47+
_PYTHON_DOWNLOAD = \
48+
'https://www.python.org/ftp/python/3.7.3/python-3.7.3-amd64.exe'
49+
_PYTHON_INSTALL = \
50+
r'.\python373.exe /passive InstallAllUsers=1 PrependPath=1 Include_test=0'
51+
_USER_ELEVATION_LEVEL = 'admin'
52+
_START_TIME = datetime.datetime.utcnow()
53+
_END_TIME = _START_TIME + datetime.timedelta(minutes=30)
54+
55+
56+
def create_job_schedule(batch_client, job_schedule_id, vm_size, vm_count,
57+
block_blob_client):
58+
"""Creates an Azure Batch pool and job schedule with the specified ids.
59+
60+
:param batch_client: The batch client to use.
61+
:type batch_client: `batchserviceclient.BatchServiceClient`
62+
:param str job_schedule_id: The id of the job schedule to create
63+
:param str vm_size: vm size (sku)
64+
:param int vm_count: number of vms to allocate
65+
:param block_blob_client: The storage block blob client to use.
66+
:type block_blob_client: `azure.storage.blob.BlockBlobService`
67+
"""
68+
cloud_service_config = batchmodels.CloudServiceConfiguration(os_family='6')
69+
70+
user_id = batchmodels.UserIdentity(
71+
auto_user=batchmodels.AutoUserSpecification(
72+
elevation_level=_USER_ELEVATION_LEVEL))
73+
74+
python_download = batchmodels.ResourceFile(
75+
http_url=_PYTHON_DOWNLOAD,
76+
file_path='python373.exe')
77+
78+
pool_info = batchmodels.PoolInformation(
79+
auto_pool_specification=batchmodels.AutoPoolSpecification(
80+
auto_pool_id_prefix="JobScheduler",
81+
pool=batchmodels.PoolSpecification(
82+
vm_size=vm_size,
83+
target_dedicated_nodes=vm_count,
84+
cloud_service_configuration=cloud_service_config,
85+
start_task=batchmodels.StartTask(
86+
command_line=common.helpers.wrap_commands_in_shell(
87+
'windows', ['{}'.format(_PYTHON_INSTALL)]),
88+
resource_files=[python_download],
89+
wait_for_success=True,
90+
user_identity=user_id)),
91+
keep_alive=False,
92+
pool_lifetime_option=batchmodels.PoolLifetimeOption.job))
93+
94+
sas_url = common.helpers.upload_blob_and_create_sas(
95+
block_blob_client,
96+
_CONTAINER_NAME,
97+
_SIMPLE_TASK_NAME,
98+
_SIMPLE_TASK_PATH,
99+
datetime.datetime.utcnow() + datetime.timedelta(minutes=30))
100+
101+
job_spec = batchmodels.JobSpecification(
102+
pool_info=pool_info,
103+
# Terminate job once all tasks under it are complete to allow for a new
104+
# job to be created under the schedule
105+
on_all_tasks_complete=batchmodels.OnAllTasksComplete.terminate_job,
106+
job_manager_task=batchmodels.JobManagerTask(
107+
id="JobManagerTask",
108+
command_line=common.helpers.wrap_commands_in_shell(
109+
'windows', ['python {}'.format(_SIMPLE_TASK_NAME)]),
110+
resource_files=[batchmodels.ResourceFile(
111+
file_path=_SIMPLE_TASK_NAME,
112+
http_url=sas_url)]))
113+
114+
do_not_run_after = datetime.datetime.utcnow() \
115+
+ datetime.timedelta(minutes=30)
116+
117+
schedule = batchmodels.Schedule(
118+
do_not_run_after=do_not_run_after,
119+
recurrence_interval=datetime.timedelta(minutes=10))
120+
121+
scheduled_job = batchmodels.JobScheduleAddParameter(
122+
id=job_schedule_id,
123+
schedule=schedule,
124+
job_specification=job_spec)
125+
126+
batch_client.job_schedule.add(cloud_job_schedule=scheduled_job)
127+
128+
129+
def execute_sample(global_config, sample_config):
130+
"""Executes the sample with the specified configurations.
131+
132+
:param global_config: The global configuration to use.
133+
:type global_config: `configparser.ConfigParser`
134+
:param sample_config: The sample specific configuration to use.
135+
:type sample_config: `configparser.ConfigParser`
136+
"""
137+
# Set up the configuration
138+
batch_account_key = global_config.get('Batch', 'batchaccountkey')
139+
batch_account_name = global_config.get('Batch', 'batchaccountname')
140+
batch_service_url = global_config.get('Batch', 'batchserviceurl')
141+
142+
storage_account_key = global_config.get('Storage', 'storageaccountkey')
143+
storage_account_name = global_config.get('Storage', 'storageaccountname')
144+
storage_account_suffix = global_config.get(
145+
'Storage',
146+
'storageaccountsuffix')
147+
148+
should_delete_container = sample_config.getboolean(
149+
'DEFAULT',
150+
'shoulddeletecontainer')
151+
should_delete_job_schedule = sample_config.getboolean(
152+
'DEFAULT',
153+
'shoulddeletejobschedule')
154+
pool_vm_size = sample_config.get(
155+
'DEFAULT',
156+
'poolvmsize')
157+
pool_vm_count = sample_config.getint(
158+
'DEFAULT',
159+
'poolvmcount')
160+
161+
# Print the settings we are running with
162+
common.helpers.print_configuration(global_config)
163+
common.helpers.print_configuration(sample_config)
164+
165+
credentials = batchauth.SharedKeyCredentials(
166+
batch_account_name,
167+
batch_account_key)
168+
169+
batch_client = batch.BatchServiceClient(
170+
credentials,
171+
batch_url=batch_service_url)
172+
173+
block_blob_client = azureblob.BlockBlobService(
174+
account_name=storage_account_name,
175+
account_key=storage_account_key,
176+
endpoint_suffix=storage_account_suffix)
177+
178+
batch_client.config.retry_policy.retries = 5
179+
job_schedule_id = common.helpers.generate_unique_resource_name(
180+
"JobScheduler")
181+
182+
try:
183+
create_job_schedule(
184+
batch_client,
185+
job_schedule_id,
186+
pool_vm_size,
187+
pool_vm_count,
188+
block_blob_client)
189+
190+
print("Start time: ", _START_TIME)
191+
print("Delete time: ", _END_TIME)
192+
193+
recent_job = common.helpers.wait_for_job_under_job_schedule(
194+
batch_client,
195+
job_schedule_id,
196+
datetime.timedelta(minutes=5))
197+
198+
common.helpers.wait_for_tasks_to_complete(
199+
batch_client,
200+
recent_job,
201+
datetime.timedelta(minutes=25))
202+
203+
tasks = batch_client.task.list(recent_job)
204+
task_ids = [task.id for task in tasks]
205+
206+
common.helpers.print_task_output(
207+
batch_client,
208+
recent_job,
209+
task_ids)
210+
211+
common.helpers.wait_for_job_schedule_to_complete(
212+
batch_client,
213+
job_schedule_id,
214+
_END_TIME + datetime.timedelta(minutes=10))
215+
216+
except batchmodels.BatchErrorException as e:
217+
for x in e.error.values:
218+
print("BatchErrorException: ", x)
219+
220+
finally:
221+
if should_delete_job_schedule:
222+
print("Deleting job schedule: ", job_schedule_id)
223+
batch_client.job_schedule.delete(job_schedule_id)
224+
if should_delete_container:
225+
block_blob_client.delete_container(
226+
_CONTAINER_NAME,
227+
fail_not_exist=False)
228+
229+
230+
if __name__ == '__main__':
231+
global_config = configparser.ConfigParser()
232+
global_config.read(common.helpers._SAMPLES_CONFIG_FILE_NAME)
233+
234+
sample_config = configparser.ConfigParser()
235+
sample_config.read(
236+
os.path.splitext(os.path.basename(__file__))[0] + '.cfg')
237+
238+
execute_sample(global_config, sample_config)

0 commit comments

Comments
 (0)