Skip to content

Commit c23859f

Browse files
bgkleinxingwu1
authored andcommitted
Fix MergeTask Bug (#100)
* Fix mergetask * pylint * Update recordings * Update documentation and minor edits
1 parent 6c20516 commit c23859f

File tree

10 files changed

+3145
-995
lines changed

10 files changed

+3145
-995
lines changed

HISTORY.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
SDK Release History
44
===================
55

6+
5.0.4 (2019-02-25)
7+
------------------
8+
9+
* Fix bug where specifying a mergeTask in a template would cause task add failures.
10+
611
5.0.3 (2019-02-19)
712
------------------
813

azext/batch/_template_utils.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,11 @@ def _transform_repeat_task(task, context, index, transformer):
883883
return new_task
884884

885885

886+
def _transform_merge_task(task):
887+
new_task = models.ExtendedTaskParameter(**copy.deepcopy(task.__dict__))
888+
return new_task
889+
890+
886891
def _parse_parameter_sets(parameter_sets):
887892
"""Parse parametric sweep set, and return all possible values in array.
888893
:param list parameter_sets: An array of parameter sets.
@@ -907,8 +912,8 @@ def _expand_parametric_sweep(factory):
907912
try:
908913
factory.merge_task.id = 'merge'
909914
factory.merge_task.depends_on = models.TaskDependencies(
910-
task_id_ranges=models.TaskIdRange(start=0, end=len(task_objs) - 1))
911-
task_objs.append(factory.merge_task)
915+
task_id_ranges=[models.TaskIdRange(start=0, end=len(task_objs) - 1)])
916+
task_objs.append(_transform_merge_task(factory.merge_task))
912917
except AttributeError: # No merge task
913918
pass
914919
return task_objs
@@ -931,8 +936,8 @@ def _expand_task_per_file(factory, fileutils):
931936
try:
932937
factory.merge_task.id = 'merge'
933938
factory.merge_task.depends_on = models.TaskDependencies(
934-
task_id_ranges=models.TaskIdRange(start=0, end=len(task_objs) - 1))
935-
task_objs.append(factory.merge_task)
939+
task_id_ranges=[models.TaskIdRange(start=0, end=len(task_objs) - 1)])
940+
task_objs.append(_transform_merge_task(factory.merge_task))
936941
except AttributeError: # No merge task
937942
pass
938943
return task_objs
@@ -994,6 +999,16 @@ def expand_task_factory(job, fileutils):
994999
return tasks
9951000

9961001

1002+
def has_merge_task(job):
1003+
""" Check if user has specified a mergeTask on the task factory
1004+
:param job_obj: The JSON job entity loaded from a template.:
1005+
:return: true if merge task present
1006+
"""
1007+
if job.task_factory.type in ['parametricSweep', 'taskPerFile'] and job.task_factory.merge_task:
1008+
return True
1009+
return False
1010+
1011+
9971012
def construct_setup_task(existing_task, command_info, os_flavor):
9981013
"""Constructs a command line for the start task/job prep task which will
9991014
run the setup script.

azext/batch/operations/job_operations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ def add(self, job, job_add_options=None, custom_headers=None, raw=False,
157157
task_collection = []
158158
file_utils = FileUtils(self.get_storage_client)
159159
if hasattr(job, 'task_factory') and job.task_factory:
160+
if templates.has_merge_task(job):
161+
job.uses_task_dependencies = True
160162
task_collection = templates.expand_task_factory(job, file_utils)
161163

162164
# If job has a task factory and terminate job on all tasks complete is set, the job will

azext/batch/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
VERSION = "5.0.3"
6+
VERSION = "5.0.4"

doc/taskFactories.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,64 @@ Where the following tasks would be created:
188188
]
189189
```
190190

191+
A `mergeTask` may also be specified to run after all tasks in the parametric sweep factory. This allows for map-reduce patterns where one task does parsing or combining of output upon completion of
192+
all tasks in the task factory. Similar to the `repeatTask` both the `id` and `dependsOn` properties are not allowed, as these are auto-populated with an `id` of 'merge' and the dependant tasks
193+
automatically.
194+
195+
**Note:** It is not advised to use file groups as inputs to a `mergeTask`. File groups are expanded at the time of adding a task to the job and as such, will not contain output generated from dependant
196+
tasks. If you desire to output data from dependant tasks to the `mergeTask` please look into using either `autoStorageContainerName` or `containerUrl` REST API properties.
197+
198+
A basic example of using mergeTask:
199+
```json
200+
"job": {
201+
"type": "Microsoft.Batch/batchAccounts/jobs",
202+
"apiVersion": "2018-12-01",
203+
"properties": {
204+
"id": "mergetask",
205+
"poolInfo": {
206+
"poolId": "my-mergetask-pool"
207+
},
208+
"taskFactory": {
209+
"type": "parametricSweep",
210+
"parameterSets": [
211+
{
212+
"start": 1,
213+
"end": 500,
214+
"step": 1
215+
}
216+
],
217+
"repeatTask": {
218+
"commandLine": "/bin/bash -c 'echo {0}'",
219+
"outputFiles": [
220+
{
221+
"filePattern": "**/stdout.txt",
222+
"destination": {
223+
"autoStorage": {
224+
"path": "output-{0}",
225+
"fileGroup": "outputData"
226+
}
227+
},
228+
"uploadOptions": {
229+
"uploadCondition": "TaskSuccess"
230+
}
231+
}
232+
]
233+
},
234+
"mergeTask" : {
235+
"displayName": "myMergeTask",
236+
"commandLine": "/bin/bash -c 'ls'",
237+
"resourceFiles": [
238+
{
239+
"autoStorageContainerName": "fgrp-outputData"
240+
}
241+
]
242+
}
243+
}
244+
}
245+
}
246+
```
247+
248+
191249
### Samples
192250

193251
The following samples use the parametric sweep task factory:
@@ -305,6 +363,63 @@ The above task factory would be expanded into the following tasks:
305363
]
306364
```
307365

366+
A `mergeTask` may also be specified to run after all tasks in the task per file factory. This allows for map-reduce patterns where one task does parsing or combining of output upon completion of
367+
all tasks in the task factory. Similar to the `repeatTask` both the `id` and `dependsOn` properties are not allowed, as these are auto-populated with an `id` of 'merge' and the dependant tasks
368+
automatically.
369+
370+
**Note:** It is not advised to use file groups as inputs to a `mergeTask`. File groups are expanded at the time of adding a task to the job and as such, will not contain output generated from dependant
371+
tasks. If you desire to output data from dependant tasks to the `mergeTask` please look into using either `autoStorageContainerName` or `containerUrl` REST API properties.
372+
373+
A basic example of using mergeTask:
374+
375+
```json
376+
"job": {
377+
"type": "Microsoft.Batch/batchAccounts/jobs",
378+
"apiVersion": "2018-12-01",
379+
"properties": {
380+
"id": "mergetask",
381+
"poolInfo": {
382+
"poolId": "my-mergetask-pool"
383+
},
384+
"taskFactory": {
385+
"type": "taskPerFile",
386+
"source" : {
387+
"fileGroup" : "inputData"
388+
},
389+
"repeatTask": {
390+
"commandLine": "/bin/bash -c 'cat {fileName}'",
391+
"resourceFiles": [
392+
{
393+
"httpUrl" : "{url}",
394+
"filePath" : "{fileName}"
395+
}
396+
],
397+
"outputFiles": [
398+
{
399+
"filePattern": "**/stdout.txt",
400+
"destination": {
401+
"autoStorage": {
402+
"path": "output-{fileName}",
403+
"fileGroup": "outputData"
404+
}
405+
},
406+
"uploadOptions": {
407+
"uploadCondition": "TaskSuccess"
408+
}
409+
}
410+
]
411+
},
412+
"mergeTask" : {
413+
"displayName": "myMergeTask",
414+
"commandLine": "/bin/bash -c 'ls'",
415+
"resourceFiles": [
416+
{
417+
"autoStorageContainerName": "fgrp-outputData"
418+
}
419+
]
420+
}
421+
```
422+
308423
### Samples
309424

310425
The following samples use the task per file task factory:
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
{
2+
"parameters": {
3+
"jobId": {
4+
"type": "string",
5+
"defaultValue": "helloworld-job3",
6+
"metadata": {
7+
"description": "The id of Azure Batch job"
8+
}
9+
},
10+
"poolId": {
11+
"type": "string",
12+
"defaultValue": "helloworld-pool3",
13+
"metadata": {
14+
"description": "The id of Azure Batch pool which runs the job"
15+
}
16+
},
17+
"vmSize": {
18+
"type": "string",
19+
"defaultValue": "STANDARD_D1_V2",
20+
"metadata": {
21+
"description": "The size of the virtual machines that run the application"
22+
}
23+
},
24+
"vmCount": {
25+
"type": "int",
26+
"defaultValue": 1,
27+
"metadata": {
28+
"description": "The number of virtual machines"
29+
}
30+
},
31+
"testData": {
32+
"type": "string",
33+
"defaultValue": "in",
34+
"metadata": {
35+
"description": "The auto-storage group where the input data is stored"
36+
}
37+
},
38+
"outputData": {
39+
"type": "string",
40+
"defaultValue": "output",
41+
"metadata": {
42+
"description": "The auto-storage group where the output data is uploaded"
43+
}
44+
}
45+
},
46+
"variables": {
47+
"osType": {
48+
"publisher": "Canonical",
49+
"offer": "UbuntuServer",
50+
"sku": "16.04.0-LTS",
51+
"version": "latest"
52+
}
53+
},
54+
"job": {
55+
"type": "Microsoft.Batch/batchAccounts/jobs",
56+
"apiVersion": "2018-12-01",
57+
"properties": {
58+
"id": "[parameters('jobId')]",
59+
"onAllTasksComplete": "terminateJob",
60+
"poolInfo": {
61+
"autoPoolSpecification": {
62+
"autoPoolIdPrefix": "[parameters('poolId')]",
63+
"poolLifetimeOption": "job",
64+
"keepAlive": false,
65+
"pool": {
66+
"vmSize": "[parameters('vmSize')]",
67+
"virtualMachineConfiguration": {
68+
"imageReference": "[variables('osType')]",
69+
"nodeAgentSKUId": "batch.node.ubuntu 16.04"
70+
},
71+
"targetDedicatedNodes": "[parameters('vmCount')]"
72+
}
73+
}
74+
},
75+
"taskFactory": {
76+
"type": "taskPerFile",
77+
"source" : {
78+
"fileGroup" : "[parameters('testData')]"
79+
},
80+
"repeatTask": {
81+
"commandLine": "/bin/bash -c 'cat {fileName}'",
82+
"resourceFiles": [
83+
{
84+
"httpUrl" : "{url}",
85+
"filePath" : "{fileName}"
86+
}
87+
],
88+
"outputFiles": [
89+
{
90+
"filePattern": "**/stdout.txt",
91+
"destination": {
92+
"autoStorage": {
93+
"path": "output-{fileName}",
94+
"fileGroup": "[parameters('outputData')]"
95+
}
96+
},
97+
"uploadOptions": {
98+
"uploadCondition": "TaskSuccess"
99+
}
100+
}
101+
]
102+
},
103+
"mergeTask" : {
104+
"displayName": "myMergeTask",
105+
"commandLine": "/bin/bash -c 'ls'",
106+
"resourceFiles": [
107+
{
108+
"autoStorageContainerName": "fgrp-[parameters('outputData')]"
109+
}
110+
]
111+
}
112+
}
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)