Skip to content

Commit f632cf8

Browse files
authored
Merge pull request #163377 from benchwang/master
Describe logging for new processes. Adjust section titles to show as links on the top of the page.
2 parents 3efbad1 + 330e67c commit f632cf8

File tree

1 file changed

+69
-29
lines changed

1 file changed

+69
-29
lines changed

articles/machine-learning/how-to-debug-parallel-run-step.md

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ For general tips on troubleshooting a pipeline, see [Troubleshooting machine lea
2525

2626
Your ParallelRunStep runs as a step in ML pipelines. You may want to [test your scripts locally](how-to-debug-visual-studio-code.md#debug-and-troubleshoot-machine-learning-pipelines) as a first step.
2727

28-
## Script requirements
28+
## Entry script requirements
2929

30-
The script for a `ParallelRunStep` *must contain* two functions:
30+
The entry script for a `ParallelRunStep` *must contain* a `run()` function and optionally contains an `init()` function:
3131
- `init()`: Use this function for any costly or common preparation for later processing. For example, use it to load the model into a global object. This function will be called only once at beginning of process.
3232
> [!NOTE]
33-
> If your `init` method creates an output directory, specify that `exist_ok=True`. The `init` method is called from each worker process on every node on which the job is running.
33+
> If your `init` method creates an output directory, specify that `parents=True` and `exist_ok=True`. The `init` method is called from each worker process on every node on which the job is running.
3434
- `run(mini_batch)`: The function will run for each `mini_batch` instance.
3535
- `mini_batch`: `ParallelRunStep` will invoke run method and pass either a list or pandas `DataFrame` as an argument to the method. Each entry in mini_batch will be a file path if input is a `FileDataset` or a pandas `DataFrame` if input is a `TabularDataset`.
3636
- `response`: run() method should return a pandas `DataFrame` or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful run of input element in the input mini-batch. Make sure that enough data is included in run result to map input to run output result. Run output will be written in output file and not guaranteed to be in order, you should use some key in the output to map it to input.
3737
> [!NOTE]
38-
> One output element is expected for one input element.
38+
> One output element is expected for one input element.
3939
4040
```python
4141
%%writefile digit_identification.py
@@ -98,7 +98,7 @@ file_path = os.path.join(script_dir, "<file_name>")
9898
- For `FileDataset`, it's the number of files with a minimum value of `1`. You can combine multiple files into one mini-batch.
9999
- For `TabularDataset`, it's the size of data. Example values are `1024`, `1024KB`, `10MB`, and `1GB`. The recommended value is `1MB`. The mini-batch from `TabularDataset` will never cross file boundaries. For example, if you have .csv files with various sizes, the smallest file is 100 KB and the largest is 10 MB. If you set `mini_batch_size = 1MB`, then files with a size smaller than 1 MB will be treated as one mini-batch. Files with a size larger than 1 MB will be split into multiple mini-batches.
100100
> [!NOTE]
101-
> TabularDatasets backed by SQL cannot be partitioned.
101+
> TabularDatasets backed by SQL cannot be partitioned.
102102
> TabularDatasets from a single parquet file and single row group cannot be partitioned.
103103
104104
- `error_threshold`: The number of record failures for `TabularDataset` and file failures for `FileDataset` that should be ignored during processing. If the error count for the entire input goes above this value, the job will be aborted. The error threshold is for the entire input and not for individual mini-batch sent to the `run()` method. The range is `[-1, int.max]`. The `-1` part indicates ignoring all failures during processing.
@@ -109,18 +109,18 @@ file_path = os.path.join(script_dir, "<file_name>")
109109
- `source_directory`: Paths to folders that contain all files to execute on the compute target (optional).
110110
- `compute_target`: Only `AmlCompute` is supported.
111111
- `node_count`: The number of compute nodes to be used for running the user script.
112-
- `process_count_per_node`: The number of worker processes per node to run the entry script in parallel. For a GPU machine, the default value is 1. For a CPU machine, the default value is the number of cores per node. A worker process will call `run()` repeatedly by passing the mini batch it gets. The total number of worker processes in your job is `process_count_per_node * node_count`, which decides the max number of `run()` to execute in parallel.
112+
- `process_count_per_node`: The number of worker processes per node to run the entry script in parallel. For a GPU machine, the default value is 1. For a CPU machine, the default value is the number of cores per node. A worker process will call `run()` repeatedly by passing the mini batch it gets. The total number of worker processes in your job is `process_count_per_node * node_count`, which decides the max number of `run()` to execute in parallel.
113113
- `environment`: The Python environment definition. You can configure it to use an existing Python environment or to set up a temporary environment. The definition is also responsible for setting the required application dependencies (optional).
114114
- `logging_level`: Log verbosity. Values in increasing verbosity are: `WARNING`, `INFO`, and `DEBUG`. (optional; the default value is `INFO`)
115115
- `run_invocation_timeout`: The `run()` method invocation timeout in seconds. (optional; default value is `60`)
116-
- `run_max_try`: Maximum try count of `run()` for a mini-batch. A `run()` is failed if an exception is thrown, or nothing is returned when `run_invocation_timeout` is reached (optional; default value is `3`).
116+
- `run_max_try`: Maximum try count of `run()` for a mini-batch. A `run()` is failed if an exception is thrown, or nothing is returned when `run_invocation_timeout` is reached (optional; default value is `3`).
117117

118-
You can specify `mini_batch_size`, `node_count`, `process_count_per_node`, `logging_level`, `run_invocation_timeout`, and `run_max_try` as `PipelineParameter`, so that when you resubmit a pipeline run, you can fine-tune the parameter values. In this example, you use `PipelineParameter` for `mini_batch_size` and `Process_count_per_node` and you will change these values when you resubmit another run.
118+
You can specify `mini_batch_size`, `node_count`, `process_count_per_node`, `logging_level`, `run_invocation_timeout`, and `run_max_try` as `PipelineParameter`, so that when you resubmit a pipeline run, you can fine-tune the parameter values. In this example, you use `PipelineParameter` for `mini_batch_size` and `Process_count_per_node` and you will change these values when you resubmit another run.
119119

120120
#### CUDA devices visibility
121121
For compute targets equipped with GPUs, the environment variable `CUDA_VISIBLE_DEVICES` will be set in worker processes. In AmlCompute, you can find the total number of GPU devices in the environment variable `AZ_BATCHAI_GPU_COUNT_FOUND`, which is set automatically. If you want each worker process to have a dedicated GPU, set `process_count_per_node` equal to the number of GPU devices on a machine. Each worker process will assign a unique index to `CUDA_VISIBLE_DEVICES`. If a worker process stops for any reason, the next started worker process will use the released GPU index.
122122

123-
If the total number of GPU devices is less than `process_count_per_node`, the worker processes will be assigned GPU index until all have been used.
123+
If the total number of GPU devices is less than `process_count_per_node`, the worker processes will be assigned GPU index until all have been used.
124124

125125
Given the total GPU devices is 2 and `process_count_per_node = 4` as an example, process 0 and process 1 will have index 0 and 1. Process 2 and 3 won't have an environment variable. For a library using this environment variable for GPU assignment, process 2 and 3 won't have GPUs and won't try to acquire GPU devices. If process 0 stops, it will release GPU index 0. The next process, which is process 4, will have GPU index 0 assigned.
126126

@@ -181,7 +181,7 @@ When you need a full understanding of how each node executed the score script, l
181181

182182
- `~/logs/sys/node/<node_id>/<process_name>.txt`: This file provides detailed info about each mini-batch as it's picked up or completed by a worker. For each mini-batch, this file includes:
183183

184-
- The IP address and the PID of the worker process.
184+
- The IP address and the PID of the worker process.
185185
- The total number of items, successfully processed items count, and failed item count.
186186
- The start time, duration, process time and run method time.
187187

@@ -196,7 +196,7 @@ You can also view the results of periodical checks of the resource usage for eac
196196
- `node_resource_usage.csv`: Resource usage overview of the node.
197197
- `processes_resource_usage.csv`: Resource usage overview of each process.
198198

199-
### How do I log from my user script from a remote context?
199+
## How do I log from my user script from a remote context?
200200

201201
ParallelRunStep may run multiple processes on one node based on process_count_per_node. In order to organize logs from each process on node and combine print and log statement, we recommend using ParallelRunStep logger as shown below. You get a logger from EntryScript and make the logs show up in **logs/user** folder in the portal.
202202

@@ -222,33 +222,71 @@ def run(mini_batch):
222222
return mini_batch
223223
```
224224

225-
### Where does the message from Python `logging` sink to?
225+
## Where does the message from Python `logging` sink to?
226226
ParallelRunStep sets a handler on the root logger, which sinks the message to `logs/user/stdout/<node_id>/processNNN.stdout.txt`.
227227

228228
`logging` defaults to `WARNING` level. By default, levels below `WARNING` won't show up, such as `INFO` or `DEBUG`.
229229

230-
### Where is the message from subprocess created with Popen()?
231-
If no `stdout` or `stderr` specified, a subprocess will inherit the setting of the worker process.
232-
233-
`stdout` will write to `logs/sys/node/<node_id>/processNNN.stdout.txt` and `stderr` to `logs/sys/node/<node_id>/processNNN.stderr.txt`.
234-
235-
### How could I write to a file to show up in the portal?
230+
## How could I write to a file to show up in the portal?
236231
Files in `logs` folder will be uploaded and show up in the portal.
237232
You can get the folder `logs/user/entry_script_log/<node_id>` like below and compose your file path to write:
238233

239234
```python
240235
from pathlib import Path
236+
from azureml_user.parallel_run import EntryScript
237+
241238
def init():
242239
"""Init once in a worker process."""
243240
entry_script = EntryScript()
244-
folder = entry_script.log_dir
241+
log_dir = entry_script.log_dir
242+
log_dir = Path(entry_script.log_dir) # logs/user/entry_script_log/<node_id>/.
243+
log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
244+
245+
proc_name = entry_script.agent_name # The process name in pattern "processNNN".
246+
fil_path = log_dir / f"{proc_name}_<file_name>" # Avoid conflicting among worker processes with proc_name.
247+
```
248+
249+
## How to handle log in new processes?
250+
You can spawn new processes in you entry script with [`subprocess`](https://docs.python.org/3/library/subprocess.html) module, connect to their input/output/error pipes and obtain their return codes.
251+
252+
The recommended approach is to use the [`run()`](https://docs.python.org/3/library/subprocess.html#subprocess.run) function with `capture_output=True`. Errors will show up in `logs/user/error/<node_id>/<process_name>.txt`.
253+
254+
If you want to use `Popen()`, you should redirect stdout/stderr to files, like:
255+
```python
256+
from pathlib import Path
257+
from subprocess import Popen
258+
259+
from azureml_user.parallel_run import EntryScript
260+
261+
262+
def init():
263+
"""Show how to redirect stdout/stderr to files in logs/user/entry_script_log/<node_id>/."""
264+
entry_script = EntryScript()
265+
proc_name = entry_script.agent_name # The process name in pattern "processNNN".
266+
log_dir = Path(entry_script.log_dir) # logs/user/entry_script_log/<node_id>/.
267+
log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
268+
stdout_file = str(log_dir / f"{proc_name}_demo_stdout.txt")
269+
stderr_file = str(log_dir / f"{proc_name}_demo_stderr.txt")
270+
proc = Popen(
271+
["...")],
272+
stdout=open(stdout_file, "w"),
273+
stderr=open(stderr_file, "w"),
274+
# ...
275+
)
245276

246-
fil_path = Path(folder) / "<file_name>"
247277
```
248278

249-
### How do I write a file to the output directory, and then view it in the portal?
279+
> [!NOTE]
280+
> A worker process runs "system" code and the entry script code in the same process.
281+
>
282+
> If no `stdout` or `stderr` specified, a subprocess created with `Popen()` in your entry script will inherit the setting of the worker process.
283+
>
284+
> `stdout` will write to `logs/sys/node/<node_id>/processNNN.stdout.txt` and `stderr` to `logs/sys/node/<node_id>/processNNN.stderr.txt`.
285+
286+
287+
## How do I write a file to the output directory, and then view it in the portal?
250288

251-
You can get the output directory from the `EntryScript` class and write to it. To view the written files, in the step Run view in the Azure Machine Learning portal, select the **Outputs + logs** tab. Select the **Data outputs** link, and then complete the steps that are described in the dialog.
289+
You can get the output directory from the `EntryScript` class and write to it. To view the written files, in the step Run view in the Azure Machine Learning portal, select the **Outputs + logs** tab. Select the **Data outputs** link, and then complete the steps that are described in the dialog.
252290

253291
Use `EntryScript` in your entry script like in this example:
254292

@@ -262,14 +300,14 @@ def run(mini_batch):
262300
(Path(output_dir) / res2).write...
263301
```
264302

265-
### How can I pass a side input such as, a file or file(s) containing a lookup table, to all my workers?
303+
## How can I pass a side input such as, a file or file(s) containing a lookup table, to all my workers?
266304

267305
User can pass reference data to script using side_inputs parameter of ParalleRunStep. All datasets provided as side_inputs will be mounted on each worker node. User can get the location of mount by passing argument.
268306

269307
Construct a [Dataset](/python/api/azureml-core/azureml.core.dataset.dataset) containing the reference data, specify a local mount path and register it with your workspace. Pass it to the `side_inputs` parameter of your `ParallelRunStep`. Additionally, you can add its path in the `arguments` section to easily access its mounted path.
270308

271309
> [!NOTE]
272-
> Use FileDatasets only for side_inputs.
310+
> Use FileDatasets only for side_inputs.
273311
274312
```python
275313
local_path = "/tmp/{}".format(str(uuid.uuid4()))
@@ -294,23 +332,23 @@ args, _ = parser.parse_known_args()
294332
labels_path = args.labels_dir
295333
```
296334

297-
### How to use input datasets with service principal authentication?
335+
## How to use input datasets with service principal authentication?
298336
User can pass input datasets with service principal authentication used in workspace. Using such dataset in ParallelRunStep requires that dataset to be registered for it to construct ParallelRunStep configuration.
299337

300338
```python
301339
service_principal = ServicePrincipalAuthentication(
302340
tenant_id="***",
303341
service_principal_id="***",
304342
service_principal_password="***")
305-
343+
306344
ws = Workspace(
307345
subscription_id="***",
308346
resource_group="***",
309347
workspace_name="***",
310348
auth=service_principal
311349
)
312-
313-
default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
350+
351+
default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
314352
ds = Dataset.File.from_files(default_blob_store, '**path***')
315353
registered_ds = ds.register(ws, '***dataset-name***', create_new_version=True)
316354
```
@@ -351,6 +389,8 @@ ParallelRunStep will start new worker processes in replace of the ones exited ab
351389

352390
* See these [Jupyter notebooks demonstrating Azure Machine Learning pipelines](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines)
353391

354-
* See the SDK reference for help with the [azureml-pipeline-steps](/python/api/azureml-pipeline-steps/azureml.pipeline.steps) package. View reference [documentation](/python/api/azureml-pipeline-steps/azureml.pipeline.steps.parallelrunstep) for ParallelRunStep class.
392+
* See the SDK reference for help with the [azureml-pipeline-steps](/python/api/azureml-pipeline-steps/azureml.pipeline.steps) package.
393+
394+
* View reference [documentation](/python/api/azureml-pipeline-steps/azureml.pipeline.steps.parallelrunconfig) for ParallelRunConfig class and [documentation](/python/api/azureml-pipeline-steps/azureml.pipeline.steps.parallelrunstep) for ParallelRunStep class.
355395

356396
* Follow the [advanced tutorial](tutorial-pipeline-batch-scoring-classification.md) on using pipelines with ParallelRunStep. The tutorial shows how to pass another file as a side input.

0 commit comments

Comments
 (0)