1
1
using NetworkTables . Logging ;
2
2
using System ;
3
- using System . Collections . Generic ;
4
- using System . Linq ;
5
3
using System . Threading ;
6
4
using System . Threading . Tasks ;
7
- using NetworkTables . Extensions ;
8
- using NetworkTables . TcpSockets ;
9
5
using System . Net . Sockets ;
10
6
using System . IO ;
11
- using NetworkTables . Streams ;
12
7
using System . Text ;
13
8
using static NetworkTables . Logging . Logger ;
14
9
using System . Net ;
10
+ using Nito . AsyncEx ;
15
11
using Nito . AsyncEx . Synchronous ;
16
12
17
13
namespace NetworkTables
@@ -33,171 +29,218 @@ public static DsClient Instance
33
29
}
34
30
}
35
31
36
- public void Dispose ( )
32
+ //private IServerOverridable m_serverOverridable;
33
+
34
+ public DsClient ( /*IServerOverridable serverOverridable*/ )
37
35
{
38
- Stop ( ) ;
36
+ //m_serverOverridable = serverOverridable ;
39
37
}
40
38
41
- public void Start ( int port )
39
+ private int m_port ;
40
+
41
+ private Task m_task ;
42
+ private CancellationTokenSource m_tokenSource ;
43
+
44
+ public void Dispose ( )
42
45
{
43
- lock ( m_mutex )
44
- {
45
- m_port = port ;
46
- if ( m_task == null )
47
- {
48
- m_active = true ;
49
- m_task = Task . Factory . StartNew ( TaskMain , TaskCreationOptions . LongRunning ) ;
50
- }
51
- }
46
+ Stop ( ) ;
52
47
}
53
48
54
49
public void Stop ( )
55
50
{
56
- m_active = false ;
51
+ m_tokenSource ? . Cancel ( ) ;
57
52
m_task ? . WaitAndUnwrapException ( ) ;
58
53
m_task = null ;
54
+ m_tokenSource = null ;
59
55
}
60
56
61
- private DsClient ( )
57
+ public void Start ( int port )
62
58
{
63
-
59
+ Interlocked . Exchange ( ref m_port , port ) ;
60
+ if ( m_task == null )
61
+ {
62
+ if ( m_tokenSource == null || m_tokenSource . IsCancellationRequested )
63
+ {
64
+ m_tokenSource = new CancellationTokenSource ( ) ;
65
+ }
66
+ m_task = Task . Factory . StartNew ( ThreadMain , m_tokenSource . Token , TaskCreationOptions . LongRunning ) ;
67
+ }
64
68
}
65
69
66
- private int m_port ;
67
-
68
- private Task m_task ;
69
-
70
- private bool m_active ;
71
-
72
- private readonly object m_mutex = new object ( ) ;
73
- private readonly AutoResetEvent m_cond = new AutoResetEvent ( false ) ;
74
-
75
- NtTcpClient m_client ;
76
-
77
- private void TaskMain ( )
70
+ public void ThreadMain ( object token )
78
71
{
79
- uint oldIp = 0 ;
80
- Logger logger = new Logger ( ) ; // To silence log messages
81
- logger . SetLogger ( null ) ;
82
72
83
- while ( m_active )
73
+ if ( token is CancellationToken )
84
74
{
85
- int port ;
86
- bool lockEntered = false ;
87
75
try
88
76
{
89
- Monitor . Enter ( m_mutex , ref lockEntered ) ;
90
- m_cond . WaitTimeout ( m_mutex , ref lockEntered , TimeSpan . FromMilliseconds ( 500 ) , ( ) => ! m_active ) ;
91
- port = m_port ;
77
+ AsyncContext . Run ( async ( ) =>
78
+ {
79
+ await ThreadMainAsync ( ( CancellationToken ) token ) ;
80
+ } ) ;
92
81
}
93
- finally
82
+ catch ( OperationCanceledException )
94
83
{
95
- if ( lockEntered ) Monitor . Exit ( m_mutex ) ;
84
+ // Ignore operation cancelled
96
85
}
86
+ }
87
+ }
97
88
98
- if ( ! m_active ) goto done ;
99
-
100
- m_client = TcpConnector . Connect ( "127.0.0.1" , 1742 , logger , 1 ) ;
101
- if ( ! m_active ) goto done ;
102
- if ( m_client == null ) continue ;
103
-
104
- Logger . Debug3 ( Logger . Instance , "connected to DS" ) ;
105
- Stream stream = m_client . GetStream ( ) ;
106
- while ( m_active && stream . CanRead )
89
+ public async Task ThreadMainAsync ( CancellationToken token )
90
+ {
91
+ uint oldIp = 0 ;
92
+ try
93
+ {
94
+ while ( ! token . IsCancellationRequested )
107
95
{
108
- StringBuilder json = new StringBuilder ( 128 ) ;
109
- byte ch = 0 ;
110
- do
111
- {
112
- bool success = stream . ReceiveByte ( out ch ) ;
113
- if ( ! success ) break ;
114
- if ( ! m_active ) goto done ;
115
- } while ( ch != ( byte ) '{' ) ;
116
- json . Append ( '{' ) ;
117
-
118
- if ( ! stream . CanRead )
119
- {
120
- m_client = null ;
121
- break ;
122
- }
96
+ int port ;
123
97
124
- // Read characters until }
125
- do
98
+ using ( TcpClient client = new TcpClient ( ) )
126
99
{
127
- bool success = stream . ReceiveByte ( out ch ) ;
128
- if ( ! success ) break ;
129
- if ( ! m_active ) goto done ;
130
- json . Append ( ( char ) ch ) ;
131
- } while ( ch != ( byte ) '}' ) ;
100
+ Task connection = client . ConnectAsync ( "127.0.0.1" , 1742 ) ;
101
+ Task delayTask = Task . Delay ( 2000 , token ) ;
132
102
133
- if ( ! stream . CanRead )
134
- {
135
- m_client = null ;
136
- break ;
137
- }
103
+ try
104
+ {
105
+ var finished = await Task . WhenAny ( connection , delayTask ) ;
106
+ if ( finished == delayTask )
107
+ {
108
+ // Timed Out
109
+ continue ;
110
+ }
111
+ if ( ! finished . IsCompleted || finished . IsFaulted || finished . IsCanceled )
112
+ {
113
+ continue ;
114
+ }
115
+ }
138
116
139
- string jsonString = json . ToString ( ) ;
140
- Debug3 ( Logger . Instance , $ "json={ jsonString } ") ;
141
-
142
- // Look for "robotIP":12345, and get 12345 portion
143
- int pos = jsonString . IndexOf ( "\" robotIP\" " ) ;
144
- if ( pos < 0 ) continue ; // could not find?
145
- pos += 9 ;
146
- pos = jsonString . IndexOf ( ':' , pos ) ;
147
- if ( pos < 0 ) continue ; // could not find?
148
- // Find first not of
149
- int endpos = - 1 ;
150
- for ( int i = pos + 1 ; i < jsonString . Length ; i ++ )
151
- {
152
- if ( jsonString [ i ] < '0' || jsonString [ i ] > '9' )
117
+ catch ( OperationCanceledException )
153
118
{
154
- endpos = i ;
155
- break ;
119
+ goto done ;
156
120
}
157
- }
158
- string ipString = jsonString . Substring ( pos + 1 , endpos - ( pos + 1 ) ) ;
159
- Debug3 ( Logger . Instance , $ "found robotIP={ ipString } ") ;
160
121
161
- // Parse into number
162
- uint ip ;
163
- if ( ! uint . TryParse ( ipString , out ip ) ) continue ;
122
+ if ( token . IsCancellationRequested )
123
+ {
124
+ goto done ;
125
+ }
164
126
165
- if ( BitConverter . IsLittleEndian )
166
- {
167
- ip = ( uint ) IPAddress . NetworkToHostOrder ( ( int ) ip ) ;
168
- }
127
+ Logger . Debug3 ( Logger . Instance , "Connected to DS" ) ;
128
+ Stream stream = client . GetStream ( ) ;
169
129
170
- // If 0 clear the override
171
- if ( ip == 0 )
172
- {
173
- Dispatcher . Instance . ClearServerOverride ( ) ;
174
- oldIp = 0 ;
175
- continue ;
130
+ while ( ! token . IsCancellationRequested && stream . CanRead )
131
+ {
132
+ StringBuilder json = new StringBuilder ( ) ;
133
+ int chars = 0 ;
134
+ byte [ ] ch = new byte [ 1 ] ;
135
+ do
136
+ {
137
+ chars = 0 ;
138
+ try
139
+ {
140
+ chars = await stream . ReadAsync ( ch , 0 , 1 , token ) ;
141
+ }
142
+ catch ( OperationCanceledException )
143
+ {
144
+ goto done ;
145
+ }
146
+ if ( chars != 1 ) break ;
147
+ if ( token . IsCancellationRequested ) goto done ;
148
+ } while ( ch [ 0 ] != ( byte ) '{' ) ;
149
+ json . Append ( '{' ) ;
150
+
151
+ if ( ! stream . CanRead || ! client . Connected || chars != 1 )
152
+ {
153
+ break ;
154
+ }
155
+
156
+ do
157
+ {
158
+ chars = 0 ;
159
+ try
160
+ {
161
+ chars = await stream . ReadAsync ( ch , 0 , 1 , token ) ;
162
+ if ( chars != 1 ) break ;
163
+ if ( token . IsCancellationRequested ) goto done ;
164
+ json . Append ( ( char ) ch [ 0 ] ) ;
165
+ }
166
+ catch ( OperationCanceledException )
167
+ {
168
+ goto done ;
169
+ }
170
+ } while ( ch [ 0 ] != ( byte ) '}' ) ;
171
+
172
+ if ( ! stream . CanRead || ! client . Connected || chars != 1 )
173
+ {
174
+ break ;
175
+ }
176
+
177
+ string jsonString = json . ToString ( ) ;
178
+ Debug3 ( Logger . Instance , $ "json={ jsonString } ") ;
179
+
180
+ // Look for "robotIP":12345, and get 12345 portion
181
+ int pos = jsonString . IndexOf ( "\" robotIP\" " ) ;
182
+ if ( pos < 0 ) continue ; // could not find?
183
+ pos += 9 ;
184
+ pos = jsonString . IndexOf ( ':' , pos ) ;
185
+ if ( pos < 0 ) continue ; // could not find?
186
+ // Find first not of
187
+ int endpos = - 1 ;
188
+ for ( int i = pos + 1 ; i < jsonString . Length ; i ++ )
189
+ {
190
+ if ( jsonString [ i ] < '0' || jsonString [ i ] > '9' )
191
+ {
192
+ endpos = i ;
193
+ break ;
194
+ }
195
+ }
196
+ string ipString = jsonString . Substring ( pos + 1 , endpos - ( pos + 1 ) ) ;
197
+ Debug3 ( Logger . Instance , $ "found robotIP={ ipString } ") ;
198
+
199
+ // Parse into number
200
+ uint ip ;
201
+ if ( ! uint . TryParse ( ipString , out ip ) ) continue ;
202
+
203
+ if ( BitConverter . IsLittleEndian )
204
+ {
205
+ ip = ( uint ) IPAddress . NetworkToHostOrder ( ( int ) ip ) ;
206
+ }
207
+
208
+ if ( ip == 0 )
209
+ {
210
+ //m_serverOverridable.ClearServerOverride();
211
+ Dispatcher . Instance . ClearServerOverride ( ) ;
212
+ oldIp = 0 ;
213
+ continue ;
214
+ }
215
+
216
+ if ( ip == oldIp ) continue ;
217
+ oldIp = ip ;
218
+
219
+ json . Clear ( ) ;
220
+
221
+ IPAddress address = new IPAddress ( oldIp ) ;
222
+ Info ( Logger . Instance , $ "client: DS overriding server IP to { address . ToString ( ) } ") ;
223
+ port = Interlocked . CompareExchange ( ref m_port , 0 , 0 ) ;
224
+ //m_serverOverridable.SetServerOverride(address, port);
225
+ Dispatcher . Instance . SetServerOverride ( address , port ) ;
226
+ }
176
227
}
177
228
178
- // If unchanged, don't reconnect
179
- if ( ip == oldIp ) continue ;
180
- oldIp = ip ;
181
-
182
- json . Clear ( ) ;
183
-
184
- IPAddress address = new IPAddress ( oldIp ) ;
185
- Info ( Logger . Instance , $ "client: DS overriding server IP to { address . ToString ( ) } ") ;
186
- Dispatcher . Instance . SetServerOverride ( address . ToString ( ) , port ) ;
187
-
188
-
229
+ Dispatcher . Instance . ClearServerOverride ( ) ;
230
+ //m_serverOverridable.ClearServerOverride();
231
+ oldIp = 0 ;
189
232
}
190
-
191
- Dispatcher . Instance . ClearServerOverride ( ) ;
192
- oldIp = 0 ;
193
-
194
233
}
195
-
196
- done :
234
+ catch ( ObjectDisposedException )
235
+ {
236
+ }
237
+ catch ( NullReferenceException )
197
238
{
198
- Dispatcher . Instance . ClearServerOverride ( ) ;
199
239
}
200
240
241
+ done :
242
+ Dispatcher . Instance . ClearServerOverride ( ) ;
243
+ //m_serverOverridable.ClearServerOverride();
201
244
}
202
245
}
203
246
}
0 commit comments