|
| 1 | +# Dask in Jupyter Notebook in OOD |
| 2 | + |
| 3 | +[Dask](https://docs.dask.org/en/stable/) is a Python library for parallel and distributed computing. |
| 4 | + |
| 5 | +## Getting Started |
| 6 | +You can run Dask in a Jupyter Notebook in OOD by going to the URL [ood.hpc.nyu.edu](http://ood.hpc.nyu.edu) in your browser and selecting `DS-GA.1004 - Jupyter Dask` from the `Interactive Apps` pull-down menu at the top of the page. Once you've used it and other interactive apps they'll show up on your home screen under the `Recently Used Apps` header. |
| 7 | + |
| 8 | +:::note |
| 9 | +Be aware that when you start from `Recently Used Apps` it will start with the same configuration that you used previously. If you'd like to configure your Dask session differently, you'll need to select it from the menu. |
| 10 | +::: |
| 11 | + |
| 12 | +## Configuration |
| 13 | + |
| 14 | +You can select the Dask version, number of cores, amount of memory, root directory, number of hours, and optional Slurm options. |
| 15 | + |
| 16 | + |
| 17 | + |
| 18 | +:::warning |
| 19 | +If you select to use `/home` as your root directory be careful not to go over your quota. You can find your current usage with the `myquota` command. Please see our [Storage documentation](../03_storage/01_intro_and_data_management.mdx) for details about your storage options. |
| 20 | +::: |
| 21 | + |
| 22 | +## Dask with Jupyter Notebook running in OOD |
| 23 | + |
| 24 | +After you hit the `Launch` button you'll have to wait for the scheduler to find node(s) for you to run on: |
| 25 | + |
| 26 | + |
| 27 | +Then you'll have a short wait for Dask itself to start up.<br /> |
| 28 | +Once that happens you'll get one last page that will give you links to: |
| 29 | +- open a terminal window on the compute node your Dask session is running on |
| 30 | +- go to the directory associated with your Session ID that stores output, config and other related files for your session |
| 31 | + |
| 32 | + |
| 33 | + |
| 34 | +Please click the `Connect to Jupyter` button and a Jupyter window will open. |
| 35 | + |
| 36 | +## Dask Example |
| 37 | + |
| 38 | +Start a new Jupyter notebook with 4 cores, 16GB memory, and set your root directory to `/scratch`. Enter the following code in the first cell and execute it by pressing the `Shift` and `Enter` keys at the same time. |
| 39 | +```python |
| 40 | +import os |
| 41 | +import pandas as pd |
| 42 | +import numpy as np |
| 43 | +import time |
| 44 | + |
| 45 | +# Create a directory for the large files |
| 46 | +output_dir = "tmp/large_data_files" |
| 47 | +os.makedirs(output_dir, exist_ok=True) |
| 48 | + |
| 49 | +num_files = 5 # Number of files to create |
| 50 | +rows_per_file = 10_000_000 # 10 million rows per file |
| 51 | +for i in range(num_files): |
| 52 | + data = { |
| 53 | + 'col1': np.random.randint(0, 100, size=rows_per_file), |
| 54 | + 'value': np.random.rand(rows_per_file) * 100 |
| 55 | + } |
| 56 | + df = pd.DataFrame(data) |
| 57 | + df.to_csv(os.path.join(output_dir, f'data_{i}.csv'), index=False) |
| 58 | +print(f"{num_files} large CSV files created in '{output_dir}'.") |
| 59 | + |
| 60 | +import dask.dataframe as dd |
| 61 | +from dask.distributed import Client |
| 62 | +import time |
| 63 | +import os |
| 64 | + |
| 65 | +# Start a Dask client for distributed processing (optional but recommended) |
| 66 | +# This allows you to monitor the computation with the Dask dashboard |
| 67 | +client = Client(n_workers=4, threads_per_worker=2, memory_limit='16GB') # Adjust these as per your system resources |
| 68 | +print(client) |
| 69 | + |
| 70 | +# Load multiple CSV files into a Dask DataFrame |
| 71 | +# Dask will automatically partition and parallelize the reading of these files |
| 72 | +output_dir = '/scratch/rjy1/tmp/large_data_files' |
| 73 | +dask_df = dd.read_csv(os.path.join(output_dir, 'data_*.csv')) |
| 74 | + |
| 75 | +# Perform a calculation (e.g., calculate the mean of the 'value' column) |
| 76 | +# This operation will be parallelized across the available workers |
| 77 | +result_dask = dask_df['value'].mean() |
| 78 | + |
| 79 | +# Trigger the computation and measure the time |
| 80 | +start_time = time.time() |
| 81 | +computed_result_dask = result_dask.compute() |
| 82 | +end_time = time.time() |
| 83 | + |
| 84 | +print(f"Dask took {end_time - start_time} seconds to compute the mean across {num_files} files.") |
| 85 | +print(f"Result (Dask): {computed_result_dask}") |
| 86 | + |
| 87 | +import pandas as pd |
| 88 | +import time |
| 89 | +import os |
| 90 | + |
| 91 | +# Perform the same calculation sequentially with Pandas |
| 92 | +start_time_pandas = time.time() |
| 93 | +total_mean = 0 |
| 94 | +total_count = 0 |
| 95 | +for i in range(num_files): |
| 96 | + df = pd.read_csv(os.path.join(output_dir, f'data_{i}.csv')) |
| 97 | + total_mean += df['value'].sum() |
| 98 | + total_count += len(df) |
| 99 | +computed_result_pandas = total_mean / total_count |
| 100 | +end_time_pandas = time.time() |
| 101 | + |
| 102 | +print(f"Pandas took {end_time_pandas - start_time_pandas} seconds to compute the mean across {num_files} files.") |
| 103 | +print(f"Result (Pandas): {computed_result_pandas}") |
| 104 | +``` |
| 105 | +You should get output like: |
| 106 | +``` |
| 107 | +5 large CSV files created in 'tmp/large_data_files'. |
| 108 | +<Client: 'tcp://127.0.0.1:45511' processes=4 threads=8, memory=59.60 GiB> |
| 109 | +Dask took 3.448112726211548 seconds to compute the mean across 5 files. |
| 110 | +Result (Dask): 50.010815178612596 |
| 111 | +Pandas took 9.641847610473633 seconds to compute the mean across 5 files. |
| 112 | +Result (Pandas): 50.01081517861258 |
| 113 | +``` |
0 commit comments