Skip to content

Commit 6de7c70

Browse files
authored
Merge pull request freqtrade#11625 from alisalama/develop
Parallelisation of iterative data downloads for speed improvement
2 parents 74f52b1 + e33363b commit 6de7c70

File tree

9 files changed

+387
-16
lines changed

9 files changed

+387
-16
lines changed

docs/commands/download-data.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ usage: freqtrade download-data [-h] [-v] [--no-color] [--logfile FILE] [-V]
44
[-p PAIRS [PAIRS ...]] [--pairs-file FILE]
55
[--days INT] [--new-pairs-days INT]
66
[--include-inactive-pairs]
7+
[--no-parallel-download]
78
[--timerange TIMERANGE] [--dl-trades]
89
[--convert] [--exchange EXCHANGE]
910
[-t TIMEFRAMES [TIMEFRAMES ...]] [--erase]
@@ -24,6 +25,9 @@ options:
2425
Default: `None`.
2526
--include-inactive-pairs
2627
Also download data from inactive pairs.
28+
--no-parallel-download
29+
Disable parallel startup download. Only use this if
30+
you experience issues.
2731
--timerange TIMERANGE
2832
Specify what timerange of data to use.
2933
--dl-trades Download trades instead of OHLCV data.

freqtrade/commands/arguments.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
"days",
165165
"new_pairs_days",
166166
"include_inactive",
167+
"no_parallel_download",
167168
"timerange",
168169
"download_trades",
169170
"convert_trades",

