Skip to content

Commit fe040c8

Browse files
authored
Dispose all subscriptions when connection broken (#677)
1 parent 7017816 commit fe040c8

File tree

8 files changed

+75
-22
lines changed

8 files changed

+75
-22
lines changed

src/Transports.Subscriptions.Abstractions/IServerOperations.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace GraphQL.Server.Transports.Subscriptions.Abstractions
44
{
5-
public interface IServerOperations
5+
public interface IServerOperations //todo: inherit IDisposable
66
{
77
Task Terminate();
88

src/Transports.Subscriptions.Abstractions/ISubscriptionManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace GraphQL.Server.Transports.Subscriptions.Abstractions
66
/// <summary>
77
/// Manages operation execution and manages created subscriptions
88
/// </summary>
9-
public interface ISubscriptionManager : IEnumerable<Subscription>
9+
public interface ISubscriptionManager : IEnumerable<Subscription> //todo: add IDisposable
1010
{
1111
/// <summary>
1212
/// Execute operation and subscribe if subscription

src/Transports.Subscriptions.Abstractions/Subscription.cs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#nullable enable
2+
13
using System;
24
using System.Linq;
35
using System.Reactive.Linq;
@@ -10,18 +12,18 @@ namespace GraphQL.Server.Transports.Subscriptions.Abstractions
1012
/// <summary>
1113
/// Internal observer of the subscription
1214
/// </summary>
13-
public class Subscription : IObserver<ExecutionResult>
15+
public class Subscription : IObserver<ExecutionResult>, IDisposable
1416
{
15-
private readonly Action<Subscription> _completed;
17+
private Action<Subscription>? _completed;
1618
private readonly ILogger<Subscription> _logger;
17-
private readonly IWriterPipeline _writer;
18-
private IDisposable _unsubscribe;
19+
private IWriterPipeline? _writer;
20+
private IDisposable? _unsubscribe;
1921

2022
public Subscription(string id,
2123
OperationMessagePayload payload,
2224
SubscriptionExecutionResult result,
2325
IWriterPipeline writer,
24-
Action<Subscription> completed,
26+
Action<Subscription>? completed,
2527
ILogger<Subscription> logger)
2628
{
2729
_writer = writer;
@@ -40,22 +42,25 @@ public Subscription(string id,
4042
public void OnCompleted()
4143
{
4244
_logger.LogDebug("Subscription: {subscriptionId} completing", Id);
43-
_writer.Post(new OperationMessage
45+
_writer?.Post(new OperationMessage
4446
{
4547
Type = MessageType.GQL_COMPLETE,
4648
Id = Id
4749
});
4850

4951
_completed?.Invoke(this);
5052
_unsubscribe?.Dispose();
53+
_completed = null;
54+
_writer = null;
55+
_unsubscribe = null;
5156
}
5257

5358
public void OnError(Exception error) => throw new NotImplementedException();
5459

5560
public void OnNext(ExecutionResult value)
5661
{
5762
_logger.LogDebug("Subscription: {subscriptionId} got data", Id);
58-
_writer.Post(new OperationMessage
63+
_writer?.Post(new OperationMessage
5964
{
6065
Type = MessageType.GQL_DATA,
6166
Id = Id,
@@ -66,19 +71,31 @@ public void OnNext(ExecutionResult value)
6671
public Task UnsubscribeAsync()
6772
{
6873
_logger.LogDebug("Subscription: {subscriptionId} unsubscribing", Id);
69-
_unsubscribe.Dispose();
70-
return _writer.SendAsync(new OperationMessage
74+
_unsubscribe?.Dispose();
75+
var writer = _writer;
76+
_writer = null;
77+
_unsubscribe = null;
78+
_completed = null;
79+
return writer?.SendAsync(new OperationMessage
7180
{
7281
Type = MessageType.GQL_COMPLETE,
7382
Id = Id
74-
});
83+
}) ?? Task.CompletedTask;
7584
}
7685

7786
private void Subscribe(SubscriptionExecutionResult result)
7887
{
79-
var stream = result.Streams.Values.Single();
88+
var stream = result.Streams!.Values.Single();
8089
_unsubscribe = stream.Synchronize().Subscribe(this);
8190
_logger.LogDebug("Subscription: {subscriptionId} subscribed", Id);
8291
}
92+
93+
public virtual void Dispose()
94+
{
95+
_unsubscribe?.Dispose();
96+
_unsubscribe = null;
97+
_writer = null;
98+
_completed = null;
99+
}
83100
}
84101
}

src/Transports.Subscriptions.Abstractions/SubscriptionManager.cs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
namespace GraphQL.Server.Transports.Subscriptions.Abstractions
1313
{
1414
/// <inheritdoc />
15-
public class SubscriptionManager : ISubscriptionManager
15+
public class SubscriptionManager : ISubscriptionManager, IDisposable
1616
{
1717
private readonly IGraphQLExecuter _executer;
1818

1919
private readonly ILogger<SubscriptionManager> _logger;
2020
private readonly ILoggerFactory _loggerFactory;
21+
private volatile bool _disposed;
2122

2223
private readonly ConcurrentDictionary<string, Subscription> _subscriptions =
2324
new ConcurrentDictionary<string, Subscription>();
@@ -45,13 +46,18 @@ public async Task SubscribeOrExecuteAsync(
4546
throw new ArgumentNullException(nameof(payload));
4647
if (context == null)
4748
throw new ArgumentNullException(nameof(context));
49+
if (_disposed)
50+
throw new ObjectDisposedException(nameof(SubscriptionManager));
4851

4952
var subscription = await ExecuteAsync(id, payload, context).ConfigureAwait(false);
5053

5154
if (subscription == null)
5255
return;
5356

54-
_subscriptions[id] = subscription;
57+
if (_disposed)
58+
subscription.Dispose();
59+
else
60+
_subscriptions[id] = subscription;
5561
}
5662

5763
/// <inheritdoc />
@@ -140,5 +146,28 @@ await writer.SendAsync(new OperationMessage
140146

141147
return null;
142148
}
149+
150+
public virtual void Dispose()
151+
{
152+
_disposed = true;
153+
while (_subscriptions.Count > 0)
154+
{
155+
var subscriptions = _subscriptions.ToArray();
156+
foreach (var subscriptionPair in subscriptions)
157+
{
158+
if (_subscriptions.TryRemove(subscriptionPair.Key, out var subscription))
159+
{
160+
try
161+
{
162+
subscription.Dispose();
163+
}
164+
catch (Exception ex)
165+
{
166+
_logger.LogError($"Failed to dispose subscription '{subscriptionPair.Key}': ${ex}");
167+
}
168+
}
169+
}
170+
}
171+
}
143172
}
144173
}

src/Transports.Subscriptions.Abstractions/SubscriptionServer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Text;
34
using System.Threading.Tasks;
@@ -10,7 +11,7 @@ namespace GraphQL.Server.Transports.Subscriptions.Abstractions
1011
/// Subscription server
1112
/// Acts as a message pump reading, handling and writing messages
1213
/// </summary>
13-
public class SubscriptionServer : IServerOperations
14+
public class SubscriptionServer : IServerOperations, IDisposable
1415
{
1516
private readonly ILogger<SubscriptionServer> _logger;
1617
private readonly IEnumerable<IOperationMessageListener> _messageListeners;
@@ -136,5 +137,7 @@ private void LinkToTransportWriter()
136137
TransportWriter = Transport.Writer;
137138
_logger.LogDebug("Writer pipeline created");
138139
}
140+
141+
public virtual void Dispose() => (Subscriptions as IDisposable)?.Dispose();
139142
}
140143
}

src/Transports.Subscriptions.WebSockets/WebSocketConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ public virtual async Task Connect()
2424
await _transport.CloseAsync();
2525
}
2626

27-
public void Dispose()
27+
public virtual void Dispose()
2828
{
29+
_server.Dispose();
2930
_transport.Dispose();
3031
}
3132
}

tests/ApiApprovalTests/GraphQL.Server.Transports.Subscriptions.Abstractions.approved.txt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,31 +96,34 @@ namespace GraphQL.Server.Transports.Subscriptions.Abstractions
9696
public System.Threading.Tasks.Task BeforeHandleAsync(GraphQL.Server.Transports.Subscriptions.Abstractions.MessageHandlingContext context) { }
9797
public System.Threading.Tasks.Task HandleAsync(GraphQL.Server.Transports.Subscriptions.Abstractions.MessageHandlingContext context) { }
9898
}
99-
public class Subscription : System.IObserver<GraphQL.ExecutionResult>
99+
public class Subscription : System.IDisposable, System.IObserver<GraphQL.ExecutionResult>
100100
{
101-
public Subscription(string id, GraphQL.Server.Transports.Subscriptions.Abstractions.OperationMessagePayload payload, GraphQL.Subscription.SubscriptionExecutionResult result, GraphQL.Server.Transports.Subscriptions.Abstractions.IWriterPipeline writer, System.Action<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription> completed, Microsoft.Extensions.Logging.ILogger<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription> logger) { }
101+
public Subscription(string id, GraphQL.Server.Transports.Subscriptions.Abstractions.OperationMessagePayload payload, GraphQL.Subscription.SubscriptionExecutionResult result, GraphQL.Server.Transports.Subscriptions.Abstractions.IWriterPipeline writer, System.Action<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription>? completed, Microsoft.Extensions.Logging.ILogger<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription> logger) { }
102102
public string Id { get; }
103103
public GraphQL.Server.Transports.Subscriptions.Abstractions.OperationMessagePayload OriginalPayload { get; }
104+
public virtual void Dispose() { }
104105
public void OnCompleted() { }
105106
public void OnError(System.Exception error) { }
106107
public void OnNext(GraphQL.ExecutionResult value) { }
107108
public System.Threading.Tasks.Task UnsubscribeAsync() { }
108109
}
109-
public class SubscriptionManager : GraphQL.Server.Transports.Subscriptions.Abstractions.ISubscriptionManager, System.Collections.Generic.IEnumerable<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription>, System.Collections.IEnumerable
110+
public class SubscriptionManager : GraphQL.Server.Transports.Subscriptions.Abstractions.ISubscriptionManager, System.Collections.Generic.IEnumerable<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription>, System.Collections.IEnumerable, System.IDisposable
110111
{
111112
public SubscriptionManager(GraphQL.Server.IGraphQLExecuter executer, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) { }
112113
public GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription this[string id] { get; }
114+
public virtual void Dispose() { }
113115
public System.Collections.Generic.IEnumerator<GraphQL.Server.Transports.Subscriptions.Abstractions.Subscription> GetEnumerator() { }
114116
public System.Threading.Tasks.Task SubscribeOrExecuteAsync(string id, GraphQL.Server.Transports.Subscriptions.Abstractions.OperationMessagePayload payload, GraphQL.Server.Transports.Subscriptions.Abstractions.MessageHandlingContext context) { }
115117
public System.Threading.Tasks.Task UnsubscribeAsync(string id) { }
116118
}
117-
public class SubscriptionServer : GraphQL.Server.Transports.Subscriptions.Abstractions.IServerOperations
119+
public class SubscriptionServer : GraphQL.Server.Transports.Subscriptions.Abstractions.IServerOperations, System.IDisposable
118120
{
119121
public SubscriptionServer(GraphQL.Server.Transports.Subscriptions.Abstractions.IMessageTransport transport, GraphQL.Server.Transports.Subscriptions.Abstractions.ISubscriptionManager subscriptions, System.Collections.Generic.IEnumerable<GraphQL.Server.Transports.Subscriptions.Abstractions.IOperationMessageListener> messageListeners, Microsoft.Extensions.Logging.ILogger<GraphQL.Server.Transports.Subscriptions.Abstractions.SubscriptionServer> logger) { }
120122
public GraphQL.Server.Transports.Subscriptions.Abstractions.ISubscriptionManager Subscriptions { get; }
121123
public GraphQL.Server.Transports.Subscriptions.Abstractions.IMessageTransport Transport { get; }
122124
public GraphQL.Server.Transports.Subscriptions.Abstractions.IReaderPipeline TransportReader { get; set; }
123125
public GraphQL.Server.Transports.Subscriptions.Abstractions.IWriterPipeline TransportWriter { get; set; }
126+
public virtual void Dispose() { }
124127
public System.Threading.Tasks.Task OnConnect() { }
125128
public System.Threading.Tasks.Task OnDisconnect() { }
126129
public System.Threading.Tasks.Task Terminate() { }

tests/ApiApprovalTests/GraphQL.Server.Transports.Subscriptions.WebSockets.approved.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace GraphQL.Server.Transports.WebSockets
2424
{
2525
public WebSocketConnection(GraphQL.Server.Transports.WebSockets.WebSocketTransport transport, GraphQL.Server.Transports.Subscriptions.Abstractions.SubscriptionServer subscriptionServer) { }
2626
public virtual System.Threading.Tasks.Task Connect() { }
27-
public void Dispose() { }
27+
public virtual void Dispose() { }
2828
}
2929
public class WebSocketConnectionFactory<TSchema> : GraphQL.Server.Transports.WebSockets.IWebSocketConnectionFactory<TSchema>
3030
where TSchema : GraphQL.Types.ISchema

0 commit comments

Comments
 (0)