diff --git a/meta/data_processor.py b/meta/data_processor.py index 077c0945..9c2036a0 100644 --- a/meta/data_processor.py +++ b/meta/data_processor.py @@ -7,6 +7,7 @@ from meta.config_tickers import DOW_30_TICKER from meta.data_processors._base import DataSource +from meta.data_processors._base import IndicatorLib class DataProcessor: @@ -103,11 +104,14 @@ def clean_data(self): self.dataframe = self.processor.dataframe def add_technical_indicator( - self, tech_indicator_list: List[str], select_stockstats_talib: int = 0 + self, + tech_indicator_list: list[str], + select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS, + drop_na_timesteps: int = 1, ): self.tech_indicator_list = tech_indicator_list self.processor.add_technical_indicator( - tech_indicator_list, select_stockstats_talib + tech_indicator_list, select_stockstats_talib, drop_na_timesteps ) self.dataframe = self.processor.dataframe @@ -151,7 +155,7 @@ def run( technical_indicator_list: List[str], if_vix: bool, cache: bool = False, - select_stockstats_talib: int = 0, + select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS, ): if self.time_interval == "1s" and self.data_source != DataSource.binance: raise ValueError( diff --git a/meta/data_processors/_base.py b/meta/data_processors/_base.py index ae1638f9..539bf10a 100644 --- a/meta/data_processors/_base.py +++ b/meta/data_processors/_base.py @@ -47,6 +47,15 @@ class DataSource(Enum): yahoofinance = "yahoofinance" +class IndicatorLib(Enum): + """ + An enumeration of technical indicator libraries. + """ + + STOCKSTATS = "stockstats" + TALIB = "talib" + + class _Base: def __init__( self, @@ -171,7 +180,7 @@ def get_trading_days(self, start: str, end: str) -> List[str]: def add_technical_indicator( self, tech_indicator_list: List[str], - select_stockstats_talib: int = 0, + select_stockstats_talib: IndicatorLib = IndicatorLib.STOCKSTATS, drop_na_timesteps: int = 1, ): """ @@ -191,65 +200,14 @@ def add_technical_indicator( self.dataframe.drop(columns=["level_1"], inplace=True) if "level_0" in self.dataframe.columns and "tic" not in self.dataframe.columns: self.dataframe.rename(columns={"level_0": "tic"}, inplace=True) - assert select_stockstats_talib in {0, 1} + print("tech_indicator_list: ", tech_indicator_list) - if select_stockstats_talib == 0: # use stockstats - stock = stockstats.StockDataFrame.retype(self.dataframe) - unique_ticker = stock.tic.unique() - for indicator in tech_indicator_list: - print("indicator: ", indicator) - indicator_df = pd.DataFrame() - for i in range(len(unique_ticker)): - try: - temp_indicator = stock[stock.tic == unique_ticker[i]][indicator] - temp_indicator = pd.DataFrame(temp_indicator) - temp_indicator["tic"] = unique_ticker[i] - temp_indicator["time"] = self.dataframe[ - self.dataframe.tic == unique_ticker[i] - ]["time"].to_list() - indicator_df = pd.concat( - [indicator_df, temp_indicator], - axis=0, - join="outer", - ignore_index=True, - ) - except Exception as e: - print(e) - if not indicator_df.empty: - self.dataframe = self.dataframe.merge( - indicator_df[["tic", "time", indicator]], - on=["tic", "time"], - how="left", - ) - else: # use talib - final_df = pd.DataFrame() - for i in self.dataframe.tic.unique(): - tic_df = self.dataframe[self.dataframe.tic == i] - ( - tic_df.loc["macd"], - tic_df.loc["macd_signal"], - tic_df.loc["macd_hist"], - ) = talib.MACD( - tic_df["close"], - fastperiod=12, - slowperiod=26, - signalperiod=9, - ) - tic_df.loc["rsi"] = talib.RSI(tic_df["close"], timeperiod=14) - tic_df.loc["cci"] = talib.CCI( - tic_df["high"], - tic_df["low"], - tic_df["close"], - timeperiod=14, - ) - tic_df.loc["dx"] = talib.DX( - tic_df["high"], - tic_df["low"], - tic_df["close"], - timeperiod=14, - ) - final_df = pd.concat([final_df, tic_df], axis=0, join="outer") - self.dataframe = final_df + if select_stockstats_talib == IndicatorLib.STOCKSTATS: + self._add_technical_indicator_stockstats(tech_indicator_list) + elif select_stockstats_talib == IndicatorLib.TALIB: + self._add_technical_indicator_talib(tech_indicator_list) + else: + raise ValueError(f"Unknown indicator library: {indicator_lib}") self.dataframe.sort_values(by=["time", "tic"], inplace=True) if drop_na_timesteps: @@ -259,6 +217,93 @@ def add_technical_indicator( self.dataframe = self.dataframe[~self.dataframe.time.isin(time_to_drop)] print("Succesfully add technical indicators") + def _add_technical_indicator_stockstats(self, tech_indicator_list: List[str]): + stock = stockstats.StockDataFrame.retype(self.dataframe.copy()) + unique_ticker = stock.tic.unique() + for indicator in tech_indicator_list: + print("indicator: ", indicator) + indicator_df = pd.DataFrame() + for i in range(len(unique_ticker)): + try: + temp_indicator = stock[stock.tic == unique_ticker[i]][indicator] + temp_indicator = pd.DataFrame(temp_indicator) + temp_indicator["tic"] = unique_ticker[i] + temp_indicator["time"] = self.dataframe[ + self.dataframe.tic == unique_ticker[i] + ]["time"].to_list() + indicator_df = pd.concat( + [indicator_df, temp_indicator], + axis=0, + join="outer", + ignore_index=True, + ) + except Exception as e: + print(e) + if not indicator_df.empty: + self.dataframe = self.dataframe.merge( + indicator_df[["tic", "time", indicator]], + on=["tic", "time"], + how="left", + ) + + def _add_technical_indicator_talib(self, tech_indicator_list: List[str]): + df = self.dataframe.copy() + # group by tic to calculate technical indicators + # we need to fillna because talib has problems with nan + df = df.groupby("tic", group_keys=False).apply( + lambda x: self._calculate_talib_indicators(x, tech_indicator_list) + ) + self.dataframe = df + + def _calculate_talib_indicators(self, df, tech_indicator_list): + # fillna to avoid problems with talib + df = df.ffill() + + for indicator in tech_indicator_list: + indicator = indicator.lower() + if indicator == "macd": + df["macd"], _, _ = talib.MACD( + df["close"], fastperiod=12, slowperiod=26, signalperiod=9 + ) + elif indicator == "macd_signal": + _, df["macd_signal"], _ = talib.MACD( + df["close"], fastperiod=12, slowperiod=26, signalperiod=9 + ) + elif indicator == "macd_hist": + _, _, df["macd_hist"] = talib.MACD( + df["close"], fastperiod=12, slowperiod=26, signalperiod=9 + ) + elif indicator == "rsi_30": + df["rsi_30"] = talib.RSI(df["close"], timeperiod=14) + elif indicator == "cci_30": + df["cci_30"] = talib.CCI( + df["high"], df["low"], df["close"], timeperiod=14 + ) + elif indicator == "dx_30": + df["dx_30"] = talib.DX( + df["high"], df["low"], df["close"], timeperiod=14 + ) + elif indicator == "boll_ub": + df["boll_ub"], _, _ = talib.BBANDS( + df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0 + ) + elif indicator == "boll_mb": + _, df["boll_mb"], _ = talib.BBANDS( + df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0 + ) + elif indicator == "boll_lb": + _, _, df["boll_lb"] = talib.BBANDS( + df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0 + ) + elif indicator == "close_30_sma": + df["close_30_sma"] = talib.SMA(df["close"], timeperiod=30) + elif indicator == "close_60_sma": + df["close_60_sma"] = talib.SMA(df["close"], timeperiod=60) + else: + # You can add more indicators here + print(f"Indicator {indicator} not supported in TALib integration.") + return df + def add_turbulence(self): """ add turbulence index from a precalcualted dataframe