|
| 1 | +--- |
| 2 | +title: '`ezpq`: an easy parallel queueing system.' |
| 3 | +output: |
| 4 | + github_document: |
| 5 | + toc: true |
| 6 | + toc_depth: 3 |
| 7 | +--- |
| 8 | + |
| 9 | +```{r setup, include=FALSE} |
| 10 | +library(knitr) |
| 11 | +library(reticulate) |
| 12 | +use_python('~/envs/mypy36/bin/python', required = TRUE) |
| 13 | +knitr::opts_chunk$set(echo=FALSE, warning=FALSE) |
| 14 | +py_config() |
| 15 | +``` |
| 16 | + |
| 17 | +```{python} |
| 18 | +import os |
| 19 | +import sys |
| 20 | +import pprint |
| 21 | +
|
| 22 | +print = pprint.pprint |
| 23 | +
|
| 24 | +if os.path.exists('./ezpq/__init__.py') and sys.path[0] != os.getcwd(): |
| 25 | + sys.path.insert(0, os.getcwd()) |
| 26 | +import ezpq |
| 27 | +``` |
| 28 | + |
| 29 | +Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/). |
| 30 | + |
| 31 | +## Overview |
| 32 | + |
| 33 | +`ezpq` implements a parallel queueing system consisting of: |
| 34 | + |
| 35 | +1. a priority "waiting" queue in. |
| 36 | +2. a lookup table of "working" jobs. |
| 37 | +3. a priority "completed" queue out. |
| 38 | + |
| 39 | +The queueing system uses `multiprocessing.Process` by default and can also run jobs with `threading.Thread`. |
| 40 | + |
| 41 | + |
| 42 | + |
| 43 | +## Features |
| 44 | + |
| 45 | +* Simple interface; pure Python. |
| 46 | +* No required dependencies outside of standard library. |
| 47 | +* Optional integration with [`tqdm`](https://github.com/tqdm/tqdm) progress bars. |
| 48 | +* Compatible with Python 2 & 3. |
| 49 | +* Cross platform with MacOS, Linux, and Windows. |
| 50 | +* Data remains in-memory. |
| 51 | +* Priority Queueing, both in and out and within lanes. |
| 52 | +* Synchronous lanes allow dependent jobs to execute in the desired order. |
| 53 | +* Easily switch from processes to threads. |
| 54 | +* Automatic handling of output. |
| 55 | +* Rich job details, easily viewed as pandas dataframe. |
| 56 | +* Built-in logging to CSV. |
| 57 | +* Customizable visualizations of queue operations. |
| 58 | + |
| 59 | +## How to get it |
| 60 | + |
| 61 | +Install from [PyPI](https://pypi.org/project/ezpq/) with: |
| 62 | + |
| 63 | +```python |
| 64 | +pip install ezpq |
| 65 | +``` |
| 66 | + |
| 67 | +Optional packages: |
| 68 | + |
| 69 | +```python |
| 70 | +pip install pandas # required for plots |
| 71 | +pip install plotnine # required for plots |
| 72 | +pip install tqdm # required for progress bars |
| 73 | +``` |
| 74 | + |
| 75 | +## Quickstart |
| 76 | + |
| 77 | +Suppose you wanted to speed up the following code, which runs 60 operations that take anywhere from 0s to 2s. With an average job time of ~1s, this operation should take ~60s. |
| 78 | + |
| 79 | +```{python, echo=TRUE} |
| 80 | +import time |
| 81 | +import random |
| 82 | +
|
| 83 | +def random_sleep(x): |
| 84 | + random.seed(x) |
| 85 | + n = random.uniform(0.5, 1.5) |
| 86 | + time.sleep(n) |
| 87 | + return n |
| 88 | +``` |
| 89 | + |
| 90 | +```{python, echo=TRUE} |
| 91 | +start = time.time() |
| 92 | +
|
| 93 | +output = [random_sleep(x) for x in range(60)] |
| 94 | +
|
| 95 | +end = time.time() |
| 96 | +
|
| 97 | +print('> Runtime: ' + str(end - start)) |
| 98 | +``` |
| 99 | + |
| 100 | +Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s. |
| 101 | + |
| 102 | +```{python, echo=TRUE} |
| 103 | +start = time.time() |
| 104 | +``` |
| 105 | + |
| 106 | +```{python, echo=TRUE} |
| 107 | +with ezpq.Queue(6) as Q: |
| 108 | + output = Q.map(random_sleep, range(60)) |
| 109 | +``` |
| 110 | + |
| 111 | +```{python, echo=TRUE} |
| 112 | +end = time.time() |
| 113 | +print('> Runtime: ' + str(end - start)) |
| 114 | +``` |
| 115 | + |
| 116 | +Here is the same scenario, using the `@ezpq.Queue` decorator. |
| 117 | + |
| 118 | +```{python, echo=TRUE} |
| 119 | +@ezpq.Queue(6) |
| 120 | +def random_sleep(x): |
| 121 | + random.seed(x) |
| 122 | + n = random.uniform(0.5, 1.5) |
| 123 | + time.sleep(n) |
| 124 | + return n |
| 125 | +
|
| 126 | +output = random_sleep(iterable=range(60)) |
| 127 | +``` |
| 128 | + |
| 129 | +```{python} |
| 130 | +# redefine for future functions |
| 131 | +def random_sleep(x): |
| 132 | + random.seed(x) |
| 133 | + n = random.uniform(0.5, 1.5) |
| 134 | + time.sleep(n) |
| 135 | + return n |
| 136 | +``` |
| 137 | + |
| 138 | +While `map()` and the decorator are useful for quick-n-simple parallization, the essential functions of an `ezpq` Queue include `put()`, `wait()`, and `get()` (or `collect()`). |
| 139 | + |
| 140 | +```{python, echo=TRUE} |
| 141 | +with ezpq.Queue(6) as Q: |
| 142 | + for x in range(60): |
| 143 | + Q.put(random_sleep, args=x) |
| 144 | + Q.wait() |
| 145 | + output = Q.collect() |
| 146 | +``` |
| 147 | + |
| 148 | +The output is a list of dicts containing verbose information about each job, along with its output, and exit code. |
| 149 | + |
| 150 | +```{python, echo=TRUE} |
| 151 | +print( output[0] ) |
| 152 | +``` |
| 153 | + |
| 154 | +Easily convert output to a `pandas` dataframe: |
| 155 | + |
| 156 | +```{python, echo=TRUE} |
| 157 | +import pandas as pd |
| 158 | +
|
| 159 | +df = pd.DataFrame(output) |
| 160 | +
|
| 161 | +print( df.head()[['id', 'output', 'runtime', 'exitcode']] ) |
| 162 | +``` |
| 163 | + |
| 164 | +Use `ezpq.Plot` to generate a Gannt chart of the job timings. |
| 165 | + |
| 166 | +```{python, echo=TRUE} |
| 167 | +plt = ezpq.Plot(output).build(show_legend=False) |
| 168 | +plt.save('docs/imgs/quickstart.png') |
| 169 | +``` |
| 170 | + |
| 171 | + |
| 172 | + |
| 173 | +## `ezpq.Queue` |
| 174 | + |
| 175 | +The `Queue` class implements the queueing system, which is itself a 3-part system composed of the: |
| 176 | + |
| 177 | +1. waiting queue |
| 178 | +2. working table |
| 179 | +3. completed queue |
| 180 | + |
| 181 | +```{python} |
| 182 | +print(help(ezpq.Queue.__init__)) |
| 183 | +``` |
| 184 | + |
| 185 | +## `ezpq.Job` |
| 186 | + |
| 187 | +A `ezpq` job defines the function to run. It is passed to an `ezpq` queue with a call to `submit()`. |
| 188 | + |
| 189 | +```{python} |
| 190 | +print(help(ezpq.Job.__init__)) |
| 191 | +``` |
| 192 | + |
| 193 | +```{python, echo=TRUE} |
| 194 | +with ezpq.Queue(6) as Q: |
| 195 | + for x in range(60): |
| 196 | + priority = x % 2 # give even numbers higher priority. |
| 197 | + job = ezpq.Job(random_sleep, args=x, priority=priority) |
| 198 | + Q.submit(job) |
| 199 | + Q.wait() |
| 200 | + output = Q.collect() |
| 201 | +``` |
| 202 | + |
| 203 | +```{python} |
| 204 | +plt = ezpq.Plot(output).build(color_by='priority', |
| 205 | + color_pal=['blue', 'green']) |
| 206 | +plt.save('docs/imgs/submit.png') |
| 207 | +``` |
| 208 | + |
| 209 | + |
| 210 | + |
| 211 | + |
| 212 | +### `put` |
| 213 | + |
| 214 | +The `put` method creates a job and submits it to an `ezpq` queue. All of its arguments are passed to `ezpq.Job()`. |
| 215 | + |
| 216 | +```{python, echo=TRUE} |
| 217 | +with ezpq.Queue(6) as Q: |
| 218 | + for x in range(60): |
| 219 | + Q.put(random_sleep, args=x) |
| 220 | + Q.wait() |
| 221 | + output = Q.collect() |
| 222 | +``` |
| 223 | + |
| 224 | +### `size` |
| 225 | + |
| 226 | +`size()` returns a count of all items across all three queue components. It accepts three boolean parameters, `waiting`, `working`, and `completed`. If all of these are `False` (default), all jobs are counted. If any combination of these is `True`, only the corresponding queue(s) will be counted. For example: |
| 227 | + |
| 228 | +```{python, echo=TRUE} |
| 229 | +def print_sizes(Q): |
| 230 | + msg = 'Total: {0}; Waiting: {1}; Working: {2}; Completed: {3}'.format( |
| 231 | + Q.size(), |
| 232 | + Q.size(waiting=True), |
| 233 | + Q.size(working=True), |
| 234 | + Q.size(completed=True) |
| 235 | + ) |
| 236 | + print(msg) |
| 237 | +``` |
| 238 | + |
| 239 | +```{python, echo=TRUE} |
| 240 | +with ezpq.Queue(6) as Q: |
| 241 | + # enqueue jobs |
| 242 | + for x in range(60): |
| 243 | + Q.put(random_sleep, x) |
| 244 | +
|
| 245 | + # repeatedly print sizes until complete. |
| 246 | + while Q.has_work(): |
| 247 | + print_sizes(Q) |
| 248 | + time.sleep(1) |
| 249 | +
|
| 250 | + print_sizes(Q) |
| 251 | +``` |
| 252 | + |
| 253 | +### `wait` |
| 254 | + |
| 255 | +The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1). |
| 256 | + |
| 257 | +New in v0.2.0, include `show_progress=True` to show a progress bar while waiting. This is equivalent to a call to `waitpb()`. |
| 258 | + |
| 259 | + |
| 260 | + |
| 261 | + |
| 262 | +### `get` |
| 263 | + |
| 264 | +`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if the `poll` frequency is greater than 0. If the timeout is exceeded, `None` is returned. |
| 265 | + |
| 266 | +```{python, echo=TRUE} |
| 267 | +with ezpq.Queue(6) as Q: |
| 268 | + n_inputs = 60 |
| 269 | + output = [None] * n_inputs |
| 270 | + # enqueue jobs |
| 271 | + for x in range(n_inputs): |
| 272 | + Q.put(random_sleep, args=x) |
| 273 | + |
| 274 | + # repeatedly `get()` queue is empty. |
| 275 | + for i in range(n_inputs): |
| 276 | + output[i] = Q.get(poll=0.1) |
| 277 | +``` |
| 278 | + |
| 279 | +### `collect` |
| 280 | + |
| 281 | +`collect()` is similar to `get()`, but it will return a list of *all* completed jobs and clear the completed queue. It does not support the `poll` or `timeout` parameters, but you can call `wait()` before `collect()` if desired. |
| 282 | + |
| 283 | +```{python, echo=TRUE} |
| 284 | +with ezpq.Queue(6) as Q: |
| 285 | + # enqueue jobs |
| 286 | + for x in range(60): |
| 287 | + Q.put(random_sleep, x) |
| 288 | +
|
| 289 | + # wait and collect all jobs |
| 290 | + print('Queue size before: {0}'.format(Q.size())) |
| 291 | +
|
| 292 | + Q.wait() |
| 293 | + output = Q.collect() |
| 294 | +
|
| 295 | + print('Queue size after: {0}'.format(Q.size())) |
| 296 | + print('Output size: {0}'.format(len(output))) |
| 297 | +``` |
| 298 | + |
| 299 | +### `map` |
| 300 | + |
| 301 | +`map` encapsulates the logic of `put`, `wait`, and `collect` in one call. Include `show_progress=True` to get output `tqdm` progress bar. |
| 302 | + |
| 303 | + |
| 304 | + |
| 305 | +### `dispose` |
| 306 | + |
| 307 | +The queueing operations performed by `ezpq.Queue` are performed on a periodic basis. By default, the `poll` parameter for a Queue is `0.1` seconds. This "pulse" thread will continue firing until the Queue is disposed of. |
| 308 | + |
| 309 | +In the previous examples, use of the context manager (`with ezpq.Queue() as Q:`) results in automatic disposal. If not using the context manager (or decorator), clean up after yourself with `dispose()`. |
| 310 | + |
| 311 | +```{python} |
| 312 | +Q = ezpq.Queue(6) |
| 313 | +
|
| 314 | +Q.map(random_sleep, range(60)) |
| 315 | +
|
| 316 | +Q.dispose() |
| 317 | +``` |
| 318 | + |
| 319 | + |
| 320 | +## Synchronous Lanes |
| 321 | + |
| 322 | +When you have jobs that are dependent upon another, you can use "lanes" to execute them in sequence. All that is required is an arbitrary lane name/id passed to the `lane` parameter of `put`. Empty lanes are automatically removed. |
| 323 | + |
| 324 | + |
| 325 | + |
| 326 | +In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously. |
| 327 | + |
| 328 | +## `ezpq.Plot` |
| 329 | + |
| 330 | +The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`. |
| 331 | + |
| 332 | +Arguments given to `build()` control various aspects of the plot, from coloring, to faceting, |
| 333 | + |
| 334 | +```{python} |
| 335 | +print(help(ezpq.Plot.build)) |
| 336 | +``` |
| 337 | + |
| 338 | +```{python, echo=TRUE} |
| 339 | +with ezpq.Queue(6) as Q: |
| 340 | + for x in range(60): |
| 341 | + lane = x % 5 |
| 342 | + Q.put(random_sleep, x, timeout=1, lane=lane) |
| 343 | + Q.wait() |
| 344 | + output = Q.collect() |
| 345 | +``` |
| 346 | + |
| 347 | +```{python, echo=TRUE} |
| 348 | +plt = ezpq.Plot(output).build(facet_by='lane', show_legend=False) |
| 349 | +plt.save('docs/imgs/lanes2.png') |
| 350 | +``` |
| 351 | + |
| 352 | + |
| 353 | + |
| 354 | +Each horizontal bar represents an independent job id. The start of the gray bar indicates when the job entered the queuing system. The start of the colored bar indicates when the job started running, and when it ended. The gray bar that follows (if any) reflects how long it took for the queue operations to recognize the finished job, join the job data with its output, remove it from the working table, and place it in the completed queue. |
| 355 | + |
| 356 | +## More Examples |
| 357 | + |
| 358 | +Many more examples can be found in [docs/examples.ipynb](//github.com/dm3ll3n/ezpq/blob/master/docs/examples.ipynb). |
0 commit comments