Skip to content
73 changes: 73 additions & 0 deletions src/_common/StreamHub/ReusableObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace Skender.Stock.Indicators;

/// <summary>
/// Flexible IStreamObserver implementation with customizable callbacks for all lifecycle methods.
/// All callbacks are optional (nullable).
/// </summary>
public class ReusableObserver : IStreamObserver<IReusable>
{
private readonly Func<bool> _isSubscribed;
private readonly Action<Exception>? _onError;
private readonly Action? _onCompleted;
private readonly Action? _onUnsubscribe;
private readonly Action<IReusable, bool, int?>? _onAdd;
private readonly Action<DateTime>? _onRebuild;
private readonly Action<DateTime>? _onPrune;
private readonly Action? _onReinitialize;
private readonly Action? _rebuild;
private readonly Action<DateTime>? _rebuildTimestamp;
private readonly Action<int>? _rebuildIndex;

public bool IsSubscribed => _isSubscribed();
Comment on lines +22 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify the thread-safety guarantee in documentation.

The documentation claims "This property is thread-safe" (line 28), but the property simply delegates to the user-provided _isSubscribed function. Thread-safety depends entirely on the caller's implementation of that delegate, not on this property. This could mislead users into assuming a guarantee that doesn't exist.

📝 Suggested documentation revision
     /// <remarks>
-    /// This property is thread-safe and returns the current subscription state
-    /// by invoking the backing delegate <see cref="_isSubscribed"/>.
+    /// Returns the current subscription state by invoking the backing delegate 
+    /// <see cref="_isSubscribed"/>. Thread-safety depends on the implementation 
+    /// of the provided delegate.
     /// The value is determined by the provider at the time of invocation.
     /// </remarks>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// <summary>
/// Gets a value indicating whether the observer is currently subscribed to the data provider.
/// </summary>
/// <value>
/// <c>true</c> if the observer is subscribed; otherwise, <c>false</c>.
/// </value>
/// <remarks>
/// This property is thread-safe and returns the current subscription state
/// by invoking the backing delegate <see cref="_isSubscribed"/>.
/// The value is determined by the provider at the time of invocation.
/// </remarks>
public bool IsSubscribed => _isSubscribed();
/// <summary>
/// Gets a value indicating whether the observer is currently subscribed to the data provider.
/// </summary>
/// <value>
/// <c>true</c> if the observer is subscribed; otherwise, <c>false</c>.
/// </value>
/// <remarks>
/// Returns the current subscription state by invoking the backing delegate
/// <see cref="_isSubscribed"/>. Thread-safety depends on the implementation
/// of the provided delegate.
/// The value is determined by the provider at the time of invocation.
/// </remarks>
public bool IsSubscribed => _isSubscribed();
🤖 Prompt for AI Agents
In @src/_common/StreamHub/ReusableObserver.cs around lines 21 - 32, The XML doc
for the IsSubscribed property incorrectly claims thread-safety even though it
simply calls the user-supplied delegate _isSubscribed; update the
summary/remarks to remove the unconditional "thread-safe" guarantee and instead
state that IsSubscribed invokes the backing delegate _isSubscribed and that any
thread-safety guarantees depend on the delegate's implementation (mention
ReusableObserver and the IsSubscribed property so reviewers can find the code).


public ReusableObserver(
Func<bool> isSubscribed,
Action<Exception>? onError = null,
Action? onCompleted = null,
Action? onUnsubscribe = null,
Action<IReusable, bool, int?>? onAdd = null,
Action<DateTime>? onRebuild = null,
Action<DateTime>? onPrune = null,
Action? onReinitialize = null,
Action? rebuild = null,
Action<DateTime>? rebuildTimestamp = null,
Action<int>? rebuildIndex = null)
{
_isSubscribed = isSubscribed;
_onError = onError;
_onCompleted = onCompleted;
_onUnsubscribe = onUnsubscribe;
_onAdd = onAdd;
_onRebuild = onRebuild;
_onPrune = onPrune;
_onReinitialize = onReinitialize;
_rebuild = rebuild;
_rebuildTimestamp = rebuildTimestamp;
_rebuildIndex = rebuildIndex;
}

public void OnError(Exception exception) => _onError?.Invoke(exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add XML documentation and validate exception parameter.

Two issues:

  1. Missing XML documentation required per coding guidelines.
  2. The exception parameter is not validated for null. Consider validating user inputs per coding guidelines.
📝 Proposed fix
+    /// <summary>
+    /// Invoked when an error occurs in the stream.
+    /// </summary>
+    /// <param name="exception">The exception that occurred.</param>
-    public void OnError(Exception exception) => _onError?.Invoke(exception);
+    public void OnError(Exception exception)
+    {
+        ArgumentNullException.ThrowIfNull(exception);
+        _onError?.Invoke(exception);
+    }

As per coding guidelines, all public methods require XML documentation.

🤖 Prompt for AI Agents
In @src/_common/StreamHub/ReusableObserver.cs at line 47, Add XML documentation
to the public OnError method describing its purpose and parameter, and validate
the exception parameter for null per coding guidelines: in
ReusableObserver.OnError check if exception is null and throw an
ArgumentNullException for the "exception" parameter before invoking the
delegate, then call _onError?.Invoke(exception); ensure the XML doc includes
summary and param tags to satisfy tooling.


public void OnCompleted() => _onCompleted?.Invoke();

public void Unsubscribe() { _onUnsubscribe?.Invoke(); }

// OnAdd is called when a new item is added to the hub's cache
public void OnAdd(IReusable item, bool notify, int? indexHint) => _onAdd?.Invoke(item, notify, indexHint);

// OnRebuild is called when the hub recalculates from a specific timestamp
public void OnRebuild(DateTime fromTimestamp) => _onRebuild?.Invoke(fromTimestamp);

// OnPrune is called when old items are removed from the hub's cache
public void OnPrune(DateTime toTimestamp) => _onPrune?.Invoke(toTimestamp);

// Reinitialize is called to reset the observer state
public void Reinitialize() { _onReinitialize?.Invoke(); }

// Rebuild methods trigger recalculation
public void Rebuild() => _rebuild?.Invoke();

public void Rebuild(DateTime fromTimestamp) => _rebuildTimestamp?.Invoke(fromTimestamp);

public void Rebuild(int fromIndex) => _rebuildIndex?.Invoke(fromIndex);
}