@@ -45,6 +45,25 @@ pub(crate) struct Overlapped {
4545 pub bytes_transferred : u32 ,
4646}
4747
48+ impl Default for Overlapped {
49+ fn default ( ) -> Self {
50+ unsafe { std:: mem:: zeroed ( ) }
51+ }
52+ }
53+
54+ impl From < & Overlapped > for Overlapped {
55+ fn from ( val : & Overlapped ) -> Self {
56+ Self {
57+ base : val. base . clone ( ) ,
58+ from_fd : val. from_fd ,
59+ socket : val. socket ,
60+ token : val. token ,
61+ syscall : val. syscall ,
62+ bytes_transferred : val. bytes_transferred ,
63+ }
64+ }
65+ }
66+
4867impl_display_by_debug ! ( Overlapped ) ;
4968
5069#[ repr( C ) ]
@@ -54,11 +73,10 @@ pub(crate) struct Operator<'o> {
5473 iocp : HANDLE ,
5574 entering : AtomicBool ,
5675 handles : DashSet < HANDLE > ,
57- context : DashMap < ( u32 , u32 ) , Overlapped > ,
58- phantom_data : PhantomData < & ' o HANDLE > ,
76+ phantom_data : PhantomData < & ' o Overlapped > ,
5977}
6078
61- impl Operator < ' _ > {
79+ impl < ' o > Operator < ' o > {
6280 pub ( crate ) fn new ( cpu : usize ) -> std:: io:: Result < Self > {
6381 let iocp =
6482 unsafe { CreateIoCompletionPort ( INVALID_HANDLE_VALUE , std:: ptr:: null_mut ( ) , 0 , 0 ) } ;
@@ -70,7 +88,6 @@ impl Operator<'_> {
7088 iocp,
7189 entering : AtomicBool :: new ( false ) ,
7290 handles : DashSet :: default ( ) ,
73- context : DashMap :: default ( ) ,
7491 phantom_data : PhantomData ,
7592 } )
7693 }
@@ -153,16 +170,11 @@ impl Operator<'_> {
153170 }
154171 unsafe { entries. set_len ( recv_count as _ ) } ;
155172 for entry in entries {
156- let parts = unsafe {
157- let union = & ( * entry. lpOverlapped ) . Anonymous . Anonymous ;
158- ( union. Offset , union. OffsetHigh )
159- } ;
160- eprintln ! ( "IOCP got parts:{parts:?}" ) ;
161- if let Some ( ( _, mut overlapped) ) = self . context . remove ( & parts) {
162- overlapped. bytes_transferred = entry. dwNumberOfBytesTransferred ;
163- eprintln ! ( "IOCP got Overlapped:{overlapped}" ) ;
164- cq. push ( overlapped) ;
165- }
173+ let mut overlapped =
174+ Overlapped :: from ( unsafe { & * entry. lpOverlapped . cast :: < Overlapped > ( ) } ) ;
175+ overlapped. bytes_transferred = entry. dwNumberOfBytesTransferred ;
176+ eprintln ! ( "IOCP got Overlapped:{overlapped}" ) ;
177+ cq. push ( overlapped) ;
166178 }
167179 if cq. len ( ) >= want {
168180 break ;
@@ -201,13 +213,7 @@ impl Operator<'_> {
201213 . saturating_add ( 16 )
202214 . try_into ( )
203215 . expect ( "size overflow" ) ;
204- let mut overlapped: Overlapped = std:: mem:: zeroed ( ) ;
205- let parts = (
206- ( user_data as u64 & 0xFFFF_FFFF ) as u32 ,
207- ( user_data as u64 >> 32 ) as u32 ,
208- ) ;
209- overlapped. base . Anonymous . Anonymous . Offset = parts. 0 ;
210- overlapped. base . Anonymous . Anonymous . OffsetHigh = parts. 1 ;
216+ let overlapped: & ' o mut Overlapped = Box :: leak ( Box :: default ( ) ) ;
211217 overlapped. from_fd = fd;
212218 overlapped. socket = socket;
213219 overlapped. token = user_data;
@@ -221,18 +227,14 @@ impl Operator<'_> {
221227 size,
222228 size,
223229 std:: ptr:: null_mut ( ) ,
224- std:: ptr:: from_mut ( & mut overlapped) . cast ( ) ,
230+ std:: ptr:: from_mut ( overlapped) . cast ( ) ,
225231 ) == FALSE
226232 {
227233 if WSA_IO_PENDING == WSAGetLastError ( ) {
228234 break ;
229235 }
230236 }
231- eprintln ! ( "add accept operation parts:{parts:?} Overlapped:{overlapped}" ) ;
232- assert ! (
233- self . context. insert( parts, overlapped) . is_none( ) ,
234- "The previous token was not retrieved in a timely manner"
235- ) ;
237+ eprintln ! ( "add accept operation Overlapped:{overlapped}" ) ;
236238 }
237239 Ok ( ( ) )
238240 }
@@ -247,13 +249,7 @@ impl Operator<'_> {
247249 ) -> std:: io:: Result < ( ) > {
248250 self . add_handle ( fd as HANDLE ) ?;
249251 unsafe {
250- let mut overlapped: Overlapped = std:: mem:: zeroed ( ) ;
251- let parts = (
252- ( user_data as u64 & 0xFFFF_FFFF ) as u32 ,
253- ( user_data as u64 >> 32 ) as u32 ,
254- ) ;
255- overlapped. base . Anonymous . Anonymous . Offset = parts. 0 ;
256- overlapped. base . Anonymous . Anonymous . OffsetHigh = parts. 1 ;
252+ let overlapped: & ' o mut Overlapped = Box :: leak ( Box :: default ( ) ) ;
257253 overlapped. from_fd = fd;
258254 overlapped. token = user_data;
259255 overlapped. syscall = Syscall :: recv;
@@ -267,7 +263,7 @@ impl Operator<'_> {
267263 buf. len ( ) . try_into ( ) . expect ( "len overflow" ) ,
268264 std:: ptr:: null_mut ( ) ,
269265 & mut u32:: try_from ( flags) . expect ( "overflow" ) ,
270- std:: ptr:: from_mut ( & mut overlapped) . cast ( ) ,
266+ std:: ptr:: from_mut ( overlapped) . cast ( ) ,
271267 None ,
272268 ) == SOCKET_ERROR
273269 && WSA_IO_PENDING != WSAGetLastError ( )
@@ -277,11 +273,7 @@ impl Operator<'_> {
277273 "add recv operation failed" ,
278274 ) ) ;
279275 }
280- eprintln ! ( "add recv operation parts:{parts:?} Overlapped:{overlapped}" ) ;
281- assert ! (
282- self . context. insert( parts, overlapped) . is_none( ) ,
283- "The previous token was not retrieved in a timely manner"
284- ) ;
276+ eprintln ! ( "add recv operation Overlapped:{overlapped}" ) ;
285277 }
286278 Ok ( ( ) )
287279 }
@@ -303,13 +295,7 @@ impl Operator<'_> {
303295 ) ;
304296 self . add_handle ( fd as HANDLE ) ?;
305297 unsafe {
306- let mut overlapped: Overlapped = std:: mem:: zeroed ( ) ;
307- let parts = (
308- ( user_data as u64 & 0xFFFF_FFFF ) as u32 ,
309- ( user_data as u64 >> 32 ) as u32 ,
310- ) ;
311- overlapped. base . Anonymous . Anonymous . Offset = parts. 0 ;
312- overlapped. base . Anonymous . Anonymous . OffsetHigh = parts. 1 ;
298+ let overlapped: & ' o mut Overlapped = Box :: leak ( Box :: default ( ) ) ;
313299 overlapped. from_fd = fd;
314300 overlapped. token = user_data;
315301 overlapped. syscall = Syscall :: WSARecv ;
@@ -319,7 +305,7 @@ impl Operator<'_> {
319305 dwbuffercount,
320306 lpnumberofbytesrecvd,
321307 lpflags,
322- std:: ptr:: from_mut ( & mut overlapped) . cast ( ) ,
308+ std:: ptr:: from_mut ( overlapped) . cast ( ) ,
323309 lpcompletionroutine,
324310 ) == SOCKET_ERROR
325311 && WSA_IO_PENDING != WSAGetLastError ( )
@@ -329,11 +315,7 @@ impl Operator<'_> {
329315 "add WSARecv operation failed" ,
330316 ) ) ;
331317 }
332- eprintln ! ( "add WSARecv operation parts:{parts:?} Overlapped:{overlapped}" ) ;
333- assert ! (
334- self . context. insert( parts, overlapped) . is_none( ) ,
335- "The previous token was not retrieved in a timely manner"
336- ) ;
318+ eprintln ! ( "add WSARecv operation Overlapped:{overlapped}" ) ;
337319 }
338320 Ok ( ( ) )
339321 }
@@ -348,13 +330,7 @@ impl Operator<'_> {
348330 ) -> std:: io:: Result < ( ) > {
349331 self . add_handle ( fd as HANDLE ) ?;
350332 unsafe {
351- let mut overlapped: Overlapped = std:: mem:: zeroed ( ) ;
352- let parts = (
353- ( user_data as u64 & 0xFFFF_FFFF ) as u32 ,
354- ( user_data as u64 >> 32 ) as u32 ,
355- ) ;
356- overlapped. base . Anonymous . Anonymous . Offset = parts. 0 ;
357- overlapped. base . Anonymous . Anonymous . OffsetHigh = parts. 1 ;
333+ let overlapped: & ' o mut Overlapped = Box :: leak ( Box :: default ( ) ) ;
358334 overlapped. from_fd = fd;
359335 overlapped. token = user_data;
360336 overlapped. syscall = Syscall :: send;
@@ -368,7 +344,7 @@ impl Operator<'_> {
368344 buf. len ( ) . try_into ( ) . expect ( "len overflow" ) ,
369345 std:: ptr:: null_mut ( ) ,
370346 u32:: try_from ( flags) . expect ( "overflow" ) ,
371- std:: ptr:: from_mut ( & mut overlapped) . cast ( ) ,
347+ std:: ptr:: from_mut ( overlapped) . cast ( ) ,
372348 None ,
373349 ) == SOCKET_ERROR
374350 && WSA_IO_PENDING != WSAGetLastError ( )
@@ -378,11 +354,7 @@ impl Operator<'_> {
378354 "add send operation failed" ,
379355 ) ) ;
380356 }
381- eprintln ! ( "add send operation parts:{parts:?} Overlapped:{overlapped}" ) ;
382- assert ! (
383- self . context. insert( parts, overlapped) . is_none( ) ,
384- "The previous token was not retrieved in a timely manner"
385- ) ;
357+ eprintln ! ( "add send operation Overlapped:{overlapped}" ) ;
386358 }
387359 Ok ( ( ) )
388360 }
@@ -404,13 +376,7 @@ impl Operator<'_> {
404376 ) ;
405377 self . add_handle ( fd as HANDLE ) ?;
406378 unsafe {
407- let mut overlapped: Overlapped = std:: mem:: zeroed ( ) ;
408- let parts = (
409- ( user_data as u64 & 0xFFFF_FFFF ) as u32 ,
410- ( user_data as u64 >> 32 ) as u32 ,
411- ) ;
412- overlapped. base . Anonymous . Anonymous . Offset = parts. 0 ;
413- overlapped. base . Anonymous . Anonymous . OffsetHigh = parts. 1 ;
379+ let overlapped: & ' o mut Overlapped = Box :: leak ( Box :: default ( ) ) ;
414380 overlapped. from_fd = fd;
415381 overlapped. token = user_data;
416382 overlapped. syscall = Syscall :: WSASend ;
@@ -420,7 +386,7 @@ impl Operator<'_> {
420386 dwbuffercount,
421387 lpnumberofbytesrecvd,
422388 dwflags,
423- std:: ptr:: from_mut ( & mut overlapped) . cast ( ) ,
389+ std:: ptr:: from_mut ( overlapped) . cast ( ) ,
424390 lpcompletionroutine,
425391 ) == SOCKET_ERROR
426392 && WSA_IO_PENDING != WSAGetLastError ( )
@@ -430,11 +396,7 @@ impl Operator<'_> {
430396 "add WSASend operation failed" ,
431397 ) ) ;
432398 }
433- eprintln ! ( "add WSASend operation parts:{parts:?} Overlapped:{overlapped}" ) ;
434- assert ! (
435- self . context. insert( parts, overlapped) . is_none( ) ,
436- "The previous token was not retrieved in a timely manner"
437- ) ;
399+ eprintln ! ( "add WSASend operation Overlapped:{overlapped}" ) ;
438400 }
439401 Ok ( ( ) )
440402 }
0 commit comments