Skip to content

Commit 824f43b

Browse files
siwachabhiAbhimanyu Siwach
andauthored
chore: remove concurrency checks and simplify thread pool handling (#46)
* chore: remove concurrency checks and simplify thread pool handling * fix gitignore --------- Co-authored-by: Abhimanyu Siwach <[email protected]>
1 parent 5d2fa11 commit 824f43b

File tree

3 files changed

+40
-157
lines changed

3 files changed

+40
-157
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,7 @@ config
223223
local_config.py
224224
local_settings.py
225225
.ruff_cache
226+
227+
.bedrock_agentcore.yaml
228+
.dockerignore
229+
Dockerfile

src/bedrock_agentcore/runtime/app.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import threading
1212
import time
1313
import uuid
14-
from concurrent.futures import ThreadPoolExecutor
1514
from typing import Any, Callable, Dict, Optional
1615

1716
from starlette.applications import Starlette
@@ -62,8 +61,6 @@ def __init__(self, debug: bool = False):
6261
self._task_counter_lock: threading.Lock = threading.Lock()
6362
self._forced_ping_status: Optional[PingStatus] = None
6463
self._last_status_update_time: float = time.time()
65-
self._invocation_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="invocation")
66-
self._invocation_semaphore = asyncio.Semaphore(2)
6764

6865
routes = [
6966
Route("/invocations", self._handle_invocation, methods=["POST"]),
@@ -346,21 +343,18 @@ def run(self, port: int = 8080, host: Optional[str] = None):
346343
uvicorn.run(self, host=host, port=port)
347344

348345
async def _invoke_handler(self, handler, request_context, takes_context, payload):
349-
if self._invocation_semaphore.locked():
350-
return JSONResponse({"error": "Server busy - maximum concurrent requests reached"}, status_code=503)
346+
try:
347+
args = (payload, request_context) if takes_context else (payload,)
351348

352-
async with self._invocation_semaphore:
353-
try:
354-
args = (payload, request_context) if takes_context else (payload,)
355-
if asyncio.iscoroutinefunction(handler):
356-
return await handler(*args)
357-
else:
358-
loop = asyncio.get_event_loop()
359-
return await loop.run_in_executor(self._invocation_executor, handler, *args)
360-
except Exception as e:
361-
handler_name = getattr(handler, "__name__", "unknown")
362-
self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e)
363-
raise
349+
if asyncio.iscoroutinefunction(handler):
350+
return await handler(*args)
351+
else:
352+
loop = asyncio.get_event_loop()
353+
return await loop.run_in_executor(None, handler, *args)
354+
except Exception as e:
355+
handler_name = getattr(handler, "__name__", "unknown")
356+
self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e)
357+
raise
364358

365359
def _handle_task_action(self, payload: dict) -> Optional[JSONResponse]:
366360
"""Handle task management actions if present in payload."""

tests/bedrock_agentcore/runtime/test_app.py

Lines changed: 25 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
import os
44
import threading
55
import time
6-
from concurrent.futures import ThreadPoolExecutor
76
from datetime import datetime
87
from decimal import Decimal
9-
from unittest.mock import MagicMock, Mock, patch
8+
from unittest.mock import MagicMock, patch
109

1110
import pytest
1211
from starlette.testclient import TestClient
@@ -235,25 +234,19 @@ def handler(payload):
235234

236235

237236
class TestConcurrentInvocations:
238-
"""Test concurrent invocation handling with thread pool and semaphore."""
237+
"""Test concurrent invocation handling simplified without limits."""
239238

240-
def test_thread_pool_initialization(self):
241-
"""Test ThreadPoolExecutor and Semaphore are properly initialized."""
239+
def test_simplified_initialization(self):
240+
"""Test that app initializes without thread pool and semaphore."""
242241
app = BedrockAgentCoreApp()
243242

244-
# Check ThreadPoolExecutor is initialized with correct settings
245-
assert hasattr(app, "_invocation_executor")
246-
assert isinstance(app._invocation_executor, ThreadPoolExecutor)
247-
assert app._invocation_executor._max_workers == 2
248-
249-
# Check Semaphore is initialized with correct limit
250-
assert hasattr(app, "_invocation_semaphore")
251-
assert isinstance(app._invocation_semaphore, asyncio.Semaphore)
252-
assert app._invocation_semaphore._value == 2
243+
# Check ThreadPoolExecutor and Semaphore are NOT initialized
244+
assert not hasattr(app, "_invocation_executor")
245+
assert not hasattr(app, "_invocation_semaphore")
253246

