Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions meta/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from meta.config_tickers import DOW_30_TICKER
from meta.data_processors._base import DataSource
from meta.data_processors._base import IndicatorLib


class DataProcessor:
Expand Down Expand Up @@ -103,11 +104,14 @@ def clean_data(self):
self.dataframe = self.processor.dataframe

def add_technical_indicator(
self, tech_indicator_list: List[str], select_stockstats_talib: int = 0
self,
tech_indicator_list: list[str],
select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS,
drop_na_timesteps: int = 1,
):
self.tech_indicator_list = tech_indicator_list
self.processor.add_technical_indicator(
tech_indicator_list, select_stockstats_talib
tech_indicator_list, select_stockstats_talib, drop_na_timesteps
)
self.dataframe = self.processor.dataframe

Expand Down Expand Up @@ -151,7 +155,7 @@ def run(
technical_indicator_list: List[str],
if_vix: bool,
cache: bool = False,
select_stockstats_talib: int = 0,
select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS,
):
if self.time_interval == "1s" and self.data_source != DataSource.binance:
raise ValueError(
Expand Down
163 changes: 104 additions & 59 deletions meta/data_processors/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ class DataSource(Enum):
yahoofinance = "yahoofinance"


class IndicatorLib(Enum):
"""
An enumeration of technical indicator libraries.
"""

STOCKSTATS = "stockstats"
TALIB = "talib"


class _Base:
def __init__(
self,
Expand Down Expand Up @@ -171,7 +180,7 @@ def get_trading_days(self, start: str, end: str) -> List[str]:
def add_technical_indicator(
self,
tech_indicator_list: List[str],
select_stockstats_talib: int = 0,
select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS,
drop_na_timesteps: int = 1,
):
"""
Expand All @@ -191,65 +200,14 @@ def add_technical_indicator(
self.dataframe.drop(columns=["level_1"], inplace=True)
if "level_0" in self.dataframe.columns and "tic" not in self.dataframe.columns:
self.dataframe.rename(columns={"level_0": "tic"}, inplace=True)
assert select_stockstats_talib in {0, 1}

print("tech_indicator_list: ", tech_indicator_list)
if select_stockstats_talib == 0: # use stockstats
stock = stockstats.StockDataFrame.retype(self.dataframe)
unique_ticker = stock.tic.unique()
for indicator in tech_indicator_list:
print("indicator: ", indicator)
indicator_df = pd.DataFrame()
for i in range(len(unique_ticker)):
try:
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
temp_indicator = pd.DataFrame(temp_indicator)
temp_indicator["tic"] = unique_ticker[i]
temp_indicator["time"] = self.dataframe[
self.dataframe.tic == unique_ticker[i]
]["time"].to_list()
indicator_df = pd.concat(
[indicator_df, temp_indicator],
axis=0,
join="outer",
ignore_index=True,
)
except Exception as e:
print(e)
if not indicator_df.empty:
self.dataframe = self.dataframe.merge(
indicator_df[["tic", "time", indicator]],
on=["tic", "time"],
how="left",
)
else: # use talib
final_df = pd.DataFrame()
for i in self.dataframe.tic.unique():
tic_df = self.dataframe[self.dataframe.tic == i]
(
tic_df.loc["macd"],
tic_df.loc["macd_signal"],
tic_df.loc["macd_hist"],
) = talib.MACD(
tic_df["close"],
fastperiod=12,
slowperiod=26,
signalperiod=9,
)
tic_df.loc["rsi"] = talib.RSI(tic_df["close"], timeperiod=14)
tic_df.loc["cci"] = talib.CCI(
tic_df["high"],
tic_df["low"],
tic_df["close"],
timeperiod=14,
)
tic_df.loc["dx"] = talib.DX(
tic_df["high"],
tic_df["low"],
tic_df["close"],
timeperiod=14,
)
final_df = pd.concat([final_df, tic_df], axis=0, join="outer")
self.dataframe = final_df
if select_stockstats_talib == IndicatorLib.STOCKSTATS:
self._add_technical_indicator_stockstats(tech_indicator_list)
elif select_stockstats_talib == IndicatorLib.TALIB:
self._add_technical_indicator_talib(tech_indicator_list)
else:
raise ValueError(f"Unknown indicator library: {indicator_lib}")

self.dataframe.sort_values(by=["time", "tic"], inplace=True)
if drop_na_timesteps:
Expand All @@ -259,6 +217,93 @@ def add_technical_indicator(
self.dataframe = self.dataframe[~self.dataframe.time.isin(time_to_drop)]
print("Succesfully add technical indicators")

def _add_technical_indicator_stockstats(self, tech_indicator_list: List[str]):
stock = stockstats.StockDataFrame.retype(self.dataframe.copy())
unique_ticker = stock.tic.unique()
for indicator in tech_indicator_list:
print("indicator: ", indicator)
indicator_df = pd.DataFrame()
for i in range(len(unique_ticker)):
try:
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
temp_indicator = pd.DataFrame(temp_indicator)
temp_indicator["tic"] = unique_ticker[i]
temp_indicator["time"] = self.dataframe[
self.dataframe.tic == unique_ticker[i]
]["time"].to_list()
indicator_df = pd.concat(
[indicator_df, temp_indicator],
axis=0,
join="outer",
ignore_index=True,
)
except Exception as e:
print(e)
if not indicator_df.empty:
self.dataframe = self.dataframe.merge(
indicator_df[["tic", "time", indicator]],
on=["tic", "time"],
how="left",
)

def _add_technical_indicator_talib(self, tech_indicator_list: List[str]):
df = self.dataframe.copy()
# group by tic to calculate technical indicators
# we need to fillna because talib has problems with nan
df = df.groupby("tic", group_keys=False).apply(
lambda x: self._calculate_talib_indicators(x, tech_indicator_list)
)
self.dataframe = df

def _calculate_talib_indicators(self, df, tech_indicator_list):
# fillna to avoid problems with talib
df = df.ffill()

for indicator in tech_indicator_list:
indicator = indicator.lower()
if indicator == "macd":
df["macd"], _, _ = talib.MACD(
df["close"], fastperiod=12, slowperiod=26, signalperiod=9
)
elif indicator == "macd_signal":
_, df["macd_signal"], _ = talib.MACD(
df["close"], fastperiod=12, slowperiod=26, signalperiod=9
)
elif indicator == "macd_hist":
_, _, df["macd_hist"] = talib.MACD(
df["close"], fastperiod=12, slowperiod=26, signalperiod=9
)
elif indicator == "rsi_30":
df["rsi_30"] = talib.RSI(df["close"], timeperiod=14)
elif indicator == "cci_30":
df["cci_30"] = talib.CCI(
df["high"], df["low"], df["close"], timeperiod=14
)
elif indicator == "dx_30":
df["dx_30"] = talib.DX(
df["high"], df["low"], df["close"], timeperiod=14
)
elif indicator == "boll_ub":
df["boll_ub"], _, _ = talib.BBANDS(
df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0
)
elif indicator == "boll_mb":
_, df["boll_mb"], _ = talib.BBANDS(
df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0
)
elif indicator == "boll_lb":
_, _, df["boll_lb"] = talib.BBANDS(
df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0
)
elif indicator == "close_30_sma":
df["close_30_sma"] = talib.SMA(df["close"], timeperiod=30)
elif indicator == "close_60_sma":
df["close_60_sma"] = talib.SMA(df["close"], timeperiod=60)
else:
# You can add more indicators here
print(f"Indicator {indicator} not supported in TALib integration.")
return df

def add_turbulence(self):
"""
add turbulence index from a precalcualted dataframe
Expand Down
Loading