-
Notifications
You must be signed in to change notification settings - Fork 126
Circuit breaker changes using pybreaker #705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 22 commits
37ec282
2504053
ef41f4c
2f54be8
db93974
1f9c4d3
939b548
6c72f86
ac845a5
a602c39
c1b6e25
e3d85f4
9dfb623
e7e8b4b
dab4b38
e1e08b0
b527e7c
2b45814
1193af7
aa459e9
4cb87b1
7cbc4c8
c646335
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,7 @@ def __init__( | |
| pool_connections: Optional[int] = None, | ||
| pool_maxsize: Optional[int] = None, | ||
| user_agent: Optional[str] = None, | ||
| telemetry_circuit_breaker_enabled: Optional[bool] = None, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will make it true in next PR post end to end testing |
||
| ): | ||
| self.hostname = hostname | ||
| self.access_token = access_token | ||
|
|
@@ -83,6 +84,7 @@ def __init__( | |
| self.pool_connections = pool_connections or 10 | ||
| self.pool_maxsize = pool_maxsize or 20 | ||
| self.user_agent = user_agent | ||
| self.telemetry_circuit_breaker_enabled = bool(telemetry_circuit_breaker_enabled) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we not add this to build_client_context too? |
||
|
|
||
|
|
||
| def get_effective_azure_login_app_id(hostname) -> str: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -264,7 +264,31 @@ def request_context( | |
| yield response | ||
| except MaxRetryError as e: | ||
| logger.error("HTTP request failed after retries: %s", e) | ||
| raise RequestError(f"HTTP request failed: {e}") | ||
|
|
||
| # Try to extract HTTP status code from the MaxRetryError | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. setting http_code here which will be used by CircuitBreaker to consider regression in telemetry endpoint |
||
| http_code = None | ||
| if ( | ||
| hasattr(e, "reason") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great to see the defensive approach with urlLib object structure ! we have had several compatibility issue with this library before (I also remember a sev1 once), also can you double check if this is the expected structure once?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I meant across versions. see internal doc for more context on the type of issues we get) |
||
| and e.reason is not None | ||
| and hasattr(e.reason, "response") | ||
| and e.reason.response is not None | ||
| ): | ||
| # The reason may contain a response object with status | ||
| http_code = getattr(e.reason.response, "status", None) | ||
| elif ( | ||
| hasattr(e, "response") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit : these ~30 lines feels like it would live more happily in a helper |
||
| and e.response is not None | ||
| and hasattr(e.response, "status") | ||
| ): | ||
| # Or the error itself may have a response | ||
| http_code = e.response.status | ||
|
|
||
| context = {} | ||
| if http_code is not None: | ||
| context["http-code"] = http_code | ||
| logger.error("HTTP request failed with status code: %d", http_code) | ||
|
|
||
| raise RequestError(f"HTTP request failed: {e}", context=context) | ||
| except Exception as e: | ||
| logger.error("HTTP request error: %s", e) | ||
| raise RequestError(f"HTTP request error: {e}") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| """ | ||
| Circuit breaker implementation for telemetry requests. | ||
| This module provides circuit breaker functionality to prevent telemetry failures | ||
| from impacting the main SQL operations. It uses pybreaker library to implement | ||
| the circuit breaker pattern with configurable thresholds and timeouts. | ||
| """ | ||
|
|
||
| import logging | ||
| import threading | ||
| from typing import Dict, Optional, Any | ||
| from dataclasses import dataclass | ||
|
|
||
| import pybreaker | ||
| from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener | ||
|
|
||
| from databricks.sql.exc import TelemetryRateLimitError | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Circuit Breaker Configuration Constants | ||
| DEFAULT_MINIMUM_CALLS = 20 | ||
| DEFAULT_RESET_TIMEOUT = 30 | ||
| DEFAULT_NAME = "telemetry-circuit-breaker" | ||
|
|
||
| # Circuit Breaker State Constants (used in logging) | ||
| CIRCUIT_BREAKER_STATE_OPEN = "open" | ||
| CIRCUIT_BREAKER_STATE_CLOSED = "closed" | ||
| CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open" | ||
|
|
||
| # Logging Message Constants | ||
| LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s" | ||
| LOG_CIRCUIT_BREAKER_OPENED = ( | ||
| "Circuit breaker opened for %s - telemetry requests will be blocked" | ||
| ) | ||
| LOG_CIRCUIT_BREAKER_CLOSED = ( | ||
| "Circuit breaker closed for %s - telemetry requests will be allowed" | ||
| ) | ||
| LOG_CIRCUIT_BREAKER_HALF_OPEN = ( | ||
| "Circuit breaker half-open for %s - testing telemetry requests" | ||
| ) | ||
|
|
||
|
|
||
| class CircuitBreakerStateListener(CircuitBreakerListener): | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only used for logging purposed for now |
||
| """Listener for circuit breaker state changes.""" | ||
|
|
||
| def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None: | ||
| """Called before the circuit breaker calls a function.""" | ||
| pass | ||
|
|
||
| def failure(self, cb: CircuitBreaker, exc: BaseException) -> None: | ||
| """Called when a function called by the circuit breaker fails.""" | ||
| pass | ||
|
|
||
| def success(self, cb: CircuitBreaker) -> None: | ||
| """Called when a function called by the circuit breaker succeeds.""" | ||
| pass | ||
|
|
||
| def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None: | ||
| """Called when the circuit breaker state changes.""" | ||
| old_state_name = old_state.name if old_state else "None" | ||
| new_state_name = new_state.name if new_state else "None" | ||
|
|
||
| logger.info( | ||
| LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name | ||
| ) | ||
|
|
||
| if new_state_name == CIRCUIT_BREAKER_STATE_OPEN: | ||
| logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name) | ||
| elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED: | ||
| logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name) | ||
| elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN: | ||
| logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class CircuitBreakerConfig: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single object to store CB config |
||
| """Configuration for circuit breaker behavior. | ||
| This class is immutable to prevent modification of circuit breaker settings. | ||
| All configuration values are set to constants defined at the module level. | ||
| """ | ||
|
|
||
| # Minimum number of calls before circuit can open | ||
| minimum_calls: int = DEFAULT_MINIMUM_CALLS | ||
|
|
||
| # Time to wait before trying to close circuit (in seconds) | ||
| reset_timeout: int = DEFAULT_RESET_TIMEOUT | ||
|
|
||
| # Name for the circuit breaker (for logging) | ||
| name: str = DEFAULT_NAME | ||
|
|
||
|
|
||
| class CircuitBreakerManager: | ||
| """ | ||
| Manages circuit breaker instances for telemetry requests. | ||
| This class provides a singleton pattern to manage circuit breaker instances | ||
| per host, ensuring that telemetry failures don't impact main SQL operations. | ||
| """ | ||
|
|
||
| _instances: Dict[str, CircuitBreaker] = {} | ||
| _lock = threading.RLock() | ||
| _config: Optional[CircuitBreakerConfig] = None | ||
|
|
||
| @classmethod | ||
| def initialize(cls, config: CircuitBreakerConfig) -> None: | ||
| """ | ||
| Initialize the circuit breaker manager with configuration. | ||
| Args: | ||
| config: Circuit breaker configuration | ||
| """ | ||
| with cls._lock: | ||
| cls._config = config | ||
| logger.debug("CircuitBreakerManager initialized with config: %s", config) | ||
|
|
||
| @classmethod | ||
| def get_circuit_breaker(cls, host: str) -> CircuitBreaker: | ||
| """ | ||
| Get or create a circuit breaker instance for the specified host. | ||
| Args: | ||
| host: The hostname for which to get the circuit breaker | ||
| Returns: | ||
| CircuitBreaker instance for the host | ||
| """ | ||
| if not cls._config: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have this inside lock too? |
||
| # Return a no-op circuit breaker if not initialized | ||
| return cls._create_noop_circuit_breaker() | ||
|
|
||
| with cls._lock: | ||
| if host not in cls._instances: | ||
| cls._instances[host] = cls._create_circuit_breaker(host) | ||
| logger.debug("Created circuit breaker for host: %s", host) | ||
|
|
||
| return cls._instances[host] | ||
|
|
||
| @classmethod | ||
| def _create_circuit_breaker(cls, host: str) -> CircuitBreaker: | ||
| """ | ||
| Create a new circuit breaker instance for the specified host. | ||
| Args: | ||
| host: The hostname for the circuit breaker | ||
| Returns: | ||
| New CircuitBreaker instance | ||
| """ | ||
| config = cls._config | ||
| if config is None: | ||
| raise RuntimeError("CircuitBreakerManager not initialized") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we shouldn't be throwing errors at all, I suppose? |
||
|
|
||
| # Create circuit breaker with configuration | ||
| breaker = CircuitBreaker( | ||
| fail_max=config.minimum_calls, # Number of failures before circuit opens | ||
| reset_timeout=config.reset_timeout, | ||
| name=f"{config.name}-{host}", | ||
| ) | ||
|
|
||
| # Add state change listeners for logging | ||
| breaker.add_listener(CircuitBreakerStateListener()) | ||
|
|
||
| return breaker | ||
|
|
||
| @classmethod | ||
| def _create_noop_circuit_breaker(cls) -> CircuitBreaker: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In cases where client config setup failed or used by test cases. |
||
| """ | ||
| Create a no-op circuit breaker that always allows calls. | ||
| Returns: | ||
| CircuitBreaker that never opens | ||
| """ | ||
| # Create a circuit breaker with very high thresholds so it never opens | ||
| breaker = CircuitBreaker( | ||
| fail_max=1000000, # Very high threshold | ||
| reset_timeout=1, # Short reset time | ||
| name="noop-circuit-breaker", | ||
| ) | ||
| return breaker | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vikrantpuppala Can we be sure that adding new library won't break any other client usage?