-
Notifications
You must be signed in to change notification settings - Fork 11
feat(csharp): implement telemetry client pipeline with circuit breaker protection #305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
665afa6
Define ITelemetryClient interface\n\nTask ID: task-2.1-telemetry-clieβ¦
982c6b4
Implement TelemetryClientManager with reference counting\n\nTask ID: β¦
35ffc75
Address PR review feedback for telemetry client
b9477bb
Implement CircuitBreakerManager\n\nTask ID: task-3.1-circuit-breaker-β¦
fdd171e
Implement CircuitBreakerTelemetryExporter wrapper\n\nTask ID: task-3.β¦
a2f0d32
Implement MetricsAggregator for Activity-based collection\n\nTask ID:β¦
de0c920
Implement DatabricksActivityListener\n\nTask ID: task-4.2-databricks-β¦
9ca9c88
Remove dead code: DatabricksActivityListener, MetricsAggregator, Teleβ¦
e417df7
Implement TelemetryClient coordinating listener/aggregator/exporter\nβ¦
29c7c37
Update TelemetryClient to direct enqueue/flush/export pipeline
d4f1460
address comments
f82e647
address comments
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Copyright (c) 2025 ADBC Drivers Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System; | ||
| using System.Collections.Concurrent; | ||
|
|
||
| namespace AdbcDrivers.Databricks.Telemetry | ||
| { | ||
| /// <summary> | ||
| /// Singleton factory that manages one circuit breaker per host. | ||
| /// Ensures proper isolation by preventing circuit breaker state from being shared across different hosts. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// <para> | ||
| /// When multiple connections are made to different hosts, each host should have its own circuit breaker | ||
| /// to isolate failures. If one host is experiencing issues, connections to other healthy hosts should | ||
| /// not be affected by the same circuit breaker state. | ||
| /// </para> | ||
| /// <para> | ||
| /// This manager maintains a single CircuitBreaker instance per host, shared across all connections | ||
| /// to that host. Each circuit breaker uses default configuration: 5 failures threshold, 1 minute timeout, | ||
| /// and 2 successes to close. | ||
| /// </para> | ||
| /// <para> | ||
| /// Thread Safety: All methods are thread-safe and can be called concurrently from multiple | ||
| /// connections. Uses ConcurrentDictionary for synchronization. | ||
| /// </para> | ||
| /// </remarks> | ||
| internal sealed class CircuitBreakerManager | ||
| { | ||
| private static readonly CircuitBreakerManager s_instance = new CircuitBreakerManager(); | ||
|
|
||
| private readonly ConcurrentDictionary<string, CircuitBreaker> _circuitBreakers = new ConcurrentDictionary<string, CircuitBreaker>(); | ||
|
|
||
| /// <summary> | ||
| /// Private constructor to enforce singleton pattern. | ||
| /// </summary> | ||
| private CircuitBreakerManager() | ||
| { | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the singleton instance of CircuitBreakerManager. | ||
| /// </summary> | ||
| /// <returns>The singleton CircuitBreakerManager instance.</returns> | ||
| public static CircuitBreakerManager GetInstance() => s_instance; | ||
|
|
||
| /// <summary> | ||
| /// Gets or creates a circuit breaker for the specified host. | ||
| /// </summary> | ||
| /// <param name="host">The host identifier (e.g., "databricks-workspace.cloud.databricks.com").</param> | ||
| /// <returns>The circuit breaker for the specified host.</returns> | ||
| /// <remarks> | ||
| /// <para> | ||
| /// Thread Safety: This method is thread-safe. If multiple connections call this method | ||
| /// concurrently for the same host, only one circuit breaker will be created due to | ||
| /// ConcurrentDictionary's atomic GetOrAdd operation. | ||
| /// </para> | ||
| /// <para> | ||
| /// The circuit breaker uses default configuration: | ||
| /// - Failure threshold: 5 consecutive failures | ||
| /// - Timeout: 1 minute (circuit stays open) | ||
| /// - Success threshold: 2 successful calls to close the circuit in half-open state | ||
| /// </para> | ||
| /// </remarks> | ||
| public CircuitBreaker GetCircuitBreaker(string host) | ||
| { | ||
| if (string.IsNullOrWhiteSpace(host)) | ||
| { | ||
| throw new ArgumentException("Host cannot be null or whitespace.", nameof(host)); | ||
| } | ||
|
|
||
| return _circuitBreakers.GetOrAdd(host, _ => new CircuitBreaker()); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Removes the circuit breaker for the specified host. | ||
| /// Called when the last client for a host is released to prevent memory leaks. | ||
| /// </summary> | ||
| /// <param name="host">The host identifier.</param> | ||
| public void RemoveCircuitBreaker(string host) | ||
| { | ||
| _circuitBreakers.TryRemove(host, out _); | ||
| } | ||
| } | ||
| } | ||
203 changes: 203 additions & 0 deletions
203
csharp/src/Telemetry/CircuitBreakerTelemetryExporter.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| /* | ||
| * Copyright (c) 2025 ADBC Drivers Contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using AdbcDrivers.Databricks.Telemetry.Models; | ||
| using Polly.CircuitBreaker; | ||
|
|
||
| namespace AdbcDrivers.Databricks.Telemetry | ||
| { | ||
| /// <summary> | ||
| /// Wraps ITelemetryExporter with circuit breaker protection to prevent wasting resources | ||
| /// on failing telemetry endpoints. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// <para> | ||
| /// This exporter protects against failing telemetry endpoints (5xx errors, timeouts, network failures). | ||
| /// When the circuit breaker detects too many failures, it opens the circuit and all subsequent | ||
| /// telemetry events are silently dropped (logged at DEBUG level) until the endpoint recovers. | ||
| /// </para> | ||
| /// <para> | ||
| /// Key Behaviors: | ||
| /// - Circuit Closed: Events pass through to inner exporter. Failures are tracked. | ||
| /// - Circuit Open: Events are silently dropped (returns true, logs at DEBUG level). | ||
| /// - Circuit HalfOpen: Test requests are allowed through to check for recovery. | ||
| /// - Per-host isolation: Each host gets its own circuit breaker via CircuitBreakerManager. | ||
| /// </para> | ||
| /// <para> | ||
| /// Thread Safety: This class is thread-safe and can be called concurrently from multiple contexts. | ||
| /// </para> | ||
| /// </remarks> | ||
| internal sealed class CircuitBreakerTelemetryExporter : ITelemetryExporter | ||
| { | ||
| private readonly ITelemetryExporter _innerExporter; | ||
| private readonly CircuitBreaker _circuitBreaker; | ||
| private readonly string _host; | ||
|
|
||
| /// <summary> | ||
| /// Gets the host for this exporter. | ||
| /// </summary> | ||
| internal string Host => _host; | ||
|
|
||
| /// <summary> | ||
| /// Gets the current state of the circuit breaker. | ||
| /// </summary> | ||
| internal CircuitBreakerState State => _circuitBreaker.State; | ||
|
|
||
| /// <summary> | ||
| /// Creates a new CircuitBreakerTelemetryExporter. | ||
| /// </summary> | ||
| /// <param name="innerExporter">The inner telemetry exporter to wrap with circuit breaker protection.</param> | ||
| /// <param name="host">The host identifier for per-host circuit breaker isolation.</param> | ||
| /// <exception cref="ArgumentNullException">Thrown when innerExporter is null.</exception> | ||
| /// <exception cref="ArgumentException">Thrown when host is null, empty, or whitespace.</exception> | ||
| public CircuitBreakerTelemetryExporter(ITelemetryExporter innerExporter, string host) | ||
| { | ||
| _innerExporter = innerExporter ?? throw new ArgumentNullException(nameof(innerExporter)); | ||
|
|
||
| if (string.IsNullOrWhiteSpace(host)) | ||
| { | ||
| throw new ArgumentException("Host cannot be null or whitespace.", nameof(host)); | ||
| } | ||
|
|
||
| _host = host; | ||
| _circuitBreaker = CircuitBreakerManager.GetInstance().GetCircuitBreaker(host); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Export telemetry frontend logs with circuit breaker protection. | ||
| /// </summary> | ||
| /// <param name="logs">The list of telemetry frontend logs to export.</param> | ||
| /// <param name="ct">Cancellation token.</param> | ||
| /// <returns> | ||
| /// True if the export succeeded or was silently dropped (circuit open). | ||
| /// False if the export failed and was tracked by the circuit breaker. | ||
| /// Returns true for empty/null logs since there's nothing to export. | ||
| /// </returns> | ||
| /// <remarks> | ||
| /// <para> | ||
| /// This method never throws exceptions. All errors are caught and traced. | ||
| /// </para> | ||
| /// <para> | ||
| /// When the circuit is open, events are silently dropped and logged at DEBUG level. | ||
| /// This prevents wasting resources on a failing endpoint while waiting for recovery. | ||
| /// </para> | ||
| /// <para> | ||
| /// When the circuit is closed, events are passed through to the inner exporter. | ||
| /// If the inner exporter fails, the failure is tracked by the circuit breaker. | ||
| /// </para> | ||
| /// </remarks> | ||
| public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, CancellationToken ct = default) | ||
| { | ||
| if (logs == null || logs.Count == 0) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| // Execute through circuit breaker | ||
| // The circuit breaker will track failures and open if threshold is reached | ||
| // Polly handles the open-circuit case internally by throwing BrokenCircuitException | ||
| bool result = await _circuitBreaker.ExecuteAsync(async () => | ||
| { | ||
| bool success = await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false); | ||
|
|
||
| // If inner exporter returns false, it means the export failed | ||
| // We need to throw an exception so the circuit breaker can track the failure | ||
| if (!success) | ||
| { | ||
| throw new TelemetryExportException("Inner exporter returned false indicating export failure"); | ||
| } | ||
|
|
||
| return success; | ||
| }).ConfigureAwait(false); | ||
|
|
||
| return result; | ||
| } | ||
| catch (BrokenCircuitException) | ||
| { | ||
| // Circuit is open - silently drop events | ||
| Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_open", | ||
| tags: new ActivityTagsCollection | ||
| { | ||
| { "host", _host }, | ||
| { "log_count", logs.Count }, | ||
| { "action", "dropped" } | ||
| })); | ||
|
|
||
| // Return true because dropping is expected behavior when circuit is open | ||
| return true; | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Cancellation should not impact driver behavior; treat as a no-op. | ||
| Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.canceled", | ||
| tags: new ActivityTagsCollection | ||
| { | ||
| { "host", _host }, | ||
| { "log_count", logs.Count }, | ||
| { "action", "canceled" } | ||
| })); | ||
|
|
||
| return true; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| // All other exceptions are swallowed per telemetry requirement | ||
| // These are already tracked by the circuit breaker | ||
| Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_breaker_error", | ||
| tags: new ActivityTagsCollection | ||
| { | ||
| { "host", _host }, | ||
| { "error.message", ex.Message }, | ||
| { "error.type", ex.GetType().Name }, | ||
| { "circuit_state", _circuitBreaker.State.ToString() } | ||
| })); | ||
|
|
||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Exception thrown when telemetry export fails. | ||
| /// Used internally by CircuitBreakerTelemetryExporter to signal failures to the circuit breaker. | ||
| /// </summary> | ||
| internal sealed class TelemetryExportException : Exception | ||
| { | ||
| /// <summary> | ||
| /// Creates a new TelemetryExportException. | ||
| /// </summary> | ||
| /// <param name="message">The error message.</param> | ||
| public TelemetryExportException(string message) : base(message) | ||
| { | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Creates a new TelemetryExportException. | ||
| /// </summary> | ||
| /// <param name="message">The error message.</param> | ||
| /// <param name="innerException">The inner exception.</param> | ||
| public TelemetryExportException(string message, Exception innerException) : base(message, innerException) | ||
| { | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.