11import pickle
2- import select
32import fcntl
43import os
54import struct
5+ from collections import defaultdict
66from contextlib import contextmanager
7+ from functools import partial
8+ from asyncio import new_event_loop
79from io import BytesIO
810
9- from madbg .utils import loop_in_thread , opposite_dict
11+ from .utils import run_thread , opposite_dict
1012
1113MESSAGE_LENGTH_FMT = 'I'
1214
@@ -28,41 +30,51 @@ def blocking_read(fd, n):
2830 return io .getvalue ()
2931
3032
31- def pipe_once (pipe_dict ):
32- # If read fails, write will fail. But if write fail, we might be still able to read.
33- # TODO: use wakeup fd instead of 0 timeout polling (os.pipe? eventfd?)
34- # TODO: can use splice or ebpf
35- if not pipe_dict :
36- return
37- reverse_pipe_dict = opposite_dict (pipe_dict )
38- for read_fd in select .select (list (pipe_dict ), [], [], 0 )[0 ]:
39- write_fd = pipe_dict [read_fd ]
33+ class Piping :
34+ def __init__ (self , pipe_dict ):
35+ self .buffers = defaultdict (bytes )
36+ self .loop = new_event_loop ()
37+ for src_fd , dest_fd in pipe_dict .items ():
38+ self .loop .add_reader (src_fd , partial (self ._read , src_fd , dest_fd ))
39+ self .loop .add_writer (dest_fd , partial (self ._write , dest_fd ))
40+ self .readers_to_writers = dict (pipe_dict )
41+ self .writers_to_readers = opposite_dict (pipe_dict )
42+
43+ def _remove_writer (self , writer_fd ):
44+ self .loop .remove_writer (writer_fd )
45+ for reader_fd in self .writers_to_readers .pop (writer_fd ):
46+ self .readers_to_writers .pop (reader_fd )
47+
48+ def _remove_reader (self , reader_fd ):
49+ # remove all writers that im the last to write to, remove all that write to me, if nothing left stop loop
50+ self .loop .remove_reader (reader_fd )
51+ writer_fd = self .readers_to_writers .pop (reader_fd )
52+ writer_readers = self .writers_to_readers [writer_fd ]
53+ writer_readers .remove (reader_fd )
54+ if not writer_fd :
55+ self ._remove_writer (writer_fd )
56+
57+ def _read (self , src_fd , dest_fd ):
4058 try :
41- data = os .read (read_fd , 1024 )
42- if not data :
43- raise OSError ('EOF' )
59+ data = os .read (src_fd , 1024 )
4460 except OSError :
45- pipe_dict . pop ( read_fd , None )
46- for writing_fd in reverse_pipe_dict [ read_fd ] :
47- pipe_dict . pop ( writing_fd , None )
61+ data = ''
62+ if data :
63+ self . buffers [ dest_fd ] += data
4864 else :
49- try :
50- os .write (write_fd , data )
51- except OSError :
52- pipe_dict .pop (read_fd , None )
53-
54-
55- @contextmanager
56- def pipe_in_background (pipe_dict ):
57- pipe_dict = dict (pipe_dict )
58- with loop_in_thread (pipe_once , pipe_dict ):
59- yield
60-
61-
62- def pipe_until_closed (pipe_dict ):
63- pipe_dict = dict (pipe_dict )
64- while pipe_dict :
65- pipe_once (pipe_dict )
65+ self ._remove_reader (src_fd )
66+ if src_fd in self .writers_to_readers :
67+ self ._remove_writer (src_fd )
68+ if not self .readers_to_writers :
69+ self .loop .stop ()
70+
71+ def _write (self , dest_fd ):
72+ buffer = self .buffers [dest_fd ]
73+ if buffer :
74+ self .buffers [dest_fd ] = buffer [os .write (dest_fd , buffer ):]
75+
76+ def run (self ):
77+ self .loop .run_forever ()
6678
6779
6880def send_message (sock , obj ):
0 commit comments