diff --git a/notebooks/Test b/notebooks/Test new file mode 100644 index 000000000..5c4fb75ab --- /dev/null +++ b/notebooks/Test @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +# trading_setup.py +import os, math, warnings +from dataclasses import dataclass +from typing import List, Optional, Dict +import numpy as np +import pandas as pd +import yfinance as yf +import ta +from scipy.stats import rankdata + +warnings.filterwarnings("ignore", category=FutureWarning) + +# --------------------------- +# Config +# --------------------------- +DEFAULT_LOOKBACK_YEARS = 2 +MIN_HISTORY_DAYS = 252 +MOM_WINDOW_DAYS = {"1m":21, "3m":63, "6m":126, "12m":252} +TECH_EMAS = (20, 50, 200) +WEIGHTS = { + "technical_flow": 0.35, + "options_sentiment": 0.20, + "dark_pool": 0.15, + "momentum": 0.20, + "growth": 0.10, +} + +# --------------------------- +# Utils +# --------------------------- +def winsorize_series(s: pd.Series, p: float = 0.01) -> pd.Series: + if s.dropna().empty: return s + lo, hi = s.quantile(p), s.quantile(1-p) + return s.clip(lower=lo, upper=hi) + +def scale_0_100(s: pd.Series) -> pd.Series: + if s.dropna().nunique() <= 1: + return pd.Series(50.0, index=s.index) # flat middle if no variance + s_w = winsorize_series(s) + lo, hi = s_w.min(), s_w.max() + return (100*(s_w - lo)/(hi - lo)).astype(float) + +def pct_change(df: pd.DataFrame, col="Adj Close", window=21) -> float: + if len(df) < window+1: return np.nan + return df[col].iloc[-1] / df[col].iloc[-1-window] - 1.0 + +def safe_div(a, b): + try: + return a / b if b and not np.isnan(b) else np.nan + except ZeroDivisionError: + return np.nan + +# --------------------------- +# Data classes +# --------------------------- +@dataclass +class DarkPoolRecord: + ticker: str + dp_net_flow: Optional[float] = None + dp_score: Optional[float] = None # if already 0-100 + +@dataclass +class FundamentalsRecord: + ticker: str + revenue_yoy: Optional[float] = None + eps_yoy: Optional[float] = None + +# --------------------------- +# Fetchers +# --------------------------- +def fetch_price_history(ticker: str, years: int = DEFAULT_LOOKBACK_YEARS) -> Optional[pd.DataFrame]: + try: + df = yf.Ticker(ticker).history(period=f"{years}y", auto_adjust=False) + if df is None or df.empty or len(df) < MIN_HISTORY_DAYS//2: + return None + return df + except Exception: + return None + +def fetch_option_chain_put_call_ratio(ticker: str) -> Optional[float]: + """ + Basic net options sentiment via Put/Call volume ratio (lower = bullish). + Returns PCR; we'll map to a 0-100 bullish score later. + """ + try: + tk = yf.Ticker(ticker) + exps = tk.options + if not exps: return None + # choose nearest non-expired expiration + exp = exps[0] + chain = tk.option_chain(exp) + calls, puts = chain.calls, chain.puts + if calls is None or puts is None or calls.empty or puts.empty: + return None + call_vol = calls["volume"].replace(0, np.nan).sum(skipna=True) + put_vol = puts["volume"].replace(0, np.nan).sum(skipna=True) + pcr = safe_div(put_vol, call_vol) + return float(pcr) if pcr is not None else None + except Exception: + return None + +def load_dark_pool_csv(path: str) -> Dict[str, DarkPoolRecord]: + if not path or not os.path.exists(path): return {} + df = pd.read_csv(path) + out = {} + for _, r in df.iterrows(): + out[str(r["ticker"]).upper()] = DarkPoolRecord( + ticker=str(r["ticker"]).upper(), + dp_net_flow=(r["dp_net_flow"] if "dp_net_flow" in df.columns else None), + dp_score=(r["dp_score"] if "dp_score" in df.columns else None), + ) + return out + +def load_fundamentals_csv(path: str) -> Dict[str, FundamentalsRecord]: + if not path or not os.path.exists(path): return {} + df = pd.read_csv(path) + out = {} + for _, r in df.iterrows(): + out[str(r["ticker"]).upper()] = FundamentalsRecord( + ticker=str(r["ticker"]).upper(), + revenue_yoy=(r["revenue_yoy"] if "revenue_yoy" in df.columns else None), + eps_yoy=(r["eps_yoy"] if "eps_yoy" in df.columns else None), + ) + return out + +# --------------------------- +# Scoring pillars +# --------------------------- +def technical_flow_score(df: pd.DataFrame) -> float: + """ + 0-100 score from EMAs stack, RSI, MACD, ADX, Volume trend. + """ + try: + close = df["Adj Close"].rename("close") if "Adj Close" in df else df["Close"].rename("close") + high, low, vol = df["High"], df["Low"], df["Volume"] + + ema20 = close.ewm(span=TECH_EMAS[0]).mean() + ema50 = close.ewm(span=TECH_EMAS[1]).mean() + ema200= close.ewm(span=TECH_EMAS[2]).mean() + + rsi = ta.momentum.RSIIndicator(close, window=14).rsi() + macd = ta.trend.MACD(close).macd_diff() + adx = ta.trend.ADXIndicator(high, low, close).adx() + + # Normalize components + # Trend stack (0-40) + stack_score = ( + (1 if ema20.iloc[-1] > ema50.iloc[-1] else 0) + + (1 if ema50.iloc[-1] > ema200.iloc[-1] else 0) + + (1 if ema20.iloc[-1] > ema200.iloc[-1] else 0) + ) / 3.0 * 40.0 + + # RSI favor 50-70 (bullish), scale to 0-30 + rsi_now = rsi.iloc[-1] + rsi_component = max(0.0, min(30.0, (rsi_now - 30) / 40 * 30)) if not np.isnan(rsi_now) else 15.0 + + # MACD histogram latest vs distribution (0-15) + macd_scaled = scale_0_100(macd).iloc[-1] if macd.notna().any() else 50.0 + macd_component = macd_scaled * 0.15 + + # ADX trend strength (25+ is trending) → 0-15 + adx_now = adx.iloc[-1] + adx_component = max(0.0, min(15.0, (adx_now - 15) / 20 * 15)) if not np.isnan(adx_now) else 7.5 + + # Volume trend (20d vs 200d) + vol20 = vol.rolling(20).mean().iloc[-1] + vol200= vol.rolling(200).mean().iloc[-1] + vol_ratio = safe_div(vol20, vol200) + vol_component = 0 if vol_ratio is None or np.isnan(vol_ratio) else max(0.0, min(10.0, (vol_ratio-0.8)/0.7*10)) + + score = stack_score + rsi_component + macd_component + adx_component + vol_component + return float(max(0.0, min(100.0, score))) + except Exception: + return np.nan + +def options_sentiment_score(pcr: Optional[float]) -> float: + """ + Map Put/Call ratio to bullish 0-100. + Typical PCR ranges: ~0.5-1.5. Lower => more bullish. + """ + if pcr is None or np.isnan(pcr): return np.nan + # Clip to [0.3, 2.0] then invert + pcr_c = max(0.3, min(2.0, pcr)) + # 0.3 -> ~100, 1.0 -> 50, 2.0 -> ~0 + score = (2.0 - pcr_c) / (2.0 - 0.3) * 100.0 + return float(max(0.0, min(100.0, score))) + +def momentum_scores(df: pd.DataFrame) -> Dict[str, float]: + out = {} + for label, win in MOM_WINDOW_DAYS.items(): + out[f"mom_{label}"] = pct_change(df, window=win) + # Composite momentum (skip 1m if you prefer medium-term) + comp = 0.1*out.get("mom_1m", np.nan) + 0.3*out.get("mom_3m", np.nan) + 0.25*out.get("mom_6m", np.nan) + 0.35*out.get("mom_12m", np.nan) + out["mom_composite_raw"] = comp + return out + +def growth_score(revenue_yoy: Optional[float], eps_yoy: Optional[float], peers_df: pd.DataFrame) -> float: + """ + Rank/percentile within peers for revenue & EPS YoY → average → 0-100. + If missing, returns NaN. + """ + if revenue_yoy is None or eps_yoy is None or np.isnan(revenue_yoy) or np.isnan(eps_yoy): + return np.nan + # Build temp arrays with peers + this point for robust scaling + rev = peers_df["revenue_yoy"].astype(float) + eps = peers_df["eps_yoy"].astype(float) + rev2 = pd.concat([rev, pd.Series([revenue_yoy])], ignore_index=True) + eps2 = pd.concat([eps, pd.Series([eps_yoy])], ignore_index=True) + rev_p = rankdata(rev2, method="average")/len(rev2)*100 + eps_p = rankdata(eps2, method="average")/len(eps2)*100 + # last element corresponds to our ticker + return float(np.nanmean([rev_p[-1], eps_p[-1]])) + +def dark_pool_score(rec: Optional[DarkPoolRecord], peer_records: List[DarkPoolRecord]) -> float: + if rec is None: return np.nan + if rec.dp_score is not None and not np.isnan(rec.dp_score): + return float(max(0.0, min(100.0, rec.dp_score))) + # derive from dp_net_flow vs peers + if rec.dp_net_flow is None or np.isnan(rec.dp_net_flow): return np.nan + vals = pd.Series([r.dp_net_flow for r in peer_records if r.dp_net_flow is not None and not np.isnan(r.dp_net_flow)]) + if vals.empty: + return np.nan + vals = winsorize_series(vals) + lo, hi = vals.min(), vals.max() + sc = (rec.dp_net_flow - lo) / (hi - lo) * 100 if hi > lo else 50.0 + return float(max(0.0, min(100.0, sc))) + +# --------------------------- +# Orchestrator +# --------------------------- +def analyze_tickers( + tickers: List[str], + dark_pool_csv: Optional[str] = None, + fundamentals_csv: Optional[str] = None +) -> pd.DataFrame: + + tickers = [t.upper() for t in tickers] + dp_map = load_dark_pool_csv(dark_pool_csv) if dark_pool_csv else {} + f_map = load_fundamentals_csv(fundamentals_csv) if fundamentals_csv else {} + + # Pre-collect peer lists for scaling DP/Growth + dp_peers = list(dp_map.values()) + growth_peers = pd.DataFrame( + [{"ticker":k, "revenue_yoy":v.revenue_yoy, "eps_yoy":v.eps_yoy} + for k,v in f_map.items() if v.revenue_yoy is not None and v.eps_yoy is not None] + ) + if growth_peers.empty: + growth_peers = pd.DataFrame({"ticker":[], "revenue_yoy":[], "eps_yoy":[]}) + + rows = [] + # Pass 1: compute price-based things + cache_price = {} + for t in tickers: + df = fetch_price_history(t) + cache_price[t] = df + + # Pass 2: compute pillar scores + # Momentum raw scores for all (for cross-sectional scaling) + raw_moms = {} + for t in tickers: + df = cache_price.get(t) + raw_moms[t] = np.nan + if df is not None: + ms = momentum_scores(df) + raw_moms[t] = ms["mom_composite_raw"] + + # Scale momentum cross-sectionally + mom_scaled = scale_0_100(pd.Series(raw_moms)) + # Gather outputs + for t in tickers: + df = cache_price.get(t) + tech = technical_flow_score(df) if df is not None else np.nan + pcr = fetch_option_chain_put_call_ratio(t) + opt = options_sentiment_score(pcr) + + dp_rec = dp_map.get(t) + dp_sc = dark_pool_score(dp_rec, dp_peers) + + # growth + f_rec = f_map.get(t, FundamentalsRecord(ticker=t, revenue_yoy=np.nan, eps_yoy=np.nan)) + gr_sc = growth_score(f_rec.revenue_yoy, f_rec.eps_yoy, growth_peers) if not growth_peers.empty else np.nan + + # final composite (handle NaNs by redistributing weights over available pillars) + components = { + "technical_flow": tech, + "options_sentiment": opt, + "dark_pool": dp_sc, + "momentum": mom_scaled.get(t, np.nan), + "growth": gr_sc + } + avail = {k:v for k,v in components.items() if v is not None and not np.isnan(v)} + if not avail: + composite = np.nan + else: + w = {k:WEIGHTS[k] for k in avail.keys()} + w_sum = sum(w.values()) + w_norm = {k:v/w_sum for k,v in w.items()} + composite = sum(avail[k]*w_norm[k] for k in avail.keys()) + + # add a simple signal + signal = ( + "Strong Buy" if composite >= 80 else + "Buy" if composite >= 65 else + "Watch" if composite >= 50 else + "Avoid" + ) + + # momentum components for visibility + mom_parts = momentum_scores(df) if df is not None else {} + rows.append({ + "ticker": t, + "technical_flow": round(tech,2) if not np.isnan(tech) else np.nan, + "options_pcr": round(pcr,3) if pcr is not None and not np.isnan(pcr) else np.nan, + "options_sentiment": round(opt,2) if not np.isnan(opt) else np.nan, + "dark_pool": round(dp_sc,2) if not np.isnan(dp_sc) else np.nan, + "mom_1m": mom_parts.get("mom_1m", np.nan), + "mom_3m": mom_parts.get("mom_3m", np.nan), + "mom_6m": mom_parts.get("mom_6m", np.nan), + "mom_12m": mom_parts.get("mom_12m", np.nan), + "momentum": round(mom_scaled.get(t, np.nan),2) if not np.isnan(mom_scaled.get(t, np.nan)) else np.nan, + "growth": round(gr_sc,2) if not np.isnan(gr_sc) else np.nan, + "composite": round(composite,2) if not np.isnan(composite) else np.nan, + "signal": signal + }) + + out = pd.DataFrame(rows) + # tidy momentum pct columns + for c in ["mom_1m","mom_3m","mom_6m","mom_12m"]: + if c in out.columns: + out[c] = (out[c]*100).round(2) + out = out.sort_values("composite", ascending=False, na_position="last").reset_index(drop=True) + return out + +# --------------------------- +# CLI +# --------------------------- +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Net Options Sentiment + Dark Pool + Technical Flow + Momentum + Growth ranker") + parser.add_argument("--tickers", nargs="+", required=True, help="e.g., AAPL MSFT NVDA") + parser.add_argument("--dark_pool_csv", type=str, default=None, help="Optional path to dark pool CSV (ticker,dp_net_flow,dp_score)") + parser.add_argument("--fundamentals_csv", type=str, default=None, help="Optional path to fundamentals CSV (ticker,revenue_yoy,eps_yoy)") + parser.add_argument("--out_csv", type=str, default="trading_setup_output.csv", help="Output CSV path") + args = parser.parse_args() + + df = analyze_tickers(args.tickers, args.dark_pool_csv, args.fundamentals_csv) + pd.set_option("display.max_columns", None) + print(df.to_string(index=False)) + df.to_csv(args.out_csv, index=False) + print(f"\nSaved: {args.out_csv}")