254247
@pytest.mark.asyncio
255-
async def test_concurrent_invocations_within_limit(self):
256-
"""Test that 2 concurrent requests work fine."""
248+
async def test_concurrent_invocations_unlimited(self):
249+
"""Test that multiple concurrent requests work without limits."""
257250
app = BedrockAgentCoreApp()
258251

259252
# Create a slow sync handler
@@ -262,66 +255,26 @@ def handler(payload):
262255
time.sleep(0.1) # Simulate work
263256
return {"id": payload["id"]}
264257

265-
# Mock the executor to track calls
266-
original_executor = app._invocation_executor
267-
mock_executor = Mock(wraps=original_executor)
268-
app._invocation_executor = mock_executor
269-
270258
# Create request context
271259
from bedrock_agentcore.runtime.context import RequestContext
272260

273261
context = RequestContext(session_id=None)
274262

275-
# Start 2 concurrent invocations
263+
# Start 3+ concurrent invocations (no limit)
276264
task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1}))
277265
task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2}))
266+
task3 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 3}))
278267

279-
# Both should complete successfully
268+
# All should complete successfully
280269
result1 = await task1
281270
result2 = await task2
271+
result3 = await task3
282272

283273
assert result1 == {"id": 1}
284274
assert result2 == {"id": 2}
275+
assert result3 == {"id": 3}
285276

286-
# Verify executor was used for sync handlers
287-
assert mock_executor.submit.call_count >= 2
288-
289-
@pytest.mark.asyncio
290-
async def test_concurrent_invocations_exceed_limit(self):
291-
"""Test that 3rd concurrent request gets 503 response."""
292-
app = BedrockAgentCoreApp()
293-
294-
# Create a slow handler
295-
@app.entrypoint
296-
def handler(payload):
297-
time.sleep(0.5) # Simulate long work
298-
return {"id": payload["id"]}
299-
300-
# Create request context
301-
from bedrock_agentcore.runtime.context import RequestContext
302-
303-
context = RequestContext(session_id=None)
304-
305-
# Start 2 invocations to fill the semaphore
306-
task1 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 1}))
307-
task2 = asyncio.create_task(app._invoke_handler(handler, context, False, {"id": 2}))
308-
309-
# Wait a bit to ensure they've acquired the semaphore
310-
await asyncio.sleep(0.1)
311-
312-
# Third invocation should get 503
313-
result3 = await app._invoke_handler(handler, context, False, {"id": 3})
314-
315-
# Verify it's a JSONResponse with 503 status
316-
from starlette.responses import JSONResponse
317-
318-
assert isinstance(result3, JSONResponse)
319-
assert result3.status_code == 503
320-
assert result3.body == b'{"error":"Server busy - maximum concurrent requests reached"}'
321-
322-
# Clean up the running tasks
323-
await task1
324-
await task2
277+
# Removed: No more 503 responses since we removed concurrency limits
325278

326279
@pytest.mark.asyncio
327280
async def test_async_handler_runs_in_event_loop(self):
@@ -338,10 +291,6 @@ async def handler(payload):
338291
await asyncio.sleep(0.01)
339292
return {"async": True}
340293

341-
# Mock the executor to ensure it's NOT used for async handlers
342-
mock_executor = Mock()
343-
app._invocation_executor = mock_executor
344-
345294
# Create request context
346295
from bedrock_agentcore.runtime.context import RequestContext
347296

@@ -353,12 +302,11 @@ async def handler(payload):
353302
assert result == {"async": True}
354303
# Async handler should run in main thread
355304
assert handler_thread_id == threading.current_thread().ident
356-
# Executor should NOT be used for async handlers
357-
mock_executor.submit.assert_not_called()
305+
# No executor needed for async handlers
358306

359307
@pytest.mark.asyncio
360308
async def test_sync_handler_runs_in_thread_pool(self):
361-
"""Test sync handlers run in thread pool."""
309+
"""Test sync handlers run in default executor, not main event loop."""
362310
app = BedrockAgentCoreApp()
363311

