77import datetime
88import functools
99import hashlib
10+ import io
1011import ipaddress
1112import json
1213import logging
@@ -121,7 +122,6 @@ def collect_user_input(fields: list[str]) -> dict[str, str]:
121122 if value := input (f"{ CONSOLE .MARGIN } { field .capitalize ()!s: >20} : " ):
122123 res [field ] = value
123124 except KeyboardInterrupt :
124- print ("" ) # Moving the cursor back to the start
125125 raise AbortAction
126126 finally :
127127 CONSOLE .space ()
@@ -162,44 +162,56 @@ def get_installer_version():
162162 return "N/A"
163163
164164
165- class StreamIterator :
166- def __init__ (self , proc , stream , file_path ):
167- self .proc = proc
168- self .stream = stream
169- self .file_path = file_path
170- self .file = None
171- self .bytes_written = 0
172-
173- def __iter__ (self ):
174- return self
175-
176- def __next__ (self ):
177- for return_anyway in (False , True ):
178- # We poll the process status before consuming the stream to make sure the StopIteration condition
179- # is not vulnerable to a race condition.
180- ret = self .proc .poll ()
181- line = self .stream .readline ()
182- if line :
183- if not self .file :
184- self .file = open (self .file_path , "wb" )
185- self .file .write (line )
186- self .bytes_written += len (line )
187- return line
188- if ret is not None and not line :
189- raise StopIteration
190- if not return_anyway :
191- time .sleep (0.1 )
192- return line
165+ @contextlib .contextmanager
166+ def stream_iterator (
167+ proc : subprocess .Popen , stdout_file_path : pathlib .Path , stderr_file_path : pathlib .Path , timeout : float = 1.0
168+ ):
169+ stdout_buffer = io .TextIOWrapper (io .BytesIO ())
170+ stderr_buffer = io .TextIOWrapper (io .BytesIO ())
193171
194- def __enter__ (self ):
195- return self
172+ def _iter ():
173+ proc_exited = False
174+ stdout_read_pos = 0
175+ stderr_read_pos = 0
176+ while not proc_exited :
177+ try :
178+ stdout_partial , stderr_partial = proc .communicate (timeout = timeout )
179+ except subprocess .TimeoutExpired as exc :
180+ stdout_partial = exc .output
181+ stderr_partial = exc .stderr
182+ else :
183+ proc_exited = True
196184
197- def __exit__ (self , exc_type , exc_val , exc_tb ):
198- for _ in iter (self ):
185+ if stdout_partial is not None :
186+ stdout_buffer .buffer .seek (0 )
187+ stdout_buffer .buffer .write (stdout_partial )
188+ if stderr_partial is not None :
189+ stderr_buffer .buffer .seek (0 )
190+ stderr_buffer .buffer .write (stderr_partial )
191+
192+ while True :
193+ stdout_buffer .seek (stdout_read_pos )
194+ stdout_line = stdout_buffer .readline ()
195+ stdout_read_pos = stdout_buffer .tell ()
196+ stderr_buffer .seek (stderr_read_pos )
197+ stderr_line = stderr_buffer .readline ()
198+ stderr_read_pos = stderr_buffer .tell ()
199+
200+ if not (stdout_line or stderr_line ):
201+ break
202+
203+ yield stdout_line .strip (), stderr_line .strip ()
204+
205+ iterator = _iter ()
206+ try :
207+ yield iterator
208+ finally :
209+ for _ in iterator :
199210 pass
200- if self .file :
201- self .file .close ()
202- return False
211+ if stdout_buffer .buffer .tell ():
212+ stdout_file_path .write_bytes (stdout_buffer .buffer .getvalue ())
213+ if stderr_buffer .buffer .tell ():
214+ stderr_file_path .write_bytes (stderr_buffer .buffer .getvalue ())
203215
204216
205217#
@@ -616,7 +628,7 @@ def run_cmd_retries(self, *cmd, timeout, retries, raise_on_non_zero=True, env=No
616628 cmd_fail_exception = None
617629 while retries > 0 :
618630 try :
619- with self .start_cmd (* cmd , raise_on_non_zero = raise_on_non_zero , env = env , ** popen_args ) as (proc , * _ ):
631+ with self .start_cmd (* cmd , raise_on_non_zero = raise_on_non_zero , env = env , ** popen_args ) as (proc , _ ):
620632 try :
621633 proc .wait (timeout = timeout )
622634 except subprocess .TimeoutExpired as e :
@@ -648,28 +660,29 @@ def run_cmd(
648660 env = None ,
649661 ** popen_args ,
650662 ):
651- with self .start_cmd (* cmd , raise_on_non_zero = raise_on_non_zero , env = env , ** popen_args ) as (proc , stdout , stderr ):
663+ with self .start_cmd (* cmd , raise_on_non_zero = raise_on_non_zero , env = env , ** popen_args ) as (proc , stream_iter ):
652664 if input :
653665 proc .stdin .write (input )
654- proc .stdin .close ()
655666
656667 if echo :
657- for line in stdout :
658- if line :
659- CONSOLE .msg (line .decode ().strip ())
660- elif capture_text :
661- return b"" .join (stdout ).decode ()
662- elif capture_json :
663- try :
664- return json .loads (b"" .join (stdout ).decode ())
665- except json .JSONDecodeError :
666- LOG .warning ("Error decoding JSON from stdout" )
667- return {}
668+ for stdout_line , _ in stream_iter :
669+ if stdout_line :
670+ CONSOLE .msg (stdout_line .strip ())
671+ elif capture_text or capture_json :
672+ text = "" .join ([stdout_line for stdout_line , _ in stream_iter ])
673+ if capture_text :
674+ return text
675+ else :
676+ try :
677+ return json .loads (text )
678+ except json .JSONDecodeError :
679+ LOG .warning ("Error decoding JSON from stdout" )
680+ return
668681 elif capture_json_lines :
669682 json_lines = []
670- for idx , output_line in enumerate (stdout ):
683+ for idx , ( stdout_line , _ ) in enumerate (stream_iter ):
671684 try :
672- json_lines .append (json .loads (output_line . decode () ))
685+ json_lines .append (json .loads (stdout_line ))
673686 except json .JSONDecodeError :
674687 LOG .warning (f"Error decoding JSON from stdout line #{ idx } " )
675688 return json_lines
@@ -693,6 +706,8 @@ def start_cmd(self, *cmd, raise_on_non_zero=True, env=None, **popen_args):
693706 stderr = subprocess .PIPE ,
694707 stdin = subprocess .PIPE ,
695708 env = env ,
709+ # Always using text=False to match the BytesIO used for buffering the output
710+ text = False ,
696711 ** popen_args ,
697712 )
698713 except FileNotFoundError as e :
@@ -701,35 +716,34 @@ def start_cmd(self, *cmd, raise_on_non_zero=True, env=None, **popen_args):
701716
702717 slug_cmd = re .sub (r"[^a-zA-Z]+" , "-" , cmd_str )[:100 ].strip ("-" )
703718
704- def get_stream_iterator ( stream_name ):
705- file_name = f"{ self ._cmd_idx :04d} -{ stream_name } -{ slug_cmd } .txt"
706- file_path = self . session_folder . joinpath ( file_name )
707- return StreamIterator ( proc , getattr ( proc , stream_name ), file_path )
719+ stdout_path , stderr_path = [
720+ self . session_folder . joinpath ( f"{ self ._cmd_idx :04d} -{ stream_name } -{ slug_cmd } .txt" )
721+ for stream_name in ( "stdout" , "stderr" )
722+ ]
708723
709724 try :
710- with (
711- get_stream_iterator ("stdout" ) as stdout_iter ,
712- get_stream_iterator ("stderr" ) as stderr_iter ,
713- ):
714- try :
715- yield proc , stdout_iter , stderr_iter
716- finally :
717- proc .wait ()
725+ with stream_iterator (proc , stdout_path , stderr_path ) as stream_iter :
726+ yield proc , stream_iter
727+
718728 if raise_on_non_zero and proc .returncode != 0 :
719729 raise CommandFailed
720730 # We capture and raise CommandFailed to allow the client code to raise an empty CommandFailed exception
721731 # but still get a contextualized exception at the end
722732 except CommandFailed as e :
723733 raise CommandFailed (self ._cmd_idx , cmd_str , proc .returncode ) from e .__cause__
734+ # Cleaning the current line in case the command is interrupted
735+ except KeyboardInterrupt :
736+ print ()
737+ raise
724738 finally :
725739 elapsed = time .time () - started
726740 LOG .info (
727- "Command [%04d] returned [%d ] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR" ,
741+ "Command [%04d] returned [%s ] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR" ,
728742 self ._cmd_idx ,
729743 proc .returncode ,
730744 elapsed ,
731- stdout_iter . bytes_written ,
732- stderr_iter . bytes_written ,
745+ stdout_path . stat (). st_size if stdout_path . exists () else 0 ,
746+ stderr_path . stat (). st_size if stderr_path . exists () else 0 ,
733747 )
734748
735749
@@ -1068,7 +1082,7 @@ def pre_execute(self, action, args):
10681082 capture_json = True ,
10691083 raise_on_non_zero = False ,
10701084 )
1071- if "Name" in env_json :
1085+ if env_json and "Name" in env_json :
10721086 CONSOLE .msg (
10731087 "Found a minikube profile with the same name. If a previous attempt to run this installer failed,"
10741088 )
@@ -1455,27 +1469,26 @@ def get_parser(self, sub_parsers):
14551469 def execute (self , args ):
14561470 CONSOLE .title ("Expose Observability ports" )
14571471
1458- try :
1459- with self .start_cmd (
1460- "minikube" ,
1461- "kubectl" ,
1462- "--profile" ,
1463- args .profile ,
1464- "--" ,
1465- "--namespace" ,
1466- args .namespace ,
1467- "--address" ,
1468- "0.0.0.0" ,
1469- "port-forward" ,
1470- "service/observability-ui" ,
1471- f"{ args .port } :http" ,
1472- raise_on_non_zero = False ,
1473- ) as (proc , stdout , stderr ):
1474- for output in stdout :
1475- if output :
1476- break
1477-
1478- if proc .poll () is None :
1472+ with self .start_cmd (
1473+ "minikube" ,
1474+ "kubectl" ,
1475+ "--profile" ,
1476+ args .profile ,
1477+ "--" ,
1478+ "--namespace" ,
1479+ args .namespace ,
1480+ "--address" ,
1481+ "0.0.0.0" ,
1482+ "port-forward" ,
1483+ "service/observability-ui" ,
1484+ f"{ args .port } :http" ,
1485+ raise_on_non_zero = False ,
1486+ ) as (proc , stream_iter ):
1487+ succeeded = False
1488+ for stdout_line , stderr_line in stream_iter :
1489+ if stderr_line :
1490+ CONSOLE .msg (stderr_line )
1491+ if stdout_line and proc .poll () is None :
14791492 url = f"http://localhost:{ args .port } "
14801493 for service , label in SERVICES_LABELS .items ():
14811494 CONSOLE .msg (f"{ label :>20} : { SERVICES_URLS [service ].format (url )} " )
@@ -1495,33 +1508,26 @@ def execute(self, args):
14951508 file .write (json .dumps (json_config ))
14961509 except Exception :
14971510 LOG .exception (f"Unable to update { DEMO_CONFIG_FILE } file with exposed port" )
1498- else :
1499- for output in stderr :
1500- if output :
1501- CONSOLE .msg (output .decode ().strip ())
1502- raise CommandFailed
15031511
15041512 try :
15051513 while proc .poll () is None :
15061514 time .sleep (10 )
15071515 except KeyboardInterrupt :
1508- # The empty print forces the terminal cursor to move to the first column
1516+ proc . terminate ()
15091517 print ()
1518+ succeeded = True
15101519
1511- proc .terminate ()
1512-
1520+ if succeeded :
15131521 CONSOLE .msg ("The services are no longer exposed." )
1514-
1515- except Exception as e :
1516- LOG .exception ("Something went wrong exposing the services ports" )
1522+ else :
15171523 CONSOLE .space ()
15181524 CONSOLE .msg ("The platform could not have its ports exposed." )
15191525 CONSOLE .msg (
15201526 f"Verify if the platform is running and installer has permission to listen at the port { args .port } ."
15211527 )
15221528 CONSOLE .space ()
15231529 CONSOLE .msg (f"If port { args .port } is in use, use the command option --port to specify an alternate value." )
1524- raise AbortAction from e
1530+ raise AbortAction
15251531
15261532
15271533class ObsDeleteAction (Action ):
@@ -1558,7 +1564,7 @@ class DemoContainerAction(Action):
15581564 requirements = [REQ_DOCKER , REQ_DOCKER_DAEMON ]
15591565
15601566 def run_dk_demo_container (self , command : str ):
1561- with self .start_cmd (
1567+ self .run_cmd (
15621568 "docker" ,
15631569 "run" ,
15641570 "--rm" ,
@@ -1572,14 +1578,8 @@ def run_dk_demo_container(self, command: str):
15721578 "host.docker.internal:host-gateway" ,
15731579 DEMO_IMAGE ,
15741580 command ,
1575- ) as (proc , stdout , stderr ):
1576- try :
1577- for line in stdout :
1578- if line :
1579- CONSOLE .msg (line .decode ().strip ())
1580- except KeyboardInterrupt :
1581- print ("" )
1582- proc .terminate ()
1581+ echo = True ,
1582+ )
15831583
15841584
15851585class ObsRunDemoAction (DemoContainerAction ):
@@ -1593,6 +1593,7 @@ def execute(self, args):
15931593 CONSOLE .title ("Demo FAILED" )
15941594 CONSOLE .space ()
15951595 CONSOLE .msg (f"To retry the demo, first run `python3 { INSTALLER_NAME } { args .prod } delete-demo`" )
1596+ raise AbortAction
15961597 else :
15971598 CONSOLE .title ("Demo SUCCEEDED" )
15981599
0 commit comments