-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathReceiverManager.cs
More file actions
278 lines (234 loc) · 9.77 KB
/
ReceiverManager.cs
File metadata and controls
278 lines (234 loc) · 9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Iviz.Msgs;
using Iviz.Tools;
using Nito.AsyncEx;
namespace Iviz.Roslib;
internal sealed class ReceiverManager<TMessage> where TMessage : IMessage
{
const int DefaultTimeoutInMs = 5000;
readonly AsyncLock mutex = new();
readonly ConcurrentDictionary<Uri, ReceiverConnector> connectorsByUri = new();
readonly ConcurrentDictionary<Uri, IProtocolReceiver> receiversByUri = new();
readonly RosClient client;
readonly RosSubscriber<TMessage> subscriber;
readonly TopicInfo topicInfo;
readonly RosTransportHint transportHint;
HashSet<Uri> cachedPublisherUris = new();
bool isPaused;
public string Topic => topicInfo.Topic;
public string TopicType => topicInfo.Type;
public int NumConnections => receiversByUri.Count;
public int NumActiveConnections => receiversByUri.Count(pair => pair.Value.IsConnected);
public bool RequestNoDelay { get; }
public int TimeoutInMs { get; set; } = DefaultTimeoutInMs;
public bool IsPaused
{
get => isPaused;
set
{
isPaused = value;
foreach (var receiver in receiversByUri.Values)
{
receiver.IsPaused = value;
}
}
}
public ReceiverManager(RosSubscriber<TMessage> subscriber, RosClient client, TopicInfo topicInfo,
bool requestNoDelay, RosTransportHint transportHint)
{
this.subscriber = subscriber;
this.client = client;
this.topicInfo = topicInfo;
this.transportHint = transportHint;
RequestNoDelay = requestNoDelay;
}
internal void MessageCallback(in TMessage msg, IRosReceiver receiver)
{
subscriber.MessageCallback(msg, receiver);
}
public async ValueTask PublisherUpdateRpcAsync(IEnumerable<Uri> publisherUris, CancellationToken token)
{
bool numConnectionsHasChanged;
using (await mutex.LockAsync(token))
{
var newPublishers = new HashSet<Uri>(publisherUris);
cachedPublisherUris = newPublishers;
if (receiversByUri.Keys.Any(key => !newPublishers.Contains(key)))
{
var toDelete = receiversByUri
.Where(pair => !newPublishers.Contains(pair.Key))
.Select(pair => pair.Value)
.ToList();
await toDelete.Select(receiver => receiver.DisposeAsync(token).AsTask())
.WhenAll()
.AwaitNoThrow(this);
}
if (newPublishers.Any(uri => !receiversByUri.ContainsKey(uri)))
{
var toAdd = newPublishers.Where(uri => !receiversByUri.ContainsKey(uri));
foreach (Uri remoteUri in toAdd)
{
var udpTopicRequest = transportHint != RosTransportHint.OnlyTcp
? UdpReceiver.CreateRequest(client.CallerUri.Host, remoteUri.Host, topicInfo)
: null;
if (connectorsByUri.TryGetValue(remoteUri, out var oldConnector))
{
if (oldConnector.IsAlive)
{
continue;
}
await oldConnector.DisposeAsync(token);
}
var rosNodeClient = client.CreateNodeClient(remoteUri);
Logger.LogDebugFormat("{0}: Adding connector for '{1}'", this, remoteUri);
var receiverConnector = new ReceiverConnector(rosNodeClient, topicInfo.Topic, transportHint,
udpTopicRequest, OnConnectionSucceeded);
connectorsByUri[remoteUri] = receiverConnector;
}
}
numConnectionsHasChanged = await CleanupAsync(token);
}
if (numConnectionsHasChanged)
{
subscriber.RaiseNumPublishersChanged();
}
}
void OnConnectionSucceeded(ReceiverConnector connector, ReceiverConnectorResponse response)
{
IProtocolReceiver receiver;
var (tcpEndpoint, udpResponse, udpClient) = response;
if (tcpEndpoint is { } endPoint)
{
udpClient?.Dispose();
receiver =
new TcpReceiver<TMessage>(this, connector.RemoteUri, endPoint, topicInfo, RequestNoDelay, TimeoutInMs)
{ IsPaused = IsPaused };
}
else if (udpResponse != null && udpClient != null)
{
receiver = new UdpReceiver<TMessage>(this, udpResponse, udpClient, connector.RemoteUri, topicInfo);
}
else
{
Logger.LogErrorFormat("{0}: Internal error, neither TCP nor UDP was set when creating connection", this);
return;
}
receiversByUri[receiver.RemoteUri] = receiver;
}
public async void RetryConnection(Uri remoteUri)
{
var token = subscriber.CancellationToken;
try
{
using (await mutex.LockAsync(token))
{
if (connectorsByUri.TryGetValue(remoteUri, out var existingConnector))
{
await existingConnector.DisposeAsync(token);
}
token.ThrowIfCancellationRequested();
var udpTopicRequest = transportHint != RosTransportHint.OnlyTcp
? UdpReceiver.CreateRequest(client.CallerUri.Host, remoteUri.Host, topicInfo)
: null;
var rosNodeClient = client.CreateNodeClient(remoteUri);
var receiverConnector = new ReceiverConnector(rosNodeClient, topicInfo.Topic, transportHint,
udpTopicRequest, OnConnectionSucceeded);
connectorsByUri[remoteUri] = receiverConnector;
Logger.LogDebugFormat("{0}: Adding connector for '{1}' (retry!)", this, remoteUri);
}
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
Logger.LogDebugFormat("{0}: Failed to retry connection for uri {1}: {2}", this, remoteUri, e);
}
}
public bool TryGetLoopbackReceiver(Endpoint endPoint, out ILoopbackReceiver<TMessage>? receiver)
{
IProtocolReceiver? newReceiver =
receiversByUri.FirstOrDefault(pair => endPoint.Equals(pair.Value.Endpoint)).Value;
receiver = newReceiver as ILoopbackReceiver<TMessage>;
return receiver != null;
}
async ValueTask<bool> CleanupAsync(CancellationToken token)
{
var receiversToDelete = receiversByUri.Values.Where(receiver => !receiver.IsAlive).ToList();
var deleteTasks = receiversToDelete.Select(receiver =>
{
receiversByUri.TryRemove(receiver.RemoteUri, out _);
Logger.LogDebugFormat("{0}: Removing connection with uri '{1}' - dead x_x: {2}", this,
receiver.RemoteUri, receiver.ErrorDescription);
return receiver.DisposeAsync(token).AsTask();
});
await deleteTasks.WhenAll().AwaitNoThrow(this);
var connectorsToDelete = connectorsByUri.Values.Where(connector => !connector.IsAlive).ToList();
var connectorTasks = connectorsToDelete.Select(connector =>
{
connectorsByUri.TryRemove(connector.RemoteUri, out _);
Logger.LogDebugFormat("{0}: Removing connector with uri '{1}' - dead x_x: {2}", this,
connector.RemoteUri, connector.ErrorDescription);
return connector.DisposeAsync(token).AsTask();
});
await connectorTasks.WhenAll().AwaitNoThrow(this);
return receiversToDelete.Count != 0 || connectorsToDelete.Count != 0;
}
public void Stop()
{
TaskUtils.Run(() => StopAsync(default).AsTask()).WaitNoThrow(this);
}
public async ValueTask StopAsync(CancellationToken token)
{
var receivers = receiversByUri.Values.ToArray();
receiversByUri.Clear();
var connectors = connectorsByUri.Values.ToArray();
connectorsByUri.Clear();
await receivers.Select(receiver => receiver.DisposeAsync(token).AsTask()).WhenAll().AwaitNoThrow(this);
await connectors.Select(connector => connector.DisposeAsync(token).AsTask()).WhenAll().AwaitNoThrow(this);
}
public SubscriberReceiverState[] GetStates()
{
var publisherUris = cachedPublisherUris;
var receivers = new Dictionary<Uri, IProtocolReceiver>(receiversByUri);
var connectors = new Dictionary<Uri, ReceiverConnector>(connectorsByUri);
SubscriberReceiverState[] states = new SubscriberReceiverState[publisherUris.Count];
int receiverIndex = 0;
void Add(SubscriberReceiverState state) => states[receiverIndex++] = state;
foreach (Uri uri in publisherUris)
{
if (receivers.TryGetValue(uri, out var receiver))
{
if (receiver.IsAlive)
{
Add(receiver.State);
continue;
}
if (connectors.TryGetValue(uri, out var connector) && connector.IsAlive)
{
Add(connector.State);
continue;
}
Add(receiver.State);
continue;
}
else
{
if (connectors.TryGetValue(uri, out var connector))
{
Add(connector.State);
continue;
}
}
Add(new UninitializedReceiverState(uri));
}
return states;
}
public override string ToString() => $"[{nameof(ReceiverManager<TMessage>)} '{Topic}']";
}