freqtrade/commands/cli_options.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,11 @@ def __init__(self, *args, **kwargs):
454454
help="Also download data from inactive pairs.",
455455
action="store_true",
456456
),
457+
"no_parallel_download": Arg(
458+
"--no-parallel-download",
459+
help="Disable parallel startup download. Only use this if you experience issues.",
460+
action="store_true",
461+
),
457462
"new_pairs_days": Arg(
458463
"--new-pairs-days",
459464
help="Download data of new pairs for given number of days. Default: `%(default)s`.",

freqtrade/configuration/configuration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ def _process_plot_options(self, config: Config) -> None:
391391
("timeframes", "timeframes --timeframes: {}"),
392392
("days", "Detected --days: {}"),
393393
("include_inactive", "Detected --include-inactive-pairs: {}"),
394+
("no_parallel_download", "Detected --no-parallel-download: {}"),
394395
("download_trades", "Detected --dl-trades: {}"),
395396
("convert_trades", "Detected --convert: {} - Converting Trade data to OHCV {}"),
396397
("dataformat_ohlcv", 'Using "{}" to store OHLCV data.'),

freqtrade/data/history/history_utils.py

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66
from pandas import DataFrame, concat
77

88
from freqtrade.configuration import TimeRange
9-
from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config
9+
from freqtrade.constants import (
10+
DATETIME_PRINT_FORMAT,
11+
DL_DATA_TIMEFRAMES,
12+
DOCS_LINK,
13+
Config,
14+
ListPairsWithTimeframes,
15+
PairWithTimeframe,
16+
)
1017
from freqtrade.data.converter import (
1118
clean_ohlcv_dataframe,
1219
convert_trades_to_ohlcv,
@@ -17,6 +24,7 @@
1724
from freqtrade.enums import CandleType, TradingMode
1825
from freqtrade.exceptions import OperationalException
1926
from freqtrade.exchange import Exchange
27+
from freqtrade.exchange.exchange_utils import date_minus_candles
2028
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
2129
from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det
2230
from freqtrade.util.migrations import migrate_data
@@ -226,6 +234,7 @@ def _download_pair_history(
226234
candle_type: CandleType,
227235
erase: bool = False,
228236
prepend: bool = False,
237+
pair_candles: DataFrame | None = None,
229238
) -> bool:
230239
"""
231240
Download latest candles from the exchange for the pair and timeframe passed in parameters
@@ -238,6 +247,7 @@ def _download_pair_history(
238247
:param timerange: range of time to download
239248
:param candle_type: Any of the enum CandleType (must match trading mode!)
240249
:param erase: Erase existing data
250+
:param pair_candles: Optional with "1 call" pair candles.
241251
:return: bool with success state
242252
"""
243253
data_handler = get_datahandler(datadir, data_handler=data_handler)
@@ -271,21 +281,40 @@ def _download_pair_history(
271281
"Current End: %s",
272282
f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None",
273283
)
274-
275-
# Default since_ms to 30 days if nothing is given
276-
new_dataframe = exchange.get_historic_ohlcv(
277-
pair=pair,
278-
timeframe=timeframe,
279-
since_ms=(
280-
since_ms
281-
if since_ms
282-
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
283-
),
284-
is_new_pair=data.empty,
285-
candle_type=candle_type,
286-
until_ms=until_ms if until_ms else None,
284+
# used to check if the passed in pair_candles (parallel downloaded) covers since_ms.
285+
# If we need more data, we have to fall back to the standard method.
286+
pair_candles_since_ms = (
287+
dt_ts(pair_candles.iloc[0]["date"])
288+
if pair_candles is not None and len(pair_candles.index) > 0
289+
else 0
287290
)
288-
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
291+
if (
292+
pair_candles is None
293+
or len(pair_candles.index) == 0
294+
or data.empty
295+
or prepend is True
296+
or erase is True
297+
or pair_candles_since_ms > (since_ms if since_ms else 0)
298+
):
299+
new_dataframe = exchange.get_historic_ohlcv(
300+
pair=pair,
301+
timeframe=timeframe,
302+
since_ms=(
303+
since_ms
304+
if since_ms
305+
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
306+
),
307+
is_new_pair=data.empty,
308+
candle_type=candle_type,
309+
until_ms=until_ms if until_ms else None,
310+
)
311+
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
312+
else:
313+
new_dataframe = pair_candles
314+
logger.info(
315+
f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method."
316+
)
317+
289318
if data.empty:
290319
data = new_dataframe
291320
else:
@@ -330,6 +359,7 @@ def refresh_backtest_ohlcv_data(
330359
data_format: str | None = None,
331360
prepend: bool = False,
332361
progress_tracker: CustomProgress | None = None,
362+
no_parallel_download: bool = False,
333363
) -> list[str]:
334364
"""
335365
Refresh stored ohlcv data for backtesting and hyperopt operations.
@@ -339,6 +369,7 @@ def refresh_backtest_ohlcv_data(
339369
progress_tracker = retrieve_progress_tracker(progress_tracker)
340370

341371
pairs_not_available = []
372+
fast_candles: dict[PairWithTimeframe, DataFrame] = {}
342373
data_handler = get_datahandler(datadir, data_format)
343374
candle_type = CandleType.get_default(trading_mode)
344375
with progress_tracker as progress:
@@ -355,6 +386,30 @@ def refresh_backtest_ohlcv_data(
355386
logger.info(f"Skipping pair {pair}...")
356387
continue
357388
for timeframe in timeframes:
389+
# Get fast candles via parallel method on first loop through per timeframe
390+
# and candle type. Downloads all the pairs in the list and stores them.
391+
if (
392+
not no_parallel_download
393+
and exchange.get_option("download_data_parallel_quick", True)
394+
and (
395+
((pair, timeframe, candle_type) not in fast_candles)
396+
and (erase is False)
397+
and (prepend is False)
398+
)
399+
):
400+
fast_candles.update(
401+
_download_all_pairs_history_parallel(
402+
exchange=exchange,
403+
pairs=pairs,
404+
timeframe=timeframe,
405+
candle_type=candle_type,
406+
timerange=timerange,
407+
)
408+
)
409+
410+
# get the already downloaded pair candles if they exist
411+
pair_candles = fast_candles.pop((pair, timeframe, candle_type), None)
412+
358413
progress.update(timeframe_task, description=f"Timeframe {timeframe}")
359414
logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.")
360415
_download_pair_history(
@@ -368,6 +423,7 @@ def refresh_backtest_ohlcv_data(
368423
candle_type=candle_type,
369424
erase=erase,
370425
prepend=prepend,
426+
pair_candles=pair_candles, # optional pass of dataframe of parallel candles
371427
)
372428
progress.update(timeframe_task, advance=1)
373429
if trading_mode == "futures":
@@ -404,6 +460,41 @@ def refresh_backtest_ohlcv_data(
404460
return pairs_not_available
405461

406462

463+
def _download_all_pairs_history_parallel(
464+
exchange: Exchange,
465+
pairs: list[str],
466+
timeframe: str,
467+
candle_type: CandleType,
468+
timerange: TimeRange | None = None,
469+
) -> dict[PairWithTimeframe, DataFrame]:
470+
"""
471+
Allows to use the faster parallel async download method for many coins
472+
but only if the data is short enough to be retrieved in one call.
473+
Used by freqtrade download-data subcommand.
474+
:return: Candle pairs with timeframes
475+
"""
476+
candles: dict[PairWithTimeframe, DataFrame] = {}
477+
since = 0
478+
if timerange:
479+
if timerange.starttype == "date":
480+
since = timerange.startts * 1000
481+
482+
candle_limit = exchange.ohlcv_candle_limit(timeframe, candle_type)
483+
one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit))
484+
# check if we can get all candles in one go, if so then we can download them in parallel
485+
if since > one_call_min_time_dt:
486+
logger.info(
487+
f"Downloading parallel candles for {timeframe} for all pairs "
488+
f"since {format_ms_time(since)}"
489+
)
490+
needed_pairs: ListPairsWithTimeframes = [
491+
(p, timeframe, candle_type) for p in [p for p in pairs]
492+
]
493+
candles = exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since, cache=False)
494+
495+
return candles
496+
497+
407498
def _download_trades_history(
408499
exchange: Exchange,
409500
pair: str,
@@ -702,6 +793,7 @@ def download_data(
702793
trading_mode=config.get("trading_mode", "spot"),
703794
prepend=config.get("prepend_data", False),
704795
progress_tracker=progress_tracker,
796+
no_parallel_download=config.get("no_parallel_download", False),
705797
)
706798
finally:
707799
if pairs_not_available:

freqtrade/exchange/exchange.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ class Exchange:
138138
"ohlcv_has_history": True, # Some exchanges (Kraken) don't provide history via ohlcv
139139
"ohlcv_partial_candle": True,
140140
"ohlcv_require_since": False,
141+
"download_data_parallel_quick": True,
141142
"always_require_api_keys": False, # purge API keys for Dry-run. Must default to false.
142143
# Check https://github.com/ccxt/ccxt/issues/10767 for removal of ohlcv_volume_currency
143144
"ohlcv_volume_currency": "base", # "base" or "quote"

freqtrade/exchange/exchange_types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class FtHas(TypedDict, total=False):
2828
ohlcv_volume_currency: str
2929
ohlcv_candle_limit_per_timeframe: dict[str, int]
3030
always_require_api_keys: bool
31+
# allow disabling of parallel download-data for specific exchanges
32+
download_data_parallel_quick: bool
3133
# Tickers
3234
tickers_have_quoteVolume: bool
3335
tickers_have_percentage: bool

freqtrade/exchange/hyperliquid.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class Hyperliquid(Exchange):
2828
"stoploss_on_exchange": False,
2929
"exchange_has_overrides": {"fetchTrades": False},
3030
"marketOrderRequiresPrice": True,
31+
"download_data_parallel_quick": False,
3132
"ws_enabled": True,
3233
}
3334
_ft_has_futures: FtHas = {

0 commit comments

Comments
 (0)