-
Notifications
You must be signed in to change notification settings - Fork 3
Add timeout functionality to IntersectClient. #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: candidate-0.9.0
Are you sure you want to change the base?
Changes from all commits
9378905
6d0789e
441bafb
1e9a75a
3bf2a94
1ab0e1f
aa2d353
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,9 @@ | |
| from __future__ import annotations | ||
|
|
||
| import time | ||
| from typing import TYPE_CHECKING | ||
| from collections import defaultdict | ||
| from threading import Event, Thread | ||
| from typing import TYPE_CHECKING, Any | ||
| from uuid import uuid4 | ||
|
|
||
| from pydantic import ValidationError | ||
|
|
@@ -46,6 +48,7 @@ | |
| from .client_callback_definitions import ( | ||
| INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, | ||
| INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE, | ||
| INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE, | ||
| ) | ||
| from .shared_callback_definitions import IntersectDirectMessageParams | ||
|
|
||
|
|
@@ -70,6 +73,7 @@ def __init__( | |
| config: IntersectClientConfig, | ||
| user_callback: INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE | None = None, | ||
| event_callback: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE | None = None, | ||
| timeout_callback: INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE | None = None, | ||
| ) -> None: | ||
| """The constructor performs almost all validation checks necessary to function in the INTERSECT ecosystem, with the exception of checking connections/credentials to any backing services. | ||
|
|
||
|
|
@@ -78,6 +82,7 @@ def __init__( | |
| user_callback: The callback function you can use to handle response messages from Services. | ||
| If this is left empty, you can only send a single message | ||
| event_callback: The callback function you can use to handle events from any Service. | ||
| timeout_callback: The callback function you can use to handle request timeouts. | ||
| """ | ||
| # this is called here in case a user created the object using "IntersectClientConfig.model_construct()" to skip validation | ||
| config = IntersectClientConfig.model_validate(config) | ||
|
|
@@ -86,6 +91,8 @@ def __init__( | |
| die('user_callback function should be a callable function if defined') | ||
| if event_callback is not None and not callable(event_callback): | ||
| die('event_callback function should be a callable function if defined') | ||
| if timeout_callback is not None and not callable(timeout_callback): | ||
| die('timeout_callback function should be a callable function if defined') | ||
| if not user_callback and not event_callback: | ||
| die('must define at least one of user_callback or event_callback') | ||
| if not user_callback: | ||
|
|
@@ -145,6 +152,10 @@ def __init__( | |
| ) | ||
| self._user_callback = user_callback | ||
| self._event_callback = event_callback | ||
| self._timeout_callback = timeout_callback | ||
| self._pending_requests: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) | ||
| self._stop_timeout_thread = Event() | ||
| self._timeout_thread = Thread(target=self._check_timeouts, daemon=True) | ||
|
|
||
| @final | ||
| def startup(self) -> Self: | ||
|
|
@@ -171,6 +182,8 @@ def startup(self) -> Self: | |
| # and has nothing to do with the Service at all. | ||
| time.sleep(1.0) | ||
|
|
||
| self._timeout_thread.start() | ||
|
|
||
| if self._resend_initial_messages or not self._sent_initial_messages: | ||
| for message in self._initial_messages: | ||
| self._send_userspace_message(message) | ||
|
|
@@ -199,11 +212,31 @@ def shutdown(self, reason: str | None = None) -> Self: | |
| """ | ||
| logger.info(f'Client is shutting down (reason: {reason})') | ||
|
|
||
| self._stop_timeout_thread.set() | ||
| self._timeout_thread.join() | ||
| self._control_plane_manager.disconnect() | ||
|
|
||
| logger.info('Client shutdown complete') | ||
| return self | ||
|
|
||
| def _check_timeouts(self) -> None: | ||
| """Periodically check for timed out requests.""" | ||
| while not self._stop_timeout_thread.is_set(): | ||
| now = time.time() | ||
| for operation_id, requests in list(self._pending_requests.items()): | ||
| for request in requests: | ||
| if now > request['timeout']: | ||
| try: | ||
| request['on_timeout'](operation_id) | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning( | ||
| f'Exception from timeout callback for operation {operation_id}:\n{e}' | ||
| ) | ||
| requests.remove(request) | ||
| if not requests: | ||
| del self._pending_requests[operation_id] | ||
| time.sleep(0.1) # Sleep for a short duration | ||
|
|
||
| @final | ||
| def is_connected(self) -> bool: | ||
| """Check if we're currently connected to the INTERSECT brokers. | ||
|
|
@@ -257,6 +290,15 @@ def _handle_userspace_message( | |
| send_os_signal() | ||
| return | ||
|
|
||
| # If not in pending requests, it already timed out, so ignore this response | ||
| if headers.operation_id in self._pending_requests: | ||
| del self._pending_requests[headers.operation_id] | ||
| else: | ||
| logger.debug( | ||
| f'Received response for operation {headers.operation_id} that already timed out, ignoring' | ||
| ) | ||
| return | ||
|
|
||
| # TWO: GET DATA FROM APPROPRIATE DATA STORE AND DESERIALIZE IT | ||
| try: | ||
| request_params = self._data_plane_manager.incoming_message_data_handler( | ||
|
|
@@ -442,3 +484,11 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: | |
| self._control_plane_manager.publish_message( | ||
| channel, payload, params.content_type, headers, persist=False | ||
| ) | ||
|
|
||
| if params.timeout is not None and params.on_timeout is not None: | ||
| self._pending_requests[params.operation].append( | ||
| { | ||
| 'timeout': time.time() + params.timeout, | ||
| 'on_timeout': params.on_timeout, | ||
| } | ||
| ) | ||
|
Comment on lines
+488
to
+494
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is another part of the reason I think the E2E tests are failing. If the user doesn't specify a timeout on a pending request, then by default the request should be handled. I also don't think that
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Fixed and will push once we have a chance to discuss the first issue. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The E2E tests are failing and I believe it's partially related to this block of code, because the existing E2E tests do not explicitly specify a timeout.
self._pending_requestsis not necessarily the source of truth for requests which haven't timed out.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right. In addition to it being an issue for when a timeout wasn't set, it was an issue for when a client gets restarted. I've changed the logic to account for this. Will push everything after we discuss the above.