diff --git a/blueprints/analyzer.py b/blueprints/analyzer.py index 23d8ade45..89f53f1a8 100644 --- a/blueprints/analyzer.py +++ b/blueprints/analyzer.py @@ -1,7 +1,7 @@ import csv import io import json -import traceback + from datetime import datetime, timedelta import pytz @@ -128,7 +128,7 @@ def get_filtered_requests(start_date=None, end_date=None): return requests except Exception as e: - logger.exception(f"Error getting filtered requests: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error getting filtered requests: {e}") return [] @@ -173,7 +173,7 @@ def generate_csv(requests): return output.getvalue() except Exception as e: - logger.exception(f"Error generating CSV: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error generating CSV: {str(e)}") return "" @@ -216,7 +216,7 @@ def analyzer(): end_date=end_date, ) except Exception as e: - logger.exception(f"Error rendering analyzer: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error rendering analyzer: {str(e)}") flash("Error loading analyzer dashboard", "error") return redirect(url_for("core_bp.home")) @@ -268,7 +268,7 @@ def api_get_data(): {"status": "success", "data": {"stats": stats_transformed, "requests": requests_data}} ) except Exception as e: - logger.exception(f"Error getting analyzer data: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error getting analyzer data: {str(e)}") return jsonify( {"status": "error", "message": f"Error loading analyzer data: {str(e)}"} ), 500 @@ -352,6 +352,6 @@ def export_requests(): ) return output except Exception as e: - logger.exception(f"Error exporting requests: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error exporting requests: {str(e)}") flash("Error exporting requests", "error") return redirect(url_for("analyzer_bp.analyzer")) diff --git a/blueprints/log.py b/blueprints/log.py index baf38cf3b..c14f9437c 100644 --- a/blueprints/log.py +++ b/blueprints/log.py @@ -3,7 +3,7 @@ import csv import io import json -import traceback + from datetime import datetime import pytz @@ -66,7 +66,7 @@ def format_log_entry(log, ist): "created_at": log.created_at.astimezone(ist).strftime("%Y-%m-%d %I:%M:%S %p"), } except Exception as e: - logger.exception(f"Error formatting log {log.id}: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error formatting log {log.id}: {str(e)}") return { "id": log.id, "api_type": log.api_type, @@ -130,7 +130,7 @@ def get_filtered_logs(start_date=None, end_date=None, search_query=None, page=No return logs, total_pages, total_logs except Exception as e: - logger.exception(f"Error in get_filtered_logs: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error in get_filtered_logs: {str(e)}") return [], 1, 0 @@ -203,7 +203,7 @@ def generate_csv(logs): return si.getvalue() except Exception as e: - logger.exception(f"Error generating CSV: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error generating CSV: {str(e)}") raise @@ -243,7 +243,7 @@ def view_logs(): ) except Exception as e: - logger.exception(f"Error in view_logs: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error in view_logs: {str(e)}") return render_template( "logs.html", logs=[], @@ -300,6 +300,5 @@ def export_logs(): ) except Exception as e: - error_msg = f"Error exporting logs: {str(e)}\n{traceback.format_exc()}" - logger.exception(error_msg) - return jsonify({"error": error_msg}), 500 + logger.exception(f"Error exporting logs: {e}") + return jsonify({"error": "An error occurred while exporting logs"}), 500 diff --git a/broker/deltaexchange/api/order_api.py b/broker/deltaexchange/api/order_api.py index b5739c786..79ca0bf0c 100644 --- a/broker/deltaexchange/api/order_api.py +++ b/broker/deltaexchange/api/order_api.py @@ -228,20 +228,61 @@ def get_trade_book(auth): # --------------------------------------------------------------------------- def get_positions(auth): - """Fetch all open margined positions.""" + """ + Fetch all open positions — both derivatives (margined) and spot (wallet). + + Derivatives come from GET /v2/positions/margined. + Spot holdings come from GET /v2/wallet/balances — non-INR assets with + a non-zero balance are synthesised into position-like dicts so they + appear in the OpenAlgo position book alongside derivative positions. + """ + positions = [] + + # 1. Derivative positions (perpetual futures, options) try: result = get_api_response("/v2/positions/margined", auth, method="GET") if result.get("success"): - return result.get("result", []) - logger.warning(f"[DeltaExchange] get_positions unexpected response: {result}") - return [] + positions.extend(result.get("result", [])) + else: + logger.warning(f"[DeltaExchange] get_positions/margined unexpected: {result}") except Exception as e: - logger.error(f"[DeltaExchange] Exception in get_positions: {e}") - return [] + logger.error(f"[DeltaExchange] Exception in get_positions/margined: {e}") + + # 2. Spot holdings from wallet balances + try: + wallet_result = get_api_response("/v2/wallet/balances", auth, method="GET") + if wallet_result.get("success"): + for asset in wallet_result.get("result", []): + if not isinstance(asset, dict): + continue + symbol = asset.get("asset_symbol", "") or asset.get("symbol", "") + # Skip INR (settlement currency) and zero-balance assets + if symbol in ("INR", "USD", "") or not symbol: + continue + balance = float(asset.get("balance", 0) or 0) + blocked = float(asset.get("blocked_margin", 0) or 0) + size = balance - blocked # available spot holding + if size <= 0: + continue + # Synthesise a position-like dict matching /v2/positions/margined structure + spot_symbol = f"{symbol}_INR" + positions.append({ + "product_id": asset.get("asset_id", ""), + "product_symbol": spot_symbol, + "size": size, + "entry_price": "0", # Wallet doesn't track entry price + "realized_pnl": "0", + "unrealized_pnl": "0", + "_is_spot": True, # Internal flag for downstream mapping + }) + except Exception as e: + logger.error(f"[DeltaExchange] Exception fetching spot wallet positions: {e}") + + return positions def get_holdings(auth): - """Delta Exchange is a derivatives-only exchange; equity holdings are not applicable.""" + """Delta Exchange has no equity holdings concept; spot is shown in positions.""" return [] @@ -406,22 +447,22 @@ def place_smartorder_api(data, auth): symbol = data.get("symbol") exchange = data.get("exchange") product = data.get("product") - position_size = int(data.get("position_size", "0")) + position_size = float(data.get("position_size", "0")) - current_position = int( + current_position = float( get_open_position(symbol, exchange, map_product_type(product), auth) ) logger.info( f"[DeltaExchange] SmartOrder: target={position_size} current={current_position}" ) - if position_size == 0 and current_position == 0 and int(data["quantity"]) != 0: + if position_size == 0 and current_position == 0 and float(data["quantity"]) != 0: return place_order_api(data, auth) if position_size == current_position: msg = ( "No OpenPosition Found. Not placing Exit order." - if int(data["quantity"]) == 0 + if float(data["quantity"]) == 0 else "No action needed. Position size matches current position" ) return res, {"status": "success", "message": msg}, None diff --git a/broker/deltaexchange/database/master_contract_db.py b/broker/deltaexchange/database/master_contract_db.py index d8a689e19..03368ffe1 100644 --- a/broker/deltaexchange/database/master_contract_db.py +++ b/broker/deltaexchange/database/master_contract_db.py @@ -253,6 +253,8 @@ def _to_canonical_symbol(delta_symbol: str, instrument_type: str, expiry: str) - "futures": "FUT", "call_options": "CE", "put_options": "PE", + "spot": "SPOT", + "move_options": "MOVE", "interest_rate_swaps": "IRS", "spreads": "SPREAD", "options_combos": "COMBO", @@ -386,7 +388,7 @@ def process_delta_products(products): expiry ← settlement_time (None → "" for perpetuals; ISO string → "DD-MON-YY" for futures/options) strike ← 0.0 (strike is encoded in the symbol for options) - lotsize ← 1 (1 contract; contract_value gives underlying units) + lotsize ← product_specs.min_order_size or 1 (fractional for spot, e.g. 0.0001 BTC) instrumenttype ← contract_type (mapped via CONTRACT_TYPE_MAP) tick_size ← tick_size (string → float) """ @@ -437,16 +439,30 @@ def process_delta_products(products): except (ValueError, TypeError): tick_size = 0.0 - # Extract strike for option contracts from symbol (e.g. C-BTC-80000-280225 -> 80000.0) + # Extract strike price — use the API field directly when available, + # fall back to parsing from the symbol (e.g. C-BTC-80000-280225 -> 80000.0) symbol_str = p.get("symbol", "") strike_val = 0.0 if instrument_type in ("CE", "PE", "TCE", "TPE", "SYNCE", "SYNPE"): - parts_s = symbol_str.split("-") - if len(parts_s) >= 3: - try: - strike_val = float(parts_s[2]) - except (ValueError, TypeError): - strike_val = 0.0 + try: + strike_val = float(p.get("strike_price") or 0) + except (ValueError, TypeError): + strike_val = 0.0 + # Fallback: parse from symbol if API field missing + if strike_val == 0.0: + parts_s = symbol_str.split("-") + if len(parts_s) >= 3: + try: + strike_val = float(parts_s[2]) + except (ValueError, TypeError): + strike_val = 0.0 + + # Lot size: use min_order_size from product_specs (important for spot + # instruments where fractional quantities are allowed, e.g. 0.0001 BTC) + try: + lotsize = float(product_specs.get("min_order_size") or 1) + except (ValueError, TypeError): + lotsize = 1.0 # Build OpenAlgo canonical symbol (exchange = CRYPTO, broker-agnostic format) canonical_symbol = _to_canonical_symbol(symbol_str, instrument_type, expiry) @@ -461,7 +477,7 @@ def process_delta_products(products): "brexchange": "DELTAIN", # Broker identifier (Delta Exchange India) "expiry": expiry, "strike": strike_val, - "lotsize": 1, + "lotsize": lotsize, "instrumenttype": instrument_type, "tick_size": tick_size, "contract_value": float(p.get("contract_value") or 1.0), diff --git a/broker/deltaexchange/mapping/margin_data.py b/broker/deltaexchange/mapping/margin_data.py index ae1cd3dd2..cc406cb56 100644 --- a/broker/deltaexchange/mapping/margin_data.py +++ b/broker/deltaexchange/mapping/margin_data.py @@ -2,6 +2,7 @@ # Mapping OpenAlgo margin positions to Delta Exchange margin_required API format # Delta Exchange endpoint: GET /v2/products/{product_id}/margin_required +from broker.deltaexchange.mapping.transform_data import _order_size from database.token_db import get_token from utils.logging import get_logger @@ -15,7 +16,7 @@ def transform_margin_positions(positions): Each OpenAlgo position is converted to a dict with the fields needed to call GET /v2/products/{product_id}/margin_required: product_id (int) – from token DB (token = product_id on Delta) - size (int) – absolute quantity + size (int|float) – contracts (int) or spot quantity (float) side (str) – "buy" or "sell" order_type (str) – "limit_order" or "market_order" limit_price (str) – required if order_type == "limit_order" @@ -54,7 +55,7 @@ def transform_margin_positions(positions): entry = { "product_id": product_id, - "size": int(pos["quantity"]), + "size": _order_size(pos["quantity"], symbol, exchange), "side": side, "order_type": order_type, } diff --git a/broker/deltaexchange/mapping/order_data.py b/broker/deltaexchange/mapping/order_data.py index ba2cfc202..1041a06e3 100644 --- a/broker/deltaexchange/mapping/order_data.py +++ b/broker/deltaexchange/mapping/order_data.py @@ -1,6 +1,6 @@ import json -from database.token_db import get_symbol, get_symbol_info +from database.token_db import get_oa_symbol, get_symbol, get_symbol_info from utils.logging import get_logger logger = get_logger(__name__) @@ -427,16 +427,26 @@ def map_position_data(position_data): product_id = position.get("product_id", "") product_symbol = position.get("product_symbol", "") + is_spot = position.get("_is_spot", False) - # Resolve symbol from DB; fall back to product_symbol - symbol_from_db = get_symbol(str(product_id), "CRYPTO") if product_id else None + # Resolve symbol from DB; fall back to product_symbol. + # For spot wallet entries, product_id is the asset_id (not a product token), + # so look up by brsymbol (e.g. BTC_INR) instead. + if is_spot: + symbol_from_db = get_oa_symbol(product_symbol, "CRYPTO") + else: + symbol_from_db = get_symbol(str(product_id), "CRYPTO") if product_id else None position["tradingSymbol"] = symbol_from_db or product_symbol position["exchangeSegment"] = "CRYPTO" - position["productType"] = "NRML" + position["productType"] = "CNC" if is_spot else "NRML" # Net quantity: positive = long, negative = short - net_qty = int(position.get("size", 0)) + # Use float() to support fractional spot sizes (e.g. 0.0001 BTC) + try: + net_qty = float(position.get("size", 0)) + except (ValueError, TypeError): + net_qty = 0 position["netQty"] = net_qty # Average entry price diff --git a/broker/deltaexchange/mapping/transform_data.py b/broker/deltaexchange/mapping/transform_data.py index 3abbc4b80..31ba63fe4 100644 --- a/broker/deltaexchange/mapping/transform_data.py +++ b/broker/deltaexchange/mapping/transform_data.py @@ -1,12 +1,32 @@ # Mapping OpenAlgo API Request to Delta Exchange API Parameters # Delta Exchange API docs: https://docs.delta.exchange -from database.token_db import get_br_symbol, get_token +from database.token_db import get_br_symbol, get_symbol_info, get_token from utils.logging import get_logger logger = get_logger(__name__) +def _order_size(quantity, symbol, exchange): + """ + Convert quantity to the correct type for the Delta Exchange size parameter. + - Spot instruments: fractional float (e.g. 0.05 SOL) + - Derivatives (futures/options/perps): integer number of contracts + + Raises ValueError if a fractional quantity is passed for a non-spot instrument. + """ + qty = float(quantity) + info = get_symbol_info(symbol, exchange) + if info and info.instrumenttype == "SPOT": + return qty + if qty != int(qty): + raise ValueError( + f"Fractional quantity ({qty}) not allowed for derivative contracts. " + f"Use whole numbers for {symbol}." + ) + return int(qty) + + def transform_data(data, token): """ Transforms the OpenAlgo API request structure to Delta Exchange POST /v2/orders payload. @@ -38,10 +58,12 @@ def transform_data(data, token): order_type = map_order_type(data["pricetype"]) side = data["action"].lower() # "buy" or "sell" + size = _order_size(data["quantity"], data["symbol"], data["exchange"]) + transformed = { "product_id": int(token), "product_symbol": symbol, - "size": int(data["quantity"]), + "size": size, "side": side, "order_type": order_type, "time_in_force": "gtc", @@ -134,10 +156,12 @@ def transform_modify_order_data(data): else: limit_price = str(data.get("price", "0")) + size = _order_size(data["quantity"], data["symbol"], data["exchange"]) + transformed = { "id": order_id, "product_id": product_id, - "size": int(data["quantity"]), + "size": size, "limit_price": limit_price, } diff --git a/broker/groww/database/master_contract_db.py b/broker/groww/database/master_contract_db.py index 78257fb72..8b3a1084f 100644 --- a/broker/groww/database/master_contract_db.py +++ b/broker/groww/database/master_contract_db.py @@ -66,32 +66,19 @@ def copy_from_dataframe(df): # Filter out data_dict entries with tokens that already exist filtered_data_dict = [row for row in data_dict if row["token"] not in existing_tokens] - # Insert in bulk the filtered records + # Insert in batches to avoid memory spikes and SQLite locking + BATCH_SIZE = 10000 try: - if filtered_data_dict: # Proceed only if there's anything to insert - logger.info(f"Inserting {len(filtered_data_dict)} new records into the database") - # Create a list of SymToken objects from the filtered data - symtoken_objects = [] - for item in filtered_data_dict: - symtoken = SymToken( - symbol=item.get("symbol", ""), - brsymbol=item.get("brsymbol", ""), - name=item.get("name", ""), - exchange=item.get("exchange", ""), - brexchange=item.get("brexchange", ""), - token=item.get("token", ""), - expiry=item.get("expiry", ""), - strike=float(item.get("strike", 0)) if item.get("strike") else 0, - lotsize=int(item.get("lotsize", 0)) if item.get("lotsize") else 0, - instrumenttype=item.get("instrumenttype", ""), - tick_size=float(item.get("tick_size", 0)) if item.get("tick_size") else 0, - ) - symtoken_objects.append(symtoken) - - # Add all objects and commit in one transaction - db_session.add_all(symtoken_objects) + if filtered_data_dict: + total = len(filtered_data_dict) + logger.info(f"Inserting {total} new records into the database in batches of {BATCH_SIZE}") + for i in range(0, total, BATCH_SIZE): + batch = filtered_data_dict[i : i + BATCH_SIZE] + db_session.bulk_insert_mappings(SymToken, batch) + db_session.flush() + logger.info(f"Inserted batch {i // BATCH_SIZE + 1} ({min(i + BATCH_SIZE, total)}/{total} records)") db_session.commit() - logger.info(f"Successfully inserted {len(symtoken_objects)} records into the database") + logger.info(f"Bulk insert completed successfully with {total} new records.") else: logger.info("No new records to insert") except Exception as e: @@ -397,46 +384,6 @@ def find_token_by_symbol(symbol, exchange): return None - # Insert in bulk the filtered records - try: - if filtered_data_dict: # Proceed only if there's anything to insert - # Pre-validate records before insertion - invalid_records = [] - valid_records = [] - - for record in filtered_data_dict: - # Allow indices ("I") even if symbol is missing - if record.get("instrumenttype") == "I": - valid_records.append(record) - else: - # Check if symbol exists and is not empty/null - symbol = record.get("symbol") - if not symbol or pd.isna(symbol) or str(symbol).strip() == "": - invalid_records.append(record) - logger.error(f"Schema validation failed for record: {record}") - logger.info("Symbol is missing, empty, or null") - else: - valid_records.append(record) - - if valid_records: - db_session.bulk_insert_mappings(SymToken, valid_records) - db_session.commit() - logger.info( - f"Bulk insert completed successfully with {len(valid_records)} new records." - ) - - if invalid_records: - logger.error( - f"Warning: {len(invalid_records)} records failed schema validation and were skipped." - ) - else: - logger.info("No new records to insert.") - except Exception as e: - logger.error(f"Error during bulk insert: {e}") - if hasattr(e, "__cause__"): - logger.info(f"Caused by: {e.__cause__}") - db_session.rollback() - def download_groww_instrument_data(output_path): """ @@ -466,22 +413,24 @@ def download_groww_instrument_data(output_path): response.raise_for_status() content = response.text - if "," in content and len(content.splitlines()) > 1: - # Read CSV using pandas - df = pd.read_csv(StringIO(content)) - - # Replace headers if column count matches - if len(df.columns) == len(expected_headers): - df.columns = expected_headers - else: - raise ValueError("Downloaded CSV column count does not match expected headers.") + lines = content.split("\n", 1) + if len(lines) < 2 or "," not in content: + raise ValueError("Downloaded content does not appear to be a valid CSV.") - # Save with new headers - df.to_csv(file_path, index=False) - logger.info(f"Successfully saved instruments CSV to: {file_path}") - return [file_path] + # Verify column count matches and replace header line directly (no pandas parse needed) + original_headers = lines[0].strip().split(",") + if len(original_headers) == len(expected_headers): + new_content = ",".join(expected_headers) + "\n" + lines[1] else: - raise ValueError("Downloaded content does not appear to be a valid CSV.") + raise ValueError( + f"Downloaded CSV column count ({len(original_headers)}) does not match expected ({len(expected_headers)})." + ) + + # Write directly to file + with open(file_path, "w", encoding="utf-8") as f: + f.write(new_content) + logger.info(f"Successfully saved instruments CSV to: {file_path}") + return [file_path] except Exception as e: logger.error(f"Failed to download or process Groww instrument data: {e}") raise @@ -660,17 +609,12 @@ def process_groww_data(path): df_mapped.drop("temp_strike", axis=1, inplace=True) df_mapped["tick_size"] = pd.to_numeric(df_mapped["tick_size"], errors="coerce").fillna(0.05) - # Convert expiry from yyyy-mm-dd to DD-MMM-YY format (to match Zerodha/other brokers) - def convert_expiry_format(expiry_val): - if pd.isna(expiry_val) or expiry_val == "": - return "" - try: - expiry_date = pd.to_datetime(expiry_val) - return expiry_date.strftime("%d-%b-%y").upper() - except Exception: - return expiry_val - - df_mapped["expiry"] = df_mapped["expiry"].apply(convert_expiry_format) + # Convert expiry from yyyy-mm-dd to DD-MMM-YY format (vectorized) + expiry_parsed = pd.to_datetime(df_mapped["expiry"], errors="coerce") + valid_expiry = expiry_parsed.notna() + if valid_expiry.any(): + df_mapped.loc[valid_expiry, "expiry"] = expiry_parsed[valid_expiry].dt.strftime("%d-%b-%y").str.upper() + df_mapped["expiry"] = df_mapped["expiry"].fillna("") # Map instrument types directly from Groww's data # We want CE, PE, FUT values to be preserved as is @@ -741,59 +685,50 @@ def convert_expiry_format(expiry_val): index_mask = (df["instrument_type"] == "IDX") | (df["segment"] == "IDX") df_mapped.loc[index_mask, "instrumenttype"] = "INDEX" - # Format the symbol for F&O (NFO) instruments to match OpenAlgo format - def format_fo_symbol(row): - # Skip non-FNO instruments or those with missing expiry - if row["brexchange"] != "NSE" or pd.isna(row["expiry"]) or row["expiry"] == "": - return row["symbol"] + # Format F&O symbols using vectorized operations (much faster than apply) + # Identify FNO rows with valid expiry + fno_data_mask = ( + (df_mapped["brexchange"] == "NSE") + & (df["segment"] == "FNO") + & df_mapped["expiry"].notna() + & (df_mapped["expiry"] != "") + ) + + if fno_data_mask.any(): + # Parse expiry dates for FNO rows and format as DDMMMYY + fno_expiry = pd.to_datetime(df_mapped.loc[fno_data_mask, "expiry"], format="%d-%b-%y", errors="coerce") + expiry_str = fno_expiry.dt.strftime("%d%b%y").str.upper() - # For segment='FNO', format according to OpenAlgo standard - if "segment" in df.columns and df.loc[row.name, "segment"] == "FNO": - try: - # Format expiry date (assuming yyyy-mm-dd format in input) - expiry_date = pd.to_datetime(row["expiry"]) - expiry_str = expiry_date.strftime("%d%b%y").upper() - - # Get underlying symbol - symbol = ( - row["underlying"] - if "underlying" in row and not pd.isna(row["underlying"]) - else row["symbol"].split("-")[0] - if "-" in row["symbol"] - else row["symbol"] - ) + # Use underlying symbol where available, else trading_symbol + underlying = df_mapped.loc[fno_data_mask, "underlying"] + base_symbol = underlying.where(underlying.notna() & (underlying != ""), df_mapped.loc[fno_data_mask, "symbol"]) - # For futures - if row["instrumenttype"] == "FUT": - return f"{symbol}{expiry_str}FUT" - - # For options - elif row["instrumenttype"] == "PE" or "CE": - # Determine strike price - strike = str(int(row["strike"])) if not pd.isna(row["strike"]) else "0" - - # Determine option type (CE/PE) - option_type = "" - if "instrument_type" in df.columns: - instrument_type = df.loc[row.name, "instrument_type"] - option_type = ( - "CE" - if instrument_type == "CE" - else "PE" - if instrument_type == "PE" - else "" - ) + # Strike as integer string + strike_str = df_mapped.loc[fno_data_mask, "strike"].fillna(0).astype(int).astype(str) + + # Get instrument type from original df + orig_inst_type = df.loc[fno_data_mask.values, "instrument_type"] - if option_type: - return f"{symbol}{expiry_str}{strike}{option_type}" - except Exception as e: - logger.error(f"Error formatting F&O symbol: {e}") + # Build symbols for futures + fut_mask = fno_data_mask & (df_mapped["instrumenttype"] == "FUT") + if fut_mask.any(): + df_mapped.loc[fut_mask, "symbol"] = ( + base_symbol[fut_mask] + expiry_str[fut_mask] + "FUT" + ) - # Return original symbol if formatting fails - return row["symbol"] + # Build symbols for CE options + ce_mask = fno_data_mask & (orig_inst_type.reindex(df_mapped.index, fill_value="") == "CE") + if ce_mask.any(): + df_mapped.loc[ce_mask, "symbol"] = ( + base_symbol[ce_mask] + expiry_str[ce_mask] + strike_str[ce_mask] + "CE" + ) - # Apply F&O symbol formatting - df_mapped["symbol"] = df_mapped.apply(format_fo_symbol, axis=1) + # Build symbols for PE options + pe_mask = fno_data_mask & (orig_inst_type.reindex(df_mapped.index, fill_value="") == "PE") + if pe_mask.any(): + df_mapped.loc[pe_mask, "symbol"] = ( + base_symbol[pe_mask] + expiry_str[pe_mask] + strike_str[pe_mask] + "PE" + ) logger.info(f"Processed {len(df_mapped)} instruments") return df_mapped @@ -802,115 +737,6 @@ def format_fo_symbol(row): logger.error(f"Error processing Groww instrument data: {e}") return pd.DataFrame() - # Map instrument types to OpenAlgo standard types - instrument_type_map = { - "EQUITY": "EQ", - "INDEX": "INDEX", - "FUTURE": "FUT", - "CALL": "OPT", - "PUT": "OPT", - "ETF": "EQ", - "CURRENCY": "CUR", - "COMMODITY": "COM", - } - - # Apply instrument type mapping - all_instruments["instrumenttype"] = ( - all_instruments["instrument_type"].map(instrument_type_map).fillna("EQ") - ) - - # Map exchanges to OpenAlgo standard exchanges - exchange_map = {"NSE": "NSE", "BSE": "BSE", "NFO": "NFO", "MCX": "MCX", "CDS": "CDS"} - - # Apply exchange mapping - all_instruments["exchange"] = ( - all_instruments["brexchange"].map(exchange_map).fillna(all_instruments["brexchange"]) - ) - - # Special handling for indices - # Mark indices based on name patterns or specific flags in the data - index_patterns = ["NIFTY", "SENSEX", "BANKNIFTY", "FINNIFTY", "MIDCPNIFTY"] - - for pattern in index_patterns: - index_mask = all_instruments["symbol"].str.contains(pattern, case=False, na=False) - all_instruments.loc[index_mask, "instrumenttype"] = "INDEX" - all_instruments.loc[index_mask, "exchange"] = "NSE_INDEX" - - # Format specific fields - all_instruments["expiry"] = all_instruments["expiry"].fillna("") - all_instruments["strike"] = pd.to_numeric(all_instruments["strike"].fillna(0), errors="coerce") - all_instruments["lotsize"] = pd.to_numeric( - all_instruments["lotsize"].fillna(1), errors="coerce" - ).astype(int) - all_instruments["tick_size"] = pd.to_numeric( - all_instruments["tick_size"].fillna(0.05), errors="coerce" - ) - - # Ensure brsymbol is not empty - use symbol if needed - all_instruments.loc[ - all_instruments["brsymbol"].isna() | (all_instruments["brsymbol"] == ""), "brsymbol" - ] = all_instruments.loc[ - all_instruments["brsymbol"].isna() | (all_instruments["brsymbol"] == ""), "symbol" - ] - - # For F&O instruments, format the symbol in OpenAlgo format - fo_mask = all_instruments["exchange"] == "NFO" - if fo_mask.any(): - # Format F&O symbols according to OpenAlgo standard - def format_fo_symbol(row): - if pd.isna(row["expiry"]) or row["expiry"] == "": - return row["symbol"] - - # Format expiry date to standard format (e.g., 25MAY23) - try: - from datetime import datetime - - expiry_date = pd.to_datetime(row["expiry"]) - expiry_str = expiry_date.strftime("%d%b%y").upper() - except: - expiry_str = row["expiry"] - - # For futures - if row["instrumenttype"] == "FUT": - return f"{row['symbol']}{expiry_str}FUT" - - # For options - elif row["instrumenttype"] == "OPT": - strike = str(int(row["strike"])) if not pd.isna(row["strike"]) else "0" - option_type = ( - "CE" if "option_type" in row and row["option_type"].upper() == "CE" else "PE" - ) - return f"{row['symbol']}{expiry_str}{strike}{option_type}" - - return row["symbol"] - - all_instruments.loc[fo_mask, "symbol"] = all_instruments[fo_mask].apply( - format_fo_symbol, axis=1 - ) - - # Create final DataFrame with required columns - token_df = pd.DataFrame( - { - "symbol": all_instruments["symbol"], - "brsymbol": all_instruments["brsymbol"], - "name": all_instruments["name"], - "exchange": all_instruments["exchange"], - "brexchange": all_instruments["brexchange"], - "token": all_instruments["token"], - "expiry": all_instruments["expiry"], - "strike": all_instruments["strike"], - "lotsize": all_instruments["lotsize"], - "instrumenttype": all_instruments["instrumenttype"], - "tick_size": all_instruments["tick_size"], - } - ) - - # Remove duplicates - token_df = token_df.drop_duplicates(subset=["symbol", "exchange"], keep="first") - - logger.info(f"Processed {len(token_df)} Groww instruments") - return token_df - def delete_groww_temp_data(output_path): """Delete only Groww-specific temporary files created during instrument data download""" @@ -982,16 +808,15 @@ def master_contract_download(): ) token_df["tick_size"] = pd.to_numeric(token_df["tick_size"], errors="coerce").fillna(0.05) - # Step 5: Add OpenAlgo symbols where needed (converting Groww format to OpenAlgo format) - # Identify rows that need conversion (NFO options and futures) - nfo_options = token_df[ - (token_df["exchange"] == "NFO") & (token_df["instrumenttype"].isin(["CE", "PE"])) - ] - - for idx, row in nfo_options.iterrows(): - # Convert the broker symbol to OpenAlgo format if spaces are detected - if " " in row["brsymbol"]: - token_df.at[idx, "symbol"] = format_groww_to_openalgo_symbol(row["brsymbol"], "NFO") + # Step 5: Add OpenAlgo symbols where needed (vectorized - remove spaces from brsymbol) + # For NFO options with spaces in brsymbol, the OpenAlgo format is just the symbol without spaces + nfo_space_mask = ( + (token_df["exchange"] == "NFO") + & (token_df["instrumenttype"].isin(["CE", "PE"])) + & (token_df["brsymbol"].str.contains(" ", na=False)) + ) + if nfo_space_mask.any(): + token_df.loc[nfo_space_mask, "symbol"] = token_df.loc[nfo_space_mask, "brsymbol"].str.replace(" ", "", regex=False) # Step 6: Insert into database logger.info(f"Inserting {len(token_df)} records into database") diff --git a/broker/groww/streaming/groww_adapter.py b/broker/groww/streaming/groww_adapter.py index fe25822e3..d1441b8ea 100644 --- a/broker/groww/streaming/groww_adapter.py +++ b/broker/groww/streaming/groww_adapter.py @@ -113,7 +113,7 @@ def unsubscribe_all(self) -> dict[str, Any]: failed_list = [] self.logger.info( - f"🧹 Unsubscribing from {len(self.subscriptions)} active subscriptions..." + f"Unsubscribing from {len(self.subscriptions)} active subscriptions..." ) # Create a copy of subscriptions to iterate over @@ -133,7 +133,7 @@ def unsubscribe_all(self) -> dict[str, Any]: unsubscribed_list.append( {"symbol": symbol, "exchange": exchange, "mode": mode} ) - self.logger.debug(f"✅ Unsubscribed: {exchange}:{symbol} mode {mode}") + self.logger.debug(f"Unsubscribed: {exchange}:{symbol} mode {mode}") else: failed_count += 1 failed_list.append( @@ -145,7 +145,7 @@ def unsubscribe_all(self) -> dict[str, Any]: } ) self.logger.warning( - f"❌ Failed to unsubscribe: {exchange}:{symbol} mode {mode}" + f"Failed to unsubscribe: {exchange}:{symbol} mode {mode}" ) except Exception as e: @@ -157,13 +157,12 @@ def unsubscribe_all(self) -> dict[str, Any]: self.subscriptions.clear() self.subscription_keys.clear() - # CRITICAL: Call the disconnect method to properly close everything - self.logger.info("🔌 Calling disconnect() to terminate Groww connection completely...") + self.logger.info("Calling disconnect() to terminate Groww connection...") try: self.disconnect() - self.logger.info("✅ Successfully disconnected from Groww server") + self.logger.info("Successfully disconnected from Groww server") except Exception as e: - self.logger.error(f"❌ Error during disconnect: {e}") + self.logger.error(f"Error during disconnect: {e}") # Force cleanup even if disconnect fails self.running = False self.connected = False @@ -180,10 +179,8 @@ def unsubscribe_all(self) -> dict[str, Any]: self._message_count = 0 self.logger.info( - f"📊 Unsubscribe all complete: {unsubscribed_count} success, {failed_count} failed" + f"Unsubscribe all complete: {unsubscribed_count} success, {failed_count} failed" ) - self.logger.info("✅ All subscriptions cleared and disconnected from Groww server") - self.logger.info("✅ ZMQ resources cleaned up - no more data will be published") return self._create_success_response( f"Unsubscribed from {unsubscribed_count} subscriptions and disconnected from server", @@ -203,7 +200,7 @@ def unsubscribe_all(self) -> dict[str, Any]: def disconnect(self) -> None: """Disconnect from Groww WebSocket with proper cleanup""" - self.logger.info("🔌 Starting Groww adapter disconnect sequence...") + self.logger.info("Starting Groww adapter disconnect sequence...") self.running = False try: @@ -211,7 +208,7 @@ def disconnect(self) -> None: if self.ws_client: try: self.ws_client.disconnect() - self.logger.info("🔗 WebSocket client disconnected") + self.logger.debug("WebSocket client disconnected") except Exception as e: self.logger.error(f"Error disconnecting WebSocket client: {e}") @@ -224,10 +221,10 @@ def disconnect(self) -> None: # Clean up ZeroMQ resources self.cleanup_zmq() - self.logger.info("✅ Groww adapter disconnected and state cleared") + self.logger.info("Groww adapter disconnected and state cleared") except Exception as e: - self.logger.error(f"❌ Error during disconnect: {e}") + self.logger.error(f"Error during disconnect: {e}") # Force cleanup even if there were errors self.connected = False self.ws_client = None @@ -279,7 +276,7 @@ def subscribe( sym = SymToken.query.filter_by(symbol=symbol, exchange=exchange).first() if sym: instrumenttype = sym.instrumenttype - self.logger.info( + self.logger.debug( f"Retrieved instrumenttype: {instrumenttype} for {symbol}.{exchange}" ) except Exception as e: @@ -303,14 +300,8 @@ def subscribe( # Get exchange and segment for Groww groww_exchange, segment = GrowwExchangeMapper.get_exchange_segment(exchange) - # Log token details for debugging F&O if exchange in ["NFO", "BFO"]: - self.logger.info("F&O Subscription Debug:") - self.logger.info(f" Symbol: {symbol}") - self.logger.info(f" Exchange: {exchange} -> Groww: {groww_exchange}") - self.logger.info(f" Segment: {segment}") - self.logger.info(f" Token from DB: {token}") - self.logger.info(f" Brexchange: {brexchange}") + self.logger.debug(f"F&O Subscription: {symbol}, exchange={exchange}->{groww_exchange}, segment={segment}, token={token}") # Generate unique correlation ID correlation_id = f"{symbol}_{exchange}_{mode}" @@ -333,8 +324,8 @@ def subscribe( try: if mode in [1, 2]: # LTP or Quote mode if mode == 2: - self.logger.info( - f"📈 QUOTE subscription for {symbol} - Note: Groww only provides LTP, OHLCV will be 0" + self.logger.debug( + f"QUOTE subscription for {symbol} - Groww only provides LTP, OHLCV will be 0" ) sub_key = self.ws_client.subscribe_ltp( groww_exchange, segment, token, symbol, instrumenttype @@ -342,8 +333,8 @@ def subscribe( elif mode == 3: # Depth mode # Check if this is an index - indices don't have depth data if instrumenttype == "INDEX" or "INDEX" in exchange: - self.logger.warning( - f"⚠️ Indices don't have depth data. Converting to LTP subscription for {symbol}" + self.logger.info( + f"Indices don't have depth data. Converting to LTP for {symbol}" ) # Subscribe to LTP instead for indices sub_key = self.ws_client.subscribe_ltp( @@ -353,38 +344,21 @@ def subscribe( self.subscriptions[correlation_id]["mode"] = 1 # Change to LTP mode else: # Enhanced logging for BSE depth subscriptions - if "BSE" in groww_exchange: - self.logger.info("🔴 Creating BSE DEPTH subscription:") - self.logger.info(f" Exchange: {groww_exchange}") - self.logger.info(f" Segment: {segment}") - self.logger.info(f" Token: {token}") - self.logger.info(f" Symbol: {symbol}") - # Subscribe to depth for non-index instruments sub_key = self.ws_client.subscribe_depth( groww_exchange, segment, token, symbol, instrumenttype ) - if "BSE" in groww_exchange: - self.logger.info(f"🔴 BSE DEPTH subscription key: {sub_key}") - # Store subscription key for unsubscribe self.subscription_keys[correlation_id] = sub_key mode_name = {1: "LTP", 2: "Quote", 3: "Depth"}.get(mode, str(mode)) self.logger.info( - f"✅ Subscribed to {symbol}.{exchange} in {mode_name} mode (key: {sub_key})" + f"Subscribed to {symbol}.{exchange} in {mode_name} mode" ) - # Special logging for LTP subscriptions to debug subscribe all issue - if mode == 1: - self.logger.info( - f"🔥 LTP SUBSCRIPTION CONFIRMED: {exchange}:{symbol} - data should start flowing" - ) - - # Extra logging for F&O if exchange in ["NFO", "BFO"]: - self.logger.info(f"F&O subscription key created: {sub_key}") + self.logger.debug(f"F&O subscription key created: {sub_key}") except Exception as e: self.logger.error(f"Error subscribing to {symbol}.{exchange}: {e}") @@ -467,17 +441,7 @@ def _resubscribe(self, correlation_id: str, sub_info: dict): def _on_data(self, data: dict[str, Any]) -> None: """Callback for market data from WebSocket""" try: - # Enhanced logging for BSE depth data - is_bse_depth = False - if "depth_data" in data and "exchange" in data and "BSE" in data.get("exchange", ""): - is_bse_depth = True - self.logger.info("🔴 BSE DEPTH DATA RECEIVED!") - self.logger.info(f" Depth data: {data.get('depth_data', {})}") - - # Debug log the raw message data to see what we're actually receiving - self.logger.debug( - f"RAW GROWW DATA{' (BSE DEPTH)' if is_bse_depth else ''}: Type: {type(data)}, Data: {data}" - ) + self.logger.debug(f"RAW GROWW DATA: Type: {type(data)}, Data: {data}") # Add data validation to ensure we have the minimum required fields if not isinstance(data, dict): @@ -513,14 +477,13 @@ def _on_data(self, data: dict[str, Any]) -> None: # Convert string numeric to string mode mode = {1: "ltp", 2: "quote", 3: "depth"}.get(int(mode), "ltp") - # Special logging for BSE depth if "BSE" in exchange and mode == "depth": - self.logger.info("🔴 BSE DEPTH: Looking for subscription") + self.logger.debug("BSE DEPTH: Looking for subscription") - self.logger.info( + self.logger.debug( f"Looking for subscription: symbol={symbol_from_data}, exchange={exchange}, mode={mode}" ) - self.logger.info(f"Available subscriptions: {list(self.subscriptions.keys())}") + self.logger.debug(f"Available subscriptions: {list(self.subscriptions.keys())}") # Find matching subscription based on symbol, exchange and mode with self.lock: @@ -557,7 +520,7 @@ def _on_data(self, data: dict[str, Any]) -> None: if is_index_match or is_regular_match: subscription = sub correlation_id = cid - self.logger.info(f"Matched subscription: {cid}") + self.logger.debug(f"Matched subscription: {cid}") break # Try to match based on exchange token from protobuf data @@ -566,7 +529,7 @@ def _on_data(self, data: dict[str, Any]) -> None: segment = data.get("segment", "CASH") exchange = data.get("exchange", "NSE") - self.logger.info( + self.logger.debug( f"Processing message with token: {token}, segment: {segment}, exchange: {exchange}" ) @@ -583,12 +546,7 @@ def _on_data(self, data: dict[str, Any]) -> None: break if not subscription: - # Enhanced logging for BSE depth debugging - if "BSE" in str(data) and "depth" in str(data).lower(): - self.logger.error("🔴 BSE DEPTH DATA RECEIVED BUT NO SUBSCRIPTION FOUND!") - self.logger.error(f" Data: {data}") - self.logger.error(f" Active subscriptions: {self.subscriptions}") - self.logger.warning(f"Received data for unsubscribed token/symbol: {data}") + self.logger.debug(f"Received data for unsubscribed token/symbol: {data}") return # Extract symbol and exchange from subscription @@ -652,15 +610,14 @@ def _on_data(self, data: dict[str, Any]) -> None: if market_data["ltp"] == 0: self.logger.warning(f"⚠️ NO VALID LTP DATA for {symbol}, check data source") else: - self.logger.info(f"📈 LTP recovered for {symbol}: {market_data['ltp']}") + self.logger.debug(f"LTP recovered for {symbol}: {market_data['ltp']}") # Ensure LTP timestamp if "ltt" not in market_data: market_data["ltt"] = int(time.time() * 1000) - # Log LTP data for debugging subscribe all issue - self.logger.info( - f"🔍 LTP MODE: {exchange}:{symbol} = ₹{market_data['ltp']} at {market_data.get('ltt')}" + self.logger.debug( + f"LTP MODE: {exchange}:{symbol} = {market_data['ltp']} at {market_data.get('ltt')}" ) elif actual_mode == 2: # Quote mode @@ -679,9 +636,8 @@ def _on_data(self, data: dict[str, Any]) -> None: ) market_data["ltp"] = float(ltp_value) if ltp_value else 0.0 - # Log Quote data - self.logger.info( - f"🔍 QUOTE MODE: {exchange}:{symbol} = ₹{market_data['ltp']} (Vol: {market_data.get('volume', 0)})" + self.logger.debug( + f"QUOTE MODE: {exchange}:{symbol} = {market_data['ltp']} (Vol: {market_data.get('volume', 0)})" ) elif actual_mode == 3: # Depth mode @@ -694,54 +650,33 @@ def _on_data(self, data: dict[str, Any]) -> None: if "ltp" not in market_data: market_data["ltp"] = 0.0 - # Log Depth data buy_levels = len(market_data["depth"].get("buy", [])) sell_levels = len(market_data["depth"].get("sell", [])) - self.logger.info( - f"🔍 DEPTH MODE: {exchange}:{symbol} = {buy_levels}B/{sell_levels}S levels" + self.logger.debug( + f"DEPTH MODE: {exchange}:{symbol} = {buy_levels}B/{sell_levels}S levels" ) - # Enhanced logging for BSE depth - if "BSE" in exchange and mode == 3: - self.logger.info(f"🔴 Publishing BSE DEPTH data for {symbol}") - if "depth" in market_data: - self.logger.info(f" Buy levels: {len(market_data['depth'].get('buy', []))}") - self.logger.info(f" Sell levels: {len(market_data['depth'].get('sell', []))}") - - # Periodic logging instead of every message (reduces noise) - but more frequent for debugging + # Track message count for periodic logging if not hasattr(self, "_message_count"): self._message_count = 0 self._message_count += 1 - # More frequent logging for debugging LTP issue - if self._message_count <= 20 or self._message_count % 25 == 0: + # Periodic logging - log first message and then every 500th + if self._message_count == 1 or self._message_count % 500 == 0: mode_name = {1: "LTP", 2: "QUOTE", 3: "DEPTH"}[actual_mode] ltp_info = ( - f"LTP: ₹{market_data.get('ltp', 'N/A')}" + f"LTP: {market_data.get('ltp', 'N/A')}" if actual_mode in [1, 2] else f"Depth: {len(market_data.get('depth', {}).get('buy', []))}B/{len(market_data.get('depth', {}).get('sell', []))}S" ) self.logger.info( - f"📈 Publishing #{self._message_count}: {topic} ({mode_name}) -> {ltp_info}" - ) - - # CRITICAL: Always log LTP mode data to debug subscribe all issue - if actual_mode == 1: - self.logger.info( - f"🚨 LTP PUBLISH: {topic} -> ₹{market_data.get('ltp')} (Message #{self._message_count})" + f"Publishing #{self._message_count}: {topic} ({mode_name}) -> {ltp_info}" ) # Publish to ZeroMQ self.publish_market_data(topic, market_data) - # Log successful publication for debugging data flow issues - self.logger.debug(f"✅ ZMQ Published: {topic} with {len(str(market_data))} bytes") - - # Verify publication by checking if we can access the data - if actual_mode == 1 and market_data.get("ltp", 0) > 0: - self.logger.info( - f"✅ LTP DATA VERIFIED: {exchange}:{symbol} = ₹{market_data['ltp']} published successfully" - ) + self.logger.debug(f"ZMQ Published: {topic}") except Exception as e: self.logger.error(f"Error processing market data: {e}", exc_info=True) diff --git a/broker/groww/streaming/groww_nats.py b/broker/groww/streaming/groww_nats.py index d25661efb..a7e464bd7 100644 --- a/broker/groww/streaming/groww_nats.py +++ b/broker/groww/streaming/groww_nats.py @@ -195,7 +195,7 @@ def parse_message( elif self.pending_data.startswith(MSG): # MSG format: MSG [reply-to] <#bytes>\r\n\r\n - logger.info("🔍 Found MSG in data stream") + logger.debug("Found MSG in data stream") end_idx = self.pending_data.find("\r\n") if end_idx == -1: logger.debug("MSG header incomplete, waiting for more data") @@ -267,13 +267,13 @@ def parse_message( self.pending_data = remaining[size + 2 :] # Skip payload and \r\n - logger.info(f"📊 MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}") + logger.debug(f"MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}") # Process the message if sid in self.subscriptions: sub = self.subscriptions[sid] sub.received_msgs += 1 - logger.info(f"✅ Subscription found for SID {sid}: {sub.subject}") + logger.debug(f"Subscription found for SID {sid}: {sub.subject}") messages.append( { diff --git a/broker/groww/streaming/groww_protobuf.py b/broker/groww/streaming/groww_protobuf.py index 53c416357..414858128 100644 --- a/broker/groww/streaming/groww_protobuf.py +++ b/broker/groww/streaming/groww_protobuf.py @@ -206,16 +206,7 @@ def _parse_market_depth(self) -> dict[str, Any]: length = self._read_varint() end_pos = self.position + length - # Log depth message size for BSE vs NSE debugging - total_size = len(self.data) - logger.info( - f"📊 Parsing depth message: inner length={length} bytes, total message={total_size} bytes" - ) - if total_size == 501: - logger.info("🔴 BSE depth message detected (501 bytes)") - elif total_size == 499: - logger.info("✅ NSE depth message detected (499 bytes)") - + logger.debug(f"Parsing depth message: inner length={length} bytes") result = {"timestamp": 0, "buy": [], "sell": []} # Parse depth data fields @@ -323,33 +314,13 @@ def parse_groww_market_data(data: bytes) -> dict[str, Any]: Returns: Parsed market data """ - data_len = len(data) - logger.info(f"Parsing protobuf data: {data_len} bytes") - - # Special logging for BSE data (501 bytes) vs NSE (499 bytes) - if data_len == 501: - logger.info("🔴 Potential BSE message detected (501 bytes)") - elif data_len == 499: - logger.info("✅ Potential NSE message detected (499 bytes)") - - # For BSE depth messages, check if there's an extra field - if data_len == 501: - logger.debug(f"BSE message - Last 10 bytes (hex): {data[-10:].hex()}") - - logger.debug(f"First 50 bytes (hex): {data[:50].hex() if len(data) > 50 else data.hex()}") + logger.debug(f"Parsing protobuf data: {len(data)} bytes") parser = MiniProtobufParser() result = parser.parse_market_data(data) - # Log what was parsed if result: - logger.info(f"Successfully parsed protobuf data: {result.keys()}") - if "ltp_data" in result: - logger.info(f"LTP data found: {result['ltp_data']}") - if "index_data" in result: - logger.info(f"Index data found: {result['index_data']}") - if "depth_data" in result: - logger.info(f"Depth data found: {result['depth_data']}") + logger.debug(f"Parsed protobuf: {result.keys()}") else: logger.warning("No data parsed from protobuf") diff --git a/broker/groww/streaming/nats_websocket.py b/broker/groww/streaming/nats_websocket.py index ecea7e082..5cddc3cc9 100644 --- a/broker/groww/streaming/nats_websocket.py +++ b/broker/groww/streaming/nats_websocket.py @@ -105,12 +105,7 @@ def _send_connect_with_signature(self): jwt=self.socket_token, nkey=nkey, sig=sig ) - # Log CONNECT details for debugging - logger.info("CONNECT details:") - logger.info(f" JWT Token length: {len(self.socket_token) if self.socket_token else 0}") - logger.info(f" Has nkey: {bool(nkey)}") - logger.info(f" Has signature: {bool(sig)}") - logger.debug(f" CONNECT command: {connect_cmd[:200]}...") # Log first 200 chars + logger.debug(f"CONNECT: JWT len={len(self.socket_token) if self.socket_token else 0}, nkey={bool(nkey)}, sig={bool(sig)}") self.ws.send(connect_cmd) logger.info(f"Sent NATS CONNECT with{'out' if not sig else ''} signature") @@ -293,7 +288,7 @@ def periodic_ping(): if self.connected and self.running and self.ws: try: ping_count += 1 - logger.info(f"\U0001f3d3 Sending PING #{ping_count} to check connection...") + logger.debug(f"Sending PING #{ping_count} to check connection...") if self.nats_protocol: self.ws.send(self.nats_protocol.create_ping()) else: @@ -301,7 +296,7 @@ def periodic_ping(): except Exception as e: logger.error(f"Failed to send PING: {e}") break # Exit on error - logger.info("🛑 Ping thread exiting") + logger.debug("Ping thread exiting") threading.Thread(target=periodic_ping, daemon=True).start() @@ -311,9 +306,7 @@ def _process_binary_nats_message(self, data: bytes): # Convert to string to find message boundaries text = data.decode("utf-8", errors="ignore") - # Log the message type for debugging - if len(text) > 0: - logger.debug(f"Binary message text preview: {text[:100]}") + logger.debug(f"Binary message text preview: {text[:100]}") # Ensure NATS protocol handler exists if not self.nats_protocol: @@ -347,14 +340,7 @@ def _process_binary_nats_message(self, data: bytes): sid = parts[2] size = int(parts[-1]) - # Enhanced logging for BSE messages - if "bse" in subject.lower(): - logger.info( - f"🔴 BSE MSG detected - Subject: {subject}, SID: {sid}, Size: {size}" - ) - # Calculate where payload starts in the original binary data - # We need to find where the header ends in the original data header_end_marker = b"\r\n" header_start = data.find(b"MSG") if header_start >= 0: @@ -364,26 +350,24 @@ def _process_binary_nats_message(self, data: bytes): payload_end = payload_start + size if payload_end <= len(data): - # Extract binary payload payload = data[payload_start:payload_end] - # Create MSG dict with binary payload msg = { "type": "MSG", "subject": subject, "sid": sid, "size": size, - "payload": payload, # Keep as bytes + "payload": payload, } - logger.info( - f"📊 Binary MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}" + logger.debug( + f"Binary MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}" ) self._process_nats_message(msg) elif text.startswith("PING") or text.startswith("PONG") or text.startswith("+OK"): # Parse as text for control messages - logger.info(f"Control message received: {text.strip()}") + logger.debug(f"Control message received: {text.strip()}") if self.nats_protocol: messages = self.nats_protocol.parse_message(text) else: @@ -410,54 +394,19 @@ def _on_message(self, ws, message): # Decode to check content msg_text = message.decode("utf-8", errors="ignore") - # Enhanced debugging for ALL messages to find BSE + # Log all per-message details at debug level if "MSG" in msg_text: - # Extract subject from MSG line - if "/ld/eq/" in msg_text: - logger.info(f"📥 Market data message received: {msg_size} bytes") - # Check specifically for exchange and type - if "/ld/eq/nse/price" in msg_text: - logger.info(" ✅ NSE LTP message detected") - elif "/ld/eq/nse/book" in msg_text: - logger.info(" ✅ NSE DEPTH message detected") - elif "/ld/eq/bse/price" in msg_text: - logger.info(" 🔴 BSE LTP message detected!") - elif "/ld/eq/bse/book" in msg_text: - logger.info(" 🔴 BSE DEPTH message detected!") - logger.info(f" First 100 chars: {msg_text[:100]}") - # Also check for any BSE-related content - elif "bse" in msg_text.lower() or "532540" in msg_text: - logger.info(f"⚠️ Possible BSE-related message: {msg_size} bytes") - logger.info(f" Content preview: {msg_text[:200]}") - # Check for F&O content - elif "/ld/fo/" in msg_text or "FNO" in msg_text or "53892" in msg_text: - logger.info(f"📈 Possible F&O message detected: {msg_size} bytes") - logger.info(f" Content preview: {msg_text[:200]}") - if "/ld/fo/nse/book" in msg_text: - logger.info(" ✅ NFO DEPTH message confirmed!") - elif "/ld/fo/nse/price" in msg_text: - logger.info(" ✅ NFO LTP message confirmed!") - # Log ANY message if we're monitoring BSE - elif hasattr(self, "monitoring_bse") and self.monitoring_bse: - logger.info( - f"🔍 Message after BSE sub: {msg_size} bytes, starts with: {msg_text[:30]}" - ) + logger.debug(f"Market data message received: {msg_size} bytes, preview: {msg_text[:80]}") else: - # Log INFO messages specially if msg_text.startswith("INFO"): - logger.info( - f"📥 Received INFO message: {msg_size} bytes (BSE=501, NSE=499 expected)" - ) + logger.info(f"Received INFO message: {msg_size} bytes") else: - logger.info(f"📥 Received BINARY message: {msg_size} bytes") - logger.info( - f" First 50 bytes (hex): {message[:50].hex() if len(message) > 0 else 'empty'}" - ) + logger.debug(f"Received BINARY message: {msg_size} bytes") # Parse binary NATS message directly self._process_binary_nats_message(message) else: - logger.info(f"📥 Received TEXT message: {len(message)} chars") + logger.debug(f"Received TEXT message: {len(message)} chars") # Parse text message if self.nats_protocol: @@ -465,9 +414,7 @@ def _on_message(self, ws, message): else: logger.error("NATS protocol handler not initialized") messages = [] - logger.info(f"Parsed {len(messages)} NATS messages") for msg in messages: - logger.info(f"Processing NATS message type: {msg.get('type')}") self._process_nats_message(msg) except Exception as e: @@ -510,17 +457,16 @@ def _process_nats_message(self, msg: dict[str, Any]): # Respond with PONG if self.nats_protocol: self.ws.send(self.nats_protocol.create_pong()) - logger.info("🏓 Received PING from server, sent PONG") + logger.debug("Received PING from server, sent PONG") else: logger.error("Cannot send PONG - NATS protocol handler not initialized") elif msg_type == "PONG": - logger.info("✅ Received PONG from server - Connection alive") + logger.debug("Received PONG from server - Connection alive") elif msg_type == "MSG": - # Market data message - logger.info( - f"📊 Processing MSG - Subject: {msg.get('subject')}, SID: {msg.get('sid')}, Size: {msg.get('size')} bytes" + logger.debug( + f"Processing MSG - Subject: {msg.get('subject')}, SID: {msg.get('sid')}, Size: {msg.get('size')} bytes" ) self._process_market_data_msg(msg) @@ -553,13 +499,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): payload = msg.get("payload", b"") sid = msg.get("sid") - # Enhanced logging for BSE - is_bse = "bse" in subject.lower() - - logger.info(f"📈 Market Data MSG Details{' (BSE)' if is_bse else ''}:") - logger.info(f" Subject: {subject}") - logger.info(f" SID: {sid}") - logger.info(f" Payload size: {len(payload)} bytes") + logger.debug(f"Market Data MSG: Subject={subject}, SID={sid}, Payload={len(payload)} bytes") # Ensure payload is bytes if isinstance(payload, str): @@ -570,18 +510,9 @@ def _process_market_data_msg(self, msg: dict[str, Any]): logger.error(f"Unexpected payload type: {type(payload)}") return - # Log payload hex for debugging - if payload: - logger.info( - f" Payload (hex): {payload[:50].hex()}..." - if len(payload) > 50 - else f" Payload (hex): {payload.hex()}" - ) - - # Try to parse as protobuf - logger.info(f"Parsing protobuf data{' for BSE' if is_bse else ''}...") + # Parse protobuf payload market_data = groww_protobuf.parse_groww_market_data(payload) - logger.info(f"✅ Parsed market data{' (BSE)' if is_bse else ''}: {market_data}") + logger.debug(f"Parsed market data: {market_data}") # Find matching subscription found_subscription = False @@ -592,7 +523,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): sub_sid = self.nats_sids[sub_key] if str(sub_sid) == str(sid): found_subscription = True - logger.info(f"✅ Matched subscription by SID: {sub_key}") + logger.debug(f"Matched subscription by SID: {sub_key}") # Add subscription info to market data market_data["symbol"] = sub_info["symbol"] @@ -614,7 +545,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): market_data["string_mode"] = sub_info["mode"] market_data["original_exchange"] = sub_info["exchange"] - logger.info(f"🚀 Sending market data to callback: {market_data}") + logger.debug(f"Sending market data to callback: {market_data}") # Call data callback if self.on_data: @@ -638,7 +569,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): mode_type == "depth" and sub_info["mode"] == "depth" ): found_subscription = True - logger.info(f"✅ Matched subscription by token pattern: {sub_key}") + logger.debug(f"Matched subscription by token pattern: {sub_key}") # Update the SID mapping for future use self.nats_sids[sub_key] = str(sid) @@ -661,16 +592,14 @@ def _process_market_data_msg(self, msg: dict[str, Any]): market_data["string_mode"] = sub_info["mode"] market_data["original_exchange"] = sub_info["exchange"] - logger.info(f"🚀 Sending market data to callback: {market_data}") + logger.debug(f"Sending market data to callback: {market_data}") if self.on_data: self.on_data(market_data) break if not found_subscription: - logger.warning(f"⚠️ No matching subscription found for SID: {sid}") - logger.info(f" Active SIDs: {self.nats_sids}") - logger.info(f" Subject: {subject}") + logger.debug(f"No matching subscription for SID: {sid}, subject: {subject}") except Exception as e: logger.error(f"Error processing market data: {e}", exc_info=True) @@ -709,11 +638,10 @@ def _send_nats_subscription(self, sub_key: str, sub_info: dict): self.nats_sids[sub_key] = sid logger.info(f"Sent NATS SUB for {topic} with SID {sid}") - logger.info(f"Current nats_sids mapping: {self.nats_sids}") + logger.debug(f"Current nats_sids mapping: {self.nats_sids}") - # Send a PING to flush ALL subscriptions (similar to official SDK's flush) - # This ensures the server processes the subscription before continuing - logger.info("Sending PING to flush subscription") + # Send a PING to flush subscription + logger.debug("Sending PING to flush subscription") self.ws.send(self.nats_protocol.create_ping()) # Wait briefly for PONG to ensure subscription is processed @@ -744,14 +672,8 @@ def subscribe_ltp( """ sub_key = f"ltp_{exchange}_{segment}_{token}" - # Enhanced logging for BSE subscriptions if "BSE" in exchange.upper(): - logger.info("🔴 BSE LTP Subscription Request:") - logger.info(f" Exchange: {exchange}") - logger.info(f" Segment: {segment}") - logger.info(f" Token: {token}") - logger.info(f" Symbol: {symbol}") - logger.info(f" InstrumentType: {instrumenttype}") + logger.debug(f"BSE LTP Subscription: exchange={exchange}, segment={segment}, token={token}, symbol={symbol}") # Determine mode based on whether it's an index # IMPORTANT: Only treat as index if exchange contains 'INDEX' @@ -785,23 +707,11 @@ def subscribe_ltp( if self.connected: self._send_nats_subscription(sub_key, self.subscriptions[sub_key]) - # Special logging for BSE subscriptions if "BSE" in exchange.upper(): - logger.info( - f"🔴 BSE subscription sent for {symbol}, waiting for market data MSG..." - ) - logger.info(f" Subscription key: {sub_key}") - logger.info(f" Active SIDs: {list(self.nats_sids.keys())}") - logger.warning("⚠️ NOTE: Monitoring ALL messages after BSE subscription...") - # Set flag to monitor messages - self.monitoring_bse = True + logger.debug(f"BSE subscription sent for {symbol}, key: {sub_key}") - # Special logging for F&O subscriptions if segment.upper() == "FNO": - logger.info(f"📈 F&O LTP subscription sent for {symbol}") - logger.info(f" Exchange: {exchange}, Segment: {segment}") - logger.info(f" Topic subscribed: /ld/fo/{exchange.lower()}/price.{token}") - self.monitoring_fo = True + logger.debug(f"F&O LTP subscription sent for {symbol}, exchange={exchange}, segment={segment}") return sub_key @@ -833,14 +743,8 @@ def subscribe_depth( sub_key = f"depth_{exchange}_{segment}_{token}" - # Enhanced logging for BSE depth subscriptions if "BSE" in exchange.upper(): - logger.info("🔴 BSE DEPTH Subscription Request:") - logger.info(f" Exchange: {exchange}") - logger.info(f" Segment: {segment}") - logger.info(f" Token: {token}") - logger.info(f" Symbol: {symbol}") - logger.info(f" InstrumentType: {instrumenttype}") + logger.debug(f"BSE DEPTH Subscription: exchange={exchange}, segment={segment}, token={token}, symbol={symbol}") # Store subscription info - CRITICAL FIX: Add numeric mode for depth self.subscriptions[sub_key] = { @@ -857,21 +761,11 @@ def subscribe_depth( if self.connected: self._send_nats_subscription(sub_key, self.subscriptions[sub_key]) - # Special logging for BSE depth subscriptions if "BSE" in exchange.upper(): - logger.info( - f"🔴 BSE DEPTH subscription sent for {symbol}, waiting for depth data MSG..." - ) - logger.info(f" Subscription key: {sub_key}") - logger.info(f" Active SIDs: {list(self.nats_sids.keys())}") + logger.debug(f"BSE DEPTH subscription sent for {symbol}, key: {sub_key}") - # Special logging for F&O subscriptions if segment.upper() == "FNO": - logger.info(f"📈 F&O DEPTH subscription sent for {symbol}") - logger.info(f" Exchange: {exchange}, Segment: {segment}") - logger.info(f" Topic subscribed: {self.subscriptions[sub_key]}") - logger.info(" Monitoring for F&O depth messages...") - self.monitoring_fo = True + logger.debug(f"F&O DEPTH subscription sent for {symbol}, exchange={exchange}, segment={segment}") return sub_key @@ -904,12 +798,12 @@ def unsubscribe_all_and_disconnect(self): """ Unsubscribe from all subscriptions and disconnect completely from server """ - logger.info("🧹 Starting complete unsubscribe and disconnect sequence...") + logger.info("Starting complete unsubscribe and disconnect sequence...") # Step 1: Unsubscribe from all active subscriptions unsubscribed_count = 0 if self.subscriptions: - logger.info(f"📤 Unsubscribing from {len(self.subscriptions)} active subscriptions...") + logger.info(f"Unsubscribing from {len(self.subscriptions)} active subscriptions...") for sub_key in list(self.subscriptions.keys()): try: @@ -918,13 +812,12 @@ def unsubscribe_all_and_disconnect(self): except Exception as e: logger.error(f"Error unsubscribing {sub_key}: {e}") - logger.info(f"✅ Unsubscribed from {unsubscribed_count} subscriptions") + logger.info(f"Unsubscribed from {unsubscribed_count} subscriptions") # Step 2: Send additional NATS cleanup commands if self.connected and self.ws and self.nats_protocol: try: - # Send NATS UNSUB for any remaining SIDs - logger.info("🔧 Sending cleanup UNSUB commands to server...") + logger.debug("Sending cleanup UNSUB commands to server...") for i in range(1, 50): # Clear up to 50 possible SIDs try: unsub_cmd = self.nats_protocol.create_unsubscribe(str(i)) @@ -937,18 +830,18 @@ def unsubscribe_all_and_disconnect(self): time.sleep(1) - logger.info("✅ Server cleanup commands sent") + logger.debug("Server cleanup commands sent") except Exception as e: - logger.warning(f"⚠️ Server cleanup warning: {e}") + logger.warning(f"Server cleanup warning: {e}") # Step 3: Disconnect WebSocket self.disconnect() - logger.info("🏁 Complete unsubscribe and disconnect sequence finished") + logger.info("Complete unsubscribe and disconnect sequence finished") def disconnect(self): """Disconnect from WebSocket with enhanced cleanup""" - logger.info("🔌 Disconnecting from Groww WebSocket...") + logger.info("Disconnecting from Groww WebSocket...") # Set disconnect flags first (similar to Angel's approach) self.running = False @@ -958,7 +851,7 @@ def disconnect(self): # Send NATS cleanup commands before closing if still connected if self.ws and self.nats_protocol: try: - logger.info("📡 Sending final UNSUB commands to server...") + logger.debug("Sending final UNSUB commands to server...") # Send UNSUB commands for any remaining subscriptions for sid in list(self.nats_sids.values()): try: @@ -976,20 +869,19 @@ def disconnect(self): if self.ws: try: - logger.info("🔗 Closing WebSocket connection...") - # Force close the WebSocket connection - self.ws.keep_running = False # Tell WebSocketApp to stop + logger.debug("Closing WebSocket connection...") + self.ws.keep_running = False self.ws.close() - logger.info("✅ WebSocket closed") + logger.debug("WebSocket closed") self.ws = None # Clear the WebSocket reference except Exception as e: logger.error(f"Error closing WebSocket: {e}") if self.ws_thread: - logger.info("⏳ Waiting for WebSocket thread to finish...") + logger.debug("Waiting for WebSocket thread to finish...") self.ws_thread.join(timeout=5) if self.ws_thread.is_alive(): - logger.warning("⚠️ WebSocket thread did not finish gracefully") + logger.warning("WebSocket thread did not finish gracefully") # Clear all state for clean reconnection self.connected = False @@ -1003,7 +895,7 @@ def disconnect(self): self.ws = None self.ws_thread = None - logger.info("✅ Groww WebSocket disconnected and all resources cleared") + logger.info("Groww WebSocket disconnected and all resources cleared") self.subscription_id = None logger.info("Disconnected from Groww WebSocket and cleared state") diff --git a/broker/rmoney/streaming/rmoney_adapter.py b/broker/rmoney/streaming/rmoney_adapter.py index 938489495..11016cf84 100644 --- a/broker/rmoney/streaming/rmoney_adapter.py +++ b/broker/rmoney/streaming/rmoney_adapter.py @@ -38,6 +38,7 @@ def __init__(self): self.lock = threading.Lock() self._reconnect_worker_lock = threading.Lock() self._reconnect_worker: threading.Thread | None = None + self._stop_event = threading.Event() # Interruptible sleep for reconnect # Log the ZMQ port being used self.logger.info(f"RMoney XTS adapter initialized with ZMQ port: {self.zmq_port}") @@ -97,6 +98,13 @@ def initialize( self.logger.info(f"Using API Key: {api_key[:10]}... for RMoney XTS connection") + # Close previous client if initialize() is called again + if self.ws_client is not None: + try: + self.ws_client.close() + except Exception: + pass + # Create RMoney XTS WebSocket client with API credentials self.ws_client = RMoneyWebSocketClient( api_key=api_key, @@ -197,6 +205,8 @@ def connect(self) -> None: self.logger.error("WebSocket client not initialized. Call initialize() first.") return + # Reset stop event for fresh connection lifecycle + self._stop_event.clear() self._start_reconnect_worker(trigger="connect") def _start_reconnect_worker(self, trigger: str) -> None: @@ -232,7 +242,9 @@ def _connect_with_retry(self) -> None: self.reconnect_delay * (2**self.reconnect_attempts), self.max_reconnect_delay ) self.logger.error(f"Connection failed: {e}. Retrying in {delay} seconds...") - time.sleep(delay) + # Use event-based wait so disconnect() can interrupt immediately + if self._stop_event.wait(delay): + break # Stop event was set — abort reconnect if self.reconnect_attempts >= self.max_reconnect_attempts: self.logger.error("Max reconnection attempts reached. Giving up.") @@ -244,18 +256,20 @@ def disconnect(self) -> None: # Set running to False to prevent reconnection attempts self.running = False self.reconnect_attempts = self.max_reconnect_attempts # Prevent reconnection attempts + # Wake up any sleeping reconnect worker immediately + self._stop_event.set() self.logger.info( "Set running=False and max reconnect attempts to prevent auto-reconnection" ) - # Disconnect Socket.IO client + # Full teardown of Socket.IO client + HTTP session if hasattr(self, "ws_client") and self.ws_client: try: - self.logger.info("Disconnecting Socket.IO client...") - self.ws_client.disconnect() - self.logger.info("Socket.IO client disconnect call completed") + self.logger.info("Closing Socket.IO client and HTTP session...") + self.ws_client.close() + self.logger.info("Socket.IO client close call completed") except Exception as e: - self.logger.error(f"Error during Socket.IO disconnect: {e}") + self.logger.error(f"Error during Socket.IO close: {e}") else: self.logger.warning("No WebSocket client to disconnect") diff --git a/broker/rmoney/streaming/rmoney_websocket.py b/broker/rmoney/streaming/rmoney_websocket.py index 94f9d7c8b..20a8f31c6 100644 --- a/broker/rmoney/streaming/rmoney_websocket.py +++ b/broker/rmoney/streaming/rmoney_websocket.py @@ -133,6 +133,9 @@ def __init__( self.subscriptions = {} self._binary_packet_seen = False + # Reusable HTTP session for connection pooling (avoids FD churn) + self._http_session = requests.Session() + # Initialize Socket.IO client self._setup_socketio() @@ -255,40 +258,42 @@ def marketdata_login(self) -> bool: self.logger.info(f"[MARKET DATA LOGIN] Attempting login to: {self.login_url}") - response = requests.post( + response = self._http_session.post( self.login_url, json=login_payload, headers=headers, timeout=30 ) - - if response.status_code == 200: - result = response.json() - self.logger.debug(f"[MARKET DATA LOGIN] Response: {result}") - - if result.get("type") == "success": - login_result = result.get("result", {}) - self.market_data_token = login_result.get("token") - self.actual_user_id = login_result.get("userID") - self.app_version = login_result.get("appVersion") - self.expiry_date = login_result.get("application_expiry_date") - - if self.market_data_token and self.actual_user_id: - self.logger.info( - f"[MARKET DATA LOGIN] Success! UserID: {self.actual_user_id}" - ) - return True + try: + if response.status_code == 200: + result = response.json() + self.logger.debug(f"[MARKET DATA LOGIN] Response: {result}") + + if result.get("type") == "success": + login_result = result.get("result", {}) + self.market_data_token = login_result.get("token") + self.actual_user_id = login_result.get("userID") + self.app_version = login_result.get("appVersion") + self.expiry_date = login_result.get("application_expiry_date") + + if self.market_data_token and self.actual_user_id: + self.logger.info( + f"[MARKET DATA LOGIN] Success! UserID: {self.actual_user_id}" + ) + return True + else: + self.logger.error("[MARKET DATA LOGIN] Missing token or userID in response") + return False else: - self.logger.error("[MARKET DATA LOGIN] Missing token or userID in response") + self.logger.error(f"[MARKET DATA LOGIN] API returned error: {result}") return False else: - self.logger.error(f"[MARKET DATA LOGIN] API returned error: {result}") + self.logger.error( + f"[MARKET DATA LOGIN] HTTP Error: {response.status_code}, Response: {response.text}" + ) return False - else: - self.logger.error( - f"[MARKET DATA LOGIN] HTTP Error: {response.status_code}, Response: {response.text}" - ) - return False + finally: + response.close() except requests.exceptions.Timeout: self.logger.error("[MARKET DATA LOGIN] Request timeout") @@ -316,6 +321,14 @@ def connect(self) -> None: self.sio.disconnect() except Exception: pass + # Force-kill Engine.IO transport to release its FD and threads + try: + eio = getattr(self.sio, "eio", None) + if eio and hasattr(eio, "disconnect"): + eio.disconnect(abort=True) + except Exception: + pass + self.sio = None self.connected = False # Create fresh Socket.IO client to avoid stale internal state @@ -365,16 +378,32 @@ def disconnect(self) -> None: self.running = False self.connected = False - try: - if self.sio and self.sio.connected: - self.sio.disconnect() - self.logger.info("[SOCKET.IO] Disconnected successfully") - except Exception as e: - self.logger.warning(f"[SOCKET.IO] Error during disconnect: {e}") + if self.sio: + try: + if self.sio.connected: + self.sio.disconnect() + self.logger.info("[SOCKET.IO] Disconnected successfully") + except Exception as e: + self.logger.warning(f"[SOCKET.IO] Error during disconnect: {e}") + # Force-kill Engine.IO transport to release its FD and threads + try: + eio = getattr(self.sio, "eio", None) + if eio and hasattr(eio, "disconnect"): + eio.disconnect(abort=True) + except Exception: + pass + self.sio = None # Clear subscriptions self.subscriptions.clear() + def close(self) -> None: + """Full teardown: disconnect Socket.IO and release HTTP session.""" + self.disconnect() + if self._http_session: + self._http_session.close() + self.logger.info("[CLEANUP] HTTP session closed") + def subscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) -> None: """ Subscribe to market data using XTS HTTP API. @@ -418,94 +447,99 @@ def subscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) -> f"[SUBSCRIBE] Code: {xts_message_code}, Instruments: {len(instruments)}" ) - response = requests.post( + response = self._http_session.post( self.subscription_url, json=subscription_request, headers=headers, timeout=10, ) + try: + if response.status_code == 200: + result = response.json() + self.logger.debug(f"[SUBSCRIBE] Response: {result}") + if result.get("type") != "success": + error_desc = result.get("description") or result.get("message") or str(result) + self.logger.error(f"[SUBSCRIBE] API error response: {error_desc}") + raise RuntimeError(error_desc) + + # Process initial quote data if available + if "result" in result: + list_quotes = result["result"].get("listQuotes", []) + self.logger.info( + f"[SUBSCRIBE] Initial quote payload count: {len(list_quotes)} for code {xts_message_code}" + ) + for quote_str in list_quotes: + try: + quote_data = ( + json.loads(quote_str) if isinstance(quote_str, str) else quote_str + ) + self.logger.debug(f"[INITIAL QUOTE] {quote_data}") + if isinstance(quote_data, dict) and "MessageCode" not in quote_data: + quote_data["MessageCode"] = xts_message_code + if self.on_data: + self.on_data(self, quote_data) + except json.JSONDecodeError as e: + self.logger.error(f"[INITIAL QUOTE] Parse error: {e}") - if response.status_code == 200: - result = response.json() - self.logger.debug(f"[SUBSCRIBE] Response: {result}") - if result.get("type") != "success": - error_desc = result.get("description") or result.get("message") or str(result) - self.logger.error(f"[SUBSCRIBE] API error response: {error_desc}") - raise RuntimeError(error_desc) - - # Process initial quote data if available - if "result" in result: - list_quotes = result["result"].get("listQuotes", []) self.logger.info( - f"[SUBSCRIBE] Initial quote payload count: {len(list_quotes)} for code {xts_message_code}" + f"[SUBSCRIBE] Success - {len(instruments)} instruments, code {xts_message_code}" ) - for quote_str in list_quotes: - try: - quote_data = ( - json.loads(quote_str) if isinstance(quote_str, str) else quote_str - ) - self.logger.debug(f"[INITIAL QUOTE] {quote_data}") - if isinstance(quote_data, dict) and "MessageCode" not in quote_data: - quote_data["MessageCode"] = xts_message_code - if self.on_data: - self.on_data(self, quote_data) - except json.JSONDecodeError as e: - self.logger.error(f"[INITIAL QUOTE] Parse error: {e}") - - self.logger.info( - f"[SUBSCRIBE] Success - {len(instruments)} instruments, code {xts_message_code}" - ) - else: - error_msg = f"[SUBSCRIBE] Failed - Status: {response.status_code}, Response: {response.text}" - # "Instrument Already Subscribed" is non-fatal (expected after reconnect) - if "Already Subscribed" in response.text or "e-session-0002" in response.text: - self.logger.info(f"[SUBSCRIBE] Instrument already subscribed (non-fatal)") - return - # Handle Invalid Token by re-authenticating and retrying once. - # This happens when data.py refreshes the feed token, which creates - # a new market data session and invalidates our current token. - if "Invalid Token" in response.text or "e-session-0007" in response.text: - self.logger.warning( - "[SUBSCRIBE] Token invalidated (likely by feed token refresh). Re-authenticating..." - ) - if self.marketdata_login(): - # Retry with new token - retry_headers = { - "authorization": self.market_data_token, - "Content-Type": "application/json", - } - retry_response = requests.post( - self.subscription_url, - json=subscription_request, - headers=retry_headers, - timeout=10, + else: + error_msg = f"[SUBSCRIBE] Failed - Status: {response.status_code}, Response: {response.text}" + # "Instrument Already Subscribed" is non-fatal (expected after reconnect) + if "Already Subscribed" in response.text or "e-session-0002" in response.text: + self.logger.info(f"[SUBSCRIBE] Instrument already subscribed (non-fatal)") + return + # Handle Invalid Token by re-authenticating and retrying once. + # This happens when data.py refreshes the feed token, which creates + # a new market data session and invalidates our current token. + if "Invalid Token" in response.text or "e-session-0007" in response.text: + self.logger.warning( + "[SUBSCRIBE] Token invalidated (likely by feed token refresh). Re-authenticating..." ) - if retry_response.status_code == 200: - retry_result = retry_response.json() - if retry_result.get("type") == "success": - self.logger.info( - f"[SUBSCRIBE] Retry succeeded after re-auth - {len(instruments)} instruments" - ) - # Process initial quotes from retry - if "result" in retry_result: - list_quotes = retry_result["result"].get("listQuotes", []) - for quote_str in list_quotes: - try: - quote_data = ( - json.loads(quote_str) - if isinstance(quote_str, str) - else quote_str - ) - if isinstance(quote_data, dict) and "MessageCode" not in quote_data: - quote_data["MessageCode"] = xts_message_code - if self.on_data: - self.on_data(self, quote_data) - except json.JSONDecodeError: - pass - return - self.logger.error("[SUBSCRIBE] Re-auth retry also failed") - self.logger.error(error_msg) - raise RuntimeError(f"Subscribe failed: {response.text}") + if self.marketdata_login(): + # Retry with new token + retry_headers = { + "authorization": self.market_data_token, + "Content-Type": "application/json", + } + retry_response = self._http_session.post( + self.subscription_url, + json=subscription_request, + headers=retry_headers, + timeout=10, + ) + try: + if retry_response.status_code == 200: + retry_result = retry_response.json() + if retry_result.get("type") == "success": + self.logger.info( + f"[SUBSCRIBE] Retry succeeded after re-auth - {len(instruments)} instruments" + ) + # Process initial quotes from retry + if "result" in retry_result: + list_quotes = retry_result["result"].get("listQuotes", []) + for quote_str in list_quotes: + try: + quote_data = ( + json.loads(quote_str) + if isinstance(quote_str, str) + else quote_str + ) + if isinstance(quote_data, dict) and "MessageCode" not in quote_data: + quote_data["MessageCode"] = xts_message_code + if self.on_data: + self.on_data(self, quote_data) + except json.JSONDecodeError: + pass + return + finally: + retry_response.close() + self.logger.error("[SUBSCRIBE] Re-auth retry also failed") + self.logger.error(error_msg) + raise RuntimeError(f"Subscribe failed: {response.text}") + finally: + response.close() except Exception as e: self.logger.error(f"[SUBSCRIBE] Exception: {e}") @@ -551,26 +585,28 @@ def unsubscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) - f"[UNSUBSCRIBE] Code: {xts_message_code}, Instruments: {len(instruments)}" ) - response = requests.put( + response = self._http_session.put( self.subscription_url, json=unsubscription_request, headers=headers, timeout=10, ) - - if response.status_code == 200: - result = response.json() - if result.get("type") == "success": - self.logger.info(f"[UNSUBSCRIBE] Success - {len(instruments)} instruments") - return True + try: + if response.status_code == 200: + result = response.json() + if result.get("type") == "success": + self.logger.info(f"[UNSUBSCRIBE] Success - {len(instruments)} instruments") + return True + else: + self.logger.error(f"[UNSUBSCRIBE] API error response: {result}") + return False else: - self.logger.error(f"[UNSUBSCRIBE] API error response: {result}") + self.logger.error( + f"[UNSUBSCRIBE] Failed - Status: {response.status_code}" + ) return False - else: - self.logger.error( - f"[UNSUBSCRIBE] Failed - Status: {response.status_code}" - ) - return False + finally: + response.close() except Exception as e: self.logger.error(f"[UNSUBSCRIBE] Exception: {e}") diff --git a/database/symbol.py b/database/symbol.py index c86015782..87f056e03 100644 --- a/database/symbol.py +++ b/database/symbol.py @@ -10,6 +10,11 @@ logger = get_logger(__name__) + +def _escape_like(term: str) -> str: + """Escape LIKE wildcard characters to prevent unintended broad matching.""" + return term.replace("%", r"\%").replace("_", r"\_") + DATABASE_URL = os.getenv("DATABASE_URL") # Conditionally create engine based on DB type if DATABASE_URL and "sqlite" in DATABASE_URL: @@ -75,22 +80,23 @@ def enhanced_search_symbols(query: str, exchange: str = None) -> list[SymToken]: # Create conditions for each term all_conditions = [] for term in terms: + safe_term = _escape_like(term) # Number detection for more accurate strike price and token searches try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), ) all_conditions.append(term_conditions) @@ -182,21 +188,22 @@ def fno_search_symbols_db( primary_term = terms[0] if terms else None all_conditions = [] for term in terms: + safe_term = _escape_like(term) try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), ) all_conditions.append(term_conditions) diff --git a/restx_api/account_schema.py b/restx_api/account_schema.py index f06876b40..2f55125a7 100644 --- a/restx_api/account_schema.py +++ b/restx_api/account_schema.py @@ -2,33 +2,33 @@ class FundsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class OrderbookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class TradebookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class PositionbookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class HoldingsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class OrderStatusSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orderid = fields.Str(required=True) class OpenPositionSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) symbol = fields.Str(required=True) exchange = fields.Str(required=True) @@ -36,20 +36,20 @@ class OpenPositionSchema(Schema): class AnalyzerSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class AnalyzerToggleSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) mode = fields.Bool(required=True) class PingSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class ChartSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class Meta: # Allow unknown fields - chart preferences can have any key-value pairs @@ -57,4 +57,4 @@ class Meta: class PnlSymbolsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) diff --git a/restx_api/chart_api.py b/restx_api/chart_api.py index 5fe21be61..89d0319de 100644 --- a/restx_api/chart_api.py +++ b/restx_api/chart_api.py @@ -1,5 +1,5 @@ +import json import os -import traceback from flask import jsonify, make_response, request from flask_restx import Namespace, Resource @@ -47,8 +47,7 @@ def get(self): return make_response(jsonify(response_data), status_code) except Exception as e: - logger.error(f"Unexpected error in chart GET endpoint: {e}") - traceback.print_exc() + logger.exception(f"Unexpected error in chart GET endpoint: {e}") return make_response( jsonify({"status": "error", "message": "An unexpected error occurred"}), 500 ) @@ -75,6 +74,26 @@ def post(self): # Extract preferences (all keys except apikey) preferences = {k: v for k, v in data.items() if k != "apikey"} + # Limit payload: max 50 keys, each key max 50 chars, each value max 1MB + if len(preferences) > 50: + return make_response( + jsonify({"status": "error", "message": "Too many preference keys (max 50)"}), 400 + ) + for k, v in preferences.items(): + if len(k) > 50: + return make_response( + jsonify({"status": "error", "message": f"Preference key too long: {k[:20]}... (max 50 chars)"}), 400 + ) + # Check serialized size for all value types (not just strings) + try: + serialized = json.dumps(v) + except (TypeError, ValueError): + serialized = str(v) + if len(serialized) > 1_048_576: + return make_response( + jsonify({"status": "error", "message": f"Preference value too large for key: {k} (max 1MB)"}), 400 + ) + if not preferences: return make_response( jsonify({"status": "error", "message": "No preferences provided to update"}), @@ -89,8 +108,7 @@ def post(self): except ValidationError as err: return make_response(jsonify({"status": "error", "message": err.messages}), 400) except Exception as e: - logger.error(f"Unexpected error in chart POST endpoint: {e}") - traceback.print_exc() + logger.exception(f"Unexpected error in chart POST endpoint: {e}") return make_response( jsonify({"status": "error", "message": "An unexpected error occurred"}), 500 ) diff --git a/restx_api/data_schemas.py b/restx_api/data_schemas.py index 0e4c2e688..9f94c06b7 100644 --- a/restx_api/data_schemas.py +++ b/restx_api/data_schemas.py @@ -38,7 +38,7 @@ def validate_option_offset(data): class QuotesSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) # Single symbol exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) @@ -49,14 +49,14 @@ class SymbolExchangePair(Schema): class MultiQuotesSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbols = fields.List( fields.Nested(SymbolExchangePair), required=True, validate=validate.Length(min=1) ) class HistorySchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) interval = fields.Str( @@ -101,23 +101,23 @@ class HistorySchema(Schema): class DepthSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) class IntervalsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class SymbolSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Symbol code (e.g., RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) class TickerSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) # Combined exchange:symbol format interval = fields.Str( required=True, @@ -136,13 +136,13 @@ class TickerSchema(Schema): class SearchSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication query = fields.Str(required=True) # Search query/symbol name exchange = fields.Str(required=False, validate=validate.OneOf(VALID_EXCHANGES)) # Optional exchange filter (e.g., NSE, BSE) class ExpirySchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Underlying symbol (e.g., NIFTY, BANKNIFTY) exchange = fields.Str( required=True, validate=validate.OneOf(["NFO", "BFO", "MCX", "CDS"]) @@ -153,7 +153,7 @@ class ExpirySchema(Schema): class OptionSymbolSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication strategy = fields.Str( required=False, allow_none=True ) # DEPRECATED: Strategy name (optional, will be removed in future versions) @@ -174,7 +174,7 @@ class OptionSymbolSchema(Schema): class OptionGreeksSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Option symbol (e.g., NIFTY28NOV2424000CE) exchange = fields.Str( required=True, validate=validate.OneOf(["NFO", "BFO", "CDS", "MCX"]) @@ -197,7 +197,7 @@ class OptionGreeksSchema(Schema): class InstrumentsSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication exchange = fields.Str( required=False, validate=validate.OneOf(VALID_EXCHANGES), @@ -208,7 +208,7 @@ class InstrumentsSchema(Schema): class OptionChainSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication underlying = fields.Str(required=True) # Underlying symbol (e.g., NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str( required=True, validate=validate.OneOf(VALID_EXCHANGES) @@ -222,14 +222,14 @@ class OptionChainSchema(Schema): class MarketHolidaysSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication year = fields.Int( required=False, validate=validate.Range(min=2020, max=2050) ) # Year to get holidays for (defaults to current year) class MarketTimingsSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication date = fields.Str(required=True) # Date in YYYY-MM-DD format @@ -245,7 +245,7 @@ class OptionSymbolRequest(Schema): class MultiOptionGreeksSchema(Schema): """Schema for batch option greeks requests""" - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbols = fields.List( fields.Nested(OptionSymbolRequest), required=True, diff --git a/restx_api/schemas.py b/restx_api/schemas.py index 4b706fe23..3f2551e94 100644 --- a/restx_api/schemas.py +++ b/restx_api/schemas.py @@ -4,13 +4,13 @@ class OrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) @@ -33,16 +33,16 @@ class OrderSchema(Schema): class SmartOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( + quantity = fields.Float( required=True, - validate=validate.Range(min=0, error="Quantity must be a non-negative integer."), + validate=validate.Range(min=0, error="Quantity must be a non-negative number."), ) - position_size = fields.Int(required=True) + position_size = fields.Float(required=True) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) ) @@ -61,7 +61,7 @@ class SmartOrderSchema(Schema): class ModifyOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) @@ -74,8 +74,8 @@ class ModifyOrderSchema(Schema): price = fields.Float( required=True, validate=validate.Range(min=0, error="Price must be a non-negative number.") ) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) disclosed_quantity = fields.Int( required=True, @@ -88,18 +88,18 @@ class ModifyOrderSchema(Schema): class CancelOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orderid = fields.Str(required=True) class ClosePositionSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) class CancelAllOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) @@ -107,8 +107,8 @@ class BasketOrderItemSchema(Schema): exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) @@ -128,7 +128,7 @@ class BasketOrderItemSchema(Schema): class BasketOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orders = fields.List( fields.Nested(BasketOrderItemSchema), required=True @@ -136,14 +136,14 @@ class BasketOrderSchema(Schema): class SplitOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( + quantity = fields.Float( required=True, - validate=validate.Range(min=1, error="Total quantity must be a positive integer."), + validate=validate.Range(min=0, min_inclusive=False, error="Total quantity must be a positive number."), ) # Total quantity to split splitsize = fields.Int( required=True, @@ -167,7 +167,7 @@ class SplitOrderSchema(Schema): class OptionsOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) underlying = fields.Str( required=True @@ -252,7 +252,7 @@ class OptionsMultiOrderLegSchema(Schema): class OptionsMultiOrderSchema(Schema): """Schema for options multi-order with multiple legs sharing common underlying""" - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) underlying = fields.Str(required=True) # Underlying symbol (NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (NSE_INDEX, NSE, BSE_INDEX, BSE) @@ -272,7 +272,7 @@ class OptionsMultiOrderSchema(Schema): class SyntheticFutureSchema(Schema): """Schema for synthetic future calculation""" - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) underlying = fields.Str(required=True) # Underlying symbol (NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (NSE_INDEX, NSE, BSE_INDEX, BSE) expiry_date = fields.Str(required=True) # Expiry date in DDMMMYY format (e.g., 28OCT25) @@ -304,7 +304,7 @@ class MarginCalculatorSchema(Schema): """Schema for margin calculator request""" apikey = fields.Str( - required=True, validate=validate.Length(min=1, error="API key is required.") + required=True, validate=validate.Length(min=1, max=256, error="API key must be between 1 and 256 characters.") ) positions = fields.List( fields.Nested(MarginPositionSchema), diff --git a/restx_api/telegram_bot.py b/restx_api/telegram_bot.py index 8d755456e..419045d5d 100644 --- a/restx_api/telegram_bot.py +++ b/restx_api/telegram_bot.py @@ -160,7 +160,17 @@ def post(self): if "broadcast_enabled" in data: config_update["broadcast_enabled"] = data["broadcast_enabled"] if "rate_limit_per_minute" in data: - config_update["rate_limit_per_minute"] = data["rate_limit_per_minute"] + try: + rate_limit = int(data["rate_limit_per_minute"]) + if not 1 <= rate_limit <= 120: + return make_response( + jsonify({"status": "error", "message": "rate_limit_per_minute must be between 1 and 120"}), 400 + ) + config_update["rate_limit_per_minute"] = rate_limit + except (TypeError, ValueError): + return make_response( + jsonify({"status": "error", "message": "rate_limit_per_minute must be an integer"}), 400 + ) success = update_bot_config(config_update) @@ -391,11 +401,16 @@ def post(self): message = data.get("message") filters = data.get("filters", {}) - if not message: + if not message or not isinstance(message, str): return make_response( jsonify({"status": "error", "message": "Message is required"}), 400 ) + if len(message) > 4096: + return make_response( + jsonify({"status": "error", "message": "Message must not exceed 4096 characters"}), 400 + ) + # Check if broadcast is enabled config = get_bot_config() if not config.get("broadcast_enabled", True): @@ -445,7 +460,12 @@ def post(self): username = data.get("username") message = data.get("message") - priority = data.get("priority", 5) + try: + priority = int(data.get("priority", 5)) + if not 1 <= priority <= 10: + priority = 5 + except (TypeError, ValueError): + priority = 5 if not username or not message: return make_response( @@ -526,8 +546,11 @@ def get(self): jsonify({"status": "error", "message": "Invalid or missing API key"}), 401 ) - # Get days parameter (default 7) - days = int(request.args.get("days", 7)) + # Get days parameter (default 7, max 365) + try: + days = min(max(int(request.args.get("days", 7)), 1), 365) + except (TypeError, ValueError): + days = 7 stats = get_command_stats(days) diff --git a/services/positionbook_service.py b/services/positionbook_service.py index b020052a0..5f03a621a 100644 --- a/services/positionbook_service.py +++ b/services/positionbook_service.py @@ -41,7 +41,7 @@ def format_position_data(position_data): key: value if key.lower() in passthrough_fields else ( - int(value) + (int(value) if value == int(value) else value) if (key.lower() in quantity_fields and isinstance(value, (int, float))) else (format_decimal(value) if isinstance(value, (int, float)) else value) )