|
| 1 | +# WebSocket Streaming - Immediate Fixes Checklist |
| 2 | + |
| 3 | +## 🚀 Quick Fixes for v3.1.0-alpha.2 |
| 4 | + |
| 5 | +This checklist tracks the immediate fixes needed before merging PR #77 or as a fast-follow update. |
| 6 | + |
| 7 | +### 🔴 Critical Fixes (Must Do) |
| 8 | + |
| 9 | +#### Thread Safety |
| 10 | +- [ ] Add `threading.RLock` for connection state |
| 11 | + ```python |
| 12 | + self._state_lock = threading.RLock() |
| 13 | + self._is_connected = False |
| 14 | + ``` |
| 15 | +- [ ] Protect subscription modifications with locks |
| 16 | +- [ ] Add locks for handler list access |
| 17 | +- [ ] Test with concurrent operations |
| 18 | + |
| 19 | +#### Exception Handling |
| 20 | +- [ ] Replace `logger.error` with `logger.exception` in: |
| 21 | + - [ ] Line 119: `_run_forever` |
| 22 | + - [ ] Line 189: `_on_message` (JSONDecodeError) |
| 23 | + - [ ] Line 191: `_on_message` (General Exception) |
| 24 | + - [ ] Line 229: `_handle_reconnect` |
| 25 | + - [ ] Line 269: `_dispatch_message` |
| 26 | + |
| 27 | +#### Message Handling |
| 28 | +- [ ] Add validation for empty/null messages |
| 29 | +- [ ] Handle partial JSON messages |
| 30 | +- [ ] Add try-catch for corrupted data |
| 31 | +- [ ] Log and continue on bad messages (don't crash) |
| 32 | + |
| 33 | +### 🟡 Important Fixes (Should Do) |
| 34 | + |
| 35 | +#### Connection Management |
| 36 | +- [ ] Make connection state checks atomic |
| 37 | +- [ ] Add connection state enum |
| 38 | +- [ ] Prevent multiple simultaneous connections |
| 39 | +- [ ] Clean up resources on disconnect |
| 40 | + |
| 41 | +#### Method Signatures |
| 42 | +- [ ] Fix unused `ws` parameter warnings: |
| 43 | + ```python |
| 44 | + def _on_message(self, _ws: websocket.WebSocketApp, message: str) -> None: |
| 45 | + # Use underscore prefix for unused params |
| 46 | + ``` |
| 47 | + Or add `# noqa: ARG002` comments |
| 48 | + |
| 49 | +#### Memory Management |
| 50 | +- [ ] Add method to remove individual handlers |
| 51 | +- [ ] Clear handlers on disconnect |
| 52 | +- [ ] Add maximum handler limit |
| 53 | +- [ ] Consider weak references for handlers |
| 54 | + |
| 55 | +### 🟢 Nice to Have (Can Do Later) |
| 56 | + |
| 57 | +#### Monitoring |
| 58 | +- [ ] Add connection metrics |
| 59 | +- [ ] Add message counters |
| 60 | +- [ ] Add performance logging |
| 61 | +- [ ] Create health check method |
| 62 | + |
| 63 | +#### Testing |
| 64 | +- [ ] Add test for partial messages |
| 65 | +- [ ] Add test for concurrent subscriptions |
| 66 | +- [ ] Add test for memory leaks |
| 67 | +- [ ] Add test for reconnection during auth |
| 68 | + |
| 69 | +## 📝 Implementation Examples |
| 70 | + |
| 71 | +### Fix 1: Thread Safety |
| 72 | +```python |
| 73 | +class StreamClient: |
| 74 | + def __init__(self, ...): |
| 75 | + # Add locks |
| 76 | + self._state_lock = threading.RLock() |
| 77 | + self._handler_lock = threading.RLock() |
| 78 | + |
| 79 | + @property |
| 80 | + def is_connected(self) -> bool: |
| 81 | + with self._state_lock: |
| 82 | + return self._is_connected |
| 83 | + |
| 84 | + @is_connected.setter |
| 85 | + def is_connected(self, value: bool) -> None: |
| 86 | + with self._state_lock: |
| 87 | + self._is_connected = value |
| 88 | +``` |
| 89 | + |
| 90 | +### Fix 2: Exception Handling |
| 91 | +```python |
| 92 | +# Change all exception handlers |
| 93 | +try: |
| 94 | + # code |
| 95 | +except json.JSONDecodeError as e: |
| 96 | + logger.exception("Failed to parse message") # Not logger.error |
| 97 | + # Handle gracefully, don't crash |
| 98 | +except Exception as e: |
| 99 | + logger.exception("Unexpected error") # Preserves stack trace |
| 100 | +``` |
| 101 | + |
| 102 | +### Fix 3: Message Validation |
| 103 | +```python |
| 104 | +def _on_message(self, _ws, message: str) -> None: |
| 105 | + """Handle incoming WebSocket messages.""" |
| 106 | + if not message: |
| 107 | + logger.debug("Received empty message") |
| 108 | + return |
| 109 | + |
| 110 | + try: |
| 111 | + data = json.loads(message) |
| 112 | + except json.JSONDecodeError: |
| 113 | + logger.exception(f"Invalid JSON received: {message[:100]}...") |
| 114 | + return # Don't crash, just skip |
| 115 | + |
| 116 | + # Ensure it's a list |
| 117 | + if not isinstance(data, list): |
| 118 | + data = [data] |
| 119 | + |
| 120 | + # Process each message... |
| 121 | +``` |
| 122 | + |
| 123 | +### Fix 4: Handler Removal |
| 124 | +```python |
| 125 | +def remove_handler(self, stream_type: StreamType, handler: Callable) -> bool: |
| 126 | + """Remove a specific handler. |
| 127 | +
|
| 128 | + Returns: |
| 129 | + True if handler was removed, False if not found |
| 130 | + """ |
| 131 | + with self._handler_lock: |
| 132 | + if handler in self.handlers[stream_type]: |
| 133 | + self.handlers[stream_type].remove(handler) |
| 134 | + return True |
| 135 | + return False |
| 136 | + |
| 137 | +def clear_all_handlers(self) -> None: |
| 138 | + """Remove all handlers.""" |
| 139 | + with self._handler_lock: |
| 140 | + for stream_type in StreamType: |
| 141 | + self.handlers[stream_type].clear() |
| 142 | +``` |
| 143 | + |
| 144 | +## 🧪 Test Cases to Add |
| 145 | + |
| 146 | +### Thread Safety Tests |
| 147 | +```python |
| 148 | +def test_concurrent_subscriptions(): |
| 149 | + """Test multiple threads subscribing simultaneously.""" |
| 150 | + client = StreamClient(...) |
| 151 | + threads = [] |
| 152 | + |
| 153 | + def subscribe_worker(symbol): |
| 154 | + client.subscribe_quotes(symbol, lambda x: None) |
| 155 | + |
| 156 | + # Create 50 threads |
| 157 | + for i in range(50): |
| 158 | + t = threading.Thread(target=subscribe_worker, args=[f"TEST{i}"]) |
| 159 | + threads.append(t) |
| 160 | + t.start() |
| 161 | + |
| 162 | + # Wait for all |
| 163 | + for t in threads: |
| 164 | + t.join() |
| 165 | + |
| 166 | + # Should have 50 subscriptions, no errors |
| 167 | + assert len(client.subscriptions[StreamType.QUOTES]) == 50 |
| 168 | +``` |
| 169 | + |
| 170 | +### Error Handling Tests |
| 171 | +```python |
| 172 | +def test_corrupted_message_handling(): |
| 173 | + """Test that corrupted messages don't crash the client.""" |
| 174 | + client = StreamClient(...) |
| 175 | + |
| 176 | + # Send various bad messages |
| 177 | + bad_messages = [ |
| 178 | + "", # Empty |
| 179 | + "not json", # Invalid JSON |
| 180 | + '{"partial": ', # Incomplete JSON |
| 181 | + '[]', # Empty array |
| 182 | + 'null', # Null |
| 183 | + ] |
| 184 | + |
| 185 | + for msg in bad_messages: |
| 186 | + # Should not raise exception |
| 187 | + client._on_message(None, msg) |
| 188 | + |
| 189 | + # Client should still be functional |
| 190 | + assert client is not None |
| 191 | +``` |
| 192 | + |
| 193 | +## 📅 Timeline |
| 194 | + |
| 195 | +### Immediate (Before Merge) |
| 196 | +- Thread safety for state management |
| 197 | +- Fix exception handling (logger.exception) |
| 198 | +- Basic message validation |
| 199 | + |
| 200 | +### Fast Follow (v3.1.0-alpha.2) |
| 201 | +- Complete thread safety |
| 202 | +- Handler removal methods |
| 203 | +- Connection state machine |
| 204 | + |
| 205 | +### Future (v3.1.0-stable) |
| 206 | +- Full metrics implementation |
| 207 | +- Advanced error recovery |
| 208 | +- Performance optimizations |
| 209 | + |
| 210 | +## ✅ Verification |
| 211 | + |
| 212 | +### Before Marking Complete |
| 213 | +1. Run existing tests: `pytest tests/test_streaming/` |
| 214 | +2. Check for race conditions: Run concurrent test |
| 215 | +3. Verify logging: Check stack traces are preserved |
| 216 | +4. Memory check: Monitor for leaks |
| 217 | +5. Code review: Get second opinion |
| 218 | + |
| 219 | +## 📊 Progress Tracking |
| 220 | + |
| 221 | +| Category | Total | Fixed | Remaining | |
| 222 | +|----------|-------|-------|-----------| |
| 223 | +| Critical | 12 | 0 | 12 | |
| 224 | +| Important | 9 | 0 | 9 | |
| 225 | +| Nice to Have | 8 | 0 | 8 | |
| 226 | +| **Total** | **29** | **0** | **29** | |
| 227 | + |
| 228 | +--- |
| 229 | + |
| 230 | +**Last Updated**: 2024-01-17 |
| 231 | +**Target Completion**: v3.1.0-alpha.2 |
| 232 | +**Owner**: Development Team |
0 commit comments