@@ -128,6 +128,21 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
128128 print ("json decode error" )
129129
130130
131+ def _listen_on_fifo (pipe_name : str , result : List [str ], completed : threading .Event ):
132+ # Open the FIFO for reading
133+ with open (pipe_name ) as fifo :
134+ print ("Waiting for data..." )
135+ while True :
136+ if completed .is_set ():
137+ break # Exit loop if completed event is set
138+ data = fifo .read () # This will block until data is available
139+ if len (data ) == 0 :
140+ # If data is empty, assume EOF
141+ break
142+ print (f"Received: { data } " )
143+ result .append (data )
144+
145+
131146def _listen_on_pipe_new (listener , result : List [str ], completed : threading .Event ):
132147 """Listen on the named pipe or Unix domain socket for JSON data from the server.
133148
@@ -307,14 +322,19 @@ def runner_with_cwd_env(
307322 # if additional environment variables are passed, add them to the environment
308323 if env_add :
309324 env .update (env_add )
310- server = UnixPipeServer (pipe_name )
311- server .start ()
325+ # server = UnixPipeServer(pipe_name)
326+ # server.start()
327+ #################
328+ # Create the FIFO (named pipe) if it doesn't exist
329+ # if not pathlib.Path.exists(pipe_name):
330+ os .mkfifo (pipe_name )
331+ #################
312332
313333 completed = threading .Event ()
314334
315335 result = [] # result is a string array to store the data during threading
316336 t1 : threading .Thread = threading .Thread (
317- target = _listen_on_pipe_new , args = (server , result , completed )
337+ target = _listen_on_fifo , args = (pipe_name , result , completed )
318338 )
319339 t1 .start ()
320340
@@ -364,14 +384,20 @@ def generate_random_pipe_name(prefix=""):
364384
365385 # For Windows, named pipes have a specific naming convention.
366386 if sys .platform == "win32" :
367- return f"\\ \\ .\\ pipe\\ { prefix } -{ random_suffix } -sock "
387+ return f"\\ \\ .\\ pipe\\ { prefix } -{ random_suffix } "
368388
369389 # For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
370390 xdg_runtime_dir = os .getenv ("XDG_RUNTIME_DIR" )
371391 if xdg_runtime_dir :
372- return os .path .join (xdg_runtime_dir , f"{ prefix } -{ random_suffix } .sock " ) # noqa: PTH118
392+ return os .path .join (xdg_runtime_dir , f"{ prefix } -{ random_suffix } " ) # noqa: PTH118
373393 else :
374- return os .path .join (tempfile .gettempdir (), f"{ prefix } -{ random_suffix } .sock" ) # noqa: PTH118
394+ return os .path .join (tempfile .gettempdir (), f"{ prefix } -{ random_suffix } " ) # noqa: PTH118
395+
396+
397+ async def create_fifo (pipe_name : str ) -> None :
398+ # Create the FIFO (named pipe) if it doesn't exist
399+ if not pathlib .Path .exists (pipe_name ):
400+ os .mkfifo (pipe_name )
375401
376402
377403class UnixPipeServer :
0 commit comments