From 2ce1f0b1454e6f33ae23347ecca6fbd3b547866d Mon Sep 17 00:00:00 2001 From: kalaivani Date: Thu, 5 Mar 2026 14:38:37 +0530 Subject: [PATCH 1/8] fix: patch 5 medium-severity findings from deep security audit M1: Escape LIKE wildcards in symbol search to prevent broad matching - Add _escape_like() helper for % and _ characters in database/symbol.py M2: Remove traceback disclosure from API error responses - Replace traceback.format_exc() with generic messages in log.py, analyzer.py - Remove unused traceback import from chart_api.py M4: Add input validation for Telegram bot endpoints - rate_limit_per_minute: int, 1-120 range - broadcast message: max 4096 chars - notification priority: int, 1-10 range - stats days: int, 1-365 range M5: Add apikey length constraints (max 256) across all 39 schema fields - schemas.py, data_schemas.py, account_schema.py M6: Restrict ChartSchema payload to prevent storage exhaustion - Max 50 keys, key max 50 chars, value max 1MB in chart_api.py Note: M3 (Windows resource limits) and M7 (Redis rate limiting) are infrastructure-level changes deferred for separate implementation. Co-Authored-By: Claude Opus 4.6 --- blueprints/analyzer.py | 12 ++++++------ blueprints/log.py | 15 +++++++------- database/symbol.py | 39 ++++++++++++++++++++++--------------- restx_api/account_schema.py | 24 +++++++++++------------ restx_api/chart_api.py | 22 ++++++++++++++++----- restx_api/data_schemas.py | 32 +++++++++++++++--------------- restx_api/schemas.py | 24 +++++++++++------------ restx_api/telegram_bot.py | 33 ++++++++++++++++++++++++++----- 8 files changed, 121 insertions(+), 80 deletions(-) diff --git a/blueprints/analyzer.py b/blueprints/analyzer.py index 23d8ade45..89f53f1a8 100644 --- a/blueprints/analyzer.py +++ b/blueprints/analyzer.py @@ -1,7 +1,7 @@ import csv import io import json -import traceback + from datetime import datetime, timedelta import pytz @@ -128,7 +128,7 @@ def get_filtered_requests(start_date=None, end_date=None): return requests except Exception as e: - logger.exception(f"Error getting filtered requests: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error getting filtered requests: {e}") return [] @@ -173,7 +173,7 @@ def generate_csv(requests): return output.getvalue() except Exception as e: - logger.exception(f"Error generating CSV: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error generating CSV: {str(e)}") return "" @@ -216,7 +216,7 @@ def analyzer(): end_date=end_date, ) except Exception as e: - logger.exception(f"Error rendering analyzer: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error rendering analyzer: {str(e)}") flash("Error loading analyzer dashboard", "error") return redirect(url_for("core_bp.home")) @@ -268,7 +268,7 @@ def api_get_data(): {"status": "success", "data": {"stats": stats_transformed, "requests": requests_data}} ) except Exception as e: - logger.exception(f"Error getting analyzer data: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error getting analyzer data: {str(e)}") return jsonify( {"status": "error", "message": f"Error loading analyzer data: {str(e)}"} ), 500 @@ -352,6 +352,6 @@ def export_requests(): ) return output except Exception as e: - logger.exception(f"Error exporting requests: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error exporting requests: {str(e)}") flash("Error exporting requests", "error") return redirect(url_for("analyzer_bp.analyzer")) diff --git a/blueprints/log.py b/blueprints/log.py index baf38cf3b..c14f9437c 100644 --- a/blueprints/log.py +++ b/blueprints/log.py @@ -3,7 +3,7 @@ import csv import io import json -import traceback + from datetime import datetime import pytz @@ -66,7 +66,7 @@ def format_log_entry(log, ist): "created_at": log.created_at.astimezone(ist).strftime("%Y-%m-%d %I:%M:%S %p"), } except Exception as e: - logger.exception(f"Error formatting log {log.id}: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error formatting log {log.id}: {str(e)}") return { "id": log.id, "api_type": log.api_type, @@ -130,7 +130,7 @@ def get_filtered_logs(start_date=None, end_date=None, search_query=None, page=No return logs, total_pages, total_logs except Exception as e: - logger.exception(f"Error in get_filtered_logs: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error in get_filtered_logs: {str(e)}") return [], 1, 0 @@ -203,7 +203,7 @@ def generate_csv(logs): return si.getvalue() except Exception as e: - logger.exception(f"Error generating CSV: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error generating CSV: {str(e)}") raise @@ -243,7 +243,7 @@ def view_logs(): ) except Exception as e: - logger.exception(f"Error in view_logs: {str(e)}\n{traceback.format_exc()}") + logger.exception(f"Error in view_logs: {str(e)}") return render_template( "logs.html", logs=[], @@ -300,6 +300,5 @@ def export_logs(): ) except Exception as e: - error_msg = f"Error exporting logs: {str(e)}\n{traceback.format_exc()}" - logger.exception(error_msg) - return jsonify({"error": error_msg}), 500 + logger.exception(f"Error exporting logs: {e}") + return jsonify({"error": "An error occurred while exporting logs"}), 500 diff --git a/database/symbol.py b/database/symbol.py index c86015782..c5ea15c5b 100644 --- a/database/symbol.py +++ b/database/symbol.py @@ -10,6 +10,11 @@ logger = get_logger(__name__) + +def _escape_like(term: str) -> str: + """Escape LIKE wildcard characters to prevent unintended broad matching.""" + return term.replace("%", r"\%").replace("_", r"\_") + DATABASE_URL = os.getenv("DATABASE_URL") # Conditionally create engine based on DB type if DATABASE_URL and "sqlite" in DATABASE_URL: @@ -75,22 +80,23 @@ def enhanced_search_symbols(query: str, exchange: str = None) -> list[SymToken]: # Create conditions for each term all_conditions = [] for term in terms: + safe_term = _escape_like(term) # Number detection for more accurate strike price and token searches try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%"), + SymToken.brsymbol.ilike(f"%{safe_term}%"), + SymToken.name.ilike(f"%{safe_term}%"), + SymToken.token.ilike(f"%{safe_term}%"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%"), + SymToken.brsymbol.ilike(f"%{safe_term}%"), + SymToken.name.ilike(f"%{safe_term}%"), + SymToken.token.ilike(f"%{safe_term}%"), ) all_conditions.append(term_conditions) @@ -182,21 +188,22 @@ def fno_search_symbols_db( primary_term = terms[0] if terms else None all_conditions = [] for term in terms: + safe_term = _escape_like(term) try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%"), + SymToken.brsymbol.ilike(f"%{safe_term}%"), + SymToken.name.ilike(f"%{safe_term}%"), + SymToken.token.ilike(f"%{safe_term}%"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{term}%"), - SymToken.brsymbol.ilike(f"%{term}%"), - SymToken.name.ilike(f"%{term}%"), - SymToken.token.ilike(f"%{term}%"), + SymToken.symbol.ilike(f"%{safe_term}%"), + SymToken.brsymbol.ilike(f"%{safe_term}%"), + SymToken.name.ilike(f"%{safe_term}%"), + SymToken.token.ilike(f"%{safe_term}%"), ) all_conditions.append(term_conditions) diff --git a/restx_api/account_schema.py b/restx_api/account_schema.py index f06876b40..2f55125a7 100644 --- a/restx_api/account_schema.py +++ b/restx_api/account_schema.py @@ -2,33 +2,33 @@ class FundsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class OrderbookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class TradebookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class PositionbookSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class HoldingsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class OrderStatusSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orderid = fields.Str(required=True) class OpenPositionSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) symbol = fields.Str(required=True) exchange = fields.Str(required=True) @@ -36,20 +36,20 @@ class OpenPositionSchema(Schema): class AnalyzerSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class AnalyzerToggleSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) mode = fields.Bool(required=True) class PingSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class ChartSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class Meta: # Allow unknown fields - chart preferences can have any key-value pairs @@ -57,4 +57,4 @@ class Meta: class PnlSymbolsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) diff --git a/restx_api/chart_api.py b/restx_api/chart_api.py index 5fe21be61..c116a18b5 100644 --- a/restx_api/chart_api.py +++ b/restx_api/chart_api.py @@ -1,5 +1,4 @@ import os -import traceback from flask import jsonify, make_response, request from flask_restx import Namespace, Resource @@ -47,8 +46,7 @@ def get(self): return make_response(jsonify(response_data), status_code) except Exception as e: - logger.error(f"Unexpected error in chart GET endpoint: {e}") - traceback.print_exc() + logger.exception(f"Unexpected error in chart GET endpoint: {e}") return make_response( jsonify({"status": "error", "message": "An unexpected error occurred"}), 500 ) @@ -75,6 +73,21 @@ def post(self): # Extract preferences (all keys except apikey) preferences = {k: v for k, v in data.items() if k != "apikey"} + # Limit payload: max 50 keys, each key max 50 chars, each value max 1MB + if len(preferences) > 50: + return make_response( + jsonify({"status": "error", "message": "Too many preference keys (max 50)"}), 400 + ) + for k, v in preferences.items(): + if len(k) > 50: + return make_response( + jsonify({"status": "error", "message": f"Preference key too long: {k[:20]}... (max 50 chars)"}), 400 + ) + if isinstance(v, str) and len(v) > 1_048_576: + return make_response( + jsonify({"status": "error", "message": f"Preference value too large for key: {k} (max 1MB)"}), 400 + ) + if not preferences: return make_response( jsonify({"status": "error", "message": "No preferences provided to update"}), @@ -89,8 +102,7 @@ def post(self): except ValidationError as err: return make_response(jsonify({"status": "error", "message": err.messages}), 400) except Exception as e: - logger.error(f"Unexpected error in chart POST endpoint: {e}") - traceback.print_exc() + logger.exception(f"Unexpected error in chart POST endpoint: {e}") return make_response( jsonify({"status": "error", "message": "An unexpected error occurred"}), 500 ) diff --git a/restx_api/data_schemas.py b/restx_api/data_schemas.py index 0e4c2e688..9f94c06b7 100644 --- a/restx_api/data_schemas.py +++ b/restx_api/data_schemas.py @@ -38,7 +38,7 @@ def validate_option_offset(data): class QuotesSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) # Single symbol exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) @@ -49,14 +49,14 @@ class SymbolExchangePair(Schema): class MultiQuotesSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbols = fields.List( fields.Nested(SymbolExchangePair), required=True, validate=validate.Length(min=1) ) class HistorySchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) interval = fields.Str( @@ -101,23 +101,23 @@ class HistorySchema(Schema): class DepthSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) class IntervalsSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) class SymbolSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Symbol code (e.g., RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (e.g., NSE, BSE) class TickerSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) symbol = fields.Str(required=True) # Combined exchange:symbol format interval = fields.Str( required=True, @@ -136,13 +136,13 @@ class TickerSchema(Schema): class SearchSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication query = fields.Str(required=True) # Search query/symbol name exchange = fields.Str(required=False, validate=validate.OneOf(VALID_EXCHANGES)) # Optional exchange filter (e.g., NSE, BSE) class ExpirySchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Underlying symbol (e.g., NIFTY, BANKNIFTY) exchange = fields.Str( required=True, validate=validate.OneOf(["NFO", "BFO", "MCX", "CDS"]) @@ -153,7 +153,7 @@ class ExpirySchema(Schema): class OptionSymbolSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication strategy = fields.Str( required=False, allow_none=True ) # DEPRECATED: Strategy name (optional, will be removed in future versions) @@ -174,7 +174,7 @@ class OptionSymbolSchema(Schema): class OptionGreeksSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbol = fields.Str(required=True) # Option symbol (e.g., NIFTY28NOV2424000CE) exchange = fields.Str( required=True, validate=validate.OneOf(["NFO", "BFO", "CDS", "MCX"]) @@ -197,7 +197,7 @@ class OptionGreeksSchema(Schema): class InstrumentsSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication exchange = fields.Str( required=False, validate=validate.OneOf(VALID_EXCHANGES), @@ -208,7 +208,7 @@ class InstrumentsSchema(Schema): class OptionChainSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication underlying = fields.Str(required=True) # Underlying symbol (e.g., NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str( required=True, validate=validate.OneOf(VALID_EXCHANGES) @@ -222,14 +222,14 @@ class OptionChainSchema(Schema): class MarketHolidaysSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication year = fields.Int( required=False, validate=validate.Range(min=2020, max=2050) ) # Year to get holidays for (defaults to current year) class MarketTimingsSchema(Schema): - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication date = fields.Str(required=True) # Date in YYYY-MM-DD format @@ -245,7 +245,7 @@ class OptionSymbolRequest(Schema): class MultiOptionGreeksSchema(Schema): """Schema for batch option greeks requests""" - apikey = fields.Str(required=True) # API Key for authentication + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) # API Key for authentication symbols = fields.List( fields.Nested(OptionSymbolRequest), required=True, diff --git a/restx_api/schemas.py b/restx_api/schemas.py index 4b706fe23..e3a7f87b2 100644 --- a/restx_api/schemas.py +++ b/restx_api/schemas.py @@ -4,7 +4,7 @@ class OrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) @@ -33,7 +33,7 @@ class OrderSchema(Schema): class SmartOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) @@ -61,7 +61,7 @@ class SmartOrderSchema(Schema): class ModifyOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) @@ -88,18 +88,18 @@ class ModifyOrderSchema(Schema): class CancelOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orderid = fields.Str(required=True) class ClosePositionSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) class CancelAllOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) @@ -128,7 +128,7 @@ class BasketOrderItemSchema(Schema): class BasketOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) orders = fields.List( fields.Nested(BasketOrderItemSchema), required=True @@ -136,7 +136,7 @@ class BasketOrderSchema(Schema): class SplitOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) @@ -167,7 +167,7 @@ class SplitOrderSchema(Schema): class OptionsOrderSchema(Schema): - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) underlying = fields.Str( required=True @@ -252,7 +252,7 @@ class OptionsMultiOrderLegSchema(Schema): class OptionsMultiOrderSchema(Schema): """Schema for options multi-order with multiple legs sharing common underlying""" - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) strategy = fields.Str(required=True) underlying = fields.Str(required=True) # Underlying symbol (NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (NSE_INDEX, NSE, BSE_INDEX, BSE) @@ -272,7 +272,7 @@ class OptionsMultiOrderSchema(Schema): class SyntheticFutureSchema(Schema): """Schema for synthetic future calculation""" - apikey = fields.Str(required=True) + apikey = fields.Str(required=True, validate=validate.Length(min=1, max=256)) underlying = fields.Str(required=True) # Underlying symbol (NIFTY, BANKNIFTY, RELIANCE) exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) # Exchange (NSE_INDEX, NSE, BSE_INDEX, BSE) expiry_date = fields.Str(required=True) # Expiry date in DDMMMYY format (e.g., 28OCT25) @@ -304,7 +304,7 @@ class MarginCalculatorSchema(Schema): """Schema for margin calculator request""" apikey = fields.Str( - required=True, validate=validate.Length(min=1, error="API key is required.") + required=True, validate=validate.Length(min=1, max=256, error="API key is required.") ) positions = fields.List( fields.Nested(MarginPositionSchema), diff --git a/restx_api/telegram_bot.py b/restx_api/telegram_bot.py index 8d755456e..419045d5d 100644 --- a/restx_api/telegram_bot.py +++ b/restx_api/telegram_bot.py @@ -160,7 +160,17 @@ def post(self): if "broadcast_enabled" in data: config_update["broadcast_enabled"] = data["broadcast_enabled"] if "rate_limit_per_minute" in data: - config_update["rate_limit_per_minute"] = data["rate_limit_per_minute"] + try: + rate_limit = int(data["rate_limit_per_minute"]) + if not 1 <= rate_limit <= 120: + return make_response( + jsonify({"status": "error", "message": "rate_limit_per_minute must be between 1 and 120"}), 400 + ) + config_update["rate_limit_per_minute"] = rate_limit + except (TypeError, ValueError): + return make_response( + jsonify({"status": "error", "message": "rate_limit_per_minute must be an integer"}), 400 + ) success = update_bot_config(config_update) @@ -391,11 +401,16 @@ def post(self): message = data.get("message") filters = data.get("filters", {}) - if not message: + if not message or not isinstance(message, str): return make_response( jsonify({"status": "error", "message": "Message is required"}), 400 ) + if len(message) > 4096: + return make_response( + jsonify({"status": "error", "message": "Message must not exceed 4096 characters"}), 400 + ) + # Check if broadcast is enabled config = get_bot_config() if not config.get("broadcast_enabled", True): @@ -445,7 +460,12 @@ def post(self): username = data.get("username") message = data.get("message") - priority = data.get("priority", 5) + try: + priority = int(data.get("priority", 5)) + if not 1 <= priority <= 10: + priority = 5 + except (TypeError, ValueError): + priority = 5 if not username or not message: return make_response( @@ -526,8 +546,11 @@ def get(self): jsonify({"status": "error", "message": "Invalid or missing API key"}), 401 ) - # Get days parameter (default 7) - days = int(request.args.get("days", 7)) + # Get days parameter (default 7, max 365) + try: + days = min(max(int(request.args.get("days", 7)), 1), 365) + except (TypeError, ValueError): + days = 7 stats = get_command_stats(days) From 59ff01e5dc4b18c0ed54b13f986754b08fa3a3df Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 14:49:35 +0530 Subject: [PATCH 2/8] Groww: Optimize master contract download and normalize streaming logs - Replace slow df.apply() and iterrows() with vectorized pandas operations - Use bulk_insert_mappings with 10K batch inserts instead of ORM object loop - Master contract download reduced from 5+ minutes to ~22 seconds - Normalize all per-tick WebSocket logging from INFO to DEBUG - Add periodic summary logging (1 info line per 500 ticks) - Remove emojis from log messages and ~150 lines of dead code Co-Authored-By: Claude Opus 4.6 --- broker/groww/database/master_contract_db.py | 338 +++++--------------- broker/groww/streaming/groww_adapter.py | 143 +++------ broker/groww/streaming/groww_nats.py | 6 +- broker/groww/streaming/groww_protobuf.py | 35 +- broker/groww/streaming/nats_websocket.py | 206 +++--------- 5 files changed, 175 insertions(+), 553 deletions(-) diff --git a/broker/groww/database/master_contract_db.py b/broker/groww/database/master_contract_db.py index 78257fb72..bdcb6b60e 100644 --- a/broker/groww/database/master_contract_db.py +++ b/broker/groww/database/master_contract_db.py @@ -66,32 +66,18 @@ def copy_from_dataframe(df): # Filter out data_dict entries with tokens that already exist filtered_data_dict = [row for row in data_dict if row["token"] not in existing_tokens] - # Insert in bulk the filtered records + # Insert in batches to avoid memory spikes and SQLite locking + BATCH_SIZE = 10000 try: - if filtered_data_dict: # Proceed only if there's anything to insert - logger.info(f"Inserting {len(filtered_data_dict)} new records into the database") - # Create a list of SymToken objects from the filtered data - symtoken_objects = [] - for item in filtered_data_dict: - symtoken = SymToken( - symbol=item.get("symbol", ""), - brsymbol=item.get("brsymbol", ""), - name=item.get("name", ""), - exchange=item.get("exchange", ""), - brexchange=item.get("brexchange", ""), - token=item.get("token", ""), - expiry=item.get("expiry", ""), - strike=float(item.get("strike", 0)) if item.get("strike") else 0, - lotsize=int(item.get("lotsize", 0)) if item.get("lotsize") else 0, - instrumenttype=item.get("instrumenttype", ""), - tick_size=float(item.get("tick_size", 0)) if item.get("tick_size") else 0, - ) - symtoken_objects.append(symtoken) - - # Add all objects and commit in one transaction - db_session.add_all(symtoken_objects) - db_session.commit() - logger.info(f"Successfully inserted {len(symtoken_objects)} records into the database") + if filtered_data_dict: + total = len(filtered_data_dict) + logger.info(f"Inserting {total} new records into the database in batches of {BATCH_SIZE}") + for i in range(0, total, BATCH_SIZE): + batch = filtered_data_dict[i : i + BATCH_SIZE] + db_session.bulk_insert_mappings(SymToken, batch) + db_session.commit() + logger.info(f"Inserted batch {i // BATCH_SIZE + 1} ({min(i + BATCH_SIZE, total)}/{total} records)") + logger.info(f"Bulk insert completed successfully with {total} new records.") else: logger.info("No new records to insert") except Exception as e: @@ -397,46 +383,6 @@ def find_token_by_symbol(symbol, exchange): return None - # Insert in bulk the filtered records - try: - if filtered_data_dict: # Proceed only if there's anything to insert - # Pre-validate records before insertion - invalid_records = [] - valid_records = [] - - for record in filtered_data_dict: - # Allow indices ("I") even if symbol is missing - if record.get("instrumenttype") == "I": - valid_records.append(record) - else: - # Check if symbol exists and is not empty/null - symbol = record.get("symbol") - if not symbol or pd.isna(symbol) or str(symbol).strip() == "": - invalid_records.append(record) - logger.error(f"Schema validation failed for record: {record}") - logger.info("Symbol is missing, empty, or null") - else: - valid_records.append(record) - - if valid_records: - db_session.bulk_insert_mappings(SymToken, valid_records) - db_session.commit() - logger.info( - f"Bulk insert completed successfully with {len(valid_records)} new records." - ) - - if invalid_records: - logger.error( - f"Warning: {len(invalid_records)} records failed schema validation and were skipped." - ) - else: - logger.info("No new records to insert.") - except Exception as e: - logger.error(f"Error during bulk insert: {e}") - if hasattr(e, "__cause__"): - logger.info(f"Caused by: {e.__cause__}") - db_session.rollback() - def download_groww_instrument_data(output_path): """ @@ -466,22 +412,24 @@ def download_groww_instrument_data(output_path): response.raise_for_status() content = response.text - if "," in content and len(content.splitlines()) > 1: - # Read CSV using pandas - df = pd.read_csv(StringIO(content)) - - # Replace headers if column count matches - if len(df.columns) == len(expected_headers): - df.columns = expected_headers - else: - raise ValueError("Downloaded CSV column count does not match expected headers.") + lines = content.split("\n", 1) + if len(lines) < 2 or "," not in content: + raise ValueError("Downloaded content does not appear to be a valid CSV.") - # Save with new headers - df.to_csv(file_path, index=False) - logger.info(f"Successfully saved instruments CSV to: {file_path}") - return [file_path] + # Verify column count matches and replace header line directly (no pandas parse needed) + original_headers = lines[0].strip().split(",") + if len(original_headers) == len(expected_headers): + new_content = ",".join(expected_headers) + "\n" + lines[1] else: - raise ValueError("Downloaded content does not appear to be a valid CSV.") + raise ValueError( + f"Downloaded CSV column count ({len(original_headers)}) does not match expected ({len(expected_headers)})." + ) + + # Write directly to file + with open(file_path, "w", encoding="utf-8") as f: + f.write(new_content) + logger.info(f"Successfully saved instruments CSV to: {file_path}") + return [file_path] except Exception as e: logger.error(f"Failed to download or process Groww instrument data: {e}") raise @@ -660,17 +608,12 @@ def process_groww_data(path): df_mapped.drop("temp_strike", axis=1, inplace=True) df_mapped["tick_size"] = pd.to_numeric(df_mapped["tick_size"], errors="coerce").fillna(0.05) - # Convert expiry from yyyy-mm-dd to DD-MMM-YY format (to match Zerodha/other brokers) - def convert_expiry_format(expiry_val): - if pd.isna(expiry_val) or expiry_val == "": - return "" - try: - expiry_date = pd.to_datetime(expiry_val) - return expiry_date.strftime("%d-%b-%y").upper() - except Exception: - return expiry_val - - df_mapped["expiry"] = df_mapped["expiry"].apply(convert_expiry_format) + # Convert expiry from yyyy-mm-dd to DD-MMM-YY format (vectorized) + expiry_parsed = pd.to_datetime(df_mapped["expiry"], errors="coerce") + valid_expiry = expiry_parsed.notna() + if valid_expiry.any(): + df_mapped.loc[valid_expiry, "expiry"] = expiry_parsed[valid_expiry].dt.strftime("%d-%b-%y").str.upper() + df_mapped["expiry"] = df_mapped["expiry"].fillna("") # Map instrument types directly from Groww's data # We want CE, PE, FUT values to be preserved as is @@ -741,59 +684,50 @@ def convert_expiry_format(expiry_val): index_mask = (df["instrument_type"] == "IDX") | (df["segment"] == "IDX") df_mapped.loc[index_mask, "instrumenttype"] = "INDEX" - # Format the symbol for F&O (NFO) instruments to match OpenAlgo format - def format_fo_symbol(row): - # Skip non-FNO instruments or those with missing expiry - if row["brexchange"] != "NSE" or pd.isna(row["expiry"]) or row["expiry"] == "": - return row["symbol"] + # Format F&O symbols using vectorized operations (much faster than apply) + # Identify FNO rows with valid expiry + fno_data_mask = ( + (df_mapped["brexchange"] == "NSE") + & (df["segment"] == "FNO") + & df_mapped["expiry"].notna() + & (df_mapped["expiry"] != "") + ) - # For segment='FNO', format according to OpenAlgo standard - if "segment" in df.columns and df.loc[row.name, "segment"] == "FNO": - try: - # Format expiry date (assuming yyyy-mm-dd format in input) - expiry_date = pd.to_datetime(row["expiry"]) - expiry_str = expiry_date.strftime("%d%b%y").upper() - - # Get underlying symbol - symbol = ( - row["underlying"] - if "underlying" in row and not pd.isna(row["underlying"]) - else row["symbol"].split("-")[0] - if "-" in row["symbol"] - else row["symbol"] - ) + if fno_data_mask.any(): + # Parse expiry dates for FNO rows and format as DDMMMYY + fno_expiry = pd.to_datetime(df_mapped.loc[fno_data_mask, "expiry"], format="%d-%b-%y", errors="coerce") + expiry_str = fno_expiry.dt.strftime("%d%b%y").str.upper() - # For futures - if row["instrumenttype"] == "FUT": - return f"{symbol}{expiry_str}FUT" - - # For options - elif row["instrumenttype"] == "PE" or "CE": - # Determine strike price - strike = str(int(row["strike"])) if not pd.isna(row["strike"]) else "0" - - # Determine option type (CE/PE) - option_type = "" - if "instrument_type" in df.columns: - instrument_type = df.loc[row.name, "instrument_type"] - option_type = ( - "CE" - if instrument_type == "CE" - else "PE" - if instrument_type == "PE" - else "" - ) + # Use underlying symbol where available, else trading_symbol + underlying = df_mapped.loc[fno_data_mask, "underlying"] + base_symbol = underlying.where(underlying.notna() & (underlying != ""), df_mapped.loc[fno_data_mask, "symbol"]) - if option_type: - return f"{symbol}{expiry_str}{strike}{option_type}" - except Exception as e: - logger.error(f"Error formatting F&O symbol: {e}") + # Strike as integer string + strike_str = df_mapped.loc[fno_data_mask, "strike"].fillna(0).astype(int).astype(str) - # Return original symbol if formatting fails - return row["symbol"] + # Get instrument type from original df + orig_inst_type = df.loc[fno_data_mask.values, "instrument_type"] - # Apply F&O symbol formatting - df_mapped["symbol"] = df_mapped.apply(format_fo_symbol, axis=1) + # Build symbols for futures + fut_mask = fno_data_mask & (df_mapped["instrumenttype"] == "FUT") + if fut_mask.any(): + df_mapped.loc[fut_mask, "symbol"] = ( + base_symbol[fut_mask] + expiry_str[fut_mask] + "FUT" + ) + + # Build symbols for CE options + ce_mask = fno_data_mask & (orig_inst_type.reindex(df_mapped.index, fill_value="") == "CE") + if ce_mask.any(): + df_mapped.loc[ce_mask, "symbol"] = ( + base_symbol[ce_mask] + expiry_str[ce_mask] + strike_str[ce_mask] + "CE" + ) + + # Build symbols for PE options + pe_mask = fno_data_mask & (orig_inst_type.reindex(df_mapped.index, fill_value="") == "PE") + if pe_mask.any(): + df_mapped.loc[pe_mask, "symbol"] = ( + base_symbol[pe_mask] + expiry_str[pe_mask] + strike_str[pe_mask] + "PE" + ) logger.info(f"Processed {len(df_mapped)} instruments") return df_mapped @@ -802,115 +736,6 @@ def format_fo_symbol(row): logger.error(f"Error processing Groww instrument data: {e}") return pd.DataFrame() - # Map instrument types to OpenAlgo standard types - instrument_type_map = { - "EQUITY": "EQ", - "INDEX": "INDEX", - "FUTURE": "FUT", - "CALL": "OPT", - "PUT": "OPT", - "ETF": "EQ", - "CURRENCY": "CUR", - "COMMODITY": "COM", - } - - # Apply instrument type mapping - all_instruments["instrumenttype"] = ( - all_instruments["instrument_type"].map(instrument_type_map).fillna("EQ") - ) - - # Map exchanges to OpenAlgo standard exchanges - exchange_map = {"NSE": "NSE", "BSE": "BSE", "NFO": "NFO", "MCX": "MCX", "CDS": "CDS"} - - # Apply exchange mapping - all_instruments["exchange"] = ( - all_instruments["brexchange"].map(exchange_map).fillna(all_instruments["brexchange"]) - ) - - # Special handling for indices - # Mark indices based on name patterns or specific flags in the data - index_patterns = ["NIFTY", "SENSEX", "BANKNIFTY", "FINNIFTY", "MIDCPNIFTY"] - - for pattern in index_patterns: - index_mask = all_instruments["symbol"].str.contains(pattern, case=False, na=False) - all_instruments.loc[index_mask, "instrumenttype"] = "INDEX" - all_instruments.loc[index_mask, "exchange"] = "NSE_INDEX" - - # Format specific fields - all_instruments["expiry"] = all_instruments["expiry"].fillna("") - all_instruments["strike"] = pd.to_numeric(all_instruments["strike"].fillna(0), errors="coerce") - all_instruments["lotsize"] = pd.to_numeric( - all_instruments["lotsize"].fillna(1), errors="coerce" - ).astype(int) - all_instruments["tick_size"] = pd.to_numeric( - all_instruments["tick_size"].fillna(0.05), errors="coerce" - ) - - # Ensure brsymbol is not empty - use symbol if needed - all_instruments.loc[ - all_instruments["brsymbol"].isna() | (all_instruments["brsymbol"] == ""), "brsymbol" - ] = all_instruments.loc[ - all_instruments["brsymbol"].isna() | (all_instruments["brsymbol"] == ""), "symbol" - ] - - # For F&O instruments, format the symbol in OpenAlgo format - fo_mask = all_instruments["exchange"] == "NFO" - if fo_mask.any(): - # Format F&O symbols according to OpenAlgo standard - def format_fo_symbol(row): - if pd.isna(row["expiry"]) or row["expiry"] == "": - return row["symbol"] - - # Format expiry date to standard format (e.g., 25MAY23) - try: - from datetime import datetime - - expiry_date = pd.to_datetime(row["expiry"]) - expiry_str = expiry_date.strftime("%d%b%y").upper() - except: - expiry_str = row["expiry"] - - # For futures - if row["instrumenttype"] == "FUT": - return f"{row['symbol']}{expiry_str}FUT" - - # For options - elif row["instrumenttype"] == "OPT": - strike = str(int(row["strike"])) if not pd.isna(row["strike"]) else "0" - option_type = ( - "CE" if "option_type" in row and row["option_type"].upper() == "CE" else "PE" - ) - return f"{row['symbol']}{expiry_str}{strike}{option_type}" - - return row["symbol"] - - all_instruments.loc[fo_mask, "symbol"] = all_instruments[fo_mask].apply( - format_fo_symbol, axis=1 - ) - - # Create final DataFrame with required columns - token_df = pd.DataFrame( - { - "symbol": all_instruments["symbol"], - "brsymbol": all_instruments["brsymbol"], - "name": all_instruments["name"], - "exchange": all_instruments["exchange"], - "brexchange": all_instruments["brexchange"], - "token": all_instruments["token"], - "expiry": all_instruments["expiry"], - "strike": all_instruments["strike"], - "lotsize": all_instruments["lotsize"], - "instrumenttype": all_instruments["instrumenttype"], - "tick_size": all_instruments["tick_size"], - } - ) - - # Remove duplicates - token_df = token_df.drop_duplicates(subset=["symbol", "exchange"], keep="first") - - logger.info(f"Processed {len(token_df)} Groww instruments") - return token_df - def delete_groww_temp_data(output_path): """Delete only Groww-specific temporary files created during instrument data download""" @@ -982,16 +807,15 @@ def master_contract_download(): ) token_df["tick_size"] = pd.to_numeric(token_df["tick_size"], errors="coerce").fillna(0.05) - # Step 5: Add OpenAlgo symbols where needed (converting Groww format to OpenAlgo format) - # Identify rows that need conversion (NFO options and futures) - nfo_options = token_df[ - (token_df["exchange"] == "NFO") & (token_df["instrumenttype"].isin(["CE", "PE"])) - ] - - for idx, row in nfo_options.iterrows(): - # Convert the broker symbol to OpenAlgo format if spaces are detected - if " " in row["brsymbol"]: - token_df.at[idx, "symbol"] = format_groww_to_openalgo_symbol(row["brsymbol"], "NFO") + # Step 5: Add OpenAlgo symbols where needed (vectorized - remove spaces from brsymbol) + # For NFO options with spaces in brsymbol, the OpenAlgo format is just the symbol without spaces + nfo_space_mask = ( + (token_df["exchange"] == "NFO") + & (token_df["instrumenttype"].isin(["CE", "PE"])) + & (token_df["brsymbol"].str.contains(" ", na=False)) + ) + if nfo_space_mask.any(): + token_df.loc[nfo_space_mask, "symbol"] = token_df.loc[nfo_space_mask, "brsymbol"].str.replace(" ", "", regex=False) # Step 6: Insert into database logger.info(f"Inserting {len(token_df)} records into database") diff --git a/broker/groww/streaming/groww_adapter.py b/broker/groww/streaming/groww_adapter.py index fe25822e3..d1441b8ea 100644 --- a/broker/groww/streaming/groww_adapter.py +++ b/broker/groww/streaming/groww_adapter.py @@ -113,7 +113,7 @@ def unsubscribe_all(self) -> dict[str, Any]: failed_list = [] self.logger.info( - f"๐Ÿงน Unsubscribing from {len(self.subscriptions)} active subscriptions..." + f"Unsubscribing from {len(self.subscriptions)} active subscriptions..." ) # Create a copy of subscriptions to iterate over @@ -133,7 +133,7 @@ def unsubscribe_all(self) -> dict[str, Any]: unsubscribed_list.append( {"symbol": symbol, "exchange": exchange, "mode": mode} ) - self.logger.debug(f"โœ… Unsubscribed: {exchange}:{symbol} mode {mode}") + self.logger.debug(f"Unsubscribed: {exchange}:{symbol} mode {mode}") else: failed_count += 1 failed_list.append( @@ -145,7 +145,7 @@ def unsubscribe_all(self) -> dict[str, Any]: } ) self.logger.warning( - f"โŒ Failed to unsubscribe: {exchange}:{symbol} mode {mode}" + f"Failed to unsubscribe: {exchange}:{symbol} mode {mode}" ) except Exception as e: @@ -157,13 +157,12 @@ def unsubscribe_all(self) -> dict[str, Any]: self.subscriptions.clear() self.subscription_keys.clear() - # CRITICAL: Call the disconnect method to properly close everything - self.logger.info("๐Ÿ”Œ Calling disconnect() to terminate Groww connection completely...") + self.logger.info("Calling disconnect() to terminate Groww connection...") try: self.disconnect() - self.logger.info("โœ… Successfully disconnected from Groww server") + self.logger.info("Successfully disconnected from Groww server") except Exception as e: - self.logger.error(f"โŒ Error during disconnect: {e}") + self.logger.error(f"Error during disconnect: {e}") # Force cleanup even if disconnect fails self.running = False self.connected = False @@ -180,10 +179,8 @@ def unsubscribe_all(self) -> dict[str, Any]: self._message_count = 0 self.logger.info( - f"๐Ÿ“Š Unsubscribe all complete: {unsubscribed_count} success, {failed_count} failed" + f"Unsubscribe all complete: {unsubscribed_count} success, {failed_count} failed" ) - self.logger.info("โœ… All subscriptions cleared and disconnected from Groww server") - self.logger.info("โœ… ZMQ resources cleaned up - no more data will be published") return self._create_success_response( f"Unsubscribed from {unsubscribed_count} subscriptions and disconnected from server", @@ -203,7 +200,7 @@ def unsubscribe_all(self) -> dict[str, Any]: def disconnect(self) -> None: """Disconnect from Groww WebSocket with proper cleanup""" - self.logger.info("๐Ÿ”Œ Starting Groww adapter disconnect sequence...") + self.logger.info("Starting Groww adapter disconnect sequence...") self.running = False try: @@ -211,7 +208,7 @@ def disconnect(self) -> None: if self.ws_client: try: self.ws_client.disconnect() - self.logger.info("๐Ÿ”— WebSocket client disconnected") + self.logger.debug("WebSocket client disconnected") except Exception as e: self.logger.error(f"Error disconnecting WebSocket client: {e}") @@ -224,10 +221,10 @@ def disconnect(self) -> None: # Clean up ZeroMQ resources self.cleanup_zmq() - self.logger.info("โœ… Groww adapter disconnected and state cleared") + self.logger.info("Groww adapter disconnected and state cleared") except Exception as e: - self.logger.error(f"โŒ Error during disconnect: {e}") + self.logger.error(f"Error during disconnect: {e}") # Force cleanup even if there were errors self.connected = False self.ws_client = None @@ -279,7 +276,7 @@ def subscribe( sym = SymToken.query.filter_by(symbol=symbol, exchange=exchange).first() if sym: instrumenttype = sym.instrumenttype - self.logger.info( + self.logger.debug( f"Retrieved instrumenttype: {instrumenttype} for {symbol}.{exchange}" ) except Exception as e: @@ -303,14 +300,8 @@ def subscribe( # Get exchange and segment for Groww groww_exchange, segment = GrowwExchangeMapper.get_exchange_segment(exchange) - # Log token details for debugging F&O if exchange in ["NFO", "BFO"]: - self.logger.info("F&O Subscription Debug:") - self.logger.info(f" Symbol: {symbol}") - self.logger.info(f" Exchange: {exchange} -> Groww: {groww_exchange}") - self.logger.info(f" Segment: {segment}") - self.logger.info(f" Token from DB: {token}") - self.logger.info(f" Brexchange: {brexchange}") + self.logger.debug(f"F&O Subscription: {symbol}, exchange={exchange}->{groww_exchange}, segment={segment}, token={token}") # Generate unique correlation ID correlation_id = f"{symbol}_{exchange}_{mode}" @@ -333,8 +324,8 @@ def subscribe( try: if mode in [1, 2]: # LTP or Quote mode if mode == 2: - self.logger.info( - f"๐Ÿ“ˆ QUOTE subscription for {symbol} - Note: Groww only provides LTP, OHLCV will be 0" + self.logger.debug( + f"QUOTE subscription for {symbol} - Groww only provides LTP, OHLCV will be 0" ) sub_key = self.ws_client.subscribe_ltp( groww_exchange, segment, token, symbol, instrumenttype @@ -342,8 +333,8 @@ def subscribe( elif mode == 3: # Depth mode # Check if this is an index - indices don't have depth data if instrumenttype == "INDEX" or "INDEX" in exchange: - self.logger.warning( - f"โš ๏ธ Indices don't have depth data. Converting to LTP subscription for {symbol}" + self.logger.info( + f"Indices don't have depth data. Converting to LTP for {symbol}" ) # Subscribe to LTP instead for indices sub_key = self.ws_client.subscribe_ltp( @@ -353,38 +344,21 @@ def subscribe( self.subscriptions[correlation_id]["mode"] = 1 # Change to LTP mode else: # Enhanced logging for BSE depth subscriptions - if "BSE" in groww_exchange: - self.logger.info("๐Ÿ”ด Creating BSE DEPTH subscription:") - self.logger.info(f" Exchange: {groww_exchange}") - self.logger.info(f" Segment: {segment}") - self.logger.info(f" Token: {token}") - self.logger.info(f" Symbol: {symbol}") - # Subscribe to depth for non-index instruments sub_key = self.ws_client.subscribe_depth( groww_exchange, segment, token, symbol, instrumenttype ) - if "BSE" in groww_exchange: - self.logger.info(f"๐Ÿ”ด BSE DEPTH subscription key: {sub_key}") - # Store subscription key for unsubscribe self.subscription_keys[correlation_id] = sub_key mode_name = {1: "LTP", 2: "Quote", 3: "Depth"}.get(mode, str(mode)) self.logger.info( - f"โœ… Subscribed to {symbol}.{exchange} in {mode_name} mode (key: {sub_key})" + f"Subscribed to {symbol}.{exchange} in {mode_name} mode" ) - # Special logging for LTP subscriptions to debug subscribe all issue - if mode == 1: - self.logger.info( - f"๐Ÿ”ฅ LTP SUBSCRIPTION CONFIRMED: {exchange}:{symbol} - data should start flowing" - ) - - # Extra logging for F&O if exchange in ["NFO", "BFO"]: - self.logger.info(f"F&O subscription key created: {sub_key}") + self.logger.debug(f"F&O subscription key created: {sub_key}") except Exception as e: self.logger.error(f"Error subscribing to {symbol}.{exchange}: {e}") @@ -467,17 +441,7 @@ def _resubscribe(self, correlation_id: str, sub_info: dict): def _on_data(self, data: dict[str, Any]) -> None: """Callback for market data from WebSocket""" try: - # Enhanced logging for BSE depth data - is_bse_depth = False - if "depth_data" in data and "exchange" in data and "BSE" in data.get("exchange", ""): - is_bse_depth = True - self.logger.info("๐Ÿ”ด BSE DEPTH DATA RECEIVED!") - self.logger.info(f" Depth data: {data.get('depth_data', {})}") - - # Debug log the raw message data to see what we're actually receiving - self.logger.debug( - f"RAW GROWW DATA{' (BSE DEPTH)' if is_bse_depth else ''}: Type: {type(data)}, Data: {data}" - ) + self.logger.debug(f"RAW GROWW DATA: Type: {type(data)}, Data: {data}") # Add data validation to ensure we have the minimum required fields if not isinstance(data, dict): @@ -513,14 +477,13 @@ def _on_data(self, data: dict[str, Any]) -> None: # Convert string numeric to string mode mode = {1: "ltp", 2: "quote", 3: "depth"}.get(int(mode), "ltp") - # Special logging for BSE depth if "BSE" in exchange and mode == "depth": - self.logger.info("๐Ÿ”ด BSE DEPTH: Looking for subscription") + self.logger.debug("BSE DEPTH: Looking for subscription") - self.logger.info( + self.logger.debug( f"Looking for subscription: symbol={symbol_from_data}, exchange={exchange}, mode={mode}" ) - self.logger.info(f"Available subscriptions: {list(self.subscriptions.keys())}") + self.logger.debug(f"Available subscriptions: {list(self.subscriptions.keys())}") # Find matching subscription based on symbol, exchange and mode with self.lock: @@ -557,7 +520,7 @@ def _on_data(self, data: dict[str, Any]) -> None: if is_index_match or is_regular_match: subscription = sub correlation_id = cid - self.logger.info(f"Matched subscription: {cid}") + self.logger.debug(f"Matched subscription: {cid}") break # Try to match based on exchange token from protobuf data @@ -566,7 +529,7 @@ def _on_data(self, data: dict[str, Any]) -> None: segment = data.get("segment", "CASH") exchange = data.get("exchange", "NSE") - self.logger.info( + self.logger.debug( f"Processing message with token: {token}, segment: {segment}, exchange: {exchange}" ) @@ -583,12 +546,7 @@ def _on_data(self, data: dict[str, Any]) -> None: break if not subscription: - # Enhanced logging for BSE depth debugging - if "BSE" in str(data) and "depth" in str(data).lower(): - self.logger.error("๐Ÿ”ด BSE DEPTH DATA RECEIVED BUT NO SUBSCRIPTION FOUND!") - self.logger.error(f" Data: {data}") - self.logger.error(f" Active subscriptions: {self.subscriptions}") - self.logger.warning(f"Received data for unsubscribed token/symbol: {data}") + self.logger.debug(f"Received data for unsubscribed token/symbol: {data}") return # Extract symbol and exchange from subscription @@ -652,15 +610,14 @@ def _on_data(self, data: dict[str, Any]) -> None: if market_data["ltp"] == 0: self.logger.warning(f"โš ๏ธ NO VALID LTP DATA for {symbol}, check data source") else: - self.logger.info(f"๐Ÿ“ˆ LTP recovered for {symbol}: {market_data['ltp']}") + self.logger.debug(f"LTP recovered for {symbol}: {market_data['ltp']}") # Ensure LTP timestamp if "ltt" not in market_data: market_data["ltt"] = int(time.time() * 1000) - # Log LTP data for debugging subscribe all issue - self.logger.info( - f"๐Ÿ” LTP MODE: {exchange}:{symbol} = โ‚น{market_data['ltp']} at {market_data.get('ltt')}" + self.logger.debug( + f"LTP MODE: {exchange}:{symbol} = {market_data['ltp']} at {market_data.get('ltt')}" ) elif actual_mode == 2: # Quote mode @@ -679,9 +636,8 @@ def _on_data(self, data: dict[str, Any]) -> None: ) market_data["ltp"] = float(ltp_value) if ltp_value else 0.0 - # Log Quote data - self.logger.info( - f"๐Ÿ” QUOTE MODE: {exchange}:{symbol} = โ‚น{market_data['ltp']} (Vol: {market_data.get('volume', 0)})" + self.logger.debug( + f"QUOTE MODE: {exchange}:{symbol} = {market_data['ltp']} (Vol: {market_data.get('volume', 0)})" ) elif actual_mode == 3: # Depth mode @@ -694,54 +650,33 @@ def _on_data(self, data: dict[str, Any]) -> None: if "ltp" not in market_data: market_data["ltp"] = 0.0 - # Log Depth data buy_levels = len(market_data["depth"].get("buy", [])) sell_levels = len(market_data["depth"].get("sell", [])) - self.logger.info( - f"๐Ÿ” DEPTH MODE: {exchange}:{symbol} = {buy_levels}B/{sell_levels}S levels" + self.logger.debug( + f"DEPTH MODE: {exchange}:{symbol} = {buy_levels}B/{sell_levels}S levels" ) - # Enhanced logging for BSE depth - if "BSE" in exchange and mode == 3: - self.logger.info(f"๐Ÿ”ด Publishing BSE DEPTH data for {symbol}") - if "depth" in market_data: - self.logger.info(f" Buy levels: {len(market_data['depth'].get('buy', []))}") - self.logger.info(f" Sell levels: {len(market_data['depth'].get('sell', []))}") - - # Periodic logging instead of every message (reduces noise) - but more frequent for debugging + # Track message count for periodic logging if not hasattr(self, "_message_count"): self._message_count = 0 self._message_count += 1 - # More frequent logging for debugging LTP issue - if self._message_count <= 20 or self._message_count % 25 == 0: + # Periodic logging - log first message and then every 500th + if self._message_count == 1 or self._message_count % 500 == 0: mode_name = {1: "LTP", 2: "QUOTE", 3: "DEPTH"}[actual_mode] ltp_info = ( - f"LTP: โ‚น{market_data.get('ltp', 'N/A')}" + f"LTP: {market_data.get('ltp', 'N/A')}" if actual_mode in [1, 2] else f"Depth: {len(market_data.get('depth', {}).get('buy', []))}B/{len(market_data.get('depth', {}).get('sell', []))}S" ) self.logger.info( - f"๐Ÿ“ˆ Publishing #{self._message_count}: {topic} ({mode_name}) -> {ltp_info}" - ) - - # CRITICAL: Always log LTP mode data to debug subscribe all issue - if actual_mode == 1: - self.logger.info( - f"๐Ÿšจ LTP PUBLISH: {topic} -> โ‚น{market_data.get('ltp')} (Message #{self._message_count})" + f"Publishing #{self._message_count}: {topic} ({mode_name}) -> {ltp_info}" ) # Publish to ZeroMQ self.publish_market_data(topic, market_data) - # Log successful publication for debugging data flow issues - self.logger.debug(f"โœ… ZMQ Published: {topic} with {len(str(market_data))} bytes") - - # Verify publication by checking if we can access the data - if actual_mode == 1 and market_data.get("ltp", 0) > 0: - self.logger.info( - f"โœ… LTP DATA VERIFIED: {exchange}:{symbol} = โ‚น{market_data['ltp']} published successfully" - ) + self.logger.debug(f"ZMQ Published: {topic}") except Exception as e: self.logger.error(f"Error processing market data: {e}", exc_info=True) diff --git a/broker/groww/streaming/groww_nats.py b/broker/groww/streaming/groww_nats.py index d25661efb..a7e464bd7 100644 --- a/broker/groww/streaming/groww_nats.py +++ b/broker/groww/streaming/groww_nats.py @@ -195,7 +195,7 @@ def parse_message( elif self.pending_data.startswith(MSG): # MSG format: MSG [reply-to] <#bytes>\r\n\r\n - logger.info("๐Ÿ” Found MSG in data stream") + logger.debug("Found MSG in data stream") end_idx = self.pending_data.find("\r\n") if end_idx == -1: logger.debug("MSG header incomplete, waiting for more data") @@ -267,13 +267,13 @@ def parse_message( self.pending_data = remaining[size + 2 :] # Skip payload and \r\n - logger.info(f"๐Ÿ“Š MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}") + logger.debug(f"MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}") # Process the message if sid in self.subscriptions: sub = self.subscriptions[sid] sub.received_msgs += 1 - logger.info(f"โœ… Subscription found for SID {sid}: {sub.subject}") + logger.debug(f"Subscription found for SID {sid}: {sub.subject}") messages.append( { diff --git a/broker/groww/streaming/groww_protobuf.py b/broker/groww/streaming/groww_protobuf.py index 53c416357..414858128 100644 --- a/broker/groww/streaming/groww_protobuf.py +++ b/broker/groww/streaming/groww_protobuf.py @@ -206,16 +206,7 @@ def _parse_market_depth(self) -> dict[str, Any]: length = self._read_varint() end_pos = self.position + length - # Log depth message size for BSE vs NSE debugging - total_size = len(self.data) - logger.info( - f"๐Ÿ“Š Parsing depth message: inner length={length} bytes, total message={total_size} bytes" - ) - if total_size == 501: - logger.info("๐Ÿ”ด BSE depth message detected (501 bytes)") - elif total_size == 499: - logger.info("โœ… NSE depth message detected (499 bytes)") - + logger.debug(f"Parsing depth message: inner length={length} bytes") result = {"timestamp": 0, "buy": [], "sell": []} # Parse depth data fields @@ -323,33 +314,13 @@ def parse_groww_market_data(data: bytes) -> dict[str, Any]: Returns: Parsed market data """ - data_len = len(data) - logger.info(f"Parsing protobuf data: {data_len} bytes") - - # Special logging for BSE data (501 bytes) vs NSE (499 bytes) - if data_len == 501: - logger.info("๐Ÿ”ด Potential BSE message detected (501 bytes)") - elif data_len == 499: - logger.info("โœ… Potential NSE message detected (499 bytes)") - - # For BSE depth messages, check if there's an extra field - if data_len == 501: - logger.debug(f"BSE message - Last 10 bytes (hex): {data[-10:].hex()}") - - logger.debug(f"First 50 bytes (hex): {data[:50].hex() if len(data) > 50 else data.hex()}") + logger.debug(f"Parsing protobuf data: {len(data)} bytes") parser = MiniProtobufParser() result = parser.parse_market_data(data) - # Log what was parsed if result: - logger.info(f"Successfully parsed protobuf data: {result.keys()}") - if "ltp_data" in result: - logger.info(f"LTP data found: {result['ltp_data']}") - if "index_data" in result: - logger.info(f"Index data found: {result['index_data']}") - if "depth_data" in result: - logger.info(f"Depth data found: {result['depth_data']}") + logger.debug(f"Parsed protobuf: {result.keys()}") else: logger.warning("No data parsed from protobuf") diff --git a/broker/groww/streaming/nats_websocket.py b/broker/groww/streaming/nats_websocket.py index ecea7e082..5cddc3cc9 100644 --- a/broker/groww/streaming/nats_websocket.py +++ b/broker/groww/streaming/nats_websocket.py @@ -105,12 +105,7 @@ def _send_connect_with_signature(self): jwt=self.socket_token, nkey=nkey, sig=sig ) - # Log CONNECT details for debugging - logger.info("CONNECT details:") - logger.info(f" JWT Token length: {len(self.socket_token) if self.socket_token else 0}") - logger.info(f" Has nkey: {bool(nkey)}") - logger.info(f" Has signature: {bool(sig)}") - logger.debug(f" CONNECT command: {connect_cmd[:200]}...") # Log first 200 chars + logger.debug(f"CONNECT: JWT len={len(self.socket_token) if self.socket_token else 0}, nkey={bool(nkey)}, sig={bool(sig)}") self.ws.send(connect_cmd) logger.info(f"Sent NATS CONNECT with{'out' if not sig else ''} signature") @@ -293,7 +288,7 @@ def periodic_ping(): if self.connected and self.running and self.ws: try: ping_count += 1 - logger.info(f"\U0001f3d3 Sending PING #{ping_count} to check connection...") + logger.debug(f"Sending PING #{ping_count} to check connection...") if self.nats_protocol: self.ws.send(self.nats_protocol.create_ping()) else: @@ -301,7 +296,7 @@ def periodic_ping(): except Exception as e: logger.error(f"Failed to send PING: {e}") break # Exit on error - logger.info("๐Ÿ›‘ Ping thread exiting") + logger.debug("Ping thread exiting") threading.Thread(target=periodic_ping, daemon=True).start() @@ -311,9 +306,7 @@ def _process_binary_nats_message(self, data: bytes): # Convert to string to find message boundaries text = data.decode("utf-8", errors="ignore") - # Log the message type for debugging - if len(text) > 0: - logger.debug(f"Binary message text preview: {text[:100]}") + logger.debug(f"Binary message text preview: {text[:100]}") # Ensure NATS protocol handler exists if not self.nats_protocol: @@ -347,14 +340,7 @@ def _process_binary_nats_message(self, data: bytes): sid = parts[2] size = int(parts[-1]) - # Enhanced logging for BSE messages - if "bse" in subject.lower(): - logger.info( - f"๐Ÿ”ด BSE MSG detected - Subject: {subject}, SID: {sid}, Size: {size}" - ) - # Calculate where payload starts in the original binary data - # We need to find where the header ends in the original data header_end_marker = b"\r\n" header_start = data.find(b"MSG") if header_start >= 0: @@ -364,26 +350,24 @@ def _process_binary_nats_message(self, data: bytes): payload_end = payload_start + size if payload_end <= len(data): - # Extract binary payload payload = data[payload_start:payload_end] - # Create MSG dict with binary payload msg = { "type": "MSG", "subject": subject, "sid": sid, "size": size, - "payload": payload, # Keep as bytes + "payload": payload, } - logger.info( - f"๐Ÿ“Š Binary MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}" + logger.debug( + f"Binary MSG parsed - Subject: {subject}, SID: {sid}, Size: {size}" ) self._process_nats_message(msg) elif text.startswith("PING") or text.startswith("PONG") or text.startswith("+OK"): # Parse as text for control messages - logger.info(f"Control message received: {text.strip()}") + logger.debug(f"Control message received: {text.strip()}") if self.nats_protocol: messages = self.nats_protocol.parse_message(text) else: @@ -410,54 +394,19 @@ def _on_message(self, ws, message): # Decode to check content msg_text = message.decode("utf-8", errors="ignore") - # Enhanced debugging for ALL messages to find BSE + # Log all per-message details at debug level if "MSG" in msg_text: - # Extract subject from MSG line - if "/ld/eq/" in msg_text: - logger.info(f"๐Ÿ“ฅ Market data message received: {msg_size} bytes") - # Check specifically for exchange and type - if "/ld/eq/nse/price" in msg_text: - logger.info(" โœ… NSE LTP message detected") - elif "/ld/eq/nse/book" in msg_text: - logger.info(" โœ… NSE DEPTH message detected") - elif "/ld/eq/bse/price" in msg_text: - logger.info(" ๐Ÿ”ด BSE LTP message detected!") - elif "/ld/eq/bse/book" in msg_text: - logger.info(" ๐Ÿ”ด BSE DEPTH message detected!") - logger.info(f" First 100 chars: {msg_text[:100]}") - # Also check for any BSE-related content - elif "bse" in msg_text.lower() or "532540" in msg_text: - logger.info(f"โš ๏ธ Possible BSE-related message: {msg_size} bytes") - logger.info(f" Content preview: {msg_text[:200]}") - # Check for F&O content - elif "/ld/fo/" in msg_text or "FNO" in msg_text or "53892" in msg_text: - logger.info(f"๐Ÿ“ˆ Possible F&O message detected: {msg_size} bytes") - logger.info(f" Content preview: {msg_text[:200]}") - if "/ld/fo/nse/book" in msg_text: - logger.info(" โœ… NFO DEPTH message confirmed!") - elif "/ld/fo/nse/price" in msg_text: - logger.info(" โœ… NFO LTP message confirmed!") - # Log ANY message if we're monitoring BSE - elif hasattr(self, "monitoring_bse") and self.monitoring_bse: - logger.info( - f"๐Ÿ” Message after BSE sub: {msg_size} bytes, starts with: {msg_text[:30]}" - ) + logger.debug(f"Market data message received: {msg_size} bytes, preview: {msg_text[:80]}") else: - # Log INFO messages specially if msg_text.startswith("INFO"): - logger.info( - f"๐Ÿ“ฅ Received INFO message: {msg_size} bytes (BSE=501, NSE=499 expected)" - ) + logger.info(f"Received INFO message: {msg_size} bytes") else: - logger.info(f"๐Ÿ“ฅ Received BINARY message: {msg_size} bytes") - logger.info( - f" First 50 bytes (hex): {message[:50].hex() if len(message) > 0 else 'empty'}" - ) + logger.debug(f"Received BINARY message: {msg_size} bytes") # Parse binary NATS message directly self._process_binary_nats_message(message) else: - logger.info(f"๐Ÿ“ฅ Received TEXT message: {len(message)} chars") + logger.debug(f"Received TEXT message: {len(message)} chars") # Parse text message if self.nats_protocol: @@ -465,9 +414,7 @@ def _on_message(self, ws, message): else: logger.error("NATS protocol handler not initialized") messages = [] - logger.info(f"Parsed {len(messages)} NATS messages") for msg in messages: - logger.info(f"Processing NATS message type: {msg.get('type')}") self._process_nats_message(msg) except Exception as e: @@ -510,17 +457,16 @@ def _process_nats_message(self, msg: dict[str, Any]): # Respond with PONG if self.nats_protocol: self.ws.send(self.nats_protocol.create_pong()) - logger.info("๐Ÿ“ Received PING from server, sent PONG") + logger.debug("Received PING from server, sent PONG") else: logger.error("Cannot send PONG - NATS protocol handler not initialized") elif msg_type == "PONG": - logger.info("โœ… Received PONG from server - Connection alive") + logger.debug("Received PONG from server - Connection alive") elif msg_type == "MSG": - # Market data message - logger.info( - f"๐Ÿ“Š Processing MSG - Subject: {msg.get('subject')}, SID: {msg.get('sid')}, Size: {msg.get('size')} bytes" + logger.debug( + f"Processing MSG - Subject: {msg.get('subject')}, SID: {msg.get('sid')}, Size: {msg.get('size')} bytes" ) self._process_market_data_msg(msg) @@ -553,13 +499,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): payload = msg.get("payload", b"") sid = msg.get("sid") - # Enhanced logging for BSE - is_bse = "bse" in subject.lower() - - logger.info(f"๐Ÿ“ˆ Market Data MSG Details{' (BSE)' if is_bse else ''}:") - logger.info(f" Subject: {subject}") - logger.info(f" SID: {sid}") - logger.info(f" Payload size: {len(payload)} bytes") + logger.debug(f"Market Data MSG: Subject={subject}, SID={sid}, Payload={len(payload)} bytes") # Ensure payload is bytes if isinstance(payload, str): @@ -570,18 +510,9 @@ def _process_market_data_msg(self, msg: dict[str, Any]): logger.error(f"Unexpected payload type: {type(payload)}") return - # Log payload hex for debugging - if payload: - logger.info( - f" Payload (hex): {payload[:50].hex()}..." - if len(payload) > 50 - else f" Payload (hex): {payload.hex()}" - ) - - # Try to parse as protobuf - logger.info(f"Parsing protobuf data{' for BSE' if is_bse else ''}...") + # Parse protobuf payload market_data = groww_protobuf.parse_groww_market_data(payload) - logger.info(f"โœ… Parsed market data{' (BSE)' if is_bse else ''}: {market_data}") + logger.debug(f"Parsed market data: {market_data}") # Find matching subscription found_subscription = False @@ -592,7 +523,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): sub_sid = self.nats_sids[sub_key] if str(sub_sid) == str(sid): found_subscription = True - logger.info(f"โœ… Matched subscription by SID: {sub_key}") + logger.debug(f"Matched subscription by SID: {sub_key}") # Add subscription info to market data market_data["symbol"] = sub_info["symbol"] @@ -614,7 +545,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): market_data["string_mode"] = sub_info["mode"] market_data["original_exchange"] = sub_info["exchange"] - logger.info(f"๐Ÿš€ Sending market data to callback: {market_data}") + logger.debug(f"Sending market data to callback: {market_data}") # Call data callback if self.on_data: @@ -638,7 +569,7 @@ def _process_market_data_msg(self, msg: dict[str, Any]): mode_type == "depth" and sub_info["mode"] == "depth" ): found_subscription = True - logger.info(f"โœ… Matched subscription by token pattern: {sub_key}") + logger.debug(f"Matched subscription by token pattern: {sub_key}") # Update the SID mapping for future use self.nats_sids[sub_key] = str(sid) @@ -661,16 +592,14 @@ def _process_market_data_msg(self, msg: dict[str, Any]): market_data["string_mode"] = sub_info["mode"] market_data["original_exchange"] = sub_info["exchange"] - logger.info(f"๐Ÿš€ Sending market data to callback: {market_data}") + logger.debug(f"Sending market data to callback: {market_data}") if self.on_data: self.on_data(market_data) break if not found_subscription: - logger.warning(f"โš ๏ธ No matching subscription found for SID: {sid}") - logger.info(f" Active SIDs: {self.nats_sids}") - logger.info(f" Subject: {subject}") + logger.debug(f"No matching subscription for SID: {sid}, subject: {subject}") except Exception as e: logger.error(f"Error processing market data: {e}", exc_info=True) @@ -709,11 +638,10 @@ def _send_nats_subscription(self, sub_key: str, sub_info: dict): self.nats_sids[sub_key] = sid logger.info(f"Sent NATS SUB for {topic} with SID {sid}") - logger.info(f"Current nats_sids mapping: {self.nats_sids}") + logger.debug(f"Current nats_sids mapping: {self.nats_sids}") - # Send a PING to flush ALL subscriptions (similar to official SDK's flush) - # This ensures the server processes the subscription before continuing - logger.info("Sending PING to flush subscription") + # Send a PING to flush subscription + logger.debug("Sending PING to flush subscription") self.ws.send(self.nats_protocol.create_ping()) # Wait briefly for PONG to ensure subscription is processed @@ -744,14 +672,8 @@ def subscribe_ltp( """ sub_key = f"ltp_{exchange}_{segment}_{token}" - # Enhanced logging for BSE subscriptions if "BSE" in exchange.upper(): - logger.info("๐Ÿ”ด BSE LTP Subscription Request:") - logger.info(f" Exchange: {exchange}") - logger.info(f" Segment: {segment}") - logger.info(f" Token: {token}") - logger.info(f" Symbol: {symbol}") - logger.info(f" InstrumentType: {instrumenttype}") + logger.debug(f"BSE LTP Subscription: exchange={exchange}, segment={segment}, token={token}, symbol={symbol}") # Determine mode based on whether it's an index # IMPORTANT: Only treat as index if exchange contains 'INDEX' @@ -785,23 +707,11 @@ def subscribe_ltp( if self.connected: self._send_nats_subscription(sub_key, self.subscriptions[sub_key]) - # Special logging for BSE subscriptions if "BSE" in exchange.upper(): - logger.info( - f"๐Ÿ”ด BSE subscription sent for {symbol}, waiting for market data MSG..." - ) - logger.info(f" Subscription key: {sub_key}") - logger.info(f" Active SIDs: {list(self.nats_sids.keys())}") - logger.warning("โš ๏ธ NOTE: Monitoring ALL messages after BSE subscription...") - # Set flag to monitor messages - self.monitoring_bse = True + logger.debug(f"BSE subscription sent for {symbol}, key: {sub_key}") - # Special logging for F&O subscriptions if segment.upper() == "FNO": - logger.info(f"๐Ÿ“ˆ F&O LTP subscription sent for {symbol}") - logger.info(f" Exchange: {exchange}, Segment: {segment}") - logger.info(f" Topic subscribed: /ld/fo/{exchange.lower()}/price.{token}") - self.monitoring_fo = True + logger.debug(f"F&O LTP subscription sent for {symbol}, exchange={exchange}, segment={segment}") return sub_key @@ -833,14 +743,8 @@ def subscribe_depth( sub_key = f"depth_{exchange}_{segment}_{token}" - # Enhanced logging for BSE depth subscriptions if "BSE" in exchange.upper(): - logger.info("๐Ÿ”ด BSE DEPTH Subscription Request:") - logger.info(f" Exchange: {exchange}") - logger.info(f" Segment: {segment}") - logger.info(f" Token: {token}") - logger.info(f" Symbol: {symbol}") - logger.info(f" InstrumentType: {instrumenttype}") + logger.debug(f"BSE DEPTH Subscription: exchange={exchange}, segment={segment}, token={token}, symbol={symbol}") # Store subscription info - CRITICAL FIX: Add numeric mode for depth self.subscriptions[sub_key] = { @@ -857,21 +761,11 @@ def subscribe_depth( if self.connected: self._send_nats_subscription(sub_key, self.subscriptions[sub_key]) - # Special logging for BSE depth subscriptions if "BSE" in exchange.upper(): - logger.info( - f"๐Ÿ”ด BSE DEPTH subscription sent for {symbol}, waiting for depth data MSG..." - ) - logger.info(f" Subscription key: {sub_key}") - logger.info(f" Active SIDs: {list(self.nats_sids.keys())}") + logger.debug(f"BSE DEPTH subscription sent for {symbol}, key: {sub_key}") - # Special logging for F&O subscriptions if segment.upper() == "FNO": - logger.info(f"๐Ÿ“ˆ F&O DEPTH subscription sent for {symbol}") - logger.info(f" Exchange: {exchange}, Segment: {segment}") - logger.info(f" Topic subscribed: {self.subscriptions[sub_key]}") - logger.info(" Monitoring for F&O depth messages...") - self.monitoring_fo = True + logger.debug(f"F&O DEPTH subscription sent for {symbol}, exchange={exchange}, segment={segment}") return sub_key @@ -904,12 +798,12 @@ def unsubscribe_all_and_disconnect(self): """ Unsubscribe from all subscriptions and disconnect completely from server """ - logger.info("๐Ÿงน Starting complete unsubscribe and disconnect sequence...") + logger.info("Starting complete unsubscribe and disconnect sequence...") # Step 1: Unsubscribe from all active subscriptions unsubscribed_count = 0 if self.subscriptions: - logger.info(f"๐Ÿ“ค Unsubscribing from {len(self.subscriptions)} active subscriptions...") + logger.info(f"Unsubscribing from {len(self.subscriptions)} active subscriptions...") for sub_key in list(self.subscriptions.keys()): try: @@ -918,13 +812,12 @@ def unsubscribe_all_and_disconnect(self): except Exception as e: logger.error(f"Error unsubscribing {sub_key}: {e}") - logger.info(f"โœ… Unsubscribed from {unsubscribed_count} subscriptions") + logger.info(f"Unsubscribed from {unsubscribed_count} subscriptions") # Step 2: Send additional NATS cleanup commands if self.connected and self.ws and self.nats_protocol: try: - # Send NATS UNSUB for any remaining SIDs - logger.info("๐Ÿ”ง Sending cleanup UNSUB commands to server...") + logger.debug("Sending cleanup UNSUB commands to server...") for i in range(1, 50): # Clear up to 50 possible SIDs try: unsub_cmd = self.nats_protocol.create_unsubscribe(str(i)) @@ -937,18 +830,18 @@ def unsubscribe_all_and_disconnect(self): time.sleep(1) - logger.info("โœ… Server cleanup commands sent") + logger.debug("Server cleanup commands sent") except Exception as e: - logger.warning(f"โš ๏ธ Server cleanup warning: {e}") + logger.warning(f"Server cleanup warning: {e}") # Step 3: Disconnect WebSocket self.disconnect() - logger.info("๐Ÿ Complete unsubscribe and disconnect sequence finished") + logger.info("Complete unsubscribe and disconnect sequence finished") def disconnect(self): """Disconnect from WebSocket with enhanced cleanup""" - logger.info("๐Ÿ”Œ Disconnecting from Groww WebSocket...") + logger.info("Disconnecting from Groww WebSocket...") # Set disconnect flags first (similar to Angel's approach) self.running = False @@ -958,7 +851,7 @@ def disconnect(self): # Send NATS cleanup commands before closing if still connected if self.ws and self.nats_protocol: try: - logger.info("๐Ÿ“ก Sending final UNSUB commands to server...") + logger.debug("Sending final UNSUB commands to server...") # Send UNSUB commands for any remaining subscriptions for sid in list(self.nats_sids.values()): try: @@ -976,20 +869,19 @@ def disconnect(self): if self.ws: try: - logger.info("๐Ÿ”— Closing WebSocket connection...") - # Force close the WebSocket connection - self.ws.keep_running = False # Tell WebSocketApp to stop + logger.debug("Closing WebSocket connection...") + self.ws.keep_running = False self.ws.close() - logger.info("โœ… WebSocket closed") + logger.debug("WebSocket closed") self.ws = None # Clear the WebSocket reference except Exception as e: logger.error(f"Error closing WebSocket: {e}") if self.ws_thread: - logger.info("โณ Waiting for WebSocket thread to finish...") + logger.debug("Waiting for WebSocket thread to finish...") self.ws_thread.join(timeout=5) if self.ws_thread.is_alive(): - logger.warning("โš ๏ธ WebSocket thread did not finish gracefully") + logger.warning("WebSocket thread did not finish gracefully") # Clear all state for clean reconnection self.connected = False @@ -1003,7 +895,7 @@ def disconnect(self): self.ws = None self.ws_thread = None - logger.info("โœ… Groww WebSocket disconnected and all resources cleared") + logger.info("Groww WebSocket disconnected and all resources cleared") self.subscription_id = None logger.info("Disconnected from Groww WebSocket and cleared state") From e2f0d64ec46aef1b3315e764f29b78ebd39316f9 Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 14:59:38 +0530 Subject: [PATCH 3/8] Issues Raised by Cubic Dev In PR#1087 - Fixed - chart_api: Validate serialized size for all value types, not just strings - symbol.py: Add escape="\\" to all ilike() calls so wildcard escaping works - schemas: Fix incorrect error message on MarginCalculatorSchema apikey field Co-Authored-By: Claude Opus 4.6 --- database/symbol.py | 32 ++++++++++++++++---------------- restx_api/chart_api.py | 8 +++++++- restx_api/schemas.py | 2 +- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/database/symbol.py b/database/symbol.py index c5ea15c5b..87f056e03 100644 --- a/database/symbol.py +++ b/database/symbol.py @@ -85,18 +85,18 @@ def enhanced_search_symbols(query: str, exchange: str = None) -> list[SymToken]: try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{safe_term}%"), - SymToken.brsymbol.ilike(f"%{safe_term}%"), - SymToken.name.ilike(f"%{safe_term}%"), - SymToken.token.ilike(f"%{safe_term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{safe_term}%"), - SymToken.brsymbol.ilike(f"%{safe_term}%"), - SymToken.name.ilike(f"%{safe_term}%"), - SymToken.token.ilike(f"%{safe_term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), ) all_conditions.append(term_conditions) @@ -192,18 +192,18 @@ def fno_search_symbols_db( try: num_term = float(term) term_conditions = or_( - SymToken.symbol.ilike(f"%{safe_term}%"), - SymToken.brsymbol.ilike(f"%{safe_term}%"), - SymToken.name.ilike(f"%{safe_term}%"), - SymToken.token.ilike(f"%{safe_term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), SymToken.strike == num_term, ) except ValueError: term_conditions = or_( - SymToken.symbol.ilike(f"%{safe_term}%"), - SymToken.brsymbol.ilike(f"%{safe_term}%"), - SymToken.name.ilike(f"%{safe_term}%"), - SymToken.token.ilike(f"%{safe_term}%"), + SymToken.symbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.brsymbol.ilike(f"%{safe_term}%", escape="\\"), + SymToken.name.ilike(f"%{safe_term}%", escape="\\"), + SymToken.token.ilike(f"%{safe_term}%", escape="\\"), ) all_conditions.append(term_conditions) diff --git a/restx_api/chart_api.py b/restx_api/chart_api.py index c116a18b5..89d0319de 100644 --- a/restx_api/chart_api.py +++ b/restx_api/chart_api.py @@ -1,3 +1,4 @@ +import json import os from flask import jsonify, make_response, request @@ -83,7 +84,12 @@ def post(self): return make_response( jsonify({"status": "error", "message": f"Preference key too long: {k[:20]}... (max 50 chars)"}), 400 ) - if isinstance(v, str) and len(v) > 1_048_576: + # Check serialized size for all value types (not just strings) + try: + serialized = json.dumps(v) + except (TypeError, ValueError): + serialized = str(v) + if len(serialized) > 1_048_576: return make_response( jsonify({"status": "error", "message": f"Preference value too large for key: {k} (max 1MB)"}), 400 ) diff --git a/restx_api/schemas.py b/restx_api/schemas.py index e3a7f87b2..49d7ad8fa 100644 --- a/restx_api/schemas.py +++ b/restx_api/schemas.py @@ -304,7 +304,7 @@ class MarginCalculatorSchema(Schema): """Schema for margin calculator request""" apikey = fields.Str( - required=True, validate=validate.Length(min=1, max=256, error="API key is required.") + required=True, validate=validate.Length(min=1, max=256, error="API key must be between 1 and 256 characters.") ) positions = fields.List( fields.Nested(MarginPositionSchema), From 595aad3eb4dc34e4f9269eab921fca59ddc0e068 Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 15:22:33 +0530 Subject: [PATCH 4/8] Groww: Make bulk insert atomic to prevent partial data on failure Use flush() per batch and single commit() after loop so all batches can be rolled back if any fails mid-way. Co-Authored-By: Claude Opus 4.6 --- broker/groww/database/master_contract_db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/broker/groww/database/master_contract_db.py b/broker/groww/database/master_contract_db.py index bdcb6b60e..8b3a1084f 100644 --- a/broker/groww/database/master_contract_db.py +++ b/broker/groww/database/master_contract_db.py @@ -75,8 +75,9 @@ def copy_from_dataframe(df): for i in range(0, total, BATCH_SIZE): batch = filtered_data_dict[i : i + BATCH_SIZE] db_session.bulk_insert_mappings(SymToken, batch) - db_session.commit() + db_session.flush() logger.info(f"Inserted batch {i // BATCH_SIZE + 1} ({min(i + BATCH_SIZE, total)}/{total} records)") + db_session.commit() logger.info(f"Bulk insert completed successfully with {total} new records.") else: logger.info("No new records to insert") From c1e78e44622b731732197c85ea6f836bea843c3a Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 15:25:34 +0530 Subject: [PATCH 5/8] Support fractional quantities for crypto spot trading on Delta Exchange Crypto spot instruments (BTC_INR, ETH_INR, SOL_INR, XRP_INR) require fractional order sizes (e.g. 0.0001 BTC). Schema quantity fields changed from Int to Float to allow fractional values through API validation. Delta Exchange transform layer uses instrument type lookup to send float for spot and int for derivatives. Also adds spot/move_options to contract type map and uses min_order_size for lotsize. Co-Authored-By: Claude Opus 4.6 --- broker/deltaexchange/api/order_api.py | 8 ++--- .../database/master_contract_db.py | 34 ++++++++++++++----- .../deltaexchange/mapping/transform_data.py | 23 +++++++++++-- restx_api/schemas.py | 26 +++++++------- 4 files changed, 62 insertions(+), 29 deletions(-) diff --git a/broker/deltaexchange/api/order_api.py b/broker/deltaexchange/api/order_api.py index b5739c786..a604530a6 100644 --- a/broker/deltaexchange/api/order_api.py +++ b/broker/deltaexchange/api/order_api.py @@ -406,22 +406,22 @@ def place_smartorder_api(data, auth): symbol = data.get("symbol") exchange = data.get("exchange") product = data.get("product") - position_size = int(data.get("position_size", "0")) + position_size = float(data.get("position_size", "0")) - current_position = int( + current_position = float( get_open_position(symbol, exchange, map_product_type(product), auth) ) logger.info( f"[DeltaExchange] SmartOrder: target={position_size} current={current_position}" ) - if position_size == 0 and current_position == 0 and int(data["quantity"]) != 0: + if position_size == 0 and current_position == 0 and float(data["quantity"]) != 0: return place_order_api(data, auth) if position_size == current_position: msg = ( "No OpenPosition Found. Not placing Exit order." - if int(data["quantity"]) == 0 + if float(data["quantity"]) == 0 else "No action needed. Position size matches current position" ) return res, {"status": "success", "message": msg}, None diff --git a/broker/deltaexchange/database/master_contract_db.py b/broker/deltaexchange/database/master_contract_db.py index d8a689e19..03368ffe1 100644 --- a/broker/deltaexchange/database/master_contract_db.py +++ b/broker/deltaexchange/database/master_contract_db.py @@ -253,6 +253,8 @@ def _to_canonical_symbol(delta_symbol: str, instrument_type: str, expiry: str) - "futures": "FUT", "call_options": "CE", "put_options": "PE", + "spot": "SPOT", + "move_options": "MOVE", "interest_rate_swaps": "IRS", "spreads": "SPREAD", "options_combos": "COMBO", @@ -386,7 +388,7 @@ def process_delta_products(products): expiry โ† settlement_time (None โ†’ "" for perpetuals; ISO string โ†’ "DD-MON-YY" for futures/options) strike โ† 0.0 (strike is encoded in the symbol for options) - lotsize โ† 1 (1 contract; contract_value gives underlying units) + lotsize โ† product_specs.min_order_size or 1 (fractional for spot, e.g. 0.0001 BTC) instrumenttype โ† contract_type (mapped via CONTRACT_TYPE_MAP) tick_size โ† tick_size (string โ†’ float) """ @@ -437,16 +439,30 @@ def process_delta_products(products): except (ValueError, TypeError): tick_size = 0.0 - # Extract strike for option contracts from symbol (e.g. C-BTC-80000-280225 -> 80000.0) + # Extract strike price โ€” use the API field directly when available, + # fall back to parsing from the symbol (e.g. C-BTC-80000-280225 -> 80000.0) symbol_str = p.get("symbol", "") strike_val = 0.0 if instrument_type in ("CE", "PE", "TCE", "TPE", "SYNCE", "SYNPE"): - parts_s = symbol_str.split("-") - if len(parts_s) >= 3: - try: - strike_val = float(parts_s[2]) - except (ValueError, TypeError): - strike_val = 0.0 + try: + strike_val = float(p.get("strike_price") or 0) + except (ValueError, TypeError): + strike_val = 0.0 + # Fallback: parse from symbol if API field missing + if strike_val == 0.0: + parts_s = symbol_str.split("-") + if len(parts_s) >= 3: + try: + strike_val = float(parts_s[2]) + except (ValueError, TypeError): + strike_val = 0.0 + + # Lot size: use min_order_size from product_specs (important for spot + # instruments where fractional quantities are allowed, e.g. 0.0001 BTC) + try: + lotsize = float(product_specs.get("min_order_size") or 1) + except (ValueError, TypeError): + lotsize = 1.0 # Build OpenAlgo canonical symbol (exchange = CRYPTO, broker-agnostic format) canonical_symbol = _to_canonical_symbol(symbol_str, instrument_type, expiry) @@ -461,7 +477,7 @@ def process_delta_products(products): "brexchange": "DELTAIN", # Broker identifier (Delta Exchange India) "expiry": expiry, "strike": strike_val, - "lotsize": 1, + "lotsize": lotsize, "instrumenttype": instrument_type, "tick_size": tick_size, "contract_value": float(p.get("contract_value") or 1.0), diff --git a/broker/deltaexchange/mapping/transform_data.py b/broker/deltaexchange/mapping/transform_data.py index 3abbc4b80..0a068586d 100644 --- a/broker/deltaexchange/mapping/transform_data.py +++ b/broker/deltaexchange/mapping/transform_data.py @@ -1,12 +1,25 @@ ๏ปฟ# Mapping OpenAlgo API Request to Delta Exchange API Parameters # Delta Exchange API docs: https://docs.delta.exchange -from database.token_db import get_br_symbol, get_token +from database.token_db import get_br_symbol, get_symbol_info, get_token from utils.logging import get_logger logger = get_logger(__name__) +def _order_size(quantity, symbol, exchange): + """ + Convert quantity to the correct type for the Delta Exchange size parameter. + - Spot instruments: fractional float (e.g. 0.05 SOL) + - Derivatives (futures/options/perps): integer number of contracts + """ + qty = float(quantity) + info = get_symbol_info(symbol, exchange) + if info and info.instrumenttype == "SPOT": + return qty + return int(qty) + + def transform_data(data, token): """ Transforms the OpenAlgo API request structure to Delta Exchange POST /v2/orders payload. @@ -38,10 +51,12 @@ def transform_data(data, token): order_type = map_order_type(data["pricetype"]) side = data["action"].lower() # "buy" or "sell" + size = _order_size(data["quantity"], data["symbol"], data["exchange"]) + transformed = { "product_id": int(token), "product_symbol": symbol, - "size": int(data["quantity"]), + "size": size, "side": side, "order_type": order_type, "time_in_force": "gtc", @@ -134,10 +149,12 @@ def transform_modify_order_data(data): else: limit_price = str(data.get("price", "0")) + size = _order_size(data["quantity"], data["symbol"], data["exchange"]) + transformed = { "id": order_id, "product_id": product_id, - "size": int(data["quantity"]), + "size": size, "limit_price": limit_price, } diff --git a/restx_api/schemas.py b/restx_api/schemas.py index 49d7ad8fa..39ef2d678 100644 --- a/restx_api/schemas.py +++ b/restx_api/schemas.py @@ -9,8 +9,8 @@ class OrderSchema(Schema): exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) @@ -38,11 +38,11 @@ class SmartOrderSchema(Schema): exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( + quantity = fields.Float( required=True, - validate=validate.Range(min=0, error="Quantity must be a non-negative integer."), + validate=validate.Range(min=0, error="Quantity must be a non-negative number."), ) - position_size = fields.Int(required=True) + position_size = fields.Float(required=True) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) ) @@ -74,8 +74,8 @@ class ModifyOrderSchema(Schema): price = fields.Float( required=True, validate=validate.Range(min=0, error="Price must be a non-negative number.") ) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) disclosed_quantity = fields.Int( required=True, @@ -107,8 +107,8 @@ class BasketOrderItemSchema(Schema): exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( - required=True, validate=validate.Range(min=1, error="Quantity must be a positive integer.") + quantity = fields.Float( + required=True, validate=validate.Range(min=0, min_inclusive=False, error="Quantity must be a positive number.") ) pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) @@ -141,13 +141,13 @@ class SplitOrderSchema(Schema): exchange = fields.Str(required=True, validate=validate.OneOf(VALID_EXCHANGES)) symbol = fields.Str(required=True) action = fields.Str(required=True, validate=validate.OneOf(["BUY", "SELL", "buy", "sell"])) - quantity = fields.Int( + quantity = fields.Float( required=True, - validate=validate.Range(min=1, error="Total quantity must be a positive integer."), + validate=validate.Range(min=0, min_inclusive=False, error="Total quantity must be a positive number."), ) # Total quantity to split - splitsize = fields.Int( + splitsize = fields.Float( required=True, - validate=validate.Range(min=1, error="Split size must be a positive integer."), + validate=validate.Range(min=0, min_inclusive=False, error="Split size must be a positive number."), ) # Size of each split pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) From 5c0bb4d7143a378984263ca7c4831a61a436dcf4 Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 16:33:14 +0530 Subject: [PATCH 6/8] Fix position book for crypto spot and address PR review feedback - Spot positions now fetched from wallet/balances (not in /positions/margined) - float() instead of int() for position size to support fractional spot qty - Reject fractional quantities for derivatives with clear error (not silent truncation) - Revert splitsize to Int (split orders don't need fractional sizes) - Position formatter preserves fractional quantities instead of int() casting Co-Authored-By: Claude Opus 4.6 --- broker/deltaexchange/api/order_api.py | 55 ++++++++++++++++--- broker/deltaexchange/mapping/order_data.py | 20 +++++-- .../deltaexchange/mapping/transform_data.py | 7 +++ restx_api/schemas.py | 4 +- services/positionbook_service.py | 2 +- 5 files changed, 73 insertions(+), 15 deletions(-) diff --git a/broker/deltaexchange/api/order_api.py b/broker/deltaexchange/api/order_api.py index a604530a6..79ca0bf0c 100644 --- a/broker/deltaexchange/api/order_api.py +++ b/broker/deltaexchange/api/order_api.py @@ -228,20 +228,61 @@ def get_trade_book(auth): # --------------------------------------------------------------------------- def get_positions(auth): - """Fetch all open margined positions.""" + """ + Fetch all open positions โ€” both derivatives (margined) and spot (wallet). + + Derivatives come from GET /v2/positions/margined. + Spot holdings come from GET /v2/wallet/balances โ€” non-INR assets with + a non-zero balance are synthesised into position-like dicts so they + appear in the OpenAlgo position book alongside derivative positions. + """ + positions = [] + + # 1. Derivative positions (perpetual futures, options) try: result = get_api_response("/v2/positions/margined", auth, method="GET") if result.get("success"): - return result.get("result", []) - logger.warning(f"[DeltaExchange] get_positions unexpected response: {result}") - return [] + positions.extend(result.get("result", [])) + else: + logger.warning(f"[DeltaExchange] get_positions/margined unexpected: {result}") except Exception as e: - logger.error(f"[DeltaExchange] Exception in get_positions: {e}") - return [] + logger.error(f"[DeltaExchange] Exception in get_positions/margined: {e}") + + # 2. Spot holdings from wallet balances + try: + wallet_result = get_api_response("/v2/wallet/balances", auth, method="GET") + if wallet_result.get("success"): + for asset in wallet_result.get("result", []): + if not isinstance(asset, dict): + continue + symbol = asset.get("asset_symbol", "") or asset.get("symbol", "") + # Skip INR (settlement currency) and zero-balance assets + if symbol in ("INR", "USD", "") or not symbol: + continue + balance = float(asset.get("balance", 0) or 0) + blocked = float(asset.get("blocked_margin", 0) or 0) + size = balance - blocked # available spot holding + if size <= 0: + continue + # Synthesise a position-like dict matching /v2/positions/margined structure + spot_symbol = f"{symbol}_INR" + positions.append({ + "product_id": asset.get("asset_id", ""), + "product_symbol": spot_symbol, + "size": size, + "entry_price": "0", # Wallet doesn't track entry price + "realized_pnl": "0", + "unrealized_pnl": "0", + "_is_spot": True, # Internal flag for downstream mapping + }) + except Exception as e: + logger.error(f"[DeltaExchange] Exception fetching spot wallet positions: {e}") + + return positions def get_holdings(auth): - """Delta Exchange is a derivatives-only exchange; equity holdings are not applicable.""" + """Delta Exchange has no equity holdings concept; spot is shown in positions.""" return [] diff --git a/broker/deltaexchange/mapping/order_data.py b/broker/deltaexchange/mapping/order_data.py index ba2cfc202..1041a06e3 100644 --- a/broker/deltaexchange/mapping/order_data.py +++ b/broker/deltaexchange/mapping/order_data.py @@ -1,6 +1,6 @@ import json -from database.token_db import get_symbol, get_symbol_info +from database.token_db import get_oa_symbol, get_symbol, get_symbol_info from utils.logging import get_logger logger = get_logger(__name__) @@ -427,16 +427,26 @@ def map_position_data(position_data): product_id = position.get("product_id", "") product_symbol = position.get("product_symbol", "") + is_spot = position.get("_is_spot", False) - # Resolve symbol from DB; fall back to product_symbol - symbol_from_db = get_symbol(str(product_id), "CRYPTO") if product_id else None + # Resolve symbol from DB; fall back to product_symbol. + # For spot wallet entries, product_id is the asset_id (not a product token), + # so look up by brsymbol (e.g. BTC_INR) instead. + if is_spot: + symbol_from_db = get_oa_symbol(product_symbol, "CRYPTO") + else: + symbol_from_db = get_symbol(str(product_id), "CRYPTO") if product_id else None position["tradingSymbol"] = symbol_from_db or product_symbol position["exchangeSegment"] = "CRYPTO" - position["productType"] = "NRML" + position["productType"] = "CNC" if is_spot else "NRML" # Net quantity: positive = long, negative = short - net_qty = int(position.get("size", 0)) + # Use float() to support fractional spot sizes (e.g. 0.0001 BTC) + try: + net_qty = float(position.get("size", 0)) + except (ValueError, TypeError): + net_qty = 0 position["netQty"] = net_qty # Average entry price diff --git a/broker/deltaexchange/mapping/transform_data.py b/broker/deltaexchange/mapping/transform_data.py index 0a068586d..31ba63fe4 100644 --- a/broker/deltaexchange/mapping/transform_data.py +++ b/broker/deltaexchange/mapping/transform_data.py @@ -12,11 +12,18 @@ def _order_size(quantity, symbol, exchange): Convert quantity to the correct type for the Delta Exchange size parameter. - Spot instruments: fractional float (e.g. 0.05 SOL) - Derivatives (futures/options/perps): integer number of contracts + + Raises ValueError if a fractional quantity is passed for a non-spot instrument. """ qty = float(quantity) info = get_symbol_info(symbol, exchange) if info and info.instrumenttype == "SPOT": return qty + if qty != int(qty): + raise ValueError( + f"Fractional quantity ({qty}) not allowed for derivative contracts. " + f"Use whole numbers for {symbol}." + ) return int(qty) diff --git a/restx_api/schemas.py b/restx_api/schemas.py index 39ef2d678..3f2551e94 100644 --- a/restx_api/schemas.py +++ b/restx_api/schemas.py @@ -145,9 +145,9 @@ class SplitOrderSchema(Schema): required=True, validate=validate.Range(min=0, min_inclusive=False, error="Total quantity must be a positive number."), ) # Total quantity to split - splitsize = fields.Float( + splitsize = fields.Int( required=True, - validate=validate.Range(min=0, min_inclusive=False, error="Split size must be a positive number."), + validate=validate.Range(min=1, error="Split size must be a positive integer."), ) # Size of each split pricetype = fields.Str( missing="MARKET", validate=validate.OneOf(["MARKET", "LIMIT", "SL", "SL-M"]) diff --git a/services/positionbook_service.py b/services/positionbook_service.py index b020052a0..5f03a621a 100644 --- a/services/positionbook_service.py +++ b/services/positionbook_service.py @@ -41,7 +41,7 @@ def format_position_data(position_data): key: value if key.lower() in passthrough_fields else ( - int(value) + (int(value) if value == int(value) else value) if (key.lower() in quantity_fields and isinstance(value, (int, float))) else (format_decimal(value) if isinstance(value, (int, float)) else value) ) From 8081281ec8cf23ffb5be0a4a48e52ac61f7e86fe Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 16:53:12 +0530 Subject: [PATCH 7/8] Fix fractional spot quantity truncation in Delta Exchange margin calculation int() in margin_data.py silently truncated sub-1 spot quantities (e.g. 0.0001 BTC) to 0, causing them to be skipped. Use _order_size() to preserve fractional spot sizes while keeping integer enforcement for derivatives. Co-Authored-By: Claude Opus 4.6 --- broker/deltaexchange/mapping/margin_data.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/broker/deltaexchange/mapping/margin_data.py b/broker/deltaexchange/mapping/margin_data.py index ae1cd3dd2..cc406cb56 100644 --- a/broker/deltaexchange/mapping/margin_data.py +++ b/broker/deltaexchange/mapping/margin_data.py @@ -2,6 +2,7 @@ # Mapping OpenAlgo margin positions to Delta Exchange margin_required API format # Delta Exchange endpoint: GET /v2/products/{product_id}/margin_required +from broker.deltaexchange.mapping.transform_data import _order_size from database.token_db import get_token from utils.logging import get_logger @@ -15,7 +16,7 @@ def transform_margin_positions(positions): Each OpenAlgo position is converted to a dict with the fields needed to call GET /v2/products/{product_id}/margin_required: product_id (int) โ€“ from token DB (token = product_id on Delta) - size (int) โ€“ absolute quantity + size (int|float) โ€“ contracts (int) or spot quantity (float) side (str) โ€“ "buy" or "sell" order_type (str) โ€“ "limit_order" or "market_order" limit_price (str) โ€“ required if order_type == "limit_order" @@ -54,7 +55,7 @@ def transform_margin_positions(positions): entry = { "product_id": product_id, - "size": int(pos["quantity"]), + "size": _order_size(pos["quantity"], symbol, exchange), "side": side, "order_type": order_type, } From 75cf1c12f9006d4517da43ed603b350d5e1147de Mon Sep 17 00:00:00 2001 From: kalaivani Date: Mon, 9 Mar 2026 17:10:43 +0530 Subject: [PATCH 8/8] Fix file descriptor leaks in RMoney XTS WebSocket streaming - Add requests.Session for HTTP connection pooling instead of bare requests calls - Close all HTTP responses explicitly via try/finally to prevent FD leaks - Add close() method for full teardown of Socket.IO client + HTTP session - Force-kill Engine.IO transport on disconnect/reconnect to release FDs and threads - Use threading.Event for interruptible reconnect sleep so disconnect() returns instantly - Close previous ws_client on re-initialize to prevent orphaned resources Co-Authored-By: Claude Opus 4.6 --- broker/rmoney/streaming/rmoney_adapter.py | 26 +- broker/rmoney/streaming/rmoney_websocket.py | 282 +++++++++++--------- 2 files changed, 179 insertions(+), 129 deletions(-) diff --git a/broker/rmoney/streaming/rmoney_adapter.py b/broker/rmoney/streaming/rmoney_adapter.py index 938489495..11016cf84 100644 --- a/broker/rmoney/streaming/rmoney_adapter.py +++ b/broker/rmoney/streaming/rmoney_adapter.py @@ -38,6 +38,7 @@ def __init__(self): self.lock = threading.Lock() self._reconnect_worker_lock = threading.Lock() self._reconnect_worker: threading.Thread | None = None + self._stop_event = threading.Event() # Interruptible sleep for reconnect # Log the ZMQ port being used self.logger.info(f"RMoney XTS adapter initialized with ZMQ port: {self.zmq_port}") @@ -97,6 +98,13 @@ def initialize( self.logger.info(f"Using API Key: {api_key[:10]}... for RMoney XTS connection") + # Close previous client if initialize() is called again + if self.ws_client is not None: + try: + self.ws_client.close() + except Exception: + pass + # Create RMoney XTS WebSocket client with API credentials self.ws_client = RMoneyWebSocketClient( api_key=api_key, @@ -197,6 +205,8 @@ def connect(self) -> None: self.logger.error("WebSocket client not initialized. Call initialize() first.") return + # Reset stop event for fresh connection lifecycle + self._stop_event.clear() self._start_reconnect_worker(trigger="connect") def _start_reconnect_worker(self, trigger: str) -> None: @@ -232,7 +242,9 @@ def _connect_with_retry(self) -> None: self.reconnect_delay * (2**self.reconnect_attempts), self.max_reconnect_delay ) self.logger.error(f"Connection failed: {e}. Retrying in {delay} seconds...") - time.sleep(delay) + # Use event-based wait so disconnect() can interrupt immediately + if self._stop_event.wait(delay): + break # Stop event was set โ€” abort reconnect if self.reconnect_attempts >= self.max_reconnect_attempts: self.logger.error("Max reconnection attempts reached. Giving up.") @@ -244,18 +256,20 @@ def disconnect(self) -> None: # Set running to False to prevent reconnection attempts self.running = False self.reconnect_attempts = self.max_reconnect_attempts # Prevent reconnection attempts + # Wake up any sleeping reconnect worker immediately + self._stop_event.set() self.logger.info( "Set running=False and max reconnect attempts to prevent auto-reconnection" ) - # Disconnect Socket.IO client + # Full teardown of Socket.IO client + HTTP session if hasattr(self, "ws_client") and self.ws_client: try: - self.logger.info("Disconnecting Socket.IO client...") - self.ws_client.disconnect() - self.logger.info("Socket.IO client disconnect call completed") + self.logger.info("Closing Socket.IO client and HTTP session...") + self.ws_client.close() + self.logger.info("Socket.IO client close call completed") except Exception as e: - self.logger.error(f"Error during Socket.IO disconnect: {e}") + self.logger.error(f"Error during Socket.IO close: {e}") else: self.logger.warning("No WebSocket client to disconnect") diff --git a/broker/rmoney/streaming/rmoney_websocket.py b/broker/rmoney/streaming/rmoney_websocket.py index 94f9d7c8b..20a8f31c6 100644 --- a/broker/rmoney/streaming/rmoney_websocket.py +++ b/broker/rmoney/streaming/rmoney_websocket.py @@ -133,6 +133,9 @@ def __init__( self.subscriptions = {} self._binary_packet_seen = False + # Reusable HTTP session for connection pooling (avoids FD churn) + self._http_session = requests.Session() + # Initialize Socket.IO client self._setup_socketio() @@ -255,40 +258,42 @@ def marketdata_login(self) -> bool: self.logger.info(f"[MARKET DATA LOGIN] Attempting login to: {self.login_url}") - response = requests.post( + response = self._http_session.post( self.login_url, json=login_payload, headers=headers, timeout=30 ) - - if response.status_code == 200: - result = response.json() - self.logger.debug(f"[MARKET DATA LOGIN] Response: {result}") - - if result.get("type") == "success": - login_result = result.get("result", {}) - self.market_data_token = login_result.get("token") - self.actual_user_id = login_result.get("userID") - self.app_version = login_result.get("appVersion") - self.expiry_date = login_result.get("application_expiry_date") - - if self.market_data_token and self.actual_user_id: - self.logger.info( - f"[MARKET DATA LOGIN] Success! UserID: {self.actual_user_id}" - ) - return True + try: + if response.status_code == 200: + result = response.json() + self.logger.debug(f"[MARKET DATA LOGIN] Response: {result}") + + if result.get("type") == "success": + login_result = result.get("result", {}) + self.market_data_token = login_result.get("token") + self.actual_user_id = login_result.get("userID") + self.app_version = login_result.get("appVersion") + self.expiry_date = login_result.get("application_expiry_date") + + if self.market_data_token and self.actual_user_id: + self.logger.info( + f"[MARKET DATA LOGIN] Success! UserID: {self.actual_user_id}" + ) + return True + else: + self.logger.error("[MARKET DATA LOGIN] Missing token or userID in response") + return False else: - self.logger.error("[MARKET DATA LOGIN] Missing token or userID in response") + self.logger.error(f"[MARKET DATA LOGIN] API returned error: {result}") return False else: - self.logger.error(f"[MARKET DATA LOGIN] API returned error: {result}") + self.logger.error( + f"[MARKET DATA LOGIN] HTTP Error: {response.status_code}, Response: {response.text}" + ) return False - else: - self.logger.error( - f"[MARKET DATA LOGIN] HTTP Error: {response.status_code}, Response: {response.text}" - ) - return False + finally: + response.close() except requests.exceptions.Timeout: self.logger.error("[MARKET DATA LOGIN] Request timeout") @@ -316,6 +321,14 @@ def connect(self) -> None: self.sio.disconnect() except Exception: pass + # Force-kill Engine.IO transport to release its FD and threads + try: + eio = getattr(self.sio, "eio", None) + if eio and hasattr(eio, "disconnect"): + eio.disconnect(abort=True) + except Exception: + pass + self.sio = None self.connected = False # Create fresh Socket.IO client to avoid stale internal state @@ -365,16 +378,32 @@ def disconnect(self) -> None: self.running = False self.connected = False - try: - if self.sio and self.sio.connected: - self.sio.disconnect() - self.logger.info("[SOCKET.IO] Disconnected successfully") - except Exception as e: - self.logger.warning(f"[SOCKET.IO] Error during disconnect: {e}") + if self.sio: + try: + if self.sio.connected: + self.sio.disconnect() + self.logger.info("[SOCKET.IO] Disconnected successfully") + except Exception as e: + self.logger.warning(f"[SOCKET.IO] Error during disconnect: {e}") + # Force-kill Engine.IO transport to release its FD and threads + try: + eio = getattr(self.sio, "eio", None) + if eio and hasattr(eio, "disconnect"): + eio.disconnect(abort=True) + except Exception: + pass + self.sio = None # Clear subscriptions self.subscriptions.clear() + def close(self) -> None: + """Full teardown: disconnect Socket.IO and release HTTP session.""" + self.disconnect() + if self._http_session: + self._http_session.close() + self.logger.info("[CLEANUP] HTTP session closed") + def subscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) -> None: """ Subscribe to market data using XTS HTTP API. @@ -418,94 +447,99 @@ def subscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) -> f"[SUBSCRIBE] Code: {xts_message_code}, Instruments: {len(instruments)}" ) - response = requests.post( + response = self._http_session.post( self.subscription_url, json=subscription_request, headers=headers, timeout=10, ) + try: + if response.status_code == 200: + result = response.json() + self.logger.debug(f"[SUBSCRIBE] Response: {result}") + if result.get("type") != "success": + error_desc = result.get("description") or result.get("message") or str(result) + self.logger.error(f"[SUBSCRIBE] API error response: {error_desc}") + raise RuntimeError(error_desc) + + # Process initial quote data if available + if "result" in result: + list_quotes = result["result"].get("listQuotes", []) + self.logger.info( + f"[SUBSCRIBE] Initial quote payload count: {len(list_quotes)} for code {xts_message_code}" + ) + for quote_str in list_quotes: + try: + quote_data = ( + json.loads(quote_str) if isinstance(quote_str, str) else quote_str + ) + self.logger.debug(f"[INITIAL QUOTE] {quote_data}") + if isinstance(quote_data, dict) and "MessageCode" not in quote_data: + quote_data["MessageCode"] = xts_message_code + if self.on_data: + self.on_data(self, quote_data) + except json.JSONDecodeError as e: + self.logger.error(f"[INITIAL QUOTE] Parse error: {e}") - if response.status_code == 200: - result = response.json() - self.logger.debug(f"[SUBSCRIBE] Response: {result}") - if result.get("type") != "success": - error_desc = result.get("description") or result.get("message") or str(result) - self.logger.error(f"[SUBSCRIBE] API error response: {error_desc}") - raise RuntimeError(error_desc) - - # Process initial quote data if available - if "result" in result: - list_quotes = result["result"].get("listQuotes", []) self.logger.info( - f"[SUBSCRIBE] Initial quote payload count: {len(list_quotes)} for code {xts_message_code}" + f"[SUBSCRIBE] Success - {len(instruments)} instruments, code {xts_message_code}" ) - for quote_str in list_quotes: - try: - quote_data = ( - json.loads(quote_str) if isinstance(quote_str, str) else quote_str - ) - self.logger.debug(f"[INITIAL QUOTE] {quote_data}") - if isinstance(quote_data, dict) and "MessageCode" not in quote_data: - quote_data["MessageCode"] = xts_message_code - if self.on_data: - self.on_data(self, quote_data) - except json.JSONDecodeError as e: - self.logger.error(f"[INITIAL QUOTE] Parse error: {e}") - - self.logger.info( - f"[SUBSCRIBE] Success - {len(instruments)} instruments, code {xts_message_code}" - ) - else: - error_msg = f"[SUBSCRIBE] Failed - Status: {response.status_code}, Response: {response.text}" - # "Instrument Already Subscribed" is non-fatal (expected after reconnect) - if "Already Subscribed" in response.text or "e-session-0002" in response.text: - self.logger.info(f"[SUBSCRIBE] Instrument already subscribed (non-fatal)") - return - # Handle Invalid Token by re-authenticating and retrying once. - # This happens when data.py refreshes the feed token, which creates - # a new market data session and invalidates our current token. - if "Invalid Token" in response.text or "e-session-0007" in response.text: - self.logger.warning( - "[SUBSCRIBE] Token invalidated (likely by feed token refresh). Re-authenticating..." - ) - if self.marketdata_login(): - # Retry with new token - retry_headers = { - "authorization": self.market_data_token, - "Content-Type": "application/json", - } - retry_response = requests.post( - self.subscription_url, - json=subscription_request, - headers=retry_headers, - timeout=10, + else: + error_msg = f"[SUBSCRIBE] Failed - Status: {response.status_code}, Response: {response.text}" + # "Instrument Already Subscribed" is non-fatal (expected after reconnect) + if "Already Subscribed" in response.text or "e-session-0002" in response.text: + self.logger.info(f"[SUBSCRIBE] Instrument already subscribed (non-fatal)") + return + # Handle Invalid Token by re-authenticating and retrying once. + # This happens when data.py refreshes the feed token, which creates + # a new market data session and invalidates our current token. + if "Invalid Token" in response.text or "e-session-0007" in response.text: + self.logger.warning( + "[SUBSCRIBE] Token invalidated (likely by feed token refresh). Re-authenticating..." ) - if retry_response.status_code == 200: - retry_result = retry_response.json() - if retry_result.get("type") == "success": - self.logger.info( - f"[SUBSCRIBE] Retry succeeded after re-auth - {len(instruments)} instruments" - ) - # Process initial quotes from retry - if "result" in retry_result: - list_quotes = retry_result["result"].get("listQuotes", []) - for quote_str in list_quotes: - try: - quote_data = ( - json.loads(quote_str) - if isinstance(quote_str, str) - else quote_str - ) - if isinstance(quote_data, dict) and "MessageCode" not in quote_data: - quote_data["MessageCode"] = xts_message_code - if self.on_data: - self.on_data(self, quote_data) - except json.JSONDecodeError: - pass - return - self.logger.error("[SUBSCRIBE] Re-auth retry also failed") - self.logger.error(error_msg) - raise RuntimeError(f"Subscribe failed: {response.text}") + if self.marketdata_login(): + # Retry with new token + retry_headers = { + "authorization": self.market_data_token, + "Content-Type": "application/json", + } + retry_response = self._http_session.post( + self.subscription_url, + json=subscription_request, + headers=retry_headers, + timeout=10, + ) + try: + if retry_response.status_code == 200: + retry_result = retry_response.json() + if retry_result.get("type") == "success": + self.logger.info( + f"[SUBSCRIBE] Retry succeeded after re-auth - {len(instruments)} instruments" + ) + # Process initial quotes from retry + if "result" in retry_result: + list_quotes = retry_result["result"].get("listQuotes", []) + for quote_str in list_quotes: + try: + quote_data = ( + json.loads(quote_str) + if isinstance(quote_str, str) + else quote_str + ) + if isinstance(quote_data, dict) and "MessageCode" not in quote_data: + quote_data["MessageCode"] = xts_message_code + if self.on_data: + self.on_data(self, quote_data) + except json.JSONDecodeError: + pass + return + finally: + retry_response.close() + self.logger.error("[SUBSCRIBE] Re-auth retry also failed") + self.logger.error(error_msg) + raise RuntimeError(f"Subscribe failed: {response.text}") + finally: + response.close() except Exception as e: self.logger.error(f"[SUBSCRIBE] Exception: {e}") @@ -551,26 +585,28 @@ def unsubscribe(self, correlation_id: str, mode: int, instruments: List[Dict]) - f"[UNSUBSCRIBE] Code: {xts_message_code}, Instruments: {len(instruments)}" ) - response = requests.put( + response = self._http_session.put( self.subscription_url, json=unsubscription_request, headers=headers, timeout=10, ) - - if response.status_code == 200: - result = response.json() - if result.get("type") == "success": - self.logger.info(f"[UNSUBSCRIBE] Success - {len(instruments)} instruments") - return True + try: + if response.status_code == 200: + result = response.json() + if result.get("type") == "success": + self.logger.info(f"[UNSUBSCRIBE] Success - {len(instruments)} instruments") + return True + else: + self.logger.error(f"[UNSUBSCRIBE] API error response: {result}") + return False else: - self.logger.error(f"[UNSUBSCRIBE] API error response: {result}") + self.logger.error( + f"[UNSUBSCRIBE] Failed - Status: {response.status_code}" + ) return False - else: - self.logger.error( - f"[UNSUBSCRIBE] Failed - Status: {response.status_code}" - ) - return False + finally: + response.close() except Exception as e: self.logger.error(f"[UNSUBSCRIBE] Exception: {e}")