2
2
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
3
4
4
using System ;
5
+ using System . Collections . Generic ;
6
+ using System . Diagnostics ;
5
7
using System . Runtime . InteropServices ;
8
+ using System . Threading ;
9
+
10
+ using size_t = System . IntPtr ;
11
+ using libc = Interop . libc ;
12
+ using libcurl = Interop . libcurl ;
13
+ using PollFlags = Interop . libc . PollFlags ;
6
14
7
15
internal static partial class Interop
8
16
{
@@ -30,16 +38,48 @@ public static void DisposeAndClearHandle(ref SafeCurlHandle curlHandle)
30
38
31
39
protected override bool ReleaseHandle ( )
32
40
{
33
- Interop . libcurl . curl_easy_cleanup ( this . handle ) ;
41
+ libcurl . curl_easy_cleanup ( this . handle ) ;
34
42
return true ;
35
43
}
36
44
}
37
45
38
46
internal sealed class SafeCurlMultiHandle : SafeHandle
39
47
{
48
+ private bool _pollCancelled = true ;
49
+ private readonly int [ ] _specialFds = new int [ 2 ] ;
50
+ private readonly HashSet < int > _fdSet = new HashSet < int > ( ) ;
51
+ private int _requestCount = 0 ;
52
+ private Timer _timer ;
53
+
54
+ internal bool PollCancelled
55
+ {
56
+ get { return _pollCancelled ; }
57
+ set { _pollCancelled = value ; }
58
+ }
59
+
60
+ internal int RequestCount
61
+ {
62
+ get { return _requestCount ; }
63
+ set { _requestCount = value ; }
64
+ }
65
+
66
+ internal Timer Timer
67
+ {
68
+ get { return _timer ; }
69
+ set { _timer = value ; }
70
+ }
71
+
72
+
40
73
public SafeCurlMultiHandle ( )
41
74
: base ( IntPtr . Zero , true )
42
75
{
76
+ unsafe
77
+ {
78
+ fixed( int * fds = _specialFds )
79
+ {
80
+ while ( Interop . CheckIo ( libc . pipe ( fds ) ) ) ;
81
+ }
82
+ }
43
83
}
44
84
45
85
public override bool IsInvalid
@@ -56,11 +96,169 @@ public static void DisposeAndClearHandle(ref SafeCurlMultiHandle curlHandle)
56
96
}
57
97
}
58
98
99
+ internal void PollFds ( List < libc . pollfd > readyFds )
100
+ {
101
+ int count ;
102
+ libc . pollfd [ ] pollFds ;
103
+
104
+ readyFds . Clear ( ) ;
105
+
106
+ lock ( this )
107
+ {
108
+ // TODO: Avoid the allocation when count is in 100s
109
+ count = _fdSet . Count + 1 ;
110
+ pollFds = new libc . pollfd [ count ] ;
111
+
112
+ // Always include special fd in the poll set. This is used to
113
+ // return from the poll in case any fds have been added or
114
+ // removed to the set of fds being polled. This prevents starvation
115
+ // in case current set of fds have no activity but the new fd
116
+ // is ready for a read/write. The special fd is the read end of a pipe
117
+ // Whenever an fd is added/removed in _fdSet, a write happens to the
118
+ // write end of the pipe thus causing the poll to return.
119
+ pollFds [ 0 ] . fd = _specialFds [ libc . ReadEndOfPipe ] ;
120
+ pollFds [ 0 ] . events = PollFlags . POLLIN | PollFlags . POLLOUT ;
121
+ int i = 1 ;
122
+ foreach ( int fd in _fdSet )
123
+ {
124
+ pollFds [ i ] . fd = fd ;
125
+ pollFds [ i ] . events = PollFlags . POLLIN | PollFlags . POLLOUT ;
126
+ i ++ ;
127
+ }
128
+ }
129
+
130
+ unsafe
131
+ {
132
+ fixed ( libc . pollfd * fds = pollFds )
133
+ {
134
+ int numFds = libc . poll ( fds , ( uint ) count , - 1 ) ;
135
+ if ( numFds <= 0 )
136
+ {
137
+ Debug . Assert ( numFds != 0 ) ; // Since timeout is infinite
138
+
139
+ // TODO: How to handle errors?
140
+ throw new InvalidOperationException ( "Poll failure: " + Marshal . GetLastWin32Error ( ) ) ;
141
+ }
142
+
143
+ lock ( this )
144
+ {
145
+ if ( 0 == _requestCount )
146
+ {
147
+ return ;
148
+ }
149
+ }
150
+
151
+ // Check for any fdset changes
152
+ if ( fds [ 0 ] . revents != 0 )
153
+ {
154
+ if ( ReadSpecialFd ( fds [ 0 ] . revents ) < 0 )
155
+ {
156
+ // TODO: How to handle errors?
157
+ throw new InvalidOperationException ( "Cannot read data: " + Marshal . GetLastWin32Error ( ) ) ;
158
+ }
159
+ numFds -- ;
160
+ }
161
+
162
+ // Now check for events on the remaining fds
163
+ for ( int i = 1 ; i < count && numFds > 0 ; i ++ )
164
+ {
165
+ if ( fds [ i ] . revents == 0 )
166
+ {
167
+ continue ;
168
+ }
169
+ readyFds . Add ( fds [ i ] ) ;
170
+ numFds -- ;
171
+ }
172
+ }
173
+ }
174
+ }
175
+
176
+ internal void SignalFdSetChange ( int fd , bool isRemove )
177
+ {
178
+ Debug . Assert ( Monitor . IsEntered ( this ) ) ;
179
+
180
+ bool changed = isRemove ? _fdSet . Remove ( fd ) : _fdSet . Add ( fd ) ;
181
+ if ( ! changed )
182
+ {
183
+ return ;
184
+ }
185
+
186
+ unsafe
187
+ {
188
+ // Write to special fd
189
+ byte * dummyBytes = stackalloc byte [ 1 ] ;
190
+ if ( ( int ) libc . write ( _specialFds [ libc . WriteEndOfPipe ] , dummyBytes , ( size_t ) 1 ) <= 0 )
191
+ {
192
+ // TODO: How to handle errors?
193
+ throw new InvalidOperationException ( "Cannot write data: " + Marshal . GetLastWin32Error ( ) ) ;
194
+ }
195
+ }
196
+ }
197
+
198
+ protected override void Dispose ( bool disposing )
199
+ {
200
+ if ( disposing )
201
+ {
202
+ if ( null != _timer )
203
+ {
204
+ _timer . Dispose ( ) ;
205
+ }
206
+ }
207
+ base . Dispose ( disposing ) ;
208
+ }
209
+
59
210
protected override bool ReleaseHandle ( )
60
211
{
61
- Interop . libcurl . curl_multi_cleanup ( this . handle ) ;
212
+ Debug . Assert ( 0 == _fdSet . Count ) ;
213
+ Debug . Assert ( 0 == _requestCount ) ;
214
+ Debug . Assert ( _pollCancelled ) ;
215
+
216
+ libc . close ( _specialFds [ libc . ReadEndOfPipe ] ) ;
217
+ libc . close ( _specialFds [ libc . WriteEndOfPipe ] ) ;
218
+ libcurl . curl_multi_cleanup ( this . handle ) ;
219
+
62
220
return true ;
63
221
}
222
+
223
+ private int ReadSpecialFd ( PollFlags revents )
224
+ {
225
+ PollFlags badEvents = PollFlags . POLLERR | PollFlags . POLLHUP | PollFlags . POLLNVAL ;
226
+ if ( ( revents & badEvents ) != 0 )
227
+ {
228
+ return - 1 ;
229
+ }
230
+ int pipeReadFd = _specialFds [ libc . ReadEndOfPipe ] ;
231
+ int bytesRead = 0 ;
232
+ unsafe
233
+ {
234
+ do
235
+ {
236
+ // Read available data from the pipe
237
+ int bufferLength = 1024 ;
238
+ byte * dummyBytes = stackalloc byte [ bufferLength ] ;
239
+ int numBytes = ( int ) libc . read ( pipeReadFd , dummyBytes , ( size_t ) bufferLength ) ;
240
+ if ( numBytes <= 0 )
241
+ {
242
+ return - 1 ;
243
+ }
244
+ bytesRead += numBytes ;
245
+
246
+ // Check if more data is available
247
+ PollFlags outFlags ;
248
+ int retVal = libc . poll ( pipeReadFd , PollFlags . POLLIN , 0 , out outFlags ) ;
249
+ if ( retVal < 0 )
250
+ {
251
+ return - 1 ;
252
+ }
253
+ else if ( 0 == retVal )
254
+ {
255
+ break ;
256
+ }
257
+ }
258
+ while ( true ) ;
259
+ }
260
+ return bytesRead ;
261
+ }
64
262
}
65
263
66
264
internal sealed class SafeCurlSlistHandle : SafeHandle
@@ -91,7 +289,7 @@ public static void DisposeAndClearHandle(ref SafeCurlSlistHandle curlHandle)
91
289
92
290
protected override bool ReleaseHandle ( )
93
291
{
94
- Interop . libcurl . curl_slist_free_all ( this . handle ) ;
292
+ libcurl . curl_slist_free_all ( this . handle ) ;
95
293
return true ;
96
294
}
97
295
}
0 commit comments