Skip to content

Commit 0315a68

Browse files
bashandboneclaudeCopilot
authored
feat: implement connection pooling for supported providers (#194)
* docs: add connection pooling implementation plan Create detailed implementation plan for HTTP client connection pooling based on the client-pooling-analysis document. The plan outlines: - Phase 1: Infrastructure (HttpClientPool class, config settings, state integration) - Phase 2: Provider integration (Voyage AI, Cohere, Qdrant considerations) - Testing strategy and rollback plan - Provider-specific connection pool settings This addresses connection overhead, potential exhaustion during high-load indexing, and Qdrant timeout issues (httpcore.ReadError). * feat: implement HTTP client connection pooling for providers Add centralized HTTP client pooling infrastructure to improve connection reuse across Voyage AI and Cohere providers during high-load operations. Key changes: - Create HttpClientPool class with singleton pattern for application-wide connection reuse (src/codeweaver/common/http_pool.py) - Integrate http_pool into CodeWeaverState with proper cleanup on shutdown - Update ProviderRegistry to inject pooled httpx clients for Voyage/Cohere - Add provider-specific settings (50 max connections, 90s read timeout) - Add comprehensive unit tests for HttpClientPool This addresses: - Connection overhead during batch embedding operations - Potential connection exhaustion during large indexing jobs - Improved reliability with HTTP/2 support for better multiplexing The implementation includes graceful fallbacks - if pooling fails, providers will create their own clients as before. * refactor: address PR #194 review suggestions for HTTP connection pooling - Add thread safety via asyncio.Lock for coroutine-safe client creation - Make reset_http_pool() async to properly close clients before reset - Add reset_http_pool_sync() for testing fixtures without cleanup - Unify dual singleton pattern (remove redundant module-level instance) - Change INFO logging to DEBUG for client creation (reduce noise) - Narrow exception types (httpx.HTTPError, OSError) instead of Exception - Merge duplicate VOYAGE/COHERE conditionals using _POOLED_HTTP_PROVIDERS - Use get_client_sync() in provider registry for sync context - Add comprehensive tests: - Test override settings are actually applied to clients - Test aclose() error handling (graceful degradation) - Test PoolTimeouts immutability - Test concurrent get_client calls (thread safety) - Test _get_pooled_httpx_client for various providers - Update implementation plan status to Implemented - Export reset_http_pool_sync in common/__init__.py * feat: expand HTTP connection pooling to OpenAI-compatible and Mistral providers Extend connection pooling support to 11 providers: - OpenAI-compatible (8): OpenAI, Azure, Fireworks, Groq, Together, Ollama, Cerebras, Heroku - Voyage and Cohere (existing) - Mistral (new) Changes: - OpenAI factory: Accept http_client parameter for AsyncOpenAI - Mistral provider: Accept httpx_client mapped to async_client - Provider registry: Convert _POOLED_HTTP_PROVIDERS to dict mapping provider -> param name - Registry _instantiate_client: Use provider-specific parameter names - Tests: Add tests for OpenAI, Mistral pooling; update provider mapping tests - Docs: Update implementation plan with full provider table Provider parameter mapping: - httpx_client: Voyage, Cohere, Mistral - http_client: OpenAI, Azure, Fireworks, Groq, Together, Ollama, Cerebras, Heroku Not supported (no httpx injection): - Bedrock (uses boto3) - HuggingFace (global factory pattern) - Google GenAI (args only, not full client) * fix: add thread safety to singleton and sync client creation Address Copilot review suggestions from PR #194: 1. get_instance() race condition: Add double-checked locking with threading.Lock to prevent multiple singleton instances when called concurrently from different threads 2. get_client_sync() race condition: Add thread-safe double-checked locking pattern using _sync_lock to prevent duplicate client creation when called concurrently during provider initialization 3. Remove unused 'patch' import from test_http_pool.py Thread safety guarantees are now documented in the module docstring: - Singleton: threading.Lock with double-checked locking - Async clients: asyncio.Lock for coroutine safety - Sync clients: threading.Lock for thread safety * fix: resolve test failures for HTTP connection pooling - Mark _POOLED_HTTP_PROVIDERS as ClassVar to prevent Pydantic treating it as a private attribute - Fix import path: use codeweaver.providers.provider for ProviderKind instead of non-existent codeweaver.providers.kinds - Remove _limits assertions from tests since httpx.AsyncClient doesn't expose limits directly (timeout assertions are sufficient) * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> Signed-off-by: Adam Poulemanos <[email protected]> * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> Signed-off-by: Adam Poulemanos <[email protected]> * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> Signed-off-by: Adam Poulemanos <[email protected]> * docs: document client caching behavior for HTTP pool Add Note section to get_client() and get_client_sync() docstrings clarifying that clients are cached by name only, not by override parameters. Subsequent calls with different overrides will return the originally cached client. This is intentional behavior for connection pooling - each provider should use consistent settings, which ProviderRegistry ensures. * fix: address Copilot review suggestions for HTTP pool 1. asyncio.Lock event loop binding issue: - Changed _async_lock to be lazily created on first use - asyncio.Lock is bound to the event loop it's created in, so lazy creation ensures it's bound to the correct loop - Renamed _lock to _async_lock for clarity 2. Race condition in fast-path checks: - Changed if-check to try-except pattern in get_client() and get_client_sync() to handle client removal between check and return - Prevents KeyError if close_client/close_all removes client mid-check 3. Sync/async mixing warning: - Added Warning section to get_client_sync() docstring explaining that sync and async methods should not be mixed 4. Walrus operator clarity: - Replaced walrus operator with explicit get() and None check in provider.py for better readability --------- Signed-off-by: Adam Poulemanos <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 043d49a commit 0315a68

File tree

8 files changed

+1345
-1
lines changed

8 files changed

+1345
-1
lines changed
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
<!--
2+
SPDX-FileCopyrightText: 2025 Knitli Inc.
3+
SPDX-FileContributor: Claude AI Assistant
4+
5+
SPDX-License-Identifier: MIT OR Apache-2.0
6+
-->
7+
8+
# Connection Pooling Implementation Plan
9+
10+
**Status**: ✅ Implemented
11+
**Priority**: High
12+
**Branch**: `claude/plan-connection-pooling-011nqd3xSBSn6a1FQdjnf8Qg`
13+
**Based on**: `claudedocs/client-pooling-analysis.md`
14+
15+
---
16+
17+
## Overview
18+
19+
This document outlines the immediate implementation plan for HTTP connection pooling across CodeWeaver's HTTP-based providers. The implementation will be phased, starting with core infrastructure and progressing to provider integration.
20+
21+
## Phase 1: Infrastructure (Core Implementation)
22+
23+
### Task 1.1: Create HttpClientPool Class
24+
25+
**File**: `src/codeweaver/common/http_pool.py`
26+
27+
Create a centralized HTTP client pool manager with:
28+
- Singleton pattern for application-wide access
29+
- Configurable connection limits and timeouts
30+
- Per-provider client management
31+
- HTTP/2 support for better multiplexing
32+
- Proper async cleanup
33+
34+
**Key Components**:
35+
```python
36+
@dataclass(frozen=True)
37+
class PoolLimits:
38+
max_connections: int = 100
39+
max_keepalive_connections: int = 20
40+
keepalive_expiry: float = 5.0
41+
42+
@dataclass(frozen=True)
43+
class PoolTimeouts:
44+
connect: float = 10.0
45+
read: float = 60.0
46+
write: float = 10.0
47+
pool: float = 5.0
48+
49+
class HttpClientPool:
50+
# Singleton instance
51+
# Per-provider client dict
52+
# get_client(name, **overrides) method
53+
# close_all() cleanup method
54+
```
55+
56+
### Task 1.2: Add Configuration Settings
57+
58+
**File**: `src/codeweaver/config/settings.py`
59+
60+
Add `HttpPoolSettings` TypedDict for configuration:
61+
```python
62+
class HttpPoolSettings(TypedDict, total=False):
63+
max_connections: int
64+
max_keepalive_connections: int
65+
keepalive_expiry: float
66+
connect_timeout: float
67+
read_timeout: float
68+
write_timeout: float
69+
pool_timeout: float
70+
enable_http2: bool
71+
```
72+
73+
### Task 1.3: Integrate into CodeWeaverState
74+
75+
**File**: `src/codeweaver/server/server.py`
76+
77+
Add `http_pool` field to `CodeWeaverState`:
78+
```python
79+
http_pool: Annotated[
80+
HttpClientPool | None,
81+
Field(
82+
default=None,
83+
description="Shared HTTP client pool for provider connections",
84+
exclude=True,
85+
),
86+
] = None
87+
```
88+
89+
Initialize in `_initialize_cw_state()` function.
90+
91+
### Task 1.4: Add Cleanup to Lifespan
92+
93+
**File**: `src/codeweaver/server/server.py`
94+
95+
Update `_cleanup_state()` to close HTTP pools:
96+
```python
97+
# Close HTTP client pools
98+
if state.http_pool:
99+
try:
100+
await state.http_pool.close_all()
101+
except Exception:
102+
_logger.exception("Error closing HTTP client pools")
103+
```
104+
105+
---
106+
107+
## Phase 2: Provider Integration
108+
109+
### Task 2.1: Update Voyage AI Provider
110+
111+
**File**: `src/codeweaver/providers/embedding/providers/voyage.py`
112+
113+
The Voyage AI `AsyncClient` accepts an `httpx_client` parameter. We'll modify initialization to use the pooled client.
114+
115+
**Current Flow**:
116+
```python
117+
client = AsyncClient(api_key=api_key)
118+
```
119+
120+
**New Flow**:
121+
```python
122+
def _get_pooled_httpx_client() -> httpx.AsyncClient | None:
123+
"""Get pooled HTTP client for Voyage AI."""
124+
try:
125+
from codeweaver.server.server import get_state
126+
state = get_state()
127+
if state.http_pool:
128+
return state.http_pool.get_client(
129+
'voyage',
130+
max_connections=50,
131+
read_timeout=90.0,
132+
)
133+
except Exception:
134+
pass # Fallback to default client
135+
return None
136+
137+
# In provider initialization
138+
httpx_client = _get_pooled_httpx_client()
139+
client = AsyncClient(api_key=api_key, httpx_client=httpx_client)
140+
```
141+
142+
### Task 2.2: Update Cohere Provider
143+
144+
**File**: `src/codeweaver/providers/embedding/providers/cohere.py`
145+
146+
The Cohere `AsyncClientV2` accepts an `httpx_client` parameter in `client_options`.
147+
148+
**Current Flow** (line 123):
149+
```python
150+
known_client_options = {
151+
"api_key", "base_url", "timeout", "max_retries", "httpx_client",
152+
}
153+
```
154+
155+
**New Flow**: Add pooled client to options:
156+
```python
157+
def _get_pooled_httpx_client() -> httpx.AsyncClient | None:
158+
"""Get pooled HTTP client for Cohere."""
159+
try:
160+
from codeweaver.server.server import get_state
161+
state = get_state()
162+
if state.http_pool:
163+
return state.http_pool.get_client(
164+
'cohere',
165+
max_connections=50,
166+
read_timeout=90.0,
167+
)
168+
except Exception:
169+
pass
170+
return None
171+
172+
# In __init__ before creating client:
173+
if not client_options.get("httpx_client"):
174+
client_options["httpx_client"] = _get_pooled_httpx_client()
175+
```
176+
177+
### Task 2.3: Qdrant Considerations
178+
179+
**File**: `src/codeweaver/providers/vector_stores/qdrant_base.py`
180+
181+
The `qdrant_client` library uses httpx internally but doesn't expose an `httpx_client` parameter. We have two options:
182+
183+
1. **Use timeout configuration** (recommended for now):
184+
- Configure longer timeouts via `qdrant_client` parameters
185+
- This addresses the httpcore.ReadError issues
186+
187+
2. **Future enhancement**:
188+
- Investigate if newer qdrant_client versions expose httpx configuration
189+
- Consider contributing upstream if needed
190+
191+
---
192+
193+
## Implementation Order
194+
195+
```
196+
1. Create http_pool.py module
197+
2. Add HttpPoolSettings to config
198+
3. Add http_pool to CodeWeaverState
199+
4. Add cleanup to _cleanup_state()
200+
5. Update Voyage AI provider
201+
6. Update Cohere provider
202+
7. Add unit tests
203+
8. Integration testing
204+
```
205+
206+
---
207+
208+
## Provider-Specific Settings
209+
210+
| Provider | max_connections | read_timeout | keepalive_expiry | HTTP/2 | Parameter Name |
211+
|----------|----------------|--------------|------------------|--------|----------------|
212+
| Voyage | 50 | 90s | 5s | Yes | `httpx_client` |
213+
| Cohere | 50 | 90s | 5s | Yes | `httpx_client` |
214+
| OpenAI | 50 | 90s | 5s | Yes | `http_client` |
215+
| Azure | 50 | 90s | 5s | Yes | `http_client` |
216+
| Fireworks| 50 | 90s | 5s | Yes | `http_client` |
217+
| Groq | 50 | 90s | 5s | Yes | `http_client` |
218+
| Together | 50 | 90s | 5s | Yes | `http_client` |
219+
| Ollama | 50 | 90s | 5s | Yes | `http_client` |
220+
| Cerebras | 50 | 90s | 5s | Yes | `http_client` |
221+
| Heroku | 50 | 90s | 5s | Yes | `http_client` |
222+
| Mistral | 50 | 90s | 5s | Yes | `httpx_client` |
223+
| Qdrant | N/A | N/A | N/A | N/A | Not supported |
224+
225+
**Note**: Qdrant uses httpx internally but doesn't expose a client injection parameter.
226+
227+
---
228+
229+
## Testing Strategy
230+
231+
### Unit Tests
232+
- `test_http_pool_singleton()` - Verify singleton behavior
233+
- `test_client_reuse()` - Verify client caching
234+
- `test_cleanup()` - Verify proper cleanup
235+
- `test_provider_overrides()` - Verify per-provider settings
236+
237+
### Integration Tests
238+
- Test Voyage AI with pooled client
239+
- Test Cohere with pooled client
240+
- Verify connection reuse (check logs)
241+
242+
### Manual Verification
243+
- Run `cw index` on a large repo
244+
- Monitor for httpcore.ReadError (should be eliminated)
245+
- Check connection reuse in debug logs
246+
247+
---
248+
249+
## Rollback Plan
250+
251+
The implementation includes fallbacks:
252+
1. If `get_state()` fails, providers fall back to default clients
253+
2. If `http_pool` is None, providers create their own clients
254+
3. No breaking changes to existing interfaces
255+
256+
---
257+
258+
## Success Metrics
259+
260+
1. **No httpcore.ReadError** during indexing operations
261+
2. **Reduced connection overhead** (visible in debug logs)
262+
3. **All existing tests pass** without modification
263+
4. **Memory usage stable** during long indexing operations
264+
265+
---
266+
267+
## Files to Create/Modify
268+
269+
| File | Action | Description |
270+
|------|--------|-------------|
271+
| `src/codeweaver/common/http_pool.py` | **Create** | HTTP client pool manager |
272+
| `src/codeweaver/config/settings.py` | Modify | Add HttpPoolSettings |
273+
| `src/codeweaver/server/server.py` | Modify | Add http_pool to state, cleanup |
274+
| `src/codeweaver/providers/embedding/providers/voyage.py` | Modify | Use pooled client |
275+
| `src/codeweaver/providers/embedding/providers/cohere.py` | Modify | Use pooled client |
276+
| `tests/unit/common/test_http_pool.py` | **Create** | Unit tests |
277+
278+
---
279+
280+
## Notes
281+
282+
- The implementation prioritizes backward compatibility
283+
- Fallbacks ensure the system works without pooling if needed
284+
- HTTP/2 is enabled by default for better multiplexing on modern APIs
285+
- Qdrant integration may require upstream changes to qdrant_client

src/codeweaver/common/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
if TYPE_CHECKING:
1313
# Import everything for IDE and type checker support
1414
# These imports are never executed at runtime, only during type checking
15+
from codeweaver.common.http_pool import (
16+
HttpClientPool,
17+
PoolLimits,
18+
PoolTimeouts,
19+
get_http_pool,
20+
reset_http_pool,
21+
reset_http_pool_sync,
22+
)
1523
from codeweaver.common.logging import log_to_client_or_fallback, setup_logger
1624
from codeweaver.common.statistics import (
1725
FileStatistics,
@@ -103,13 +111,16 @@
103111
_dynamic_imports: MappingProxyType[str, tuple[str, str]] = MappingProxyType({
104112
"CallHookTimingDict": (__spec__.parent, "types"),
105113
"FileStatistics": (__spec__.parent, "statistics"),
114+
"HttpClientPool": (__spec__.parent, "http_pool"),
106115
"HttpRequestsDict": (__spec__.parent, "types"),
107116
"Identifier": (__spec__.parent, "statistics"),
108117
"LazyImport": (__spec__.parent, "utils"),
109118
"McpComponentRequests": (__spec__.parent, "types"),
110119
"McpComponentTimingDict": (__spec__.parent, "types"),
111120
"McpOperationRequests": (__spec__.parent, "types"),
112121
"McpTimingDict": (__spec__.parent, "types"),
122+
"PoolLimits": (__spec__.parent, "http_pool"),
123+
"PoolTimeouts": (__spec__.parent, "http_pool"),
113124
"ResourceUri": (__spec__.parent, "types"),
114125
"SessionStatistics": (__spec__.parent, "statistics"),
115126
"TimingStatistics": (__spec__.parent, "statistics"),
@@ -140,6 +151,7 @@
140151
"get_function_signature": (__spec__.parent, "utils"),
141152
"get_git_branch": (__spec__.parent, "utils"),
142153
"get_git_revision": (__spec__.parent, "utils"),
154+
"get_http_pool": (__spec__.parent, "http_pool"),
143155
"get_optimal_workers": (__spec__.parent, "utils"),
144156
"get_possible_env_vars": (__spec__.parent, "utils"),
145157
"get_project_path": (__spec__.parent, "utils"),
@@ -169,6 +181,8 @@
169181
"low_priority": (__spec__.parent, "utils"),
170182
"normalize_ext": (__spec__.parent, "utils"),
171183
"record_timed_http_request": (__spec__.parent, "statistics"),
184+
"reset_http_pool": (__spec__.parent, "http_pool"),
185+
"reset_http_pool_sync": (__spec__.parent, "http_pool"),
172186
"rpartial": (__spec__.parent, "utils"),
173187
"sanitize_unicode": (__spec__.parent, "utils"),
174188
"set_relative_path": (__spec__.parent, "utils"),
@@ -203,13 +217,16 @@ def __getattr__(name: str) -> object:
203217
"CODEWEAVER_PREFIX",
204218
"CallHookTimingDict",
205219
"FileStatistics",
220+
"HttpClientPool",
206221
"HttpRequestsDict",
207222
"Identifier",
208223
"LazyImport",
209224
"McpComponentRequests",
210225
"McpComponentTimingDict",
211226
"McpOperationRequests",
212227
"McpTimingDict",
228+
"PoolLimits",
229+
"PoolTimeouts",
213230
"ResourceUri",
214231
"SessionStatistics",
215232
"TimingStatistics",
@@ -240,6 +257,7 @@ def __getattr__(name: str) -> object:
240257
"get_function_signature",
241258
"get_git_branch",
242259
"get_git_revision",
260+
"get_http_pool",
243261
"get_optimal_workers",
244262
"get_possible_env_vars",
245263
"get_project_path",
@@ -269,6 +287,8 @@ def __getattr__(name: str) -> object:
269287
"low_priority",
270288
"normalize_ext",
271289
"record_timed_http_request",
290+
"reset_http_pool",
291+
"reset_http_pool_sync",
272292
"rpartial",
273293
"sanitize_unicode",
274294
"set_relative_path",

0 commit comments

Comments
 (0)