11using Microsoft . Extensions . Caching . Distributed ;
2+ using Microsoft . Extensions . Logging ;
3+ using Microsoft . Extensions . Logging . Abstractions ;
24using ModelContextProtocol . Protocol ;
35using System . Net . ServerSentEvents ;
46using System . Runtime . CompilerServices ;
@@ -20,28 +22,32 @@ namespace ModelContextProtocol.Server;
2022/// to be only one writer per stream. Readers may be created from separate processes.
2123/// </para>
2224/// </remarks>
23- public sealed class DistributedCacheEventStreamStore : ISseEventStreamStore
25+ public sealed partial class DistributedCacheEventStreamStore : ISseEventStreamStore
2426{
2527 private readonly IDistributedCache _cache ;
2628 private readonly DistributedCacheEventStreamStoreOptions _options ;
29+ private readonly ILogger _logger ;
2730
2831 /// <summary>
2932 /// Initializes a new instance of the <see cref="DistributedCacheEventStreamStore"/> class.
3033 /// </summary>
3134 /// <param name="cache">The distributed cache to use for storage.</param>
3235 /// <param name="options">Optional configuration options for the store.</param>
33- public DistributedCacheEventStreamStore ( IDistributedCache cache , DistributedCacheEventStreamStoreOptions ? options = null )
36+ /// <param name="logger">Optional logger for diagnostic output.</param>
37+ public DistributedCacheEventStreamStore ( IDistributedCache cache , DistributedCacheEventStreamStoreOptions ? options = null , ILogger < DistributedCacheEventStreamStore > ? logger = null )
3438 {
3539 Throw . IfNull ( cache ) ;
3640 _cache = cache ;
3741 _options = options ?? new ( ) ;
42+ _logger = logger ?? NullLogger < DistributedCacheEventStreamStore > . Instance ;
3843 }
3944
4045 /// <inheritdoc />
4146 public ValueTask < ISseEventStreamWriter > CreateStreamAsync ( SseEventStreamOptions options , CancellationToken cancellationToken = default )
4247 {
4348 Throw . IfNull ( options ) ;
44- var writer = new DistributedCacheEventStreamWriter ( _cache , options . SessionId , options . StreamId , options . Mode , _options ) ;
49+ LogStreamCreated ( options . SessionId , options . StreamId , options . Mode ) ;
50+ var writer = new DistributedCacheEventStreamWriter ( _cache , options . SessionId , options . StreamId , options . Mode , _options , _logger ) ;
4551 return new ValueTask < ISseEventStreamWriter > ( writer ) ;
4652 }
4753
@@ -53,6 +59,7 @@ public ValueTask<ISseEventStreamWriter> CreateStreamAsync(SseEventStreamOptions
5359 // Parse the event ID to get session, stream, and sequence information
5460 if ( ! DistributedCacheEventIdFormatter . TryParse ( lastEventId , out var sessionId , out var streamId , out var sequence ) )
5561 {
62+ LogEventIdParsingFailed ( lastEventId ) ;
5663 return null ;
5764 }
5865
@@ -61,17 +68,20 @@ public ValueTask<ISseEventStreamWriter> CreateStreamAsync(SseEventStreamOptions
6168 var metadataBytes = await _cache . GetAsync ( metadataKey , cancellationToken ) . ConfigureAwait ( false ) ;
6269 if ( metadataBytes is null )
6370 {
71+ LogStreamMetadataNotFound ( sessionId , streamId ) ;
6472 return null ;
6573 }
6674
6775 var metadata = JsonSerializer . Deserialize ( metadataBytes , McpJsonUtilities . JsonContext . Default . StreamMetadata ) ;
6876 if ( metadata is null )
6977 {
78+ LogStreamMetadataDeserializationFailed ( sessionId , streamId ) ;
7079 return null ;
7180 }
7281
7382 var startSequence = sequence + 1 ;
74- return new DistributedCacheEventStreamReader ( _cache , sessionId , streamId , startSequence , metadata , _options ) ;
83+ LogStreamReaderCreated ( sessionId , streamId , startSequence , metadata . LastSequence ) ;
84+ return new DistributedCacheEventStreamReader ( _cache , sessionId , streamId , startSequence , metadata , _options , _logger ) ;
7585 }
7686
7787 /// <summary>
@@ -111,13 +121,14 @@ internal sealed class StoredEvent
111121 public JsonRpcMessage ? Data { get ; set ; }
112122 }
113123
114- private sealed class DistributedCacheEventStreamWriter : ISseEventStreamWriter
124+ private sealed partial class DistributedCacheEventStreamWriter : ISseEventStreamWriter
115125 {
116126 private readonly IDistributedCache _cache ;
117127 private readonly string _sessionId ;
118128 private readonly string _streamId ;
119129 private SseEventStreamMode _mode ;
120130 private readonly DistributedCacheEventStreamStoreOptions _options ;
131+ private readonly ILogger _logger ;
121132 private long _sequence ;
122133 private bool _disposed ;
123134
@@ -126,17 +137,20 @@ public DistributedCacheEventStreamWriter(
126137 string sessionId ,
127138 string streamId ,
128139 SseEventStreamMode mode ,
129- DistributedCacheEventStreamStoreOptions options )
140+ DistributedCacheEventStreamStoreOptions options ,
141+ ILogger logger )
130142 {
131143 _cache = cache ;
132144 _sessionId = sessionId ;
133145 _streamId = streamId ;
134146 _mode = mode ;
135147 _options = options ;
148+ _logger = logger ;
136149 }
137150
138151 public async ValueTask SetModeAsync ( SseEventStreamMode mode , CancellationToken cancellationToken = default )
139152 {
153+ LogStreamModeChanged ( _sessionId , _streamId , mode ) ;
140154 _mode = mode ;
141155 await UpdateMetadataAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
142156 }
@@ -146,6 +160,7 @@ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken c
146160 // Skip if already has an event ID
147161 if ( sseItem . EventId is not null )
148162 {
163+ LogEventAlreadyHasId ( _sessionId , _streamId , sseItem . EventId ) ;
149164 return sseItem ;
150165 }
151166
@@ -174,6 +189,7 @@ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken c
174189 // Update metadata with the latest sequence
175190 await UpdateMetadataAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
176191
192+ LogEventWritten ( _sessionId , _streamId , eventId , sequence ) ;
177193 return newItem ;
178194 }
179195
@@ -207,30 +223,46 @@ public async ValueTask DisposeAsync()
207223
208224 // Mark the stream as completed in the metadata
209225 await UpdateMetadataAsync ( CancellationToken . None ) . ConfigureAwait ( false ) ;
226+ LogStreamWriterDisposed ( _sessionId , _streamId , Interlocked . Read ( ref _sequence ) ) ;
210227 }
228+
229+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Stream mode changed for session '{SessionId}', stream '{StreamId}' to {Mode}." ) ]
230+ private partial void LogStreamModeChanged ( string sessionId , string streamId , SseEventStreamMode mode ) ;
231+
232+ [ LoggerMessage ( Level = LogLevel . Trace , Message = "Event already has ID '{EventId}' for session '{SessionId}', stream '{StreamId}'. Skipping ID generation." ) ]
233+ private partial void LogEventAlreadyHasId ( string sessionId , string streamId , string eventId ) ;
234+
235+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Event written to session '{SessionId}', stream '{StreamId}' with ID '{EventId}' (sequence {Sequence})." ) ]
236+ private partial void LogEventWritten ( string sessionId , string streamId , string eventId , long sequence ) ;
237+
238+ [ LoggerMessage ( Level = LogLevel . Information , Message = "Stream writer disposed for session '{SessionId}', stream '{StreamId}'. Total events written: {TotalEvents}." ) ]
239+ private partial void LogStreamWriterDisposed ( string sessionId , string streamId , long totalEvents ) ;
211240 }
212241
213- private sealed class DistributedCacheEventStreamReader : ISseEventStreamReader
242+ private sealed partial class DistributedCacheEventStreamReader : ISseEventStreamReader
214243 {
215244 private readonly IDistributedCache _cache ;
216245 private readonly long _startSequence ;
217246 private readonly StreamMetadata _initialMetadata ;
218247 private readonly DistributedCacheEventStreamStoreOptions _options ;
248+ private readonly ILogger _logger ;
219249
220250 public DistributedCacheEventStreamReader (
221251 IDistributedCache cache ,
222252 string sessionId ,
223253 string streamId ,
224254 long startSequence ,
225255 StreamMetadata initialMetadata ,
226- DistributedCacheEventStreamStoreOptions options )
256+ DistributedCacheEventStreamStoreOptions options ,
257+ ILogger logger )
227258 {
228259 _cache = cache ;
229260 SessionId = sessionId ;
230261 StreamId = streamId ;
231262 _startSequence = startSequence ;
232263 _initialMetadata = initialMetadata ;
233264 _options = options ;
265+ _logger = logger ;
234266 }
235267
236268 public string SessionId { get ; }
@@ -246,6 +278,8 @@ public DistributedCacheEventStreamReader(
246278 var isCompleted = _initialMetadata . IsCompleted ;
247279 var mode = _initialMetadata . Mode ;
248280
281+ LogReadingEventsStarted ( SessionId , StreamId , _startSequence , lastSequence ) ;
282+
249283 while ( ! cancellationToken . IsCancellationRequested )
250284 {
251285 // Read all available events from currentSequence + 1 to lastSequence
@@ -261,6 +295,7 @@ public DistributedCacheEventStreamReader(
261295 var storedEvent = JsonSerializer . Deserialize ( eventBytes , McpJsonUtilities . JsonContext . Default . StoredEvent ) ;
262296 if ( storedEvent is not null )
263297 {
298+ LogEventRead ( SessionId , StreamId , eventId , currentSequence ) ;
264299 yield return new SseItem < JsonRpcMessage ? > ( storedEvent . Data , storedEvent . EventType )
265300 {
266301 EventId = storedEvent . EventId ,
@@ -271,16 +306,19 @@ public DistributedCacheEventStreamReader(
271306 // If in polling mode, stop after returning currently available events
272307 if ( mode == SseEventStreamMode . Polling )
273308 {
309+ LogReadingEventsCompletedPolling ( SessionId , StreamId , currentSequence - 1 ) ;
274310 yield break ;
275311 }
276312
277313 // If the stream is completed and we've read all events, stop
278314 if ( isCompleted )
279315 {
316+ LogReadingEventsCompletedStreamEnded ( SessionId , StreamId , currentSequence - 1 ) ;
280317 yield break ;
281318 }
282319
283320 // Wait before polling again for new events
321+ LogWaitingForNewEvents ( SessionId , StreamId , _options . PollingInterval ) ;
284322 await Task . Delay ( _options . PollingInterval , cancellationToken ) . ConfigureAwait ( false ) ;
285323
286324 // Refresh metadata to get the latest sequence and completion status
@@ -296,5 +334,35 @@ public DistributedCacheEventStreamReader(
296334 mode = currentMetadata . Mode ;
297335 }
298336 }
337+
338+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Starting to read events for session '{SessionId}', stream '{StreamId}' from sequence {StartSequence} to {LastSequence}." ) ]
339+ private partial void LogReadingEventsStarted ( string sessionId , string streamId , long startSequence , long lastSequence ) ;
340+
341+ [ LoggerMessage ( Level = LogLevel . Trace , Message = "Event read from session '{SessionId}', stream '{StreamId}' with ID '{EventId}' (sequence {Sequence})." ) ]
342+ private partial void LogEventRead ( string sessionId , string streamId , string eventId , long sequence ) ;
343+
344+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Reading events completed for session '{SessionId}', stream '{StreamId}' in polling mode. Last sequence read: {LastSequence}." ) ]
345+ private partial void LogReadingEventsCompletedPolling ( string sessionId , string streamId , long lastSequence ) ;
346+
347+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Reading events completed for session '{SessionId}', stream '{StreamId}' as stream has ended. Last sequence read: {LastSequence}." ) ]
348+ private partial void LogReadingEventsCompletedStreamEnded ( string sessionId , string streamId , long lastSequence ) ;
349+
350+ [ LoggerMessage ( Level = LogLevel . Trace , Message = "Waiting for new events on session '{SessionId}', stream '{StreamId}'. Polling interval: {PollingInterval}." ) ]
351+ private partial void LogWaitingForNewEvents ( string sessionId , string streamId , TimeSpan pollingInterval ) ;
299352 }
353+
354+ [ LoggerMessage ( Level = LogLevel . Information , Message = "Stream created for session '{SessionId}', stream '{StreamId}' with mode {Mode}." ) ]
355+ private partial void LogStreamCreated ( string sessionId , string streamId , SseEventStreamMode mode ) ;
356+
357+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Stream reader created for session '{SessionId}', stream '{StreamId}' starting at sequence {StartSequence}. Last available sequence: {LastSequence}." ) ]
358+ private partial void LogStreamReaderCreated ( string sessionId , string streamId , long startSequence , long lastSequence ) ;
359+
360+ [ LoggerMessage ( Level = LogLevel . Warning , Message = "Failed to parse event ID '{EventId}'. Unable to create stream reader." ) ]
361+ private partial void LogEventIdParsingFailed ( string eventId ) ;
362+
363+ [ LoggerMessage ( Level = LogLevel . Debug , Message = "Stream metadata not found for session '{SessionId}', stream '{StreamId}'." ) ]
364+ private partial void LogStreamMetadataNotFound ( string sessionId , string streamId ) ;
365+
366+ [ LoggerMessage ( Level = LogLevel . Warning , Message = "Failed to deserialize stream metadata for session '{SessionId}', stream '{StreamId}'." ) ]
367+ private partial void LogStreamMetadataDeserializationFailed ( string sessionId , string streamId ) ;
300368}
0 commit comments