364312
# Track which thread the handler runs in
@@ -379,36 +327,14 @@ def handler(payload):
379327
result = await app._invoke_handler(handler, context, False, {})
380328

381329
assert result == {"sync": True}
382-
# Sync handler should NOT run in main thread
330+
# Sync handler should NOT run in main thread (uses default executor)
383331
assert handler_thread_id != threading.current_thread().ident
384332

385-
@pytest.mark.asyncio
386-
async def test_semaphore_release_after_completion(self):
387-
"""Test semaphore is properly released after request completion."""
388-
app = BedrockAgentCoreApp()
389-
390-
@app.entrypoint
391-
def handler(payload):
392-
return {"result": "ok"}
393-
394-
# Create request context
395-
from bedrock_agentcore.runtime.context import RequestContext
396-
397-
context = RequestContext(session_id=None)
398-
399-
# Check initial semaphore value
400-
assert app._invocation_semaphore._value == 2
401-
402-
# Make a request
403-
result = await app._invoke_handler(handler, context, False, {})
404-
assert result == {"result": "ok"}
405-
406-
# Semaphore should be released
407-
assert app._invocation_semaphore._value == 2
333+
# Removed: No semaphore to test
408334

409335
@pytest.mark.asyncio
410-
async def test_handler_exception_releases_semaphore(self):
411-
"""Test semaphore is released even when handler fails."""
336+
async def test_handler_exception_propagates(self):
337+
"""Test handler exceptions are properly propagated."""
412338
app = BedrockAgentCoreApp()
413339

414340
@app.entrypoint
@@ -420,16 +346,10 @@ def handler(payload):
420346

421347
context = RequestContext(session_id=None)
422348

423-
# Check initial semaphore value
424-
assert app._invocation_semaphore._value == 2
425-
426-
# Make a request that will fail
349+
# Exception should propagate
427350
with pytest.raises(ValueError, match="Test error"):
428351
await app._invoke_handler(handler, context, False, {})
429352

430-
# Semaphore should still be released
431-
assert app._invocation_semaphore._value == 2
432-
433353
def test_no_thread_leak_on_repeated_requests(self):
434354
"""Test that repeated requests don't leak threads."""
435355
app = BedrockAgentCoreApp()
@@ -450,46 +370,11 @@ def handler(payload):
450370
assert response.json() == {"id": i}
451371

452372
# Thread count should not have increased significantly
453-
# Allow for some variance but no leak
373+
# Allow for some variance but no leak (uses default executor)
454374
final_thread_count = threading.active_count()
455-
assert final_thread_count <= initial_thread_count + 2 # Thread pool has max 2 threads
456-
457-
@pytest.mark.asyncio
458-
async def test_server_busy_error_format(self):
459-
"""Test 503 response has correct error message format."""
460-
app = BedrockAgentCoreApp()
461-
462-
# Fill the semaphore
463-
await app._invocation_semaphore.acquire()
464-
await app._invocation_semaphore.acquire()
465-
466-
@app.entrypoint
467-
def handler(payload):
468-
return {"ok": True}
469-
470-
# Create request context
471-
from bedrock_agentcore.runtime.context import RequestContext
472-
473-
context = RequestContext(session_id=None)
474-
475-
# Try to invoke when semaphore is full
476-
result = await app._invoke_handler(handler, context, False, {})
477-
478-
# Check response format
479-
from starlette.responses import JSONResponse
480-
481-
assert isinstance(result, JSONResponse)
482-
assert result.status_code == 503
483-
484-
# Parse the JSON body
485-
import json
486-
487-
body = json.loads(result.body)
488-
assert body == {"error": "Server busy - maximum concurrent requests reached"}
375+
assert final_thread_count <= initial_thread_count + 10 # Default executor may create more threads
489376

490-
# Release semaphore
491-
app._invocation_semaphore.release()
492-
app._invocation_semaphore.release()
377+
# Removed: No more server busy errors
493378

494379
def test_ping_endpoint_remains_sync(self):
495380
"""Test that ping endpoint is not async."""

0 commit comments

Comments
 (0)