diff --git a/blueprints/brlogin.py b/blueprints/brlogin.py index 92fbe43e2..aeff0fa54 100644 --- a/blueprints/brlogin.py +++ b/blueprints/brlogin.py @@ -135,51 +135,30 @@ def broker_callback(broker, para=None): ) elif broker == "aliceblue": - if request.method == "GET": - # Redirect to React TOTP page - return redirect("/broker/aliceblue/totp") - - elif request.method == "POST": - logger.info("Aliceblue Login Flow initiated") - userid = request.form.get("userid") - # Step 1: Get encryption key - # Use the shared httpx client with connection pooling - from utils.httpx_client import get_httpx_client - - client = get_httpx_client() - - # AliceBlue API expects only userId in the encryption key request - # Do not include API key in this initial request - payload = {"userId": userid} - headers = {"Content-Type": "application/json"} - try: - # Get encryption key - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/customer/getAPIEncpkey" - response = client.post(url, json=payload, headers=headers) - response.raise_for_status() - data_dict = response.json() - logger.debug(f"Aliceblue response data: {data_dict}") - - # Check if we successfully got the encryption key - if data_dict.get("stat") == "Ok" and data_dict.get("encKey"): - enc_key = data_dict["encKey"] - # Step 2: Authenticate with encryption key - auth_token, error_message = auth_function(userid, enc_key) - - if auth_token: - return handle_auth_success(auth_token, session["user"], broker) - else: - return handle_auth_failure(error_message, forward_url="broker.html") - else: - # Failed to get encryption key - error_msg = data_dict.get("emsg", "Failed to get encryption key") - return handle_auth_failure( - f"Failed to get encryption key: {error_msg}", forward_url="broker.html" - ) - except Exception as e: - return jsonify( - {"status": "error", "message": f"Authentication error: {str(e)}"} - ), 500 + # New OAuth redirect flow: + # 1. GET without authCode → redirect to AliceBlue login page with appcode + # 2. GET with authCode + userId (callback) → authenticate and get session + authCode = request.args.get("authCode") + userId = request.args.get("userId") + + if authCode and userId: + # Callback from AliceBlue with authorization code + logger.info(f"AliceBlue OAuth callback received for user {userId}") + auth_token, client_id, error_message = auth_function(userId, authCode) + user_id = client_id or userId # clientId from API response, fallback to OAuth userId + feed_token = None # AliceBlue doesn't use a separate feed token + forward_url = "broker.html" + else: + # Initial visit — redirect to AliceBlue login page + logger.info("Redirecting to AliceBlue login page") + appcode = os.environ.get("BROKER_API_KEY") + if not appcode: + return handle_auth_failure( + "BROKER_API_KEY (appCode) not configured in environment", + forward_url="broker.html", + ) + aliceblue_login_url = f"https://ant.aliceblueonline.com/?appcode={appcode}" + return redirect(aliceblue_login_url) elif broker == "fivepaisaxts": code = "fivepaisaxts" @@ -771,6 +750,9 @@ def broker_callback(broker, para=None): auth_token = f"{auth_token}" # For brokers that have user_id and feed_token from authenticate_broker + if broker in ["angel", "aliceblue", "compositedge", "pocketful", "definedge", "dhan"]: + # For Compositedge, handle missing session user + if broker == "compositedge" and "user" not in session: if broker in ["angel", "compositedge", "pocketful", "definedge", "dhan", "rmoney"]: # For OAuth brokers, handle missing session user if broker in ("compositedge", "rmoney") and "user" not in session: diff --git a/broker/aliceblue/api/alicebluewebsocket.py b/broker/aliceblue/api/alicebluewebsocket.py index 684cd3541..8a15a1738 100644 --- a/broker/aliceblue/api/alicebluewebsocket.py +++ b/broker/aliceblue/api/alicebluewebsocket.py @@ -26,8 +26,8 @@ class AliceBlueWebSocket: PRIMARY_URL = "wss://ws1.aliceblueonline.com/NorenWS/" ALTERNATE_URL = "wss://ws2.aliceblueonline.com/NorenWS/" - # REST API base URL for WebSocket session management - BASE_URL = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/" + # REST API base URL for WebSocket session management (V2 API) + BASE_URL = "https://a3.aliceblueonline.com/" # Maximum reconnection attempts MAX_RECONNECT_ATTEMPTS = 5 @@ -61,7 +61,7 @@ def __init__(self, user_id: str, session_id: str): def _get_auth_header(self) -> dict: """Get authorization header for REST API calls.""" return { - "Authorization": f"Bearer {self.user_id.upper()} {self.session_id}", + "Authorization": f"Bearer {self.session_id}", "Content-Type": "application/json", } @@ -74,9 +74,8 @@ def _invalidate_socket_session(self) -> bool: bool: True if successful, False otherwise """ try: - # Try both endpoint variants (createWsSession and createSocketSess) - url = self.BASE_URL + "ws/invalidateWsSession" - payload = {"loginType": "API"} + url = self.BASE_URL + "open-api/od/v1/profile/invalidateWsSess" + payload = {"source": "API", "userId": self.user_id} with httpx.Client() as client: response = client.post( @@ -105,9 +104,8 @@ def _create_socket_session(self) -> bool: bool: True if successful, False otherwise """ try: - # Use the same endpoint as aliceblue_client.py: ws/createWsSession - url = self.BASE_URL + "ws/createWsSession" - payload = {"loginType": "API"} + url = self.BASE_URL + "open-api/od/v1/profile/createWsSess" + payload = {"source": "API", "userId": self.user_id} with httpx.Client() as client: response = client.post( @@ -119,7 +117,7 @@ def _create_socket_session(self) -> bool: logger.info(f"Create socket session response: {data}") # Check for success - if data.get("stat") == "Ok": + if data.get("status") == "Ok": logger.info("WebSocket session created successfully") return True else: @@ -366,10 +364,10 @@ def _process_tick_data(self, data): else: # Fallback to broker symbol from AliceBlue data symbol = data.get("ts", f"TOKEN_{token}") - logger.warning( - f"✗ Using broker symbol: {symbol} for {subscription_key} (subscription not found)" + logger.debug( + f"Using broker symbol: {symbol} for {subscription_key} (subscription not found)" ) - logger.warning(f"Available subscriptions: {list(self.subscriptions.keys())}") + logger.debug(f"Available subscriptions: {list(self.subscriptions.keys())}") # Use consistent key format for data storage: exchange:token key = f"{exchange}:{token}" @@ -396,6 +394,10 @@ def _process_tick_data(self, data): "prev_open_interest": int(float(data.get("poi", 0))) if data.get("poi") else 0, "total_buy_quantity": int(data.get("tbq", 0)), "total_sell_quantity": int(data.get("tsq", 0)), + "bid": float(data.get("bp1", 0)), + "ask": float(data.get("sp1", 0)), + "bid_qty": int(data.get("bq1", 0)), + "ask_qty": int(data.get("sq1", 0)), "symbol": symbol, # Use OpenAlgo symbol from subscription "broker_symbol": data.get("ts", ""), # Keep broker symbol for reference "timestamp": datetime.now().isoformat(), @@ -502,10 +504,10 @@ def _process_depth_data(self, data): else: # Fallback to broker symbol from AliceBlue data symbol = data.get("ts", f"TOKEN_{token}") - logger.warning( - f"✗ Using broker symbol: {symbol} for {subscription_key} (subscription not found)" + logger.debug( + f"Using broker symbol: {symbol} for {subscription_key} (subscription not found)" ) - logger.warning(f"Available subscriptions: {list(self.subscriptions.keys())}") + logger.debug(f"Available subscriptions: {list(self.subscriptions.keys())}") # Use consistent key format for data storage: exchange:token key = f"{exchange}:{token}" @@ -543,6 +545,12 @@ def _process_depth_data(self, data): "token": token, "bids": bids, "asks": asks, + "open": float(data.get("o", 0)), + "high": float(data.get("h", 0)), + "low": float(data.get("l", 0)), + "close": float(data.get("c", 0)), + "volume": int(data.get("v", 0)), + "last_trade_quantity": int(data.get("ltq", 0)), "total_buy_quantity": int(data.get("tbq", 0)), "total_sell_quantity": int(data.get("tsq", 0)), "ltp": float(data.get("lp", 0)), @@ -576,6 +584,16 @@ def _process_depth_data(self, data): # Update specific fields if they exist in the feed if "lp" in data: depth["ltp"] = float(data.get("lp", 0)) + if "o" in data: + depth["open"] = float(data.get("o", 0)) + if "h" in data: + depth["high"] = float(data.get("h", 0)) + if "l" in data: + depth["low"] = float(data.get("l", 0)) + if "c" in data: + depth["close"] = float(data.get("c", 0)) + if "v" in data: + depth["volume"] = int(data.get("v", 0)) if "pc" in data: depth["percent_change"] = float(data.get("pc", 0)) if "ft" in data: diff --git a/broker/aliceblue/api/auth_api.py b/broker/aliceblue/api/auth_api.py index 1ff07f58f..078983916 100644 --- a/broker/aliceblue/api/auth_api.py +++ b/broker/aliceblue/api/auth_api.py @@ -1,4 +1,3 @@ -import base64 import hashlib import json import os @@ -11,40 +10,51 @@ logger = get_logger(__name__) -def authenticate_broker(userid, encKey): +def authenticate_broker(userid, authCode): + """ + Authenticate with AliceBlue using the new V2 vendor API. + + Returns: + Tuple of (userSession, clientId, error_message) + + Flow: + 1. Compute SHA-256 checksum of: userId + authCode + apiSecret + 2. POST {"checkSum": checksum} to /open-api/od/v1/vendor/getUserDetails + 3. Return the userSession from the response + + Environment variables: + BROKER_API_KEY = App Code (appCode) + BROKER_API_SECRET = API Secret (apiSecret) + """ try: # Fetching the necessary credentials from environment variables - BROKER_API_KEY = os.environ.get("BROKER_API_KEY") + # BROKER_API_KEY = appCode (used for the login redirect, not needed here) + # BROKER_API_SECRET = apiSecret (used to build the checksum) BROKER_API_SECRET = os.environ.get("BROKER_API_SECRET") - if not BROKER_API_SECRET or not BROKER_API_KEY: - logger.error("API keys not found in environment variables") - return None, "API keys not set in environment variables" + if not BROKER_API_SECRET: + logger.error("BROKER_API_SECRET not found in environment variables") + return None, None, "API secret not set in environment variables" logger.debug(f"Authenticating with AliceBlue for user {userid}") - # Proper AliceBlue API authentication flow according to V2 API docs: # Step 1: Get the shared httpx client with connection pooling client = get_httpx_client() - # Step 2: Generate SHA-256 hash using the combination of User ID + API Key + Encryption Key - # This is the pattern specified in their API docs + # Step 2: Generate SHA-256 checksum = hash(userId + authCode + apiSecret) logger.debug("Generating checksum for authentication") - # The AliceBlue API V2 documentation specifies: User ID + API Key + Encryption Key - # This is the official order specified in their documentation - checksum_input = f"{userid}{BROKER_API_SECRET}{encKey}" - logger.debug("Checksum input pattern: userId + apiSecret + encKey") + checksum_input = f"{userid}{authCode}{BROKER_API_SECRET}" + logger.debug("Checksum input pattern: userId + authCode + apiSecret") checksum = hashlib.sha256(checksum_input.encode()).hexdigest() - # Step 3: Prepare request payload with exact parameters matching their API documentation - payload = {"userId": userid, "userData": checksum, "source": "WEB"} + # Step 3: Prepare request payload matching the new API documentation + payload = {"checkSum": checksum} - # Set the headers exactly as expected by their API headers = {"Content-Type": "application/json", "Accept": "application/json"} - # Step 4: Make the API request to get session ID - logger.debug("Making getUserSID request to AliceBlue API") - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/customer/getUserSID" + # Step 4: POST to the new vendor getUserDetails endpoint + logger.debug("Making getUserDetails request to AliceBlue API") + url = "https://ant.aliceblueonline.com/open-api/od/v1/vendor/getUserDetails" response = client.post(url, json=payload, headers=headers) logger.debug(f"AliceBlue API response status: {response.status_code}") @@ -53,57 +63,37 @@ def authenticate_broker(userid, encKey): # Log full response for debugging logger.info(f"AliceBlue API response: {json.dumps(data_dict, indent=2)}") - # Extract the session ID from the response - # Handle all possible response formats from AliceBlue API + # --- Parse the response --- + + # Success case: stat == "Ok" and userSession is present + if data_dict.get("stat") == "Ok" and data_dict.get("userSession"): + client_id = data_dict.get("clientId") + logger.info(f"Authentication successful for user {userid} (clientId={client_id})") + return data_dict["userSession"], client_id, None - # Case 1: Check if response has sessionID field (typical success case) - if data_dict.get("sessionID"): - logger.info(f"Authentication successful for user {userid}") - return data_dict.get("sessionID"), None + # Error case: stat == "Not_ok" with an error message + if data_dict.get("stat") == "Not_ok": + error_msg = data_dict.get("emsg", "Unknown error occurred") + logger.error(f"API returned Not_ok: {error_msg}") + return None, None, f"API error: {error_msg}" - # Case 2: Check for specific error messages and handle them + # Fallback: check for emsg in any other shape of response if "emsg" in data_dict and data_dict["emsg"]: error_msg = data_dict["emsg"] - # Special case handling for common errors - if "User does not login" in error_msg: - logger.error(f"User not logged in: {error_msg}") - return ( - None, - "User is not logged in. Please login to the AliceBlue platform first and then try again.", - ) - elif "Invalid Input" in error_msg: - logger.error(f"Invalid input error: {error_msg}") - return None, "Invalid input. Please check your user ID and API credentials." - else: - logger.error(f"API error: {error_msg}") - return None, f"API error: {error_msg}" - - # Case 3: Handle status field - if data_dict.get("stat") == "Not ok": - error_msg = data_dict.get("emsg", "Unknown error occurred") - logger.error(f"API returned Not ok status: {error_msg}") - return None, f"API error: {error_msg}" - - # Case 4: Try to find any field that might contain the session token - for field_name in ["sessionID", "session_id", "sessionId", "token"]: - if field_name in data_dict and data_dict[field_name]: - session_id = data_dict[field_name] - logger.info(f"Found session ID in field {field_name}") - return session_id, None - - # Case 5: If we got this far, we couldn't find a session ID - logger.error(f"Couldn't extract session ID from response: {data}") + logger.error(f"API error: {error_msg}") + return None, None, f"API error: {error_msg}" + + # If we got here, we couldn't find a session token + logger.error(f"Couldn't extract userSession from response: {data_dict}") return ( None, - "Failed to extract session ID from response. Please check API credentials and try again.", + None, + "Failed to extract session from response. Please check API credentials and try again.", ) except json.JSONDecodeError: - # Handle invalid JSON response - return None, "Invalid response format from AliceBlue API." + return None, None, "Invalid response format from AliceBlue API." except httpx.HTTPError as e: - # Handle HTTPX connection errors - return None, f"HTTP connection error: {str(e)}" + return None, None, f"HTTP connection error: {str(e)}" except Exception as e: - # General exception handling - return None, f"An exception occurred: {str(e)}" + return None, None, f"An exception occurred: {str(e)}" diff --git a/broker/aliceblue/api/data.py b/broker/aliceblue/api/data.py index a91c29fe8..e292ea746 100644 --- a/broker/aliceblue/api/data.py +++ b/broker/aliceblue/api/data.py @@ -2,63 +2,24 @@ import os import threading import time -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple, Union import httpx import pandas as pd -from requests.exceptions import HTTPError, Timeout -from utils.logging import get_logger - -logger = get_logger(__name__) - - -# Mock token map for testing -ALICEBLUE_TOKEN_MAP = { - ("NSE", "YESBANK"): "5998089", - ("NSE", "RELIANCE"): "2885634", - ("NSE", "INFY"): "1594561", - ("BSE", "RELIANCE"): "500325", - ("NSE", "NIFTY50"): "26000", - ("NSE", "BANKNIFTY"): "26009", -} - - -# Token retrieval function -def get_token(symbol, exchange): - """Get token for a symbol and exchange""" - try: - # First try the local token map - key = (exchange, symbol) - if key in ALICEBLUE_TOKEN_MAP: - return ALICEBLUE_TOKEN_MAP[key] - - # Could also try to import the database function - # but we'll just return None for now if not in our map - return None - except Exception as e: - logger.error(f"Error getting token: {str(e)}") - return None - - -from datetime import timedelta - -from database.auth_db import db_session -from database.symbol import SymToken +from database.auth_db import Auth from database.token_db import get_br_symbol, get_brexchange, get_oa_symbol, get_token from utils.httpx_client import get_httpx_client +from utils.logging import get_logger from .alicebluewebsocket import AliceBlueWebSocket -# AliceBlue API URLs -BASE_URL = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/" -SCRIP_DETAILS_URL = BASE_URL + "ScripDetails/getScripQuoteDetails" -HISTORICAL_API_URL = BASE_URL + "chart/history" +logger = get_logger(__name__) -# Global websocket instance for reuse -_web_socket = None -_web_socket_lock = threading.Lock() +# AliceBlue V3 API URLs +BASE_URL = "https://a3.aliceblueonline.com/" +HISTORICAL_API_URL = BASE_URL + "open-api/od/ChartAPIService/api/chart/history" class BrokerData: @@ -67,54 +28,30 @@ class BrokerData: Handles market data operations including quotes, market depth, and historical data. """ - def _auto_detect_exchange(self, symbol: str) -> str: - """ - Auto-detect exchange for a symbol by looking up its instrumenttype in database - Returns the appropriate exchange based on instrumenttype - """ - try: - # Query database for the symbol - with db_session() as session: - # First try to find any matching symbol - results = session.query(SymToken).filter(SymToken.symbol == symbol).all() - - if results: - for result in results: - # Check instrumenttype to determine exchange - if result.instrumenttype: - instrument_type = result.instrumenttype.upper() - # If instrumenttype contains INDEX, use it as exchange - if "INDEX" in instrument_type: - # instrumenttype like NSE_INDEX, BSE_INDEX, MCX_INDEX - return result.instrumenttype - else: - # For other types, use the exchange field - return result.exchange - - # If no instrumenttype, return the exchange of first match - return results[0].exchange - - # If not found, make educated guess based on symbol pattern - if symbol.endswith("FUT"): - return "NFO" - elif symbol.endswith("CE") or symbol.endswith("PE"): - return "NFO" - elif "USDINR" in symbol.upper() or "EURINR" in symbol.upper(): - return "CDS" - else: - return "NSE" # Default to NSE - - except Exception as e: - logger.error(f"Error in auto-detecting exchange: {str(e)}") - return "NSE" # Default fallback + # Timeframes that require resampling from 1-minute data + _RESAMPLE_TIMEFRAMES = { + "3m": 3, + "5m": 5, + "10m": 10, + "15m": 15, + "30m": 30, + "1h": 60, + } def __init__(self, auth_token=None): self.token_mapping = {} self.session_id = auth_token # Store the session ID from authentication - # AliceBlue only supports 1-minute and daily data + # AliceBlue natively supports 1-minute and daily data. + # Other intraday timeframes are resampled from 1-minute data. self.timeframe_map = { - "1m": "1", # 1-minute data - "D": "D", # Daily data + "1m": "1", + "3m": "1", # resampled from 1m + "5m": "1", # resampled from 1m + "10m": "1", # resampled from 1m + "15m": "1", # resampled from 1m + "30m": "1", # resampled from 1m + "1h": "1", # resampled from 1m + "D": "D", # V3 API uses 'D' for daily (docs text says '1D' but API rejects it) } def get_websocket(self, force_new=False): @@ -140,14 +77,17 @@ def get_websocket(self, force_new=False): # Clean up any existing connection if hasattr(self, "_websocket") and self._websocket: try: - self._websocket.close() + self._websocket.disconnect() except Exception as e: logger.warning(f"Error closing existing WebSocket: {str(e)}") - # Get user ID from environment variable or fallback - user_id = os.environ.get("BROKER_API_KEY", "") + # Get user ID (clientId) from the auth database + # This is the numeric clientId (e.g., '1614986') stored during login, + # NOT the appCode from BROKER_API_KEY + auth_obj = Auth.query.filter_by(broker='aliceblue', is_revoked=False).first() + user_id = auth_obj.user_id if auth_obj else None if not user_id: - logger.error("Missing API secret (user ID) for AliceBlue WebSocket") + logger.error("Missing user_id (clientId) for AliceBlue WebSocket. Please re-login.") return None # Create new websocket connection @@ -173,233 +113,125 @@ def get_websocket(self, force_new=False): logger.error(f"Error creating WebSocket: {str(e)}") return None - def get_quotes(self, symbol_list, timeout: int = 5) -> list[dict[str, Any]]: + @staticmethod + def _normalize_token(token) -> str: + """Normalize token to integer string (e.g. 3045.0 -> '3045').""" + try: + return str(int(float(token))) + except (ValueError, TypeError): + return str(token) + + def _map_exchange(self, exchange: str) -> str: + """Map OpenAlgo exchange codes to AliceBlue API exchange codes.""" + exchange_map = { + "NSE_INDEX": "NSE", + "BSE_INDEX": "BSE", + "MCX_INDEX": "MCX", + } + return exchange_map.get(exchange, exchange) + + def _try_fetch_quote_via_ws(self, api_exchange: str, token: str, br_symbol: str, symbol: str, exchange: str) -> dict | None: + """Attempt a single WebSocket quote fetch. Returns quote dict or None.""" + try: + websocket = self.get_websocket() + if not websocket or not websocket.is_connected: + logger.warning("WebSocket not connected, reconnecting...") + websocket = self.get_websocket(force_new=True) + + if not websocket or not websocket.is_connected: + logger.error("WebSocket connection unavailable") + return None + + class Instrument: + def __init__(self, exchange, token, symbol=None): + self.exchange = exchange + self.token = token + self.symbol = symbol + + instrument = Instrument(exchange=api_exchange, token=token, symbol=br_symbol) + instruments = [instrument] + + logger.info(f"Subscribing to {api_exchange}:{symbol} with token {token}") + success = websocket.subscribe(instruments, is_depth=False) + + if not success: + logger.warning(f"Subscribe failed for {symbol} on {exchange}") + return None + + # Wait for data to arrive + time.sleep(2.0) + + quote = websocket.get_quote(api_exchange, token) + + # Unsubscribe after getting the data + websocket.unsubscribe(instruments, is_depth=False) + + return quote + + except Exception as e: + logger.warning(f"WebSocket quote attempt failed for {symbol}: {e}") + return None + + def get_quotes(self, symbol: str, exchange: str) -> dict: """ - Get real-time quotes for a list of symbols using the WebSocket connection. - Falls back to REST API if WebSocket is not available. + Get real-time quotes for given symbol with retry logic. Args: - symbol_list: List of symbols or a single symbol dictionary with exchange and symbol - timeout (int): Timeout in seconds + symbol: Trading symbol (e.g., 'RELIANCE', 'NIFTY') + exchange: Exchange (e.g., NSE, BSE, NFO, NSE_INDEX, BSE_INDEX) Returns: - List[Dict[str, Any]]: List of quote data for each symbol + dict: Quote data in OpenAlgo standard format """ - logger.info(f"Original symbol_list: {symbol_list}") + MAX_RETRIES = 2 # Total attempts (1 original + 1 retry) - # Special case for Bruno API format with single symbol - # Special case for OpenAlgo standard format: direct quote request via Bruno - if isinstance(symbol_list, dict): - try: - # Extract symbol and exchange - symbol = symbol_list.get("symbol") or symbol_list.get("SYMBOL") - exchange = symbol_list.get("exchange") or symbol_list.get("EXCHANGE") - - if symbol and exchange: - logger.info(f"Processing single symbol request: {symbol} on {exchange}") - # Convert to a list with a single item to use the standard flow - symbol_list = [{"symbol": symbol, "exchange": exchange}] - else: - logger.error("Missing symbol or exchange in request") - return { - "status": "error", - "data": [], - "message": "Missing symbol or exchange in request", - } - except Exception as e: - logger.error(f"Error processing single symbol request: {str(e)}") - return { - "status": "error", - "data": [], - "message": f"Error processing request: {str(e)}", - } - - # Handle plain string (like just "YESBANK" or "NIFTY") - elif isinstance(symbol_list, str): - symbol = symbol_list.strip() - - # Use the helper function to auto-detect exchange based on database lookup - exchange = self._auto_detect_exchange(symbol) - - logger.info( - f"Processing string symbol: {symbol} on {exchange} (auto-detected from database)" - ) - symbol_list = [{"symbol": symbol, "exchange": exchange}] - - # For simple case, let's create mock data for testing - # In a production system, you'd get this from the broker API - quote_data = [] - - for sym in symbol_list: - # If it's a simple dict with symbol and exchange - if isinstance(sym, dict) and "symbol" in sym and "exchange" in sym: - symbol = sym["symbol"] - exchange = sym["exchange"] - - # Get token for this symbol - token = get_token(symbol, exchange) - - if token: - # Get WebSocket connection or create a new one - websocket = self.get_websocket() - - if not websocket or not websocket.is_connected: - logger.warning("WebSocket not connected, reconnecting...") - websocket = self.get_websocket(force_new=True) - - if websocket and websocket.is_connected: - # Get broker symbol if different - br_symbol = get_br_symbol(symbol, exchange) or symbol - - # Convert exchange for AliceBlue API (same as Angel) - if exchange == "NSE_INDEX": - exchange = "NSE" - elif exchange == "BSE_INDEX": - exchange = "BSE" - elif exchange == "MCX_INDEX": - exchange = "MCX" - - # Create instrument for subscription - class Instrument: - def __init__(self, exchange, token, symbol=None): - self.exchange = exchange - self.token = token - self.symbol = symbol - - # Use converted exchange for websocket subscription - instrument = Instrument(exchange=exchange, token=token, symbol=br_symbol) - instruments = [instrument] - - # Subscribe to this instrument - logger.info(f"Subscribing to {exchange}:{symbol} with token {token}") - success = websocket.subscribe(instruments) - - if success: - # Wait longer for data to arrive, especially for first subscription - logger.info(f"Waiting for WebSocket data for {exchange}:{symbol}") - time.sleep(2.0) # Increased wait time - - # Retrieve quote from WebSocket using converted exchange - logger.debug(f"Attempting to retrieve quote for {exchange}:{token}") - quote = websocket.get_quote(exchange, token) - logger.debug(f"Quote retrieval result: {quote is not None}") - - if quote: - # Format the response according to OpenAlgo standard format - quote_item = { - "symbol": symbol, - "exchange": exchange, - "token": token, - "ltp": float(quote.get("ltp", 0)), - "open": float(quote.get("open", 0)), - "high": float(quote.get("high", 0)), - "low": float(quote.get("low", 0)), - "close": float(quote.get("close", 0)), - "prev_close": float( - quote.get("close", 0) - ), # Using close as prev_close - "change": float(quote.get("change", 0)), - "change_percent": float(quote.get("change_percent", 0)), - "volume": int(quote.get("volume", 0)), - "oi": int(quote.get("open_interest", 0)), - "bid": float(quote.get("bid", 0)), - "ask": float(quote.get("ask", 0)), - "timestamp": datetime.now().isoformat(), - } - - # Add market depth if available - if "depth" in quote: - quote_item["depth"] = quote["depth"] - - quote_data.append(quote_item) - logger.debug( - f"Retrieved real-time quote for {symbol} on {exchange}" - ) - - # Unsubscribe after getting the data to stop continuous streaming - logger.info( - f"Unsubscribing from {exchange}:{symbol} after retrieving quote" - ) - websocket.unsubscribe(instruments, is_depth=False) - else: - logger.warning(f"No quote data received for {symbol} on {exchange}") - # Unsubscribe even if no data received to clean up subscription - logger.info( - f"Unsubscribing from {exchange}:{symbol} due to no quote data" - ) - websocket.unsubscribe(instruments, is_depth=False) - # Create fallback data with zeros - quote_item = { - "symbol": symbol, - "exchange": exchange, - "token": token, - "ltp": 0.0, - "open": 0.0, - "high": 0.0, - "low": 0.0, - "close": 0.0, - "change": 0.0, - "change_percent": 0.0, - "volume": 0, - "oi": 0, - "timestamp": datetime.now().isoformat(), - } - quote_data.append(quote_item) - else: - logger.error(f"Failed to subscribe to {symbol} on {exchange}") - # No need to unsubscribe if subscription failed - # Create error data - quote_item = { - "symbol": symbol, - "exchange": exchange, - "token": token, - "error": "Failed to subscribe to the instrument", - "timestamp": datetime.now().isoformat(), - } - quote_data.append(quote_item) - else: - logger.error("WebSocket connection unavailable") - quote_item = { - "symbol": symbol, - "exchange": exchange, - "token": token, - "error": "WebSocket connection unavailable", - "timestamp": datetime.now().isoformat(), - } - quote_data.append(quote_item) - else: - logger.error(f"Could not find token for {symbol} on {exchange}") + try: + br_symbol = get_br_symbol(symbol, exchange) or symbol + token = self._normalize_token(get_token(symbol, exchange)) - # Return data directly (service layer will wrap it) - # If there's no data, return empty response - if not quote_data: - return {} + if not token: + raise Exception(f"Token not found for {symbol} on {exchange}") + + api_exchange = self._map_exchange(exchange) + + # Attempt quote fetch with retry + quote = None + for attempt in range(1, MAX_RETRIES + 1): + quote = self._try_fetch_quote_via_ws(api_exchange, token, br_symbol, symbol, exchange) + if quote: + break + if attempt < MAX_RETRIES: + logger.info(f"Retrying quote fetch for {symbol} (attempt {attempt + 1}/{MAX_RETRIES})") + # Force a fresh WebSocket connection on retry + try: + if hasattr(self, "_websocket") and self._websocket: + self._websocket.disconnect() + except Exception: + pass + self._websocket = None + time.sleep(1.0) - # For single symbol request (most common case), return in simplified format - if len(quote_data) == 1: - # Extract the first and only quote - quote = quote_data[0] + if not quote: + raise Exception(f"No quote data received for {symbol} on {exchange} after {MAX_RETRIES} attempts") - # Return the data directly without wrapping return { - "ltp": quote.get("ltp", 0), - "oi": quote.get("oi", 0), - "open": quote.get("open", 0), - "high": quote.get("high", 0), - "low": quote.get("low", 0), - "prev_close": quote.get("prev_close", 0) or quote.get("close", 0), - "volume": quote.get("volume", 0), - "bid": quote.get("bid", 0), - "ask": quote.get("ask", 0), + "bid": float(quote.get("bid", 0)), + "ask": float(quote.get("ask", 0)), + "open": float(quote.get("open", 0)), + "high": float(quote.get("high", 0)), + "low": float(quote.get("low", 0)), + "ltp": float(quote.get("ltp", 0)), + "prev_close": float(quote.get("close", 0)), + "volume": int(quote.get("volume", 0)), + "oi": int(quote.get("open_interest", 0)), } - # For multiple symbols, return the full list - return quote_data + except Exception as e: + raise Exception(f"Error fetching quotes: {str(e)}") def get_multiquotes(self, symbols: list) -> list: """ - Get real-time quotes for multiple symbols using WebSocket - AliceBlue WebSocket supports subscribing to multiple instruments + Get real-time quotes for multiple symbols using WebSocket. Args: symbols: List of dicts with 'symbol' and 'exchange' keys @@ -409,12 +241,10 @@ def get_multiquotes(self, symbols: list) -> list: [{'symbol': 'SBIN', 'exchange': 'NSE', 'data': {...}}, ...] """ try: - # AliceBlue WebSocket can handle multiple instruments - # Using batch size of 100 for practical response times BATCH_SIZE = 100 if len(symbols) > BATCH_SIZE: - logger.debug(f"Processing {len(symbols)} symbols in batches of {BATCH_SIZE}") + logger.info(f"Processing {len(symbols)} symbols in batches of {BATCH_SIZE}") all_results = [] for i in range(0, len(symbols), BATCH_SIZE): @@ -422,11 +252,10 @@ def get_multiquotes(self, symbols: list) -> list: logger.info( f"Processing batch {i // BATCH_SIZE + 1}: symbols {i + 1} to {min(i + BATCH_SIZE, len(symbols))}" ) - batch_results = self._process_multiquotes_batch(batch) all_results.extend(batch_results) - logger.debug(f"Successfully processed {len(all_results)} quotes") + logger.info(f"Successfully processed {len(all_results)} quotes") return all_results else: return self._process_multiquotes_batch(symbols) @@ -437,7 +266,8 @@ def get_multiquotes(self, symbols: list) -> list: def _process_multiquotes_batch(self, symbols: list) -> list: """ - Process a batch of symbols using WebSocket subscription + Process a batch of symbols using WebSocket subscription. + Args: symbols: List of dicts with 'symbol' and 'exchange' keys Returns: @@ -446,11 +276,10 @@ def _process_multiquotes_batch(self, symbols: list) -> list: results = [] skipped_symbols = [] instruments = [] - symbol_map = {} # Map token to original symbol/exchange + symbol_map = {} # Map api_exchange:token -> original info # Get WebSocket connection websocket = self.get_websocket() - if not websocket or not websocket.is_connected: logger.warning("WebSocket not connected, reconnecting...") websocket = self.get_websocket(force_new=True) @@ -459,42 +288,32 @@ def _process_multiquotes_batch(self, symbols: list) -> list: logger.error("Could not establish WebSocket connection") raise ConnectionError("WebSocket connection unavailable") - # Create Instrument class for subscription class Instrument: def __init__(self, exchange, token, symbol=None): self.exchange = exchange self.token = token self.symbol = symbol - # Step 1: Prepare all instruments + # Prepare all instruments for item in symbols: symbol = item["symbol"] exchange = item["exchange"] - token = get_token(symbol, exchange) - if not token: + raw_token = get_token(symbol, exchange) + if not raw_token: logger.warning(f"Skipping symbol {symbol} on {exchange}: could not resolve token") skipped_symbols.append( {"symbol": symbol, "exchange": exchange, "error": "Could not resolve token"} ) continue - # Get broker symbol + token = self._normalize_token(raw_token) br_symbol = get_br_symbol(symbol, exchange) or symbol - - # Map exchange for AliceBlue API - api_exchange = exchange - if exchange == "NSE_INDEX": - api_exchange = "NSE" - elif exchange == "BSE_INDEX": - api_exchange = "BSE" - elif exchange == "MCX_INDEX": - api_exchange = "MCX" + api_exchange = self._map_exchange(exchange) instrument = Instrument(exchange=api_exchange, token=token, symbol=br_symbol) instruments.append(instrument) - # Store mapping for response processing symbol_map[f"{api_exchange}:{token}"] = { "symbol": symbol, "exchange": exchange, @@ -505,34 +324,40 @@ def __init__(self, exchange, token, symbol=None): logger.warning("No valid symbols to fetch quotes for") return skipped_symbols - # Step 2: Subscribe to all instruments at once + # Subscribe to all instruments at once with retry logger.info(f"Subscribing to {len(instruments)} symbols via WebSocket") - success = websocket.subscribe(instruments) + success = websocket.subscribe(instruments, is_depth=False) + + if not success: + # Retry once with a fresh connection + logger.warning("First subscription attempt failed, retrying with fresh connection...") + websocket = self.get_websocket(force_new=True) + if websocket and websocket.is_connected: + success = websocket.subscribe(instruments, is_depth=False) if not success: - logger.error("Failed to send subscription request") + logger.error("Failed to send subscription request after retry") for key, info in symbol_map.items(): results.append( - { - "symbol": info["symbol"], - "exchange": info["exchange"], - "error": "Subscription failed", - } + {"symbol": info["symbol"], "exchange": info["exchange"], "error": "Subscription failed"} ) return skipped_symbols + results - # Step 3: Wait for data to arrive - wait_time = min(max(len(instruments) * 0.05, 2), 10) # Between 2-10 seconds - logger.debug(f"Waiting {wait_time:.1f}s for quote data...") + # Wait for data to arrive — use higher cap for large batches + # (Vol Surface / OI Profile can request 60+ symbols at once) + wait_time = min(max(len(instruments) * 0.08, 2), 20) + logger.debug(f"Waiting {wait_time:.1f}s for quote data ({len(instruments)} instruments)...") time.sleep(wait_time) - # Step 4: Collect results from WebSocket + # Collect results from WebSocket — first pass + received_keys = set() + missing_keys = [] for key, info in symbol_map.items(): api_exchange, token = key.split(":") - quote = websocket.get_quote(api_exchange, token) if quote: + received_keys.add(key) results.append( { "symbol": info["symbol"], @@ -551,572 +376,152 @@ def __init__(self, exchange, token, symbol=None): } ) else: - results.append( - { - "symbol": info["symbol"], - "exchange": info["exchange"], - "error": "No data received", - } - ) + missing_keys.append(key) - # Step 5: Unsubscribe after getting data - logger.info(f"Unsubscribing from {len(instruments)} symbols") - websocket.unsubscribe(instruments, is_depth=False) - - logger.info( - f"Retrieved quotes for {len([r for r in results if 'data' in r])}/{len(symbol_map)} symbols" - ) - return skipped_symbols + results - - # Support various input formats - if not hasattr(symbol_list, "__iter__"): - logger.error(f"symbol_list must be iterable, got {type(symbol_list)}") - return [] - - for sym in symbol_list: - try: - # Case 1: Dictionary with exchange and token - if isinstance(sym, dict) and "exchange" in sym and "token" in sym: - normalized_symbols.append( - { - "exchange": sym["exchange"], - "token": sym["token"], - "symbol": sym.get("symbol", ""), - } - ) + # Retry pass for symbols that didn't return data on first attempt + if missing_keys: + logger.info(f"{len(missing_keys)}/{len(symbol_map)} symbols missing after first pass, retrying...") + time.sleep(3.0) # Extra wait for stragglers - # Case 2: Dictionary with exchange and symbol but no token (like from Bruno API request) - elif ( - isinstance(sym, dict) - and "exchange" in sym - and "symbol" in sym - and "token" not in sym - ): - try: - exchange = sym["exchange"] - symbol_str = sym["symbol"] - # Get token from database - token = get_token(symbol_str, exchange) - normalized_symbols.append( - {"exchange": exchange, "token": token, "symbol": symbol_str} - ) - logger.info(f"Retrieved token {token} for {exchange}:{symbol_str}") - except Exception as e: - logger.error(f"Could not get token for {exchange}:{symbol_str}: {str(e)}") + for key in missing_keys: + api_exchange, token = key.split(":") + info = symbol_map[key] + quote = websocket.get_quote(api_exchange, token) - # Case 3: Object with expected attributes - elif hasattr(sym, "exchange") and hasattr(sym, "token"): - normalized_symbols.append( + if quote: + results.append( { - "exchange": sym.exchange, - "token": sym.token, - "symbol": getattr(sym, "symbol", ""), + "symbol": info["symbol"], + "exchange": info["exchange"], + "data": { + "bid": float(quote.get("bid", 0)), + "ask": float(quote.get("ask", 0)), + "open": float(quote.get("open", 0)), + "high": float(quote.get("high", 0)), + "low": float(quote.get("low", 0)), + "ltp": float(quote.get("ltp", 0)), + "prev_close": float(quote.get("close", 0)), + "volume": int(quote.get("volume", 0)), + "oi": int(quote.get("open_interest", 0)), + }, } ) - - # Case 4: Single string with format "exchange:symbol" - elif isinstance(sym, str) and ":" in sym: - parts = sym.split(":", 1) - if len(parts) == 2: - exchange = parts[0] - symbol_str = parts[1] - try: - # Try to get token from database - token = get_token(symbol_str, exchange) - normalized_symbols.append( - {"exchange": exchange, "token": token, "symbol": symbol_str} - ) - except Exception as e: - logger.error(f"Could not get token for {sym}: {str(e)}") - - # Case 5: Simple string symbol (like 'YESBANK') - elif isinstance(sym, str) and ":" not in sym: - symbol_str = sym.strip() - - # Handle different formats - if len(symbol_str.split()) > 1: - # It might be "NSE YESBANK" format - parts = symbol_str.split() - exchange, symbol_str = parts[0], parts[1] - else: - # Default to NSE for Indian symbols if no exchange specified - exchange = "NSE" - - logger.info(f"Processing symbol: {symbol_str} on {exchange}") - - try: - # Try to get token from database - token = get_token(symbol_str, exchange) - if token: - normalized_symbols.append( - {"exchange": exchange, "token": token, "symbol": symbol_str} - ) - logger.info( - f"Successfully normalized {symbol_str} on {exchange} with token {token}" - ) - else: - logger.error(f"Could not get token for {symbol_str} on {exchange}") - except Exception as e: - logger.error( - f"Could not get token for {symbol_str} on {exchange}: {str(e)}" - ) - - # Case 6: Could not parse else: - logger.warning(f"Could not parse symbol format: {type(sym)} - {sym}") - except Exception as e: - logger.error(f"Error processing symbol {sym}: {str(e)}") - - logger.info(f"Normalized {len(normalized_symbols)} symbols") - - results = [] - - # First, try using WebSocket for faster data retrieval - websocket = self.get_websocket() - - # Check if the websocket is connected - if websocket and hasattr(websocket, "is_connected") and websocket.is_connected: - try: - # Prepare instruments for subscription - instruments = [] - for symbol in normalized_symbols: - # Create a simple object with exchange and token attributes - class Instrument: - def __init__(self, exchange, token, symbol=None): - self.exchange = exchange - self.token = token - self.symbol = symbol - - # Always get token from database to ensure we have correct token format - try: - # Get the token from database - token = get_token(symbol["symbol"], symbol["exchange"]) - if token: - logger.info( - f"Retrieved token {token} for {symbol['exchange']}:{symbol['symbol']}" - ) - instruments.append( - Instrument( - exchange=symbol["exchange"], - token=token, - symbol=symbol["symbol"], - ) - ) - else: - # Fall back to token in symbol dict if present - if "token" in symbol and symbol["token"]: - logger.info( - f"Using provided token {symbol['token']} for {symbol['exchange']}:{symbol['symbol']}" - ) - instruments.append( - Instrument( - exchange=symbol["exchange"], - token=symbol["token"], - symbol=symbol["symbol"], - ) - ) - else: - logger.error( - f"Could not find token for {symbol['symbol']} on {symbol['exchange']}" - ) - except Exception as e: - logger.error( - f"Error getting token for {symbol['symbol']} on {symbol['exchange']}: {str(e)}" - ) - continue - - # Skip if no valid instruments - if not instruments: - logger.warning("No valid instruments to subscribe") - return [] - - # Subscribe to the instruments - websocket.subscribe(instruments) - - # Wait for data to arrive - time.sleep(1) # Wait a bit for data to arrive - - # Collect quote data from WebSocket - for i, instrument in enumerate(instruments): - if i >= len(symbol_list): - break - - exchange = instrument.exchange - token = instrument.token - symbol_name = getattr(instrument, "symbol", "") - - quote = websocket.get_quote(exchange, token) - - if quote: - # Format the quote to match the expected structure - formatted_quote = { - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "exchange": exchange, - "symbol": symbol_name, - "ltp": quote.get("ltp", 0), - "close": quote.get("close", 0), - "open": quote.get("open", 0), - "high": quote.get("high", 0), - "low": quote.get("low", 0), - "volume": quote.get("volume", 0), - "bid": quote.get("bid", 0), # Best bid may not be available - "ask": quote.get("ask", 0), # Best ask may not be available - "total_buy_qty": quote.get("total_buy_quantity", 0), - "total_sell_qty": quote.get("total_sell_quantity", 0), - "open_interest": quote.get("open_interest", 0), - "average_price": quote.get("average_trade_price", 0), - "token": token, - } - results.append(formatted_quote) - else: - logger.warning(f"No WebSocket quote data for {exchange}:{token}") - # Add to results with empty/default values - results.append( - { - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "exchange": exchange, - "symbol": symbol_name, - "ltp": 0, - "close": 0, - "open": 0, - "high": 0, - "low": 0, - "volume": 0, - "bid": 0, - "ask": 0, - "total_buy_qty": 0, - "total_sell_qty": 0, - "open_interest": 0, - "average_price": 0, - "token": token, - } - ) - - # If we got at least some data, return it - if any(r.get("ltp", 0) > 0 for r in results): - return results - - # Otherwise, fall back to REST API - logger.warning("No valid quote data from WebSocket, falling back to REST API") - - except Exception as e: - logger.error(f"Error getting quotes via WebSocket: {str(e)}") - # Continue to fallback REST API method - - # Fallback: Use REST API for quotes - try: - logger.info("Using REST API for quotes as WebSocket fallback") - client = get_httpx_client() - - # Get user_id from environment variables and session_id from class instance - user_id = os.environ.get("BROKER_API_SECRET") - session_id = self.session_id - - if not user_id or not session_id: - logger.error( - f"Missing credentials for REST API - user_id: {'Yes' if user_id else 'No'}, session_id: {'Yes' if session_id else 'No'}" - ) - return results # Return whatever we have so far - - # Make REST API calls for each symbol - results = [] - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {user_id} {session_id}", - } - - for symbol in normalized_symbols: - # Handle different possible formats of the symbol - if isinstance(symbol, dict): - exchange = symbol.get("exchange") - token = symbol.get("token") - symbol_name = symbol.get("symbol", "") - elif hasattr(symbol, "exchange") and hasattr(symbol, "token"): - exchange = symbol.exchange - token = symbol.token - symbol_name = getattr(symbol, "symbol", "") - else: - logger.error(f"Unsupported symbol format in REST fallback: {symbol}") - continue - - # Skip if we don't have both exchange and token - if not exchange or not token: - logger.warning( - f"Missing exchange or token in symbol for REST fallback: {symbol}" - ) - continue - - payload = {"exch": exchange, "symbol": token} - - try: - response = client.post( - SCRIP_DETAILS_URL, headers=headers, json=payload, timeout=timeout - ) - response.raise_for_status() - data = response.json() - - # Format the response to match our expected structure - quote = { - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "exchange": exchange, - "symbol": symbol_name, - "ltp": float(data.get("ltp", 0)), - "close": float(data.get("close", 0)), - "open": float(data.get("open", 0)), - "high": float(data.get("high", 0)), - "low": float(data.get("low", 0)), - "volume": int(data.get("volume", 0)), - "bid": float(data.get("bp", 0)), # Best bid price - "ask": float(data.get("sp", 0)), # Best ask price - "total_buy_qty": int(data.get("tbq", 0)), - "total_sell_qty": int(data.get("tsq", 0)), - "open_interest": int(data.get("oi", 0)), - "average_price": float(data.get("ap", 0)), - "token": token, - } - results.append(quote) - - except (HTTPError, Timeout) as e: - logger.error(f"Error fetching quote for {exchange}:{token}: {str(e)}") - # Add empty quote to maintain order results.append( - { - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "exchange": exchange, - "symbol": symbol_name, - "ltp": 0, - "close": 0, - "open": 0, - "high": 0, - "low": 0, - "volume": 0, - "bid": 0, - "ask": 0, - "total_buy_qty": 0, - "total_sell_qty": 0, - "open_interest": 0, - "average_price": 0, - "token": token, - } + {"symbol": info["symbol"], "exchange": info["exchange"], "error": "No data received"} ) - continue - except Exception as e: - logger.error(f"Error in REST API fallback for quotes: {str(e)}") + # Unsubscribe after getting data + logger.info(f"Unsubscribing from {len(instruments)} symbols") + websocket.unsubscribe(instruments, is_depth=False) - return results + received_count = len([r for r in results if 'data' in r]) + logger.info( + f"Retrieved quotes for {received_count}/{len(symbol_map)} symbols" + ) + return skipped_symbols + results - def get_depth(self, symbol_list, timeout: int = 5): + def get_depth(self, symbol: str, exchange: str) -> dict: """ - Get market depth data for a list of symbols using the WebSocket connection. - This is a wrapper for get_market_depth to maintain API compatibility. + Get market depth for given symbol. Args: - symbol_list: List of symbols, single symbol dict with exchange and symbol, or a single symbol string - timeout (int): Timeout in seconds + symbol: Trading symbol (e.g., 'RELIANCE', 'SBIN') + exchange: Exchange (e.g., NSE, BSE, NFO, NSE_INDEX, BSE_INDEX) Returns: - Dict with market depth data in the OpenAlgo standard format + dict: Market depth data in OpenAlgo standard format """ - return self.get_market_depth(symbol_list, timeout) + try: + # Convert symbol to broker format and get token + br_symbol = get_br_symbol(symbol, exchange) or symbol + token = self._normalize_token(get_token(symbol, exchange)) - def get_market_depth(self, symbol_list, timeout: int = 5): - """ - Get market depth data for a list of symbols using the WebSocket connection. + if not token: + raise Exception(f"Token not found for {symbol} on {exchange}") - Args: - symbol_list: List of symbols, single symbol dict with exchange and symbol, or a single symbol string - timeout (int): Timeout in seconds + # Map exchange for AliceBlue WebSocket API + api_exchange = self._map_exchange(exchange) - Returns: - Dict with market depth data in the OpenAlgo standard format - """ - logger.info(f"Getting market depth for: {symbol_list}") + # Get WebSocket connection + websocket = self.get_websocket() + if not websocket or not websocket.is_connected: + logger.warning("WebSocket not connected, reconnecting...") + websocket = self.get_websocket(force_new=True) - # Standardize input format - # Handle dictionary input (single symbol case) - if isinstance(symbol_list, dict): - try: - # Extract symbol and exchange - symbol = symbol_list.get("symbol") or symbol_list.get("SYMBOL") - exchange = symbol_list.get("exchange") or symbol_list.get("EXCHANGE") - - if symbol and exchange: - logger.info(f"Processing single symbol depth request: {symbol} on {exchange}") - # Convert to a list with a single item to use the standard flow - symbol_list = [{"symbol": symbol, "exchange": exchange}] - else: - logger.error("Missing symbol or exchange in request") - return { - "status": "error", - "data": {}, - "message": "Missing symbol or exchange in request", - } - except Exception as e: - logger.error(f"Error processing single symbol depth request: {str(e)}") - return { - "status": "error", - "data": {}, - "message": f"Error processing depth request: {str(e)}", - } + if not websocket or not websocket.is_connected: + raise Exception("WebSocket connection unavailable") - # Handle plain string (like just "YESBANK" or "NIFTY") - elif isinstance(symbol_list, str): - symbol = symbol_list.strip() + # Create instrument for subscription + class Instrument: + def __init__(self, exchange, token, symbol=None): + self.exchange = exchange + self.token = token + self.symbol = symbol - # Use the helper function to auto-detect exchange based on database lookup - exchange = self._auto_detect_exchange(symbol) + instrument = Instrument(exchange=api_exchange, token=token, symbol=br_symbol) - logger.info( - f"Processing string symbol depth: {symbol} on {exchange} (auto-detected from database)" - ) - symbol_list = [{"symbol": symbol, "exchange": exchange}] + # Subscribe to depth data (is_depth=True sends t='d') + logger.info(f"Subscribing to depth for {api_exchange}:{symbol} with token {token}") + success = websocket.subscribe([instrument], is_depth=True) - # For simple case, prepare the instruments for WebSocket subscription - depth_data = [] + if not success: + raise Exception(f"Failed to subscribe to depth for {symbol} on {exchange}") - # Get WebSocket connection - websocket = self.get_websocket() + # Wait for depth data to arrive + time.sleep(2.0) - if not websocket or not websocket.is_connected: - logger.warning("WebSocket not connected, reconnecting...") - websocket = self.get_websocket(force_new=True) + # Retrieve depth from WebSocket + depth = websocket.get_market_depth(api_exchange, token) - if not websocket or not websocket.is_connected: - logger.error("Could not establish WebSocket connection for market depth") - return {"status": "error", "data": {}, "message": "WebSocket connection unavailable"} - - # Process each symbol - for sym in symbol_list: - # If it's a simple dict with symbol and exchange - if isinstance(sym, dict) and "symbol" in sym and "exchange" in sym: - symbol = sym["symbol"] - exchange = sym["exchange"] - - # Get token for this symbol - token = get_token(symbol, exchange) - - if token: - # Get broker symbol if different - br_symbol = get_br_symbol(symbol, exchange) or symbol - - # Convert exchange for AliceBlue API (same as Angel) - if exchange == "NSE_INDEX": - exchange = "NSE" - elif exchange == "BSE_INDEX": - exchange = "BSE" - elif exchange == "MCX_INDEX": - exchange = "MCX" - - # Create instrument for subscription - class Instrument: - def __init__(self, exchange, token, symbol=None): - self.exchange = exchange - self.token = token - self.symbol = symbol - - # Use converted exchange for websocket subscription - instrument = Instrument(exchange=exchange, token=token, symbol=br_symbol) - - # Subscribe to market depth - logger.info( - f"Subscribing to market depth for {exchange}:{symbol} with token {token}" - ) + # Unsubscribe after getting the data + websocket.unsubscribe([instrument], is_depth=True) - # Use the depth subscription (t='d') - success = websocket.subscribe([instrument], is_depth=True) - - if success: - # Wait longer for depth data to arrive - logger.info(f"Waiting for WebSocket depth data for {exchange}:{symbol}") - time.sleep(2.0) # Increased wait time for depth data - - # Retrieve depth from WebSocket using converted exchange - depth = websocket.get_market_depth(exchange, token) - - if depth: - # Create a normalized depth structure in the OpenAlgo format - item = { - "symbol": symbol, - "exchange": exchange, - "token": token, - "timestamp": datetime.now().isoformat(), - "total_buy_qty": depth.get("total_buy_quantity", 0), - "total_sell_qty": depth.get("total_sell_quantity", 0), - "ltp": depth.get("ltp", 0), - "oi": depth.get("open_interest", 0), - "depth": {"buy": [], "sell": []}, - } - - # Format the buy orders - bids = depth.get("bids", []) - for bid in bids: - item["depth"]["buy"].append( - { - "price": bid.get("price", 0), - "quantity": bid.get("quantity", 0), - "orders": bid.get("orders", 0), - } - ) - - # Format the sell orders - asks = depth.get("asks", []) - for ask in asks: - item["depth"]["sell"].append( - { - "price": ask.get("price", 0), - "quantity": ask.get("quantity", 0), - "orders": ask.get("orders", 0), - } - ) - - depth_data.append(item) - logger.debug(f"Retrieved market depth for {symbol} on {exchange}") - - # Unsubscribe after getting the data to stop continuous streaming - logger.info( - f"Unsubscribing from depth for {exchange}:{symbol} after retrieving data" - ) - websocket.unsubscribe([instrument], is_depth=True) - else: - logger.warning(f"No market depth received for {symbol} on {exchange}") - # Also unsubscribe even if no data received to clean up subscription - logger.info( - f"Unsubscribing from depth for {exchange}:{symbol} due to no data" - ) - websocket.unsubscribe([instrument], is_depth=True) - else: - logger.error( - f"Failed to subscribe to market depth for {symbol} on {exchange}" - ) - else: - logger.error(f"Could not find token for {symbol} on {exchange}") - else: - logger.warning(f"Unsupported symbol format for market depth: {sym}") + if not depth: + raise Exception(f"No market depth received for {symbol} on {exchange}") - # Return data directly (service layer will wrap it) - # If there's no data, return empty response - if not depth_data: - return {} + # Format bids and asks with exactly 5 entries each (matching Angel format) + bids = [] + asks = [] - # For single symbol request (most common case), return in simplified format - if len(depth_data) == 1: - # Extract the first and only depth item - depth_item = depth_data[0] + raw_bids = depth.get("bids", []) + for i in range(5): + if i < len(raw_bids): + bids.append({ + "price": raw_bids[i].get("price", 0), + "quantity": raw_bids[i].get("quantity", 0), + }) + else: + bids.append({"price": 0, "quantity": 0}) + + raw_asks = depth.get("asks", []) + for i in range(5): + if i < len(raw_asks): + asks.append({ + "price": raw_asks[i].get("price", 0), + "quantity": raw_asks[i].get("quantity", 0), + }) + else: + asks.append({"price": 0, "quantity": 0}) - # Return the data directly without wrapping + # Return in OpenAlgo standard format (matching Angel broker) return { - "symbol": depth_item.get("symbol", ""), - "exchange": depth_item.get("exchange", ""), - "ltp": depth_item.get("ltp", 0), - "oi": depth_item.get("oi", 0), - "total_buy_qty": depth_item.get("total_buy_qty", 0), - "total_sell_qty": depth_item.get("total_sell_qty", 0), - "depth": depth_item.get("depth", {"buy": [], "sell": []}), + "bids": bids, + "asks": asks, + "high": depth.get("high", 0) if "high" in depth else 0, + "low": depth.get("low", 0) if "low" in depth else 0, + "ltp": depth.get("ltp", 0), + "ltq": depth.get("last_trade_quantity", 0), + "open": depth.get("open", 0) if "open" in depth else 0, + "prev_close": depth.get("close", 0) if "close" in depth else 0, + "volume": depth.get("volume", 0) if "volume" in depth else 0, + "oi": depth.get("open_interest", 0), + "totalbuyqty": depth.get("total_buy_quantity", 0), + "totalsellqty": depth.get("total_sell_quantity", 0), } - # For multiple symbols, return the full list - return depth_data + except Exception as e: + raise Exception(f"Error fetching market depth: {str(e)}") def get_history( self, symbol: str, exchange: str, timeframe: str, start_date: str, end_date: str @@ -1145,6 +550,13 @@ def get_history( logger.error(f"Token not found for {symbol} on {exchange}") return pd.DataFrame() + # CRITICAL: get_token() returns float-like values (e.g. '3045.0') + # AliceBlue API requires clean integer tokens (e.g. '3045') + try: + token = str(int(float(token))) + except (ValueError, TypeError): + token = str(token) # fallback to string as-is + logger.debug(f"Found token {token} for {symbol}:{exchange}") # Convert exchange for AliceBlue API (same as Angel) @@ -1170,41 +582,34 @@ def get_history( if timeframe not in self.timeframe_map: supported = list(self.timeframe_map.keys()) logger.error( - f"Unsupported timeframe: {timeframe}. AliceBlue only supports: {', '.join(supported)}" + f"Unsupported timeframe: {timeframe}. AliceBlue supports: {', '.join(supported)}" ) return pd.DataFrame() - # Get the AliceBlue resolution format - aliceblue_timeframe = self.timeframe_map[timeframe] + # Determine whether we need to resample from 1-minute data + needs_resample = timeframe in self._RESAMPLE_TIMEFRAMES - # Get credentials - AliceBlue historical API uses user_id in Bearer token - from utils.config import get_broker_api_key, get_broker_api_secret + # Get the AliceBlue resolution format (always "1" for intraday, "D" for daily) + aliceblue_timeframe = self.timeframe_map[timeframe] - # IMPORTANT: AliceBlue historical API uses user_id (BROKER_API_KEY), not client_id! - # This is different from other APIs which use BROKER_API_SECRET - user_id = get_broker_api_key() # This should be '1412368' in your case - auth_token = self.session_id # This is the session token from login + # V3 API auth: Bearer {session_token} + auth_token = self.session_id - if not user_id or not auth_token: - logger.error( - f"Missing credentials for historical data - user_id: {'Yes' if user_id else 'No'}, auth_token: {'Yes' if auth_token else 'No'}" - ) + if not auth_token: + logger.error("Missing session token for historical data") return pd.DataFrame() - # Historical API uses different auth format: Bearer {user_id} {session_token} headers = { - "Authorization": f"Bearer {user_id} {auth_token}", + "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json", } # Alternative: Try adding session token to payload as some historical APIs expect it # payload['sessionId'] = session_id - # For indices, append ::index to the exchange - exchange_str = f"{exchange}::index" if exchange.endswith("IDX") else exchange - # Convert timestamps to milliseconds as required by AliceBlue API - # Format: Unix timestamp in milliseconds (like 1660128489000) + # Convert timestamps to milliseconds as required by AliceBlue V3 API + # V3 docs example: "from": "1660128489000" (13-digit milliseconds) import time from datetime import datetime @@ -1240,23 +645,25 @@ def convert_to_unix_ms(timestamp, is_end_date=False): dt = datetime.strptime(timestamp, "%Y-%m-%d") if is_end_date: # Set to end of day (23:59:59) for end dates - dt = dt.replace(hour=23, minute=59, second=59, microsecond=999999) + dt = dt.replace(hour=23, minute=59, second=59) else: - # For intraday data, set to market open (09:15:00) for start dates - # This ensures we get full day data from market open - dt = dt.replace(hour=9, minute=15, second=0, microsecond=0) + # For daily data, start at midnight (00:00:00) + # For intraday data, start at market open (09:15:00) + if aliceblue_timeframe == "D": + dt = dt.replace(hour=0, minute=0, second=0) + else: + dt = dt.replace(hour=9, minute=15, second=0) # Localize to IST timezone (AliceBlue expects IST timestamps) dt_ist = ist.localize(dt) - # Convert to Unix timestamp in seconds, then to milliseconds + # Convert to Unix timestamp in milliseconds result = str(int(dt_ist.timestamp() * 1000)) logger.debug(f"Converted '{timestamp}' to {result} (Date: {dt_ist})") return result except (ValueError, Exception) as e: logger.error(f"Error parsing timestamp string '{timestamp}': {e}") logger.error(f"Timestamp type: {type(timestamp)}, value: {repr(timestamp)}") - # Fallback to current time - THIS SHOULD NOT HAPPEN logger.error( "WARNING: Falling back to current time - this is likely a bug!" ) @@ -1275,74 +682,78 @@ def convert_to_unix_ms(timestamp, is_end_date=False): # Fallback to current time return str(int(time.time() * 1000)) - start_ms = convert_to_unix_ms(start_date, is_end_date=False) - end_ms = convert_to_unix_ms(end_date, is_end_date=True) + start_ts = convert_to_unix_ms(start_date, is_end_date=False) + end_ts = convert_to_unix_ms(end_date, is_end_date=True) # Log the conversion for debugging - logger.info( - f"Date conversion - Start: {start_date} -> {start_ms}, End: {end_date} -> {end_ms}" + logger.debug( + f"Date conversion - Start: {start_date} -> {start_ts}, End: {end_date} -> {end_ts}" ) # Validate that dates are not in the future current_time_ms = int(time.time() * 1000) - if int(start_ms) > current_time_ms: + if int(start_ts) > current_time_ms: logger.error( f"Start date {start_date} is in the future. Historical data is only available for past dates." ) return pd.DataFrame() # If end date is in future, cap it to current time - if int(end_ms) > current_time_ms: + if int(end_ts) > current_time_ms: logger.warning(f"End date {end_date} is in the future. Capping to current time.") - end_ms = str(current_time_ms) + end_ts = str(current_time_ms) # Ensure start and end times are different and valid - if start_ms == end_ms: + if start_ts == end_ts: logger.warning( - f"Start and end timestamps are the same: {start_ms}. Adjusting end time." + f"Start and end timestamps are the same: {start_ts}. Adjusting end time." ) # If they're the same, add one day to the end time - end_ms = str(int(end_ms) + 86400000) # Add 24 hours in milliseconds + end_ts = str(int(end_ts) + 86400000) # Add 24 hours in milliseconds # For intraday data, ensure minimum time range if timeframe != "D": - time_diff_ms = int(end_ms) - int(start_ms) + time_diff_ms = int(end_ts) - int(start_ts) min_range_ms = 3600000 # Minimum 1 hour for intraday data if time_diff_ms < min_range_ms: logger.warning( f"Time range too small ({time_diff_ms}ms). Extending to minimum 1 hour for intraday data." ) - end_ms = str(int(start_ms) + min_range_ms) + end_ts = str(int(start_ts) + min_range_ms) - # Prepare request payload according to AliceBlue API docs + # Prepare request payload according to AliceBlue V3 API docs payload = { "token": str(token), # Token should be the instrument token "exchange": exchange, # Exchange should be NSE, NFO, etc. - "from": start_ms, - "to": end_ms, + "from": start_ts, + "to": end_ts, "resolution": aliceblue_timeframe, } - # Debug logging - logger.debug("Making historical data request:") - logger.debug(f"URL: {HISTORICAL_API_URL}") - logger.debug(f"Headers: {headers}") - logger.debug(f"Payload: {payload}") + logger.debug(f"Historical API request: {symbol}:{exchange} res={aliceblue_timeframe} token={token}") # Make request to historical API client = get_httpx_client() - response = client.post(HISTORICAL_API_URL, headers=headers, json=payload, timeout=10) - response.raise_for_status() - data = response.json() + try: + response = client.post(HISTORICAL_API_URL, headers=headers, json=payload, timeout=15) + response.raise_for_status() + data = response.json() + except httpx.HTTPStatusError as http_err: + logger.error(f"HTTP Error: {http_err}") + logger.error(f"Response body: {http_err.response.text[:500]}") + return pd.DataFrame() + except Exception as req_err: + logger.error(f"Request failed: {type(req_err).__name__}: {req_err}") + return pd.DataFrame() # Check if response contains valid data - if data.get("stat") == "Not_Ok" or "result" not in data: + if str(data.get("stat", "")).lower() in ["not_ok", "not ok"] or "result" not in data: error_msg = data.get("emsg", "Unknown error") logger.error(f"Error in historical data response: {error_msg}") # Provide more helpful error messages based on the error - if "No data available" in error_msg: + if "No data available" in error_msg or "market time" in error_msg.lower() or "Session" in error_msg: if exchange in ["MCX", "NFO", "CDS"]: logger.error( f"No data available. For {exchange}, AliceBlue only provides data for current expiry contracts." @@ -1356,8 +767,18 @@ def convert_to_unix_ms(timestamp, is_end_date=False): ) else: logger.error(f"No historical data available for {symbol} on {exchange}.") + + # Check if we're during market hours (AliceBlue limitation) + from datetime import time as dtime + import pytz as _pytz + _ist = _pytz.timezone("Asia/Kolkata") + _now = datetime.now(_ist) + _market_open = dtime(8, 0) + _market_close = dtime(17, 30) + if _now.weekday() < 5 and _market_open <= _now.time() <= _market_close: logger.error( - "This could be due to: 1) Symbol not traded in the date range, 2) Invalid symbol, or 3) Data not available during market hours (available from 5:30 PM to 8 AM on weekdays)" + "AliceBlue historical data API is only available from 5:30 PM to 8 AM on weekdays " + "and fully on weekends/holidays. This request was made during market hours." ) return pd.DataFrame() @@ -1385,41 +806,33 @@ def convert_to_unix_ms(timestamp, is_end_date=False): logger.error("Missing required columns in historical data response") return pd.DataFrame() - # Log the first few rows of raw data to debug - logger.info( - f"First 3 rows of historical data from AliceBlue: {df.head(3).to_dict('records')}" - ) - logger.info(f"Total rows received: {len(df)}") + logger.debug(f"Received {len(df)} rows from AliceBlue for {symbol}:{exchange}") # Convert time column to datetime # AliceBlue returns time as string in format 'YYYY-MM-DD HH:MM:SS' df["timestamp"] = pd.to_datetime(df["timestamp"]) - # Handle different timeframes + # Handle different timeframes for timestamp conversion if timeframe == "D": - # For daily data, normalize to date only (no time component) - # Set time to midnight to represent the date + # For daily data, normalize to date only then add IST offset + # Match Angel's approach: naive datetime + 5:30, no tz_localize df["timestamp"] = df["timestamp"].dt.normalize() - - # Add IST offset (5:30 hours) for proper Unix timestamp conversion - # This ensures the date is correctly represented df["timestamp"] = df["timestamp"] + pd.Timedelta(hours=5, minutes=30) + + # Convert directly to Unix epoch (naive → treated as UTC by pandas) + df["timestamp"] = df["timestamp"].astype("int64") // 10**9 else: # For intraday data, adjust timestamps to represent the start of the candle # AliceBlue provides end-of-candle timestamps (XX:XX:59), we need start (XX:XX:00) df["timestamp"] = df["timestamp"].dt.floor("min") - # AliceBlue timestamps are in IST - need to localize them - import pytz - - ist = pytz.timezone("Asia/Kolkata") - - # Localize to IST (AliceBlue provides IST timestamps without timezone info) - df["timestamp"] = df["timestamp"].dt.tz_localize(ist) + # AliceBlue timestamps are in IST - localize them for correct epoch conversion + import pytz + ist = pytz.timezone("Asia/Kolkata") + df["timestamp"] = df["timestamp"].dt.tz_localize(ist) - # Convert timestamp to Unix epoch (seconds since 1970) - # This will correctly handle the IST timezone - df["timestamp"] = df["timestamp"].astype("int64") // 10**9 + # Convert to Unix epoch (seconds since 1970) + df["timestamp"] = df["timestamp"].astype("int64") // 10**9 # Ensure numeric columns are properly typed numeric_columns = ["open", "high", "low", "close", "volume"] @@ -1432,65 +845,44 @@ def convert_to_unix_ms(timestamp, is_end_date=False): .reset_index(drop=True) ) - # Add OI column with zeros (AliceBlue doesn't provide OI in historical data) + # Add OI column with zeros — AliceBlue's historical API does NOT return OI. + # This means OI Profile's "Daily OI Change" will show current OI as the + # full change amount (since previous day OI always = 0). df["oi"] = 0 - # For intraday data, ensure we have data from market open (9:15 AM) - if timeframe != "D" and not df.empty: - from datetime import datetime, time, timedelta - - import pytz - - ist = pytz.timezone("Asia/Kolkata") - - # Get the date from the first timestamp - first_timestamp = pd.to_datetime(df["timestamp"].iloc[0], unit="s") - first_timestamp = first_timestamp.tz_localize("UTC").tz_convert(ist) - - # Create market open time for that date - market_date = first_timestamp.date() - market_open = ist.localize(datetime.combine(market_date, time(9, 15))) - market_open_ts = int(market_open.timestamp()) - - # If first data point is after 9:15 AM, pad with data from 9:15 AM - if df["timestamp"].iloc[0] > market_open_ts: - logger.info( - "Padding data from market open (9:15 AM) to first available data point" - ) - - # Get the first available price as reference - first_price = df["open"].iloc[0] - - # Create timestamps from 9:15 AM to first data point (1-minute intervals) - current_ts = market_open_ts - padding_data = [] - - while current_ts < df["timestamp"].iloc[0]: - padding_data.append( - { - "timestamp": current_ts, - "open": first_price, - "high": first_price, - "low": first_price, - "close": first_price, - "volume": 0, - "oi": 0, - } - ) - current_ts += 60 # Add 1 minute - - if padding_data: - # Create DataFrame from padding data - padding_df = pd.DataFrame(padding_data) - # Concatenate with original data - df = pd.concat([padding_df, df], ignore_index=True) - # Re-sort by timestamp - df = df.sort_values("timestamp").reset_index(drop=True) - logger.info(f"Added {len(padding_data)} data points from market open") - # Return columns in the order matching Angel broker format df = df[["close", "high", "low", "open", "timestamp", "volume", "oi"]] + # Resample to requested timeframe if needed + if needs_resample: + resample_minutes = self._RESAMPLE_TIMEFRAMES[timeframe] + logger.info(f"Resampling 1m data to {timeframe} ({resample_minutes}m intervals)") + try: + # Convert timestamp back to datetime for resampling + import pytz as _pytz2 + _ist2 = _pytz2.timezone("Asia/Kolkata") + df["dt"] = pd.to_datetime(df["timestamp"], unit="s", utc=True).dt.tz_convert(_ist2) + df = df.set_index("dt") + + resampled = df.resample(f"{resample_minutes}min", label="left", closed="left").agg( + { + "open": "first", + "high": "max", + "low": "min", + "close": "last", + "volume": "sum", + "oi": "last", + } + ).dropna(subset=["open"]) + + # Convert back to unix timestamps + resampled["timestamp"] = resampled.index.astype("int64") // 10**9 + resampled = resampled.reset_index(drop=True) + df = resampled[["close", "high", "low", "open", "timestamp", "volume", "oi"]] + logger.info(f"Resampled to {len(df)} candles at {timeframe}") + except Exception as resample_err: + logger.error(f"Resampling to {timeframe} failed: {resample_err}. Returning 1m data.") + return df except Exception as e: diff --git a/broker/aliceblue/api/funds.py b/broker/aliceblue/api/funds.py index 4624e457e..c504c5ce9 100644 --- a/broker/aliceblue/api/funds.py +++ b/broker/aliceblue/api/funds.py @@ -1,7 +1,6 @@ # api/funds.py import json -import os import httpx @@ -11,8 +10,52 @@ logger = get_logger(__name__) +def _get_realized_pnl(client, headers): + """Fetch positions and sum up realizedPnl from all positions.""" + try: + positions_url = "https://a3.aliceblueonline.com/open-api/od/v1/positions" + response = client.get(positions_url, headers=headers) + response.raise_for_status() + + positions_data = response.json() + + if positions_data.get("status") != "Ok": + logger.warning( + f"Error fetching positions for PnL: {positions_data.get('message', 'Unknown error')}" + ) + return 0.0, 0.0 + + positions = positions_data.get("result", []) + if not positions: + return 0.0, 0.0 + + total_realized_pnl = 0.0 + total_unrealized_pnl = 0.0 + + for position in positions: + # Sum realized PnL from all positions + total_realized_pnl += float(position.get("realizedPnl", 0) or 0) + + # Calculate unrealized PnL for open positions + net_qty = int(position.get("netQuantity", 0) or 0) + if net_qty != 0: + buy_value = float(position.get("dayBuyValue", 0) or 0) + sell_value = float(position.get("daySellValue", 0) or 0) + # Unrealized = sell_value - buy_value - realized_pnl (for the position) + # Or more simply: net open value based on avg price vs current price + # Using the standard formula: total sell value - total buy value - realized pnl + unrealized = sell_value - buy_value - float(position.get("realizedPnl", 0) or 0) + total_unrealized_pnl += unrealized + + return total_realized_pnl, total_unrealized_pnl + + except Exception as e: + logger.warning(f"Failed to fetch positions for PnL calculation: {str(e)}") + return 0.0, 0.0 + + def get_margin_data(auth_token): - """Fetch margin data from Alice Blue's API using the provided auth token and shared connection pooling.""" + """Fetch margin/funds data from Alice Blue's V2 API using the provided auth token and shared connection pooling.""" # Initialize processed data dictionary processed_margin_data = { "availablecash": "0.00", @@ -26,51 +69,56 @@ def get_margin_data(auth_token): # Get the shared httpx client with connection pooling client = get_httpx_client() - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/limits/getRmsLimits" + url = "https://a3.aliceblueonline.com/open-api/od/v1/limits/" + # V2 API uses just the auth_token (JWT) in the Bearer header headers = { "Authorization": f"Bearer {auth_token}", + "Content-Type": "application/json", } # Make the API request using the shared client - logger.debug("Making getRmsLimits request to AliceBlue API") response = client.get(url, headers=headers) response.raise_for_status() margin_data = response.json() - logger.debug(f"Funds Details: {json.dumps(margin_data, indent=2)}") - - # Process the margin data - for item in margin_data: - if item.get("stat") == "Not_Ok": - # Log the error or return an empty dictionary to indicate failure - logger.error(f"Error fetching margin data: {item.get('emsg', 'Unknown error')}") - return {} - - # Accumulate values - processed_margin_data["availablecash"] = "{:.2f}".format(float(item.get("net", 0))) - processed_margin_data["collateral"] = "{:.2f}".format( - float(item.get("collateralvalue", 0)) - ) - processed_margin_data["m2munrealized"] = "{:.2f}".format( - float(item.get("unrealizedMtomPrsnt", 0)) - ) - processed_margin_data["m2mrealized"] = "{:.2f}".format( - float(item.get("realizedMtomPrsnt", 0)) - ) - processed_margin_data["utiliseddebits"] = "{:.2f}".format( - float(item.get("cncMarginUsed", 0)) - ) + + # Check for API-level errors in the new response format + if margin_data.get("status") != "Ok": + error_msg = margin_data.get("message", "Unknown error") + logger.error(f"Error fetching margin data: {error_msg}") + return {} + + # Process the result array from the V2 API response + results = margin_data.get("result", []) + if not results: + logger.warning("No margin data returned from AliceBlue API") + return processed_margin_data + + item = results[0] + + # Fetch realized & unrealized PnL from positions API + realized_pnl, unrealized_pnl = _get_realized_pnl(client, headers) + + # Map V2 API fields to OpenAlgo format + processed_margin_data["availablecash"] = "{:.2f}".format( + float(item.get("tradingLimit", 0)) + ) + processed_margin_data["collateral"] = "{:.2f}".format( + float(item.get("collateralMargin", 0)) + ) + processed_margin_data["m2munrealized"] = "{:.2f}".format(unrealized_pnl) + processed_margin_data["m2mrealized"] = "{:.2f}".format(realized_pnl) + processed_margin_data["utiliseddebits"] = "{:.2f}".format( + float(item.get("utilizedMargin", 0)) + ) return processed_margin_data except KeyError as e: - # Return an empty dictionary in case of unexpected data structure logger.error(f"KeyError while processing margin data: {str(e)}") return {} except httpx.HTTPError as e: - # Handle HTTPX connection errors logger.error(f"HTTP connection error: {str(e)}") return {} except Exception as e: - # General exception handling logger.error(f"An exception occurred while fetching margin data: {str(e)}") return {} diff --git a/broker/aliceblue/api/order_api.py b/broker/aliceblue/api/order_api.py index 03300a64b..34ea0e83f 100644 --- a/broker/aliceblue/api/order_api.py +++ b/broker/aliceblue/api/order_api.py @@ -1,35 +1,41 @@ import json import os -import urllib.parse import httpx +from broker.aliceblue.mapping.order_data import ( + normalize_holding, + normalize_order, + normalize_position, + normalize_trade, +) from broker.aliceblue.mapping.transform_data import ( map_product_type, reverse_map_product_type, transform_data, transform_modify_order_data, ) -from database.auth_db import get_auth_token -from database.token_db import get_br_symbol, get_oa_symbol -from utils.config import get_broker_api_key, get_broker_api_secret +from database.token_db import get_br_symbol, get_oa_symbol, get_token from utils.httpx_client import get_httpx_client from utils.logging import get_logger logger = get_logger(__name__) +# AliceBlue V2 API base URL +BASE_URL = "https://a3.aliceblueonline.com" + + +# ─── API request helper ────────────────────────────────────────────────────── + def get_api_response(endpoint, auth, method="GET", payload=None): - """Make API requests to AliceBlue API using shared connection pooling.""" + """Make API requests to AliceBlue V2 API using shared connection pooling.""" try: - # Get the shared httpx client with connection pooling client = get_httpx_client() - - AUTH_TOKEN = auth - url = f"https://ant.aliceblueonline.com{endpoint}" + url = f"{BASE_URL}{endpoint}" headers = { - "Authorization": f"Bearer {get_broker_api_secret()} {AUTH_TOKEN}", + "Authorization": f"Bearer {auth}", "Content-Type": "application/json", } @@ -61,68 +67,127 @@ def get_api_response(endpoint, auth, method="GET", payload=None): except httpx.HTTPError as e: logger.error(f"HTTP error during API request: {str(e)}") - return {"stat": "Not_Ok", "emsg": f"HTTP error: {str(e)}"} + return {"status": "Error", "message": f"HTTP error: {str(e)}"} except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}") - return {"stat": "Not_Ok", "emsg": f"Invalid JSON response: {str(e)}"} + return {"status": "Error", "message": f"Invalid JSON response: {str(e)}"} except Exception as e: logger.error(f"Error during API request: {str(e)}") - return {"stat": "Not_Ok", "emsg": f"General error: {str(e)}"} + return {"status": "Error", "message": f"General error: {str(e)}"} +def _extract_result(response_data): + """Extract result list from V2 API response, handling errors.""" + if isinstance(response_data, dict): + if response_data.get("status") == "Ok": + return response_data.get("result", []) + else: + msg = response_data.get("message", "Unknown error") + logger.error(f"API error: {msg}") + return None + return response_data # fallback: return as-is if not a dict + + +# ─── Order book / Trade book / Positions / Holdings ────────────────────────── + def get_order_book(auth): - return get_api_response("/rest/AliceBlueAPIService/api/placeOrder/fetchOrderBook", auth) + """Fetch order book from V2 API and normalize to old field names.""" + response = get_api_response("/open-api/od/v1/orders/book", auth) + result = _extract_result(response) + + if result is None: + # V2 API returns error message when there are no orders + # Treat "Failed to retrieve" as empty, not an error + msg = response.get("message", "") + if "Failed to retrieve" in msg or "No orders" in msg.lower(): + logger.info(f"No orders found: {msg}") + return [] + return {"stat": "Not_Ok", "emsg": msg or "Failed to fetch order book"} + + if not result: + return [] + + # Normalize each order to old field names + return [normalize_order(order) for order in result] def get_trade_book(auth): - response = get_api_response("/rest/AliceBlueAPIService/api/placeOrder/fetchTradeBook", auth) + """Fetch trade book from V2 API and normalize to old field names.""" + response = get_api_response("/open-api/od/v1/orders/trades", auth) + result = _extract_result(response) - # Log the raw tradebook response from AliceBlue API logger.info(f"AliceBlue tradebook API response type: {type(response)}") - if response: - if isinstance(response, list) and len(response) > 0: - logger.info(f"First trade from AliceBlue API: {response[0]}") - elif isinstance(response, dict): - logger.info(f"AliceBlue API returned dict with keys: {list(response.keys())}") - if response.get("stat") == "Ok": - logger.info("Success response, checking data field...") - return response + if result is None: + # V2 API returns error message when there are no trades + # Treat "No trades found" as empty, not an error + msg = response.get("message", "") + if "No trades" in msg or "not found" in msg.lower(): + logger.info(f"No trades found: {msg}") + return [] + return {"stat": "Not_Ok", "emsg": msg or "Failed to fetch trade book"} + + if not result: + return [] + + # Normalize each trade to old field names + return [normalize_trade(trade) for trade in result] def get_positions(auth): - payload = json.dumps({"ret": "NET"}) + """Fetch positions from V2 API and normalize to old field names.""" + response = get_api_response("/open-api/od/v1/positions", auth) + result = _extract_result(response) - return get_api_response( - "/rest/AliceBlueAPIService/api/positionAndHoldings/positionBook", - auth, - "POST", - payload=payload, - ) + if result is None: + # V2 API returns error message when there are no positions + msg = response.get("message", "") + if "No position" in msg or "not found" in msg.lower() or "Failed to retrieve" in msg: + logger.info(f"No positions found: {msg}") + return [] + return {"stat": "Not_Ok", "emsg": msg or "Failed to fetch positions"} + + if not result: + return [] + + # Normalize each position to old field names + return [normalize_position(pos) for pos in result] def get_holdings(auth): - return get_api_response("/rest/AliceBlueAPIService/api/positionAndHoldings/holdings", auth) + """Fetch holdings from V2 API and normalize to old field names.""" + response = get_api_response("/open-api/od/v1/holdings/CNC", auth) + result = _extract_result(response) + + if result is None: + # V2 API returns error message when there are no holdings + msg = response.get("message", "") + if "No holding" in msg or "not found" in msg.lower() or "Failed to retrieve" in msg: + logger.info(f"No holdings found: {msg}") + return [] + return {"stat": "Not_Ok", "emsg": msg or "Failed to fetch holdings"} + + if not result: + return [] + return [normalize_holding(h) for h in result] + + +# ─── Open position lookup ──────────────────────────────────────────────────── def get_open_position(tradingsymbol, exchange, product, auth): - # Convert Trading Symbol from OpenAlgo Format to Broker Format Before Search in OpenPosition + """Get net quantity for a specific symbol/exchange/product.""" + # Convert Trading Symbol from OpenAlgo Format to Broker Format Before Search tradingsymbol = get_br_symbol(tradingsymbol, exchange) position_data = get_positions(auth) if isinstance(position_data, dict): - if position_data["stat"] == "Not_Ok": - # Handle the case where there is an error in the data - # For example, you might want to display an error message to the user - # or pass an empty list or dictionary to the template. - logger.info(f"Error fetching order data: {position_data['emsg']}") + if position_data.get("stat") == "Not_Ok": + logger.info(f"Error fetching position data: {position_data.get('emsg')}") position_data = {} - else: - position_data = position_data net_qty = "0" - # logger.info(f"{positions_data['data']['net']}") if position_data: for position in position_data: @@ -133,68 +198,73 @@ def get_open_position(tradingsymbol, exchange, product, auth): ): net_qty = position.get("Netqty", "0") logger.info(f"Net Quantity {net_qty}") - break # Assuming you need the first match + break return net_qty +# ─── Place order ────────────────────────────────────────────────────────────── + def place_order_api(data, auth): - """Place an order using the AliceBlue API with shared connection pooling.""" + """Place an order using the AliceBlue V2 API.""" try: - # Get the shared httpx client client = get_httpx_client() - AUTH_TOKEN = auth - newdata = transform_data(data) + # Build V2 API payload via transform_data + payload_item = transform_data(data) + payload = [payload_item] - # Prepare headers and payload headers = { - "Authorization": f"Bearer {get_broker_api_secret()} {AUTH_TOKEN}", + "Authorization": f"Bearer {auth}", "Content-Type": "application/json", } - payload = [newdata] logger.debug(f"Place order payload: {json.dumps(payload, indent=2)}") - # Make the API request - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/placeOrder/executePlaceOrder" + url = f"{BASE_URL}/open-api/od/v1/orders/placeorder" response = client.post(url, json=payload, headers=headers) response.raise_for_status() response_data = response.json() logger.debug(f"Place order response: {json.dumps(response_data, indent=2)}") - # Process the response - response_data = response_data[0] - logger.info(f"Place order response: {response_data}") - if response_data["stat"] == "Ok": - orderid = response_data["NOrdNo"] + # Process the V2 API response + orderid = None + if response_data.get("status") == "Ok": + results = response_data.get("result", []) + if results and len(results) > 0: + result_item = results[0] + # Check for per-result error (AliceBlue may return top-level Ok but result-level error) + result_status = result_item.get("status", "") + if result_status and result_status != "Ok" and result_item.get("brokerOrderId", "") == "": + error_msg = result_item.get("message", "Unknown error in result") + logger.error(f"Order placement failed (result error {result_status}): {error_msg}") + else: + orderid = result_item.get("brokerOrderId") + logger.info(f"Order placed successfully: {orderid}") else: - # Extract error message if present - error_msg = response_data.get("emsg", "No error message provided by API") + error_msg = response_data.get("message", "No error message provided by API") logger.error(f"Order placement failed: {error_msg}") - logger.error(f"Order placement error: {error_msg}") - orderid = None - # Add status attribute to response object to match what PlaceOrder endpoint expects + # Add status attribute for compatibility response.status = response.status_code return response, response_data, orderid except httpx.HTTPError as e: logger.error(f"HTTP error during place order: {str(e)}") - response_data = {"stat": "Not_Ok", "emsg": f"HTTP error: {str(e)}"} - # Create a simple object with status attribute set to 500 + response_data = {"status": "Error", "message": f"HTTP error: {str(e)}"} response = type("", (), {"status": 500, "status_code": 500})() return response, response_data, None except Exception as e: logger.error(f"Error during place order: {str(e)}") - response_data = {"stat": "Not_Ok", "emsg": f"General error: {str(e)}"} - # Create a simple object with status attribute set to 500 + response_data = {"status": "Error", "message": f"General error: {str(e)}"} response = type("", (), {"status": 500, "status_code": 500})() return response, response_data, None +# ─── Smart order ────────────────────────────────────────────────────────────── + def place_smartorder_api(data, auth): AUTH_TOKEN = auth @@ -209,7 +279,7 @@ def place_smartorder_api(data, auth): # Get current open position for the symbol current_position = int( - get_open_position(symbol, exchange, map_product_type(product), AUTH_TOKEN) + get_open_position(symbol, exchange, reverse_map_product_type(map_product_type(product)), AUTH_TOKEN) ) logger.info(f"position_size : {position_size}") @@ -219,17 +289,11 @@ def place_smartorder_api(data, auth): action = None quantity = 0 - # If both position_size and current_position are 0, do nothing # If both position_size and current_position are 0, do nothing if position_size == 0 and current_position == 0 and int(data["quantity"]) != 0: action = data["action"] quantity = data["quantity"] - # logger.info(f"action : {action}") - # logger.info(f"Quantity : {quantity}") res, response, orderid = place_order_api(data, AUTH_TOKEN) - # logger.info(f"{res}") - # logger.info(f"{response}") - return res, response, orderid elif position_size == current_position: @@ -244,7 +308,7 @@ def place_smartorder_api(data, auth): "message": "No action needed. Position size matches current position", } orderid = None - return res, response, orderid # res remains None as no API call was mad + return res, response, orderid if position_size == 0 and current_position > 0: action = "SELL" @@ -259,11 +323,9 @@ def place_smartorder_api(data, auth): if position_size > current_position: action = "BUY" quantity = position_size - current_position - # logger.info(f"smart buy quantity : {quantity}") elif position_size < current_position: action = "SELL" quantity = current_position - position_size - # logger.info(f"smart sell quantity : {quantity}") if action: # Prepare data for placing the order @@ -271,31 +333,24 @@ def place_smartorder_api(data, auth): order_data["action"] = action order_data["quantity"] = str(quantity) - # logger.info(f"{order_data}") # Place the order res, response, orderid = place_order_api(order_data, AUTH_TOKEN) - # logger.info(f"{res}") - # logger.info(f"{response}") return res, response, orderid +# ─── Close all positions ────────────────────────────────────────────────────── + def close_all_positions(current_api_key, auth): AUTH_TOKEN = auth # Fetch the current open positions positions_response = get_positions(AUTH_TOKEN) if isinstance(positions_response, dict): - if positions_response["stat"] == "Not_Ok": - # Handle the case where there is an error in the data - # For example, you might want to display an error message to the user - # or pass an empty list or dictionary to the template. - logger.info(f"Error fetching order data: {positions_response['emsg']}") + if positions_response.get("stat") == "Not_Ok": + logger.info(f"Error fetching position data: {positions_response.get('emsg')}") positions_response = {} - else: - positions_response = positions_response - # logger.info(f"{positions_response}") # Check if the positions data is null or empty if positions_response is None or not positions_response: return {"message": "No Open Positions Found"}, 200 @@ -332,56 +387,44 @@ def close_all_positions(current_api_key, auth): logger.info(f"{api_response}") - # Note: Ensure place_order_api handles any errors and logs accordingly - return {"status": "success", "message": "All Open Positions SquaredOff"}, 200 +# ─── Cancel order ───────────────────────────────────────────────────────────── + def cancel_order(orderid, auth): - """Cancel an order using the AliceBlue API with shared connection pooling.""" + """Cancel an order using the AliceBlue V2 API.""" try: - # Get the shared httpx client client = get_httpx_client() - AUTH_TOKEN = auth - order_book_response = get_order_book(AUTH_TOKEN) - - # Find the order details - Trading_symbol = "" - Exchange = "" - orders = order_book_response - for order in orders: - if order.get("Nstordno") == orderid: - Trading_symbol = order.get("Trsym") - Exchange = order.get("Exchange") - - # Prepare headers and payload headers = { - "Authorization": f"Bearer {get_broker_api_secret()} {AUTH_TOKEN}", + "Authorization": f"Bearer {auth}", "Content-Type": "application/json", } - payload = {"exch": Exchange, "nestOrderNumber": orderid, "trading_symbol": Trading_symbol} + # V2 API only needs brokerOrderId to cancel + payload = {"brokerOrderId": str(orderid)} logger.debug(f"Cancel order payload: {json.dumps(payload, indent=2)}") - # Make the API request - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/placeOrder/cancelOrder" + url = f"{BASE_URL}/open-api/od/v1/orders/cancel" response = client.post(url, json=payload, headers=headers) response.raise_for_status() response_data = response.json() logger.debug(f"Cancel order response: {json.dumps(response_data, indent=2)}") - # Check if the request was successful - if response_data.get("stat") == "Ok": - # Return a success response - return {"status": "success", "orderid": response_data["nestOrderNumber"]}, 200 + # Check V2 API response + if response_data.get("status") == "Ok": + results = response_data.get("result", []) + cancelled_id = orderid + if results and len(results) > 0: + cancelled_id = results[0].get("brokerOrderId", orderid) + return {"status": "success", "orderid": cancelled_id}, 200 else: - # Return an error response return { "status": "error", - "message": response_data.get("emsg", "Failed to cancel order"), + "message": response_data.get("message", "Failed to cancel order"), }, response.status_code except httpx.HTTPError as e: @@ -392,38 +435,41 @@ def cancel_order(orderid, auth): return {"status": "error", "message": f"General error: {str(e)}"}, 500 +# ─── Modify order ───────────────────────────────────────────────────────────── + def modify_order(data, auth): - """Modify an order using the AliceBlue API with shared connection pooling.""" + """Modify an order using the AliceBlue V2 API.""" try: - # Get the shared httpx client client = get_httpx_client() - AUTH_TOKEN = auth - newdata = transform_modify_order_data(data) + # Build V2 API modify payload via transform_modify_order_data + payload = transform_modify_order_data(data) - # Prepare headers headers = { - "Authorization": f"Bearer {get_broker_api_secret()} {AUTH_TOKEN}", + "Authorization": f"Bearer {auth}", "Content-Type": "application/json", } - logger.debug(f"Modify order payload: {json.dumps(newdata, indent=2)}") + logger.debug(f"Modify order payload: {json.dumps(payload, indent=2)}") - # Make the API request - url = "https://ant.aliceblueonline.com/rest/AliceBlueAPIService/api/placeOrder/modifyOrder" - response = client.post(url, json=newdata, headers=headers) + url = f"{BASE_URL}/open-api/od/v1/orders/modify" + response = client.post(url, json=payload, headers=headers) response.raise_for_status() response_data = response.json() logger.debug(f"Modify order response: {json.dumps(response_data, indent=2)}") - # Process the response - if response_data.get("stat") == "Ok": - return {"status": "success", "orderid": response_data["nestOrderNumber"]}, 200 + # Process V2 API response + if response_data.get("status") == "Ok": + results = response_data.get("result", []) + modified_id = data.get("orderid") + if results and len(results) > 0: + modified_id = results[0].get("brokerOrderId", modified_id) + return {"status": "success", "orderid": modified_id}, 200 else: return { "status": "error", - "message": response_data.get("emsg", "Failed to modify order"), + "message": response_data.get("message", "Failed to modify order"), }, response.status_code except httpx.HTTPError as e: @@ -434,18 +480,20 @@ def modify_order(data, auth): return {"status": "error", "message": f"General error: {str(e)}"}, 500 +# ─── Cancel all orders ──────────────────────────────────────────────────────── + def cancel_all_orders_api(data, auth): AUTH_TOKEN = auth - # Get the order book + # Get the order book (already normalized to old field names) order_book_response = get_order_book(AUTH_TOKEN) - # logger.info(f"{order_book_response}") + if isinstance(order_book_response, dict): - if order_book_response["stat"] == "Not_Ok": - return [], [] # Return empty lists indicating failure to retrieve the order book + if order_book_response.get("stat") == "Not_Ok": + return [], [] - # Filter orders that are in 'open' or 'trigger_pending' state + # Filter orders that are in 'open' or 'trigger pending' state orders_to_cancel = [ - order for order in order_book_response if order["Status"] in ["open", "trigger pending"] + order for order in order_book_response if order.get("Status") in ["open", "trigger pending"] ] logger.info(f"{orders_to_cancel}") canceled_orders = [] diff --git a/broker/aliceblue/database/master_contract_db.py b/broker/aliceblue/database/master_contract_db.py index f8906a15b..83079cff8 100644 --- a/broker/aliceblue/database/master_contract_db.py +++ b/broker/aliceblue/database/master_contract_db.py @@ -140,14 +140,14 @@ def download_csv_aliceblue_data(output_path): logger.info("Downloading Master Contract CSV Files") # URLs of the CSV files to be downloaded csv_urls = { - "CDS": "https://v2api.aliceblueonline.com/restpy/static/contract_master/CDS.csv", - "NFO": "https://v2api.aliceblueonline.com/restpy/static/contract_master/NFO.csv", - "NSE": "https://v2api.aliceblueonline.com/restpy/static/contract_master/NSE.csv", - "BSE": "https://v2api.aliceblueonline.com/restpy/static/contract_master/BSE.csv", - "BFO": "https://v2api.aliceblueonline.com/restpy/static/contract_master/BFO.csv", - "BCD": "https://v2api.aliceblueonline.com/restpy/static/contract_master/BCD.csv", - "MCX": "https://v2api.aliceblueonline.com/restpy/static/contract_master/MCX.csv", - "INDICES": "https://v2api.aliceblueonline.com/restpy/static/contract_master/INDICES.csv", + "CDS": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/CDS.csv", + "NFO": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/NFO.csv", + "NSE": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/NSE.csv", + "BSE": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/BSE.csv", + "BFO": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/BFO.csv", + "BCD": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/BCD.csv", + "MCX": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/MCX.csv", + "INDICES": "https://v2api.aliceblueonline.com/restpy/static/contract_master/V2/INDICES.csv", } # Get the shared httpx client with connection pooling @@ -598,15 +598,49 @@ def process_aliceblue_indices_csv(path): {"NSE": "NSE_INDEX", "BSE": "BSE_INDEX", "MCX": "MCX_INDEX"} ) token_df["tick_size"] = 0.01 + # Step 1: Remove all spaces from symbol names (handles most NSE index mappings automatically) + token_df["symbol"] = token_df["symbol"].str.replace(" ", "", regex=False) + + # Step 2: Apply only the special mappings that involve actual renaming (not just space removal) token_df["symbol"] = token_df["symbol"].replace( { - "NIFTY 50": "NIFTY", - "NIFTY NEXT 50": "NIFTYNXT50", - "NIFTY FIN SERVICE": "FINNIFTY", - "NIFTY BANK": "BANKNIFTY", - "NIFTY MIDCAP SELECT": "MIDCPNIFTY", - "INDIA VIX": "INDIAVIX", + # NSE Index Symbols requiring renaming (not just space removal) + "NIFTY50": "NIFTY", + "NIFTYNEXT50": "NIFTYNXT50", + "NIFTYFINSERVICE": "FINNIFTY", + "NIFTYBANK": "BANKNIFTY", + "NIFTYMIDCAPSELECT": "MIDCPNIFTY", + # BSE Index Symbols (AliceBlue -> OpenAlgo) "SNSX50": "SENSEX50", + "SNXT50": "BSESENSEXNEXT50", + "MID150": "BSE150MIDCAPINDEX", + "LMI250": "BSE250LARGEMIDCAPINDEX", + "MSL400": "BSE400MIDSMALLCAPINDEX", + "AUTO": "BSEAUTO", + "BSE CG": "BSECAPITALGOODS", + "CARBON": "BSECARBONEX", + "BSE CD": "BSECONSUMERDURABLES", + "CPSE": "BSECPSE", + "ENERGY": "BSEENERGY", + "BSEFMC": "BSEFASTMOVINGCONSUMERGOODS", + "FIN": "BSEFINANCIALSERVICES", + "GREENX": "BSEGREENEX", + "BSE HC": "BSEHEALTHCARE", + "INFRA": "BSEINDIAINFRASTRUCTUREINDEX", + "INDSTR": "BSEINDUSTRIALS", + "BSE IT": "BSEINFORMATIONTECHNOLOGY", + "LRGCAP": "BSELARGECAP", + "METAL": "BSEMETAL", + "MIDCAP": "BSEMIDCAP", + "MIDSEL": "BSEMIDCAPSELECTINDEX", + "OILGAS": "BSEOIL&GAS", + "POWER": "BSEPOWER", + "REALTY": "BSEREALTY", + "SMLCAP": "BSESMALLCAP", + "SMLSEL": "BSESMALLCAPSELECTINDEX", + "SMEIPO": "BSESMEIPO", + "TECK": "BSETECK", + "TELCOM": "BSETELECOM", } ) diff --git a/broker/aliceblue/mapping/order_data.py b/broker/aliceblue/mapping/order_data.py index 51749c936..4885bf616 100644 --- a/broker/aliceblue/mapping/order_data.py +++ b/broker/aliceblue/mapping/order_data.py @@ -1,12 +1,141 @@ import json from datetime import date +from broker.aliceblue.mapping.transform_data import reverse_map_product_type from database.token_db import get_oa_symbol, get_symbol from utils.logging import get_logger logger = get_logger(__name__) +# ─── V2 API response normalization ─────────────────────────────────────────── +# These map AliceBlue V2 API response fields to the old field names that the +# downstream transform/map functions in this file expect. + + +def normalize_order(order): + """Normalize a V2 order-book entry to old field names.""" + # Map transactionType BUY/SELL → B/S + trantype_map = {"BUY": "B", "SELL": "S"} + trantype = trantype_map.get(order.get("transactionType", ""), order.get("transactionType", "")) + + # Map orderType MARKET/LIMIT/SL/SLM → MKT/L/SL/SL-M + prctype_map = {"MARKET": "MKT", "LIMIT": "L", "SL": "SL", "SLM": "SL-M"} + prctype = prctype_map.get(order.get("orderType", ""), order.get("orderType", "")) + + # Map orderStatus to lowercase for downstream compatibility + status_raw = order.get("orderStatus", "") + status = status_raw.lower() if status_raw else "" + + return { + # Core identifiers + "Nstordno": order.get("brokerOrderId", ""), + "Exchange": order.get("exchange", ""), + "Trsym": order.get("formattedInstrumentName", order.get("tradingSymbol", "")), + "Tsym": order.get("formattedInstrumentName", order.get("tradingSymbol", "")), + # Transaction + "Trantype": trantype, + "transactionType": order.get("transactionType", ""), + # Quantities + "Qty": str(order.get("quantity", 0)), + "Fillshares": str(order.get("filledQuantity", 0)), + "cancelledQuantity": str(order.get("cancelledQuantity", 0)), + "pendingQuantity": str(order.get("pendingQuantity", 0)), + # Prices + "Prc": str(order.get("price", 0)), + "Trgprc": str(order.get("slTriggerPrice", 0)), + "AvgPrice": str(order.get("averageTradedPrice", 0)), + # Order metadata + "Prctype": prctype, + "orderType": order.get("orderType", ""), + "Pcode": reverse_map_product_type(order.get("product", "")), + "Status": status, + "orderStatus": status_raw, + "orderentrytime": order.get("orderTime", ""), + "rejectionReason": order.get("rejectionReason", ""), + "orderTag": order.get("orderTag", ""), + # Complexity & validity + "orderComplexity": order.get("orderComplexity", ""), + "validity": order.get("validity", ""), + # Exchange-level + "exchangeOrderId": order.get("exchangeOrderId", ""), + "formattedInstrumentName": order.get("formattedInstrumentName", ""), + "instrumentId": order.get("instrumentId", ""), + } + + +def normalize_trade(trade): + """Normalize a V2 trade-book entry to old field names.""" + trantype_map = {"BUY": "B", "SELL": "S"} + trantype = trantype_map.get(trade.get("transactionType", ""), trade.get("transactionType", "")) + + prctype_map = {"MARKET": "MKT", "LIMIT": "L", "SL": "SL", "SLM": "SL-M"} + prctype = prctype_map.get(trade.get("orderType", ""), trade.get("orderType", "")) + + return { + "Nstordno": trade.get("brokerOrderId", ""), + "Exchange": trade.get("exchange", ""), + "Tsym": trade.get("formattedInstrumentName", trade.get("tradingSymbol", "")), + "Trsym": trade.get("formattedInstrumentName", trade.get("tradingSymbol", "")), + "Trantype": trantype, + "transactionType": trade.get("transactionType", ""), + "Qty": str(trade.get("filledQuantity", 0)), + "AvgPrice": str(trade.get("tradedPrice", 0)), + "Prc": str(trade.get("tradedPrice", 0)), + "Pcode": reverse_map_product_type(trade.get("product", "")), + "Prctype": prctype, + "Filltime": trade.get("fillTimestamp", trade.get("orderTime", "")), + "orderTag": trade.get("orderTag", ""), + "exchangeOrderId": trade.get("exchangeOrderId", ""), + "exchangeTradeId": trade.get("exchangeTradeId", ""), + } + + +def normalize_position(position): + """Normalize a V2 position entry to old field names.""" + return { + "Tsym": position.get("tradingSymbol", position.get("formattedInstrumentName", "")), + "Exchange": position.get("exchange", ""), + "Pcode": reverse_map_product_type(position.get("product", "")), + "Netqty": str(position.get("netQuantity", 0)), + "Buyavgprc": str(position.get("dayBuyPrice", position.get("netAveragePrice", 0))), + "Sellavgprc": str(position.get("daySellPrice", 0)), + "Bqty": str(position.get("buyQuantity", position.get("dayBuyQuantity", 0))), + "Sqty": str(position.get("sellQuantity", position.get("daySellQuantity", 0))), + "LTP": str(position.get("ltp", position.get("previousDayClose", 0))), + "MtoM": str(position.get("mtm", position.get("markToMarket", 0))), + "unrealisedprofitloss": str(position.get("unrealisedPnl", position.get("unrealisedProfitLoss", 0))), + "realisedprofitloss": str(position.get("realizedPnl", position.get("realisedPnl", 0))), + "instrumentId": position.get("instrumentId", ""), + } + + +def normalize_holding(h): + """Normalize a V2 holdings entry to old field names.""" + nse_symbol = h.get("nseTradingSymbol", "") + bse_symbol = h.get("bseTradingSymbol", "") + # Use NSE symbol as primary, fallback to BSE + primary_symbol = nse_symbol or bse_symbol + # Determine exchange based on which symbol is available + exchange = "NSE" if nse_symbol else ("BSE" if bse_symbol else "NSE") + + return { + "Nsetsym": nse_symbol, + "Bsetsym": bse_symbol, + "ExchSeg1": exchange, + "Exchange": exchange, + "Holdqty": str(h.get("dpQuantity", h.get("totalQuantity", 0))), + "HUqty": str(h.get("t1Quantity", 0)), + "Ltp": str(h.get("ltp", 0)), + "Price": str(h.get("averageTradedPrice", h.get("investedPrice", 0))), + "Pcode": "CNC", + "Symbol": primary_symbol, + "instrumentId": h.get("nseInstrumentId", h.get("bseInstrumentId", "")), + "isin": h.get("isin", ""), + "sellableQty": str(h.get("sellableQty", 0)), + } + + def map_order_data(order_data): """ Processes and modifies a list of order dictionaries based on specific conditions. @@ -188,7 +317,14 @@ def map_trade_data(trade_data): # Check if a symbol was found; if so, update the trading_symbol in the current trade if symbol: - trade["Tsym"] = get_oa_symbol(brsymbol=symbol, exchange=exchange) + mapped_symbol = get_oa_symbol(brsymbol=symbol, exchange=exchange) + # Keep original symbol if mapping returns None (e.g., NFO symbols) + if mapped_symbol is not None: + trade["Tsym"] = mapped_symbol + else: + logger.warning( + f"Symbol mapping returned None for {symbol} on {exchange}. Keeping original symbol." + ) else: logger.info( f"{symbol} and exchange {exchange} not found. Keeping original trading symbol." @@ -220,6 +356,27 @@ def transform_tradebook_data(tradebook_data): else: action = trantype + # Parse Filltime which can be: + # - Full datetime: '04-03-2026 15:21:01' (DD-MM-YYYY HH:MM:SS) + # - Time only: '15:21:01' (HH:MM:SS) + filltime_raw = trade.get("Filltime", "") + timestamp = "" + if filltime_raw: + try: + from datetime import datetime as _dt + # Try full datetime format: DD-MM-YYYY HH:MM:SS + parsed = _dt.strptime(filltime_raw, "%d-%m-%Y %H:%M:%S") + timestamp = parsed.strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + try: + # Try time-only format: HH:MM:SS (prepend today's date) + parsed = _dt.strptime(filltime_raw, "%H:%M:%S") + timestamp = f"{date.today()} {filltime_raw}" + except ValueError: + # Fallback: use as-is + logger.warning(f"Could not parse Filltime: {filltime_raw}") + timestamp = filltime_raw + transformed_trade = { "symbol": trade.get("Tsym"), "exchange": trade.get("Exchange", ""), @@ -229,7 +386,7 @@ def transform_tradebook_data(tradebook_data): "average_price": average_price, "trade_value": quantity * average_price, "orderid": trade.get("Nstordno", ""), - "timestamp": f"{date.today()} {trade.get('Filltime', '')}" if trade.get("Filltime") else "", + "timestamp": timestamp, } transformed_data.append(transformed_trade) return transformed_data diff --git a/broker/aliceblue/mapping/transform_data.py b/broker/aliceblue/mapping/transform_data.py index f5c683a39..7bb308501 100644 --- a/broker/aliceblue/mapping/transform_data.py +++ b/broker/aliceblue/mapping/transform_data.py @@ -1,77 +1,106 @@ # Mapping OpenAlgo API Request https://openalgo.in/docs -# Mapping Zerodha Broking Parameters https://kite.trade/docs/connect/v3/ +# Mapping AliceBlue V2 API Parameters from database.token_db import get_br_symbol, get_token -def transform_data(data): - """ - Transforms the new API request structure to the current expected structure. - """ - symbol = get_br_symbol(data["symbol"], data["exchange"]) - token = get_token(data["symbol"], data["exchange"]) +# ─── Product / Order type mappings (OpenAlgo ↔ AliceBlue V2) ────────────────── - # Basic mapping - transformed = { - "complexty": "regular", - "discqty": data.get("disclosed_quantity", "0"), - "exch": data["exchange"], - "pCode": data["product"], - "prctyp": map_order_type(data["pricetype"]), - "price": data.get("price", "0"), - "qty": data["quantity"], - "ret": "DAY", - "symbol_id": token, - "trading_symbol": symbol, - "transtype": data["action"].upper(), - "trigPrice": data.get("trigger_price", "0"), - "orderTag": "openalgo", - } - return transformed +def map_product_type(product): + """Map OpenAlgo product type to AliceBlue V2 product type.""" + mapping = { + "CNC": "LONGTERM", + "NRML": "NRML", + "MIS": "INTRADAY", + } + return mapping.get(product, "INTRADAY") -def transform_modify_order_data(data): - return { - "discqty": int(data.get("disclosed_quantity", 0)), - "exch": data.get("exchange"), - "filledQuantity": 0, - "nestOrderNumber": data.get("orderid"), - "prctyp": map_order_type(data.get("pricetype")), - "price": float(data.get("price")), - "qty": int(data.get("quantity")), - "trading_symbol": get_br_symbol(data.get("symbol"), data.get("exchange")), - "trigPrice": data.get("trigger_price", "0"), - "transtype": data.get("action").upper(), - "pCode": data.get("product"), +def reverse_map_product_type(product): + """Map AliceBlue V2 product type back to OpenAlgo product type.""" + mapping = { + "LONGTERM": "CNC", + "NRML": "NRML", + "INTRADAY": "MIS", + "MTF": "CNC", + "CNC": "CNC", + "MIS": "MIS", + "DELIVERY": "CNC", } + return mapping.get(product, "MIS") def map_order_type(pricetype): - """ - Maps the new pricetype to the existing order type. - """ - order_type_mapping = {"MARKET": "MKT", "LIMIT": "L", "SL": "SL", "SL-M": "SL-M"} - return order_type_mapping.get(pricetype, "MKT") # Default to MARKET if not found + """Map OpenAlgo price type to AliceBlue V2 order type.""" + mapping = { + "MARKET": "MARKET", + "LIMIT": "LIMIT", + "SL": "SL", + "SL-M": "SLM", + } + return mapping.get(pricetype, "MARKET") -def map_product_type(product): +def reverse_map_order_type(order_type): + """Map AliceBlue V2 order type back to OpenAlgo price type.""" + mapping = { + "MARKET": "MARKET", + "LIMIT": "LIMIT", + "SL": "SL", + "SLM": "SL-M", + } + return mapping.get(order_type, "MARKET") + + +# ─── Payload builders (OpenAlgo request → AliceBlue V2 API payload) ────────── + + +def transform_data(data): """ - Maps the new product type to the existing product type. + Transform an OpenAlgo place-order request into an AliceBlue V2 API payload item. """ - product_type_mapping = { - "CNC": "CNC", - "NRML": "NRML", - "MIS": "MIS", + symbol = get_br_symbol(data["symbol"], data["exchange"]) + token = get_token(data["symbol"], data["exchange"]) + + return { + "exchange": data["exchange"], + "instrumentId": str(int(float(token))), + "transactionType": data["action"].upper(), + "quantity": int(data["quantity"]), + "product": map_product_type(data.get("product", "MIS")), + "orderComplexity": "REGULAR", + "orderType": map_order_type(data.get("pricetype", "MARKET")), + "validity": "DAY", + "price": str(data.get("price", "0")), + "slLegPrice": "", + "targetLegPrice": "", + "slTriggerPrice": str(data.get("trigger_price", "0")), + "disclosedQuantity": str(data.get("disclosed_quantity", "")), + "marketProtectionPercent": "", + "deviceId": "", + "trailingSlAmount": "", + "apiOrderSource": "", + "algoId": "", + "orderTag": "openalgo", } - return product_type_mapping.get(product, "MIS") # Default to INTRADAY if not found -def reverse_map_product_type(product): +def transform_modify_order_data(data): """ - Reverse maps the broker product type to the OpenAlgo product type, considering the exchange. + Transform an OpenAlgo modify-order request into an AliceBlue V2 API modify payload. """ - # Exchange to OpenAlgo product type mapping for 'D' - exchange_mapping = {"MKT": "MARKET", "L": "LIMIT", "SL": "SL", "SL-M": "SL-M"} - - return exchange_mapping.get(product) + return { + "brokerOrderId": str(data.get("orderid")), + "quantity": int(data.get("quantity", 0)), + "orderType": map_order_type(data.get("pricetype", "LIMIT")), + "slTriggerPrice": str(data.get("trigger_price", "0")), + "price": str(data.get("price", "0")), + "slLegPrice": "", + "trailingSlAmount": "", + "targetLegPrice": "", + "validity": "DAY", + "disclosedQuantity": str(data.get("disclosed_quantity", "0")), + "marketProtection": "", + "deviceId": "", + }