1
1
using System ;
2
- using System . Collections . Concurrent ;
3
- using System . Collections . Generic ;
4
2
using System . IO ;
5
3
using System . Linq ;
6
- using System . Text ;
7
4
using System . Threading ;
8
5
using System . Threading . Tasks ;
9
- using JsonRpc . Server ;
10
6
using JsonRpc . Server . Messages ;
11
7
using Newtonsoft . Json . Linq ;
12
8
13
9
namespace JsonRpc
14
10
{
15
11
public class InputHandler : IInputHandler
16
12
{
17
- private readonly TimeSpan _sleepTime = TimeSpan . FromMilliseconds ( 50 ) ;
18
13
public const char CR = '\r ' ;
19
14
public const char LF = '\n ' ;
20
15
public static char [ ] CRLF = { CR , LF } ;
@@ -28,8 +23,7 @@ public class InputHandler : IInputHandler
28
23
private Thread _inputThread ;
29
24
private readonly IRequestRouter _requestRouter ;
30
25
private readonly IResponseRouter _responseRouter ;
31
- private readonly ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > _queue ;
32
- private Thread _queueThread ;
26
+ private readonly IScheduler _scheduler ;
33
27
34
28
public InputHandler (
35
29
Stream input ,
@@ -47,31 +41,16 @@ IResponseRouter responseRouter
47
41
_requestProcessIdentifier = requestProcessIdentifier ;
48
42
_requestRouter = requestRouter ;
49
43
_responseRouter = responseRouter ;
50
- _queue = new ConcurrentQueue < ( RequestProcessType type , Func < Task > request ) > ( ) ;
51
44
52
- _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true } ;
53
-
54
- _queueThread = new Thread ( ProcessRequestQueue ) { IsBackground = true } ;
55
- }
56
-
57
- internal InputHandler (
58
- Stream input ,
59
- IOutputHandler outputHandler ,
60
- IReciever reciever ,
61
- IRequestProcessIdentifier requestProcessIdentifier ,
62
- IRequestRouter requestRouter ,
63
- IResponseRouter responseRouter ,
64
- TimeSpan sleepTime
65
- ) : this ( input , outputHandler , reciever , requestProcessIdentifier , requestRouter , responseRouter )
66
- {
67
- _sleepTime = sleepTime ;
68
- }
45
+ _scheduler = new ProcessScheduler ( ) ;
46
+ _inputThread = new Thread ( ProcessInputStream ) { IsBackground = true , Name = "ProcessInputStream" } ;
47
+ }
69
48
70
49
public void Start ( )
71
50
{
72
51
_outputHandler . Start ( ) ;
73
52
_inputThread . Start ( ) ;
74
- _queueThread . Start ( ) ;
53
+ _scheduler . Start ( ) ;
75
54
}
76
55
77
56
private async void ProcessInputStream ( )
@@ -85,33 +64,49 @@ private async void ProcessInputStream()
85
64
86
65
var buffer = new byte [ 300 ] ;
87
66
var current = await _input . ReadAsync ( buffer , 0 , MinBuffer ) ;
67
+ if ( current == 0 ) return ; // no more _input
88
68
while ( current < MinBuffer ||
89
69
buffer [ current - 4 ] != CR || buffer [ current - 3 ] != LF ||
90
70
buffer [ current - 2 ] != CR || buffer [ current - 1 ] != LF )
91
71
{
92
- current += await _input . ReadAsync ( buffer , current , 1 ) ;
72
+ var n = await _input . ReadAsync ( buffer , current , 1 ) ;
73
+ if ( n == 0 ) return ; // no more _input, mitigates endless loop here.
74
+ current += n ;
93
75
}
94
76
95
77
var headersContent = System . Text . Encoding . ASCII . GetString ( buffer , 0 , current ) ;
96
78
var headers = headersContent . Split ( HeaderKeys , StringSplitOptions . RemoveEmptyEntries ) ;
97
79
long length = 0 ;
98
- for ( var i = 0 ; i < headers . Length ; i += 2 )
80
+ for ( var i = 1 ; i < headers . Length ; i += 2 )
99
81
{
100
- var header = headers [ 0 ] ;
101
- var value = headers [ i + 1 ] . Trim ( ) ;
82
+ // starting at i = 1 instead of 0 won't throw, if we have uneven headers' length
83
+ var header = headers [ i - 1 ] ;
84
+ var value = headers [ i ] . Trim ( ) ;
102
85
if ( header . Equals ( "Content-Length" , StringComparison . OrdinalIgnoreCase ) )
103
86
{
104
- length = long . Parse ( value ) ;
87
+ length = 0 ;
88
+ long . TryParse ( value , out length ) ;
105
89
}
106
90
}
107
91
108
- var requestBuffer = new byte [ length ] ;
109
-
110
- await _input . ReadAsync ( requestBuffer , 0 , requestBuffer . Length ) ;
111
-
112
- var payload = System . Text . Encoding . UTF8 . GetString ( requestBuffer ) ;
113
-
114
- HandleRequest ( payload ) ;
92
+ if ( length == 0 || length >= int . MaxValue )
93
+ {
94
+ HandleRequest ( string . Empty ) ;
95
+ }
96
+ else
97
+ {
98
+ var requestBuffer = new byte [ length ] ;
99
+ var received = 0 ;
100
+ while ( received < length )
101
+ {
102
+ var n = await _input . ReadAsync ( requestBuffer , received , requestBuffer . Length - received ) ;
103
+ if ( n == 0 ) return ; // no more _input
104
+ received += n ;
105
+ }
106
+ // TODO sometimes: encoding should be based on the respective header (including the wrong "utf8" value)
107
+ var payload = System . Text . Encoding . UTF8 . GetString ( requestBuffer ) ;
108
+ HandleRequest ( payload ) ;
109
+ }
115
110
}
116
111
}
117
112
@@ -162,24 +157,23 @@ private void HandleRequest(string request)
162
157
{
163
158
if ( item . IsRequest )
164
159
{
165
- _queue . Enqueue ( (
160
+ _scheduler . Add (
166
161
type ,
167
162
async ( ) => {
168
163
var result = await _requestRouter . RouteRequest ( item . Request ) ;
169
-
170
164
_outputHandler . Send ( result . Value ) ;
171
165
}
172
- ) ) ;
166
+ ) ;
173
167
}
174
168
else if ( item . IsNotification )
175
169
{
176
- _queue . Enqueue ( (
170
+ _scheduler . Add (
177
171
type ,
178
172
( ) => {
179
173
_requestRouter . RouteNotification ( item . Notification ) ;
180
174
return Task . CompletedTask ;
181
175
}
182
- ) ) ;
176
+ ) ;
183
177
}
184
178
else if ( item . IsError )
185
179
{
@@ -189,42 +183,12 @@ private void HandleRequest(string request)
189
183
}
190
184
}
191
185
192
- private bool IsNextSerial ( )
193
- {
194
- return _queue . TryPeek ( out var queueResult ) && queueResult . type == RequestProcessType . Serial ;
195
- }
196
-
197
- private async void ProcessRequestQueue ( )
198
- {
199
- while ( true )
200
- {
201
- if ( _queueThread == null ) return ;
202
- var items = new List < Func < Task > > ( ) ;
203
- while ( ! _queue . IsEmpty )
204
- {
205
- if ( IsNextSerial ( ) && items . Count > 0 )
206
- {
207
- break ;
208
- }
209
-
210
- if ( _queue . TryDequeue ( out var queueResult ) )
211
- items . Add ( queueResult . request ) ;
212
- }
213
-
214
- await Task . WhenAll ( items . Select ( x => x ( ) ) ) ;
215
-
216
- if ( _queue . IsEmpty )
217
- {
218
- await Task . Delay ( _sleepTime ) ;
219
- }
220
- }
221
- }
222
186
223
187
public void Dispose ( )
224
188
{
225
189
_outputHandler . Dispose ( ) ;
226
190
_inputThread = null ;
227
- _queueThread = null ;
191
+ _scheduler . Dispose ( ) ;
228
192
}
229
193
}
230
194
}
0 commit comments