@@ -61,7 +61,7 @@ class PositionManager:
6161 - Portfolio-level position management
6262 - Automated P&L calculation and risk metrics
6363 - Position sizing and risk management tools
64- - Event-driven position update notifications
64+ - Event-driven position updates (closures detected from type=0/size=0)
6565 - Thread-safe operations for concurrent access
6666
6767 Example Usage:
@@ -171,15 +171,14 @@ def _setup_realtime_callbacks(self):
171171 if not self .realtime_client :
172172 return
173173
174- # Register for position events
174+ # Register for position events (closures are detected from position updates)
175175 self .realtime_client .add_callback ("position_update" , self ._on_position_update )
176- self .realtime_client .add_callback ("position_closed" , self ._on_position_closed )
177176 self .realtime_client .add_callback ("account_update" , self ._on_account_update )
178177
179178 self .logger .info ("🔄 Real-time position callbacks registered" )
180179
181180 def _on_position_update (self , data : dict ):
182- """Handle real-time position updates."""
181+ """Handle real-time position updates and detect position closures ."""
183182 try :
184183 with self .position_lock :
185184 if isinstance (data , list ):
@@ -193,32 +192,12 @@ def _on_position_update(self, data: dict):
193192 except Exception as e :
194193 self .logger .error (f"Error processing position update: { e } " )
195194
196- def _on_position_closed (self , data : dict ):
197- """Handle real-time position closure notifications."""
198- try :
199- data = data .get ("data" , {})
200- if not data :
201- self .logger .error (f"No position data found in { data } " )
202- return
203-
204- contract_id = data .get ("contractId" )
205- if contract_id :
206- with self .position_lock :
207- if contract_id in self .tracked_positions :
208- del self .tracked_positions [contract_id ]
209- self .logger .info (f"📊 Position closed: { contract_id } " )
210-
211- self ._trigger_callbacks ("position_closed" , data )
212-
213- except Exception as e :
214- self .logger .error (f"Error processing position closure: { e } " )
215-
216195 def _on_account_update (self , data : dict ):
217196 """Handle account-level updates that may affect positions."""
218197 self ._trigger_callbacks ("account_update" , data )
219198
220199 def _process_position_data (self , position_data : dict ):
221- """Process individual position data update."""
200+ """Process individual position data update and detect position closures ."""
222201 try :
223202 position_data = position_data .get ("data" , {})
224203
@@ -227,25 +206,41 @@ def _process_position_data(self, position_data: dict):
227206 self .logger .error (f"No contract ID found in { position_data } " )
228207 return
229208
230- # Create or update position
231- position = Position (** position_data )
209+ # Check if this is a position closure (type=0 and/or size=0)
210+ position_type = position_data .get ("type" , - 1 )
211+ position_size = position_data .get ("size" , - 1 )
212+ is_position_closed = position_type == 0 or position_size == 0
213+
214+ # Get the old position before updating
232215 old_position = self .tracked_positions .get (contract_id )
233216
234- self .tracked_positions [contract_id ] = position
217+ if is_position_closed :
218+ # Position is closed - remove from tracking and trigger closure callbacks
219+ if contract_id in self .tracked_positions :
220+ del self .tracked_positions [contract_id ]
221+ self .logger .info (f"📊 Position closed: { contract_id } " )
222+ self .stats ["positions_closed" ] += 1
235223
236- # Track position history
237- self .position_history [contract_id ].append (
238- {
239- "timestamp" : datetime .now (),
240- "position" : position_data .copy (),
241- "size_change" : 0
242- if not old_position
243- else position .size - old_position .size ,
244- }
245- )
224+ # Trigger position_closed callbacks with the closure data
225+ self ._trigger_callbacks ("position_closed" , {"data" : position_data })
226+ else :
227+ # Position is open/updated - create or update position
228+ position = Position (** position_data )
229+ self .tracked_positions [contract_id ] = position
230+
231+ # Track position history
232+ self .position_history [contract_id ].append (
233+ {
234+ "timestamp" : datetime .now (),
235+ "position" : position_data .copy (),
236+ "size_change" : 0
237+ if not old_position
238+ else position .size - old_position .size ,
239+ }
240+ )
246241
247- # Check alerts
248- self ._check_position_alerts (contract_id , position , old_position )
242+ # Check alerts
243+ self ._check_position_alerts (contract_id , position , old_position )
249244
250245 except Exception as e :
251246 self .logger .error (f"Error processing position data: { e } " )
0 commit comments