77import datetime
88import functools
99import hashlib
10+ import io
1011import ipaddress
1112import json
1213import logging
@@ -162,44 +163,55 @@ def get_installer_version():
162163 return "N/A"
163164
164165
165- class StreamIterator :
166- def __init__ (self , proc , stream , file_path ):
167- self .proc = proc
168- self .stream = stream
169- self .file_path = file_path
170- self .file = None
171- self .bytes_written = 0
172-
173- def __iter__ (self ):
174- return self
175-
176- def __next__ (self ):
177- for return_anyway in (False , True ):
178- # We poll the process status before consuming the stream to make sure the StopIteration condition
179- # is not vulnerable to a race condition.
180- ret = self .proc .poll ()
181- line = self .stream .readline ()
182- if line :
183- if not self .file :
184- self .file = open (self .file_path , "wb" )
185- self .file .write (line )
186- self .bytes_written += len (line )
187- return line
188- if ret is not None and not line :
189- raise StopIteration
190- if not return_anyway :
191- time .sleep (0.1 )
192- return line
166+ @contextlib .contextmanager
167+ def stream_iterator (proc : subprocess .Popen , stream_name : str , file_path : pathlib .Path , timeout : float = 1.0 ):
168+ comm_index , exc_attr = {
169+ "stdout" : (0 , "output" ),
170+ "stderr" : (1 , "stderr" ),
171+ }[stream_name ]
172+ buffer = io .TextIOWrapper (io .BytesIO ())
193173
194- def __enter__ (self ):
195- return self
174+ def _iter ():
175+ proc_exited = False
176+ read_pos = 0
177+ while not proc_exited :
178+ try :
179+ partial = proc .communicate (timeout = timeout )[comm_index ]
180+ except subprocess .TimeoutExpired as exc :
181+ partial = getattr (exc , exc_attr )
182+ else :
183+ proc_exited = True
196184
197- def __exit__ (self , exc_type , exc_val , exc_tb ):
198- for _ in iter (self ):
185+ if partial is not None :
186+ buffer .buffer .seek (0 )
187+ buffer .buffer .write (partial )
188+
189+ buffer .seek (read_pos )
190+ while True :
191+ try :
192+ line = buffer .readline ()
193+ # When some unicode char is incomplete, we skip yielding
194+ except UnicodeDecodeError :
195+ break
196+
197+ # When the line is empty we skip yielding
198+ # When the line is incomplete and the process is still running, we skip yielding
199+ if not line or (not line .endswith (os .linesep ) and not proc_exited ):
200+ break
201+
202+ yield line .strip (os .linesep )
203+
204+ read_pos = buffer .tell ()
205+
206+ iterator = _iter ()
207+ try :
208+ yield iterator
209+ finally :
210+ # Making sure all output was consumed before writing the buffer to the file
211+ for _ in iterator :
199212 pass
200- if self .file :
201- self .file .close ()
202- return False
213+ if buffer .buffer .tell ():
214+ file_path .write_bytes (buffer .buffer .getvalue ())
203215
204216
205217#
@@ -651,25 +663,24 @@ def run_cmd(
651663 with self .start_cmd (* cmd , raise_on_non_zero = raise_on_non_zero , env = env , ** popen_args ) as (proc , stdout , stderr ):
652664 if input :
653665 proc .stdin .write (input )
654- proc .stdin .close ()
655666
656667 if echo :
657668 for line in stdout :
658669 if line :
659- CONSOLE .msg (line . decode (). strip () )
670+ CONSOLE .msg (line )
660671 elif capture_text :
661- return b" " .join (stdout ). decode ( )
672+ return " \n " .join (stdout )
662673 elif capture_json :
663674 try :
664- return json .loads (b "" .join (stdout ). decode ( ))
675+ return json .loads ("" .join (stdout ))
665676 except json .JSONDecodeError :
666677 LOG .warning ("Error decoding JSON from stdout" )
667678 return {}
668679 elif capture_json_lines :
669680 json_lines = []
670681 for idx , output_line in enumerate (stdout ):
671682 try :
672- json_lines .append (json .loads (output_line . decode () ))
683+ json_lines .append (json .loads (output_line ))
673684 except json .JSONDecodeError :
674685 LOG .warning (f"Error decoding JSON from stdout line #{ idx } " )
675686 return json_lines
@@ -701,15 +712,15 @@ def start_cmd(self, *cmd, raise_on_non_zero=True, env=None, **popen_args):
701712
702713 slug_cmd = re .sub (r"[^a-zA-Z]+" , "-" , cmd_str )[:100 ].strip ("-" )
703714
704- def get_stream_iterator ( stream_name ):
705- file_name = f"{ self ._cmd_idx :04d} -{ stream_name } -{ slug_cmd } .txt"
706- file_path = self . session_folder . joinpath ( file_name )
707- return StreamIterator ( proc , getattr ( proc , stream_name ), file_path )
715+ stdout_path , stderr_path = [
716+ self . session_folder . joinpath ( f"{ self ._cmd_idx :04d} -{ stream_name } -{ slug_cmd } .txt" )
717+ for stream_name in ( "stdout" , "stderr" )
718+ ]
708719
709720 try :
710721 with (
711- get_stream_iterator ( "stdout" ) as stdout_iter ,
712- get_stream_iterator ( "stderr" ) as stderr_iter ,
722+ stream_iterator ( proc , "stdout" , stdout_path ) as stdout_iter ,
723+ stream_iterator ( proc , "stderr" , stderr_path ) as stderr_iter ,
713724 ):
714725 try :
715726 yield proc , stdout_iter , stderr_iter
@@ -724,12 +735,12 @@ def get_stream_iterator(stream_name):
724735 finally :
725736 elapsed = time .time () - started
726737 LOG .info (
727- "Command [%04d] returned [%d ] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR" ,
738+ "Command [%04d] returned [%s ] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR" ,
728739 self ._cmd_idx ,
729740 proc .returncode ,
730741 elapsed ,
731- stdout_iter . bytes_written ,
732- stderr_iter . bytes_written ,
742+ stdout_path . stat (). st_size if stdout_path . exists () else 0 ,
743+ stderr_path . stat (). st_size if stderr_path . exists () else 0 ,
733744 )
734745
735746
@@ -1455,73 +1466,73 @@ def get_parser(self, sub_parsers):
14551466 def execute (self , args ):
14561467 CONSOLE .title ("Expose Observability ports" )
14571468
1458- try :
1459- with self .start_cmd (
1460- "minikube" ,
1461- "kubectl" ,
1462- "--profile" ,
1463- args .profile ,
1464- "--" ,
1465- "--namespace" ,
1466- args .namespace ,
1467- "--address" ,
1468- "0.0.0.0" ,
1469- "port-forward" ,
1470- "service/observability-ui" ,
1471- f"{ args .port } :http" ,
1472- raise_on_non_zero = False ,
1473- ) as (proc , stdout , stderr ):
1474- for output in stdout :
1475- if output :
1476- break
1477-
1478- if proc .poll () is None :
1479- url = f"http://localhost:{ args .port } "
1480- for service , label in SERVICES_LABELS .items ():
1481- CONSOLE .msg (f"{ label :>20} : { SERVICES_URLS [service ].format (url )} " )
1482- CONSOLE .space ()
1483- CONSOLE .msg ("Listening on all interfaces (0.0.0.0)" )
1484- CONSOLE .msg ("Keep this process running while using the above URLs" )
1485- CONSOLE .msg ("Press Ctrl + C to stop exposing the ports" )
1469+ success = False
1470+ with self .start_cmd (
1471+ "minikube" ,
1472+ "kubectl" ,
1473+ "--profile" ,
1474+ args .profile ,
1475+ "--" ,
1476+ "--namespace" ,
1477+ args .namespace ,
1478+ "--address" ,
1479+ "0.0.0.0" ,
1480+ "port-forward" ,
1481+ "service/observability-ui" ,
1482+ f"{ args .port } :http" ,
1483+ raise_on_non_zero = False ,
1484+ ) as (proc , stdout , stderr ):
1485+ for output in stdout :
1486+ if output :
1487+ break
14861488
1487- try :
1488- with open (self .data_folder / DEMO_CONFIG_FILE , "r" ) as file :
1489- json_config = json .load (file )
1490- json_config ["api_host" ] = BASE_API_URL_TPL .format (
1491- f"http://host.docker.internal:{ args .port } "
1492- )
1493-
1494- with open (self .data_folder / DEMO_CONFIG_FILE , "w" ) as file :
1495- file .write (json .dumps (json_config ))
1496- except Exception :
1497- LOG .exception (f"Unable to update { DEMO_CONFIG_FILE } file with exposed port" )
1498- else :
1499- for output in stderr :
1500- if output :
1501- CONSOLE .msg (output .decode ().strip ())
1502- raise CommandFailed
1489+ if proc .poll () is None :
1490+ url = f"http://localhost:{ args .port } "
1491+ for service , label in SERVICES_LABELS .items ():
1492+ CONSOLE .msg (f"{ label :>20} : { SERVICES_URLS [service ].format (url )} " )
1493+ CONSOLE .space ()
1494+ CONSOLE .msg ("Listening on all interfaces (0.0.0.0)" )
1495+ CONSOLE .msg ("Keep this process running while using the above URLs" )
1496+ CONSOLE .msg ("Press Ctrl + C to stop exposing the ports" )
15031497
15041498 try :
1505- while proc .poll () is None :
1506- time .sleep (10 )
1507- except KeyboardInterrupt :
1508- # The empty print forces the terminal cursor to move to the first column
1509- print ()
1499+ with open (self .data_folder / DEMO_CONFIG_FILE , "r" ) as file :
1500+ json_config = json .load (file )
1501+ json_config ["api_host" ] = BASE_API_URL_TPL .format (f"http://host.docker.internal:{ args .port } " )
1502+
1503+ with open (self .data_folder / DEMO_CONFIG_FILE , "w" ) as file :
1504+ file .write (json .dumps (json_config ))
1505+ except Exception :
1506+ LOG .exception (f"Unable to update { DEMO_CONFIG_FILE } file with exposed port" )
15101507
1511- proc .terminate ()
1508+ while True :
1509+ try :
1510+ proc .wait (10 )
1511+ except subprocess .TimeoutExpired :
1512+ continue
1513+ except KeyboardInterrupt :
1514+ # The empty print forces the terminal cursor to move to the first column
1515+ print ()
1516+ proc .terminate ()
1517+ success = True
1518+ break
1519+ else :
1520+ break
15121521
1522+ if success :
15131523 CONSOLE .msg ("The services are no longer exposed." )
1524+ else :
1525+ for output in stderr :
1526+ CONSOLE .msg (output )
15141527
1515- except Exception as e :
1516- LOG .exception ("Something went wrong exposing the services ports" )
15171528 CONSOLE .space ()
15181529 CONSOLE .msg ("The platform could not have its ports exposed." )
15191530 CONSOLE .msg (
15201531 f"Verify if the platform is running and installer has permission to listen at the port { args .port } ."
15211532 )
15221533 CONSOLE .space ()
15231534 CONSOLE .msg (f"If port { args .port } is in use, use the command option --port to specify an alternate value." )
1524- raise AbortAction from e
1535+ raise AbortAction
15251536
15261537
15271538class ObsDeleteAction (Action ):
@@ -1558,7 +1569,7 @@ class DemoContainerAction(Action):
15581569 requirements = [REQ_DOCKER , REQ_DOCKER_DAEMON ]
15591570
15601571 def run_dk_demo_container (self , command : str ):
1561- with self .start_cmd (
1572+ self .run_cmd (
15621573 "docker" ,
15631574 "run" ,
15641575 "--rm" ,
@@ -1572,14 +1583,8 @@ def run_dk_demo_container(self, command: str):
15721583 "host.docker.internal:host-gateway" ,
15731584 DEMO_IMAGE ,
15741585 command ,
1575- ) as (proc , stdout , stderr ):
1576- try :
1577- for line in stdout :
1578- if line :
1579- CONSOLE .msg (line .decode ().strip ())
1580- except KeyboardInterrupt :
1581- print ("" )
1582- proc .terminate ()
1586+ echo = True ,
1587+ )
15831588
15841589
15851590class ObsRunDemoAction (DemoContainerAction ):
0 commit comments