|
237 | 237 | "\n", |
238 | 238 | "</div>" |
239 | 239 | ] |
| 240 | + }, |
| 241 | + { |
| 242 | + "cell_type": "markdown", |
| 243 | + "metadata": {}, |
| 244 | + "source": [ |
| 245 | + "## Guide to running ONE with multi-processes workflows.\n", |
| 246 | + "\n", |
| 247 | + "When using ONE with multiple processes and making simultaneous requests to the remote database, these requests can lead to connection errors like JSON.DecodeError and HTTPError.\n", |
| 248 | + "\n", |
| 249 | + "To get around these errors, it is very useful to generate the above mentioned parquet files containing the cache data. Then ONE can be initialized in local mode and the saved cache can be loaded. This ensures no additional database requests are made during the parallel run, and hence no connection errors.\n", |
| 250 | + "\n", |
| 251 | + "Below is an example, where all the probe-insertions for a project which have the spike sorting data are first queried and then the cache is saved. \n", |
| 252 | + "## Example\n" |
| 253 | + ] |
| 254 | + }, |
| 255 | + { |
| 256 | + "cell_type": "code", |
| 257 | + "execution_count": null, |
| 258 | + "metadata": {}, |
| 259 | + "outputs": [], |
| 260 | + "source": [ |
| 261 | + "from itertools import batched\n", |
| 262 | + "\n", |
| 263 | + "from one.api import ONE\n", |
| 264 | + "from one.converters import datasets2records\n", |
| 265 | + "from one.alf.cache import merge_tables\n", |
| 266 | + "\n", |
| 267 | + "# To generate the cache, we need to use the remote mode of ONE\n", |
| 268 | + "one = ONE()\n", |
| 269 | + "#Get the list of eids that have the spikes.times.npy dataset.\n", |
| 270 | + "eids = one.search(project='u19_proj1_multiareacom', datasets='spikes.times.npy')\n", |
| 271 | + "# Update datasets\n", |
| 272 | + "#Batching is being done to avoid multiple requests to the database.\n", |
| 273 | + "for batch in batched(map(str, eids), 50):\n", |
| 274 | + " # these rest queries update the one._cache object in memory\n", |
| 275 | + " dsets = one.alyx.rest('datasets', 'list', django=f'session__in,{batch}')\n", |
| 276 | + " df = datasets2records(dsets)\n", |
| 277 | + " merge_tables(one._cache, datasets=df, origin=one.alyx.base_url)\n", |
| 278 | + "\n", |
| 279 | + "\n", |
| 280 | + "#Provide the location of the directory where the cache will be saved.\n", |
| 281 | + "one.save_cache(\"mutli_area_cache\")" |
| 282 | + ] |
| 283 | + }, |
| 284 | + { |
| 285 | + "cell_type": "markdown", |
| 286 | + "metadata": {}, |
| 287 | + "source": [ |
| 288 | + "The above script will save `datasets.pqt` and `sessions.pqt` file in the directory `mutli_area_cache`.\n", |
| 289 | + "\n", |
| 290 | + "You can then initialize ONE in local mode within your parallelized scripts - whether using SLURM, joblib, or multiprocessing." |
| 291 | + ] |
| 292 | + }, |
| 293 | + { |
| 294 | + "cell_type": "code", |
| 295 | + "execution_count": null, |
| 296 | + "metadata": {}, |
| 297 | + "outputs": [], |
| 298 | + "source": [ |
| 299 | + "import pandas as pd\n", |
| 300 | + "from one.api import ONE\n", |
| 301 | + "from joblib import Parallel, delayed\n", |
| 302 | + "\n", |
| 303 | + "if __name__ == \"__main__\":\n", |
| 304 | + " one = ONE(mode='local', tables_dir=\"/path/to/mutli_area_cache\")\n", |
| 305 | + " eid_list = ['eid1', 'eid2', 'eid3']\n", |
| 306 | + " results = Parallel(n_jobs=-1, verbose=10)(delayed(one.list_collections)(eid) for eid in eid_list)\n", |
| 307 | + " print(results)" |
| 308 | + ] |
240 | 309 | } |
241 | 310 | ], |
242 | 311 | "metadata": { |
|
0 commit comments