Skip to content

Commit f33270a

Browse files
committed
#78 Optimize historical data fetching with multi-threading
1 parent 0579244 commit f33270a

File tree

1 file changed

+46
-27
lines changed

1 file changed

+46
-27
lines changed

tksbrokerapi/TKSBrokerAPI.py

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
import requests
104104
from dateutil.tz import tzlocal
105105
from mako.template import Template # Mako Templates for Python (https://www.makotemplates.org/). Mako is a template library provides simple syntax and maximum performance.
106-
from concurrent.futures import ThreadPoolExecutor
106+
from concurrent.futures import ThreadPoolExecutor, as_completed
107107

108108
# Add the current dir for the local run:
109109
packageDir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
@@ -3037,6 +3037,26 @@ def _InfoStr(data1: dict, data2: dict, data3: dict, data4: dict, cur: str = "")
30373037

30383038
return ops, customStat
30393039

3040+
def _DownloadHistoryBlock(self, blockStart, blockEnd, interval):
3041+
"""
3042+
Internal method for downloading a single history block (used in threads).
3043+
"""
3044+
# REST API for request: https://tinkoff.github.io/investAPI/swagger-ui/#/MarketDataService/MarketDataService_GetCandles
3045+
historyURL = self.server + r"/tinkoff.public.invest.api.contract.v1.MarketDataService/GetCandles"
3046+
3047+
self.body = str({
3048+
"figi": self._figi,
3049+
"from": blockStart.strftime(TKS_DATE_TIME_FORMAT),
3050+
"to": blockEnd.strftime(TKS_DATE_TIME_FORMAT),
3051+
"interval": TKS_CANDLE_INTERVALS[interval][0]
3052+
})
3053+
3054+
if self.moreDebug:
3055+
threadName = threading.current_thread().name
3056+
uLogger.debug(f"Started downloading block [{blockStart}{blockEnd}] in thread [{threadName}]")
3057+
3058+
return blockStart, blockEnd, self.SendAPIRequest(historyURL, reqType="POST", methodName="GetCandles")
3059+
30403060
def History(self, start: str = None, end: str = None, interval: str = "hour", onlyMissing: bool = False, csvSep: str = ",", show: bool = True) -> pd.DataFrame:
30413061
"""
30423062
This method returns last history candles of the current instrument defined by `ticker` or `figi` (FIGI id).
@@ -3133,48 +3153,41 @@ def History(self, start: str = None, end: str = None, interval: str = "hour", on
31333153
uLogger.error("📝 An issue occurred while loading file [{}] — possibly due to incorrect format. It will be rewritten. Message: {}".format(os.path.abspath(self.historyFile), e))
31343154

31353155
responseJSONs = [] # raw history blocks of data
3136-
3156+
blockRanges = []
31373157
blockEnd = dtEnd
3158+
31383159
for item in range(blocks):
31393160
tail = length % TKS_CANDLE_INTERVALS[interval][2] if item + 1 == blocks else TKS_CANDLE_INTERVALS[interval][2]
31403161
blockStart = blockEnd - timedelta(minutes=TKS_CANDLE_INTERVALS[interval][1] * tail)
31413162

3142-
if show and self.moreDebug:
3143-
uLogger.debug("[Block #{}/{}] time period: [{}] UTC - [{}] UTC".format(
3144-
item + 1, blocks, blockStart.strftime(TKS_DATE_TIME_FORMAT), blockEnd.strftime(TKS_DATE_TIME_FORMAT),
3145-
))
3163+
if blockStart != blockEnd:
3164+
blockRanges.append((blockStart, blockEnd))
31463165

3147-
if blockStart == blockEnd:
3148-
if show and self.moreDebug:
3149-
uLogger.debug("Skipped this zero-length block...")
3166+
blockEnd = blockStart
31503167

3151-
else:
3152-
# REST API for request: https://tinkoff.github.io/investAPI/swagger-ui/#/MarketDataService/MarketDataService_GetCandles
3153-
historyURL = self.server + r"/tinkoff.public.invest.api.contract.v1.MarketDataService/GetCandles"
3154-
self.body = str({
3155-
"figi": self._figi,
3156-
"from": blockStart.strftime(TKS_DATE_TIME_FORMAT),
3157-
"to": blockEnd.strftime(TKS_DATE_TIME_FORMAT),
3158-
"interval": TKS_CANDLE_INTERVALS[interval][0]
3159-
})
3160-
responseJSON = self.SendAPIRequest(historyURL, reqType="POST")
3168+
with ThreadPoolExecutor(max_workers=min(CPU_USAGES, len(blockRanges))) as executor:
3169+
futures = [executor.submit(self._DownloadHistoryBlock, bStart, bEnd, interval) for bStart, bEnd in blockRanges]
3170+
3171+
for future in as_completed(futures):
3172+
try:
3173+
bStart, bEnd, responseJSON = future.result()
31613174

3162-
if "code" in responseJSON.keys():
3163-
if show and self.moreDebug:
3164-
uLogger.debug("An issue occurred and block #{}/{} is empty".format(item + 1, blocks))
3175+
if "code" in responseJSON:
3176+
if show and self.moreDebug:
3177+
uLogger.debug(f"Block [{bStart}{bEnd}] returned an error or empty result")
31653178

3166-
else:
3167-
if "candles" in responseJSON.keys():
3179+
elif "candles" in responseJSON:
31683180
if start is not None and (start.lower() == "yesterday" or start == end) and interval == "day" and len(responseJSON["candles"]) > 1:
31693181
responseJSON["candles"] = responseJSON["candles"][:-1] # removes last candle for "yesterday" request
31703182

3171-
responseJSONs = responseJSON["candles"] + responseJSONs # add more old history behind newest dates
3183+
responseJSONs.extend(responseJSON["candles"])
31723184

31733185
else:
31743186
if show and self.moreDebug:
3175-
uLogger.debug("`candles` key not in responseJSON keys! Block #{}/{} is empty".format(item + 1, blocks))
3187+
uLogger.debug(f"`candles` key missing in block [{bStart}{bEnd}]")
31763188

3177-
blockEnd = blockStart
3189+
except Exception as e:
3190+
uLogger.debug(f"❌ Error while downloading history block: {e}")
31783191

31793192
if responseJSONs:
31803193
tempHistory = pd.DataFrame(
@@ -3212,6 +3225,12 @@ def History(self, start: str = None, end: str = None, interval: str = "hour", on
32123225
else:
32133226
history = tempHistory # if no `--only-missing` key then load full data from server
32143227

3228+
# Sorting blocks by date and time:
3229+
history["__dt"] = pd.to_datetime(history["date"] + " " + history["time"], format="%Y.%m.%d %H:%M")
3230+
history.sort_values(by="__dt", inplace=True)
3231+
history.drop(columns=["__dt"], inplace=True)
3232+
history.reset_index(drop=True, inplace=True)
3233+
32153234
if show and self.moreDebug:
32163235
uLogger.debug("Last 3 rows of received history:\n{}".format(pd.DataFrame.to_string(history[["date", "time", "open", "high", "low", "close", "volume"]][-3:], max_cols=20, index=False)))
32173236

0 commit comments

Comments
 (0)