diff --git a/backtesting/backtesting.py b/backtesting/backtesting.py index 83ba8513..977798d2 100644 --- a/backtesting/backtesting.py +++ b/backtesting/backtesting.py @@ -8,12 +8,9 @@ from __future__ import annotations -import multiprocessing as mp -import os import sys import warnings from abc import ABCMeta, abstractmethod -from concurrent.futures import ProcessPoolExecutor, as_completed from copy import copy from functools import lru_cache, partial from itertools import chain, product, repeat @@ -23,6 +20,7 @@ import numpy as np import pandas as pd +from joblib import Parallel, delayed from numpy.random import default_rng try: @@ -1495,41 +1493,15 @@ def _optimize_grid() -> Union[pd.Series, Tuple[pd.Series, pd.Series]]: [p.values() for p in param_combos], names=next(iter(param_combos)).keys())) - def _batch(seq): - n = np.clip(int(len(seq) // (os.cpu_count() or 1)), 1, 300) - for i in range(0, len(seq), n): - yield seq[i:i + n] - - # Save necessary objects into "global" state; pass into concurrent executor - # (and thus pickle) nothing but two numbers; receive nothing but numbers. - # With start method "fork", children processes will inherit parent address space - # in a copy-on-write manner, achieving better performance/RAM benefit. - backtest_uuid = np.random.random() - param_batches = list(_batch(param_combos)) - Backtest._mp_backtests[backtest_uuid] = (self, param_batches, maximize) - try: - # If multiprocessing start method is 'fork' (i.e. on POSIX), use - # a pool of processes to compute results in parallel. - # Otherwise (i.e. on Windos), sequential computation will be "faster". - if mp.get_start_method(allow_none=False) == 'fork': - with ProcessPoolExecutor() as executor: - futures = [executor.submit(Backtest._mp_task, backtest_uuid, i) - for i in range(len(param_batches))] - for future in _tqdm(as_completed(futures), total=len(futures), - desc='Backtest.optimize'): - batch_index, values = future.result() - for value, params in zip(values, param_batches[batch_index]): - heatmap[tuple(params.values())] = value - else: - if os.name == 'posix': - warnings.warn("For multiprocessing support in `Backtest.optimize()` " - "set multiprocessing start method to 'fork'.") - for batch_index in _tqdm(range(len(param_batches))): - _, values = Backtest._mp_task(backtest_uuid, batch_index) - for value, params in zip(values, param_batches[batch_index]): - heatmap[tuple(params.values())] = value - finally: - del Backtest._mp_backtests[backtest_uuid] + with Parallel(prefer='threads', require='sharedmem', max_nbytes='50M', + n_jobs=-2, return_as='generator') as parallel: + results = _tqdm( + parallel(delayed(self._mp_task)(self, params, maximize=maximize) + for params in param_combos), + total=len(param_combos), + desc='Backtest.optimize') + for value, params in zip(results, param_combos): + heatmap[tuple(params.values())] = value if pd.isnull(heatmap).all(): # No trade was made in any of the runs. Just make a random @@ -1578,7 +1550,7 @@ def memoized_run(tup): stats = self.run(**dict(tup)) return -maximize(stats) - progress = iter(_tqdm(repeat(None), total=max_tries, leave=False, desc='Backtest.optimize')) + progress = iter(_tqdm(repeat(None), total=max_tries, desc='Backtest.optimize')) _names = tuple(kwargs.keys()) def objective_function(x): @@ -1623,11 +1595,9 @@ def cons(x): return output @staticmethod - def _mp_task(backtest_uuid, batch_index): - bt, param_batches, maximize_func = Backtest._mp_backtests[backtest_uuid] - return batch_index, [maximize_func(stats) if stats['# Trades'] else np.nan - for stats in (bt.run(**params) - for params in param_batches[batch_index])] + def _mp_task(bt, params, *, maximize): + stats = bt.run(**params) + return maximize(stats) if stats['# Trades'] else np.nan _mp_backtests: Dict[float, Tuple['Backtest', List, Callable]] = {} diff --git a/backtesting/test/_test.py b/backtesting/test/_test.py index 53596d4b..55f5ba48 100644 --- a/backtesting/test/_test.py +++ b/backtesting/test/_test.py @@ -621,18 +621,6 @@ def test_max_tries(self): **OPT_PARAMS) self.assertEqual(len(heatmap), 6) - def test_multiprocessing_windows_spawn(self): - df = GOOG.iloc[:100] - kw = {'fast': [10]} - - stats1 = Backtest(df, SmaCross).optimize(**kw) - with patch('multiprocessing.get_start_method', lambda **_: 'spawn'): - with self.assertWarns(UserWarning) as cm: - stats2 = Backtest(df, SmaCross).optimize(**kw) - - self.assertIn('multiprocessing support', cm.warning.args[0]) - assert stats1.filter(chars := tuple('[^_]')).equals(stats2.filter(chars)), (stats1, stats2) - def test_optimize_invalid_param(self): bt = Backtest(GOOG.iloc[:100], SmaCross) self.assertRaises(AttributeError, bt.optimize, foo=range(3)) @@ -648,7 +636,7 @@ def test_optimize_speed(self): start = time.process_time() bt.optimize(fast=(2, 5, 7), slow=[10, 15, 20, 30]) end = time.process_time() - self.assertLess(end - start, .2) + self.assertLess(end - start, 1) class TestPlot(TestCase): diff --git a/setup.py b/setup.py index 666bb22c..e331f5b3 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ 'numpy >= 1.17.0', 'pandas >= 0.25.0, != 0.25.0', 'bokeh >= 1.4.0, != 3.0.*', + 'joblib', ], extras_require={ 'doc': [