Skip to content

Commit 34b879a

Browse files
committed
Added fancy color output
1 parent 09e4739 commit 34b879a

File tree

1 file changed

+80
-59
lines changed

1 file changed

+80
-59
lines changed

zypperoni

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ from shlex import quote
2828
import xml.etree.ElementTree as ET
2929

3030
# Constants
31-
ZYPPERONI_VERSION = "0.3.5"
31+
ZYPPERONI_VERSION = "0.3.7"
3232
ZYPPER_PID_FILE = "/run/zypp.pid"
3333
VALID_CMD = ["ref", "force-ref", "in", "in-download", "dup", "dup-download", "inr", "inr-download"]
3434
VALID_OPT = ["--debug", "--help", "--version", "--no-confirm", "--max-jobs"]
@@ -126,6 +126,24 @@ umount_dirs = f"""
126126

127127
################################
128128

129+
# Function to get ANSI colored text
130+
def color(text_type, text):
131+
# rgb color codes
132+
colors = {
133+
"input": (0, 170, 170), # cyan
134+
"success": (0, 170, 0), # green
135+
"info": (85, 85, 255), # bright blue
136+
"warning": (255, 255, 85), # bright yellow
137+
"error": (255, 85, 85), # bright red
138+
"exception": (170, 0, 170), # magenta
139+
}
140+
color = colors.get(text_type)
141+
# color only if running in terminal
142+
if color and sys.stdout.isatty():
143+
return f"\033[38;2;{color[0]};{color[1]};{color[2]}m{text} \033[38;2;255;255;255m"
144+
else:
145+
return text
146+
129147
# Function to query user for yes or no
130148
def query_yes_no(question, default=None):
131149
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
@@ -138,16 +156,16 @@ def query_yes_no(question, default=None):
138156
else:
139157
raise ValueError(f"Invalid default answer: {default!r}")
140158
while True:
141-
sys.stdout.write(question + prompt)
159+
sys.stdout.write(color("input", question + prompt))
142160
choice = input().lower()
143161
if default is not None and choice == "":
144162
return valid[default]
145163
elif choice in valid:
146164
return valid[choice]
147165
else:
148-
sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
166+
sys.stdout.write(color("warning", "Please respond with 'yes' or 'no' (or 'y' or 'n').\n"))
149167

150-
# Function to take exclusive control of future zypper invokations
168+
# Function to take exclusive control of future zypper invocations
151169
def get_zypp_lock():
152170
our_pid = os.getpid()
153171
os.system(f"echo {our_pid} > {ZYPPER_PID_FILE}")
@@ -261,21 +279,21 @@ async def zypper_task(lock, UUID, task_type, task_item, total_items, item_counte
261279
uuid=uuid,
262280
pkg_name=task_item,
263281
)
264-
logging.info(log_messages.get("start"))
282+
logging.info(color("info", log_messages.get("start")))
265283
proc = await asyncio.create_subprocess_shell(
266284
commands,
267285
stdout=asyncio.subprocess.PIPE,
268286
stderr=asyncio.subprocess.PIPE,
269287
)
270288
stdout, stderr = await proc.communicate()
271289
if proc.returncode == 0:
272-
logging.info(log_messages.get("success"))
290+
logging.info(color("success", log_messages.get("success")))
273291
else:
274-
logging.error(f"{log_messages.get('error')}. zypper exit code: {proc.returncode}")
292+
logging.error(color("error", f"{log_messages.get('error')}. zypper exit code: {proc.returncode}"))
275293
if stdout:
276-
logging.debug(f'[zypper output]\n{stdout.decode()}')
294+
logging.debug(f"[zypper output]\n{stdout.decode()}")
277295
if stderr:
278-
logging.debug(f'[zypper error]\n{stderr.decode()}')
296+
logging.debug(f"[zypper error]\n{stderr.decode()}")
279297
async with lock:
280298
UUID.append(uuid)
281299
except asyncio.exceptions.CancelledError:
@@ -318,32 +336,32 @@ async def main_task(num_jobs, task_type, task_items, no_confirm=None):
318336
except asyncio.exceptions.CancelledError:
319337
EXCEPTION_OCCUR = True
320338
logging.debug(log_messages.get("exception"))
321-
logging.info("Cancelling pending tasks...")
339+
logging.info(color("info", "Cancelling pending tasks..."))
322340
for task in asyncio.all_tasks():
323341
task.cancel()
324342
await asyncio.sleep(0.1)
325343
finally:
326344
# cleanup temp mounts
327-
logging.info("Cleaning up temp mounts...")
345+
logging.info(color("info", "Cleaning up temp mounts..."))
328346
unmount(UUID_UNCHANGED.copy())
329347
# cleanup temp dir
330-
logging.info("Cleaning up temp directory...")
348+
logging.info(color("info", "Cleaning up temp directory..."))
331349
recursive_delete(ZYPPERONI_TMP_DIR)
332350
# release zypper exclusive lock
333351
release_zypp_lock()
334352
# perform additional zypper commands (if any) on no exception
335353
if not EXCEPTION_OCCUR:
336354
msg = "Zypperoni has finished its tasks. Handing you over to zypper..."
337355
if task_type == "dup":
338-
logging.info(msg)
356+
logging.info(color("info", msg))
339357
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if no_confirm else ''} --no-cd dist-upgrade"
340358
os.system(command)
341359
elif task_type == "in":
342-
logging.info(msg)
360+
logging.info(color("info", msg))
343361
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if no_confirm else ''} --no-cd install {' '.join(install_pkgs)}"
344362
os.system(command)
345363
elif task_type == "inr":
346-
logging.info(msg)
364+
logging.info(color("info", msg))
347365
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if no_confirm else ''} --no-cd install-new-recommends"
348366
os.system(command)
349367

@@ -424,7 +442,7 @@ logging.basicConfig(
424442

425443
# Bail out if we're not root
426444
if os.getuid() != 0:
427-
logging.error("Bailing out, program must be run with root privileges")
445+
logging.error(color("error", "Bailing out, program must be run with root privileges"))
428446
zypperoni_cleanup()
429447
sys.exit(3)
430448

@@ -434,10 +452,10 @@ programs = ["zypper", "echo", "ps", "sed", "awk", "mkdir", "cat", "dirname", "ba
434452
for program in programs:
435453
out, ret = shell_exec(f"command -v {program}")
436454
if not out:
437-
logging.error(f"Bailing out, missing required dependency {program!r} in PATH ({os.environ.get('PATH')}) " \
455+
msg = f"Bailing out, missing required dependency {program!r} in PATH ({os.environ.get('PATH')}) " \
438456
f"for user {os.environ.get('USER')!r}. The following shell tools " \
439457
f"are required for zypperoni to function: {', '.join(programs)}"
440-
)
458+
logging.error(color("error", msg))
441459
zypperoni_cleanup()
442460
sys.exit(4)
443461

@@ -456,14 +474,14 @@ if os.path.isfile(ZYPPER_PID_FILE):
456474
if pid_program:
457475
msg = f"zypper is already invoked by the application with pid {pid} ({pid_program}).\n" \
458476
"Close this application before trying again."
459-
logging.error(msg)
477+
logging.error(color("error", msg))
460478
zypperoni_cleanup()
461479
sys.exit(5)
462480

463481
# Handle commands: ref, force-ref
464482
if COMMAND in ["ref", "force-ref"]:
465483
# get all enabled repos
466-
logging.info("Getting all enabled repos")
484+
logging.info(color("info", "Getting all enabled repos"))
467485
REPO_ALIAS = []
468486
xml_output, ret = shell_exec("env -i zypper --non-interactive --no-cd --xmlout repos")
469487
logging.debug(xml_output)
@@ -474,24 +492,24 @@ if COMMAND in ["ref", "force-ref"]:
474492
REPO_ALIAS.append(item.attrib["alias"])
475493
logging.debug(f"Enabled repos: {REPO_ALIAS}")
476494
if not REPO_ALIAS:
477-
logging.info("No repos found. Exiting...")
495+
logging.info(color("info", "No repos found. Exiting..."))
478496
zypperoni_cleanup()
479497
sys.exit()
480498
try:
481499
asyncio.run(main_task(MAX_JOBS, COMMAND, REPO_ALIAS))
482500
except asyncio.exceptions.CancelledError:
483501
logging.debug("Received SIGINT for asyncio runner")
484502
except:
485-
logging.exception("Unknown exception for asyncio runner")
503+
logging.exception(color("exception", "Unknown exception for asyncio runner"))
486504

487505
# Handle commands: dup, dup-download
488506
elif COMMAND in ["dup", "dup-download"]:
489507
# get info about dup packages
490-
logging.info("Getting all packages to be downloaded for distribution upgrade")
508+
logging.info(color("info", "Getting all packages to be downloaded for distribution upgrade"))
491509
xml_output, ret = shell_exec("env -i zypper --non-interactive --no-cd --xmlout dist-upgrade --dry-run")
492510
logging.debug(xml_output)
493511
if ret == 0 and xml_output.find("Nothing to do") != -1:
494-
logging.info("Nothing to do. Exiting...")
512+
logging.info(color("info", "Nothing to do. Exiting..."))
495513
zypperoni_cleanup()
496514
sys.exit()
497515
get_zypp_lock()
@@ -501,13 +519,14 @@ elif COMMAND in ["dup", "dup-download"]:
501519
download_size_bytes = float(item.attrib["download-size"])
502520
diff_bytes = float(item.attrib["space-usage-diff"])
503521
num_pkgs = int(item.attrib["packages-to-change"])
504-
logging.info(f"Number of packages to download: {num_pkgs}")
505-
logging.info(f"Total download size: {download_size_bytes/1000**2:.2f} MB")
522+
logging.info(color("info", f"Number of packages to download: {num_pkgs}"))
523+
logging.info(color("info", f"Total download size: {download_size_bytes/1000**2:.2f} MB"))
506524
if COMMAND == "dup":
507-
logging.info(f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB")
525+
logging.info(color("info", f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB"))
508526
if not num_pkgs:
509-
logging.warning("There are package conflicts that must be manually resolved. See output of:\n" \
510-
"zypper --non-interactive --no-cd dist-upgrade --dry-run")
527+
msg = "There are package conflicts that must be manually resolved. See output of:\n" \
528+
"zypper --non-interactive --no-cd dist-upgrade --dry-run"
529+
logging.warning(color("warning", msg))
511530
zypperoni_cleanup()
512531
sys.exit()
513532
# parse all packages from xml output
@@ -524,22 +543,22 @@ elif COMMAND in ["dup", "dup-download"]:
524543
DUP_PKG = list( set(DUP_PKG) - set(RM_PKG) )
525544
DUP_PKG.sort()
526545
if not DUP_PKG:
527-
logging.info("Nothing to do. Exiting...")
546+
logging.info(color("info", "Nothing to do. Exiting..."))
528547
zypperoni_cleanup()
529548
sys.exit()
530549
# do not download if all packages are already in cache
531550
if COMMAND == "dup-download" and download_size_bytes == 0:
532-
logging.info("Nothing to do. Exiting...")
551+
logging.info(color("info", "Nothing to do. Exiting..."))
533552
zypperoni_cleanup()
534553
sys.exit()
535554
# proceed straight to dup if all packages are in cache
536555
if COMMAND == "dup" and download_size_bytes == 0:
537556
zypperoni_cleanup()
538-
logging.info("Zypperoni has finished its tasks. Handing you over to zypper...")
557+
logging.info(color("info", "Zypperoni has finished its tasks. Handing you over to zypper..."))
539558
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if NO_CONFIRM else ''} --no-cd dist-upgrade"
540559
os.system(command)
541560
sys.exit()
542-
logging.info(f"Packages to download: {' '.join(DUP_PKG)}")
561+
logging.info(color("info", f"Packages to download: {' '.join(DUP_PKG)}"))
543562
if not NO_CONFIRM and not query_yes_no("Would you like to continue?", default="yes"):
544563
zypperoni_cleanup()
545564
sys.exit()
@@ -548,16 +567,16 @@ elif COMMAND in ["dup", "dup-download"]:
548567
except asyncio.exceptions.CancelledError:
549568
logging.debug("Received SIGINT for asyncio runner")
550569
except:
551-
logging.exception("Unknown exception for asyncio runner")
570+
logging.exception(color("exception", "Unknown exception for asyncio runner"))
552571

553572
# Handle commands: in, in-download
554573
elif COMMAND in ["in", "in-download"]:
555574
# get info about install packages
556-
logging.info("Getting packages and their dependencies to be downloaded for installation")
575+
logging.info(color("info", "Getting packages and their dependencies to be downloaded for installation"))
557576
xml_output, ret = shell_exec(f"env -i zypper --non-interactive --no-cd --xmlout install --dry-run {' '.join(ARG)}")
558577
logging.debug(xml_output)
559578
if ret == 0 and xml_output.find("Nothing to do") != -1:
560-
logging.info("Nothing to do. Exiting...")
579+
logging.info(color("info", "Nothing to do. Exiting..."))
561580
zypperoni_cleanup()
562581
sys.exit()
563582
get_zypp_lock()
@@ -568,20 +587,21 @@ elif COMMAND in ["in", "in-download"]:
568587
download_size_bytes = float(item.attrib["download-size"])
569588
diff_bytes = float(item.attrib["space-usage-diff"])
570589
num_pkgs = int(item.attrib["packages-to-change"])
571-
logging.info(f"Number of packages to download: {num_pkgs}")
572-
logging.info(f"Total download size: {download_size_bytes/1000**2:.2f} MB")
573-
logging.info(f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB")
590+
logging.info(color("info", f"Number of packages to download: {num_pkgs}"))
591+
logging.info(color("info", f"Total download size: {download_size_bytes/1000**2:.2f} MB"))
592+
logging.info(color("info", f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB"))
574593
NO_ERR = True
575594
if not num_pkgs:
576-
logging.warning("There are package conflicts that must be manually resolved. See output of:\n" \
577-
"zypper --non-interactive --no-cd dist-upgrade --dry-run")
595+
msg = "There are package conflicts that must be manually resolved. See output of:\n" \
596+
"zypper --non-interactive --no-cd dist-upgrade --dry-run"
597+
logging.warning(color("warning", msg))
578598
zypperoni_cleanup()
579599
sys.exit()
580600
if not NO_ERR:
581601
friendly_output = ""
582602
for item in docroot.iter("message"):
583603
friendly_output += item.text + "\n"
584-
logging.error(f"There was an error processing your request.\n[zypper output]\n{friendly_output.strip()}")
604+
logging.error(color("error", f"There was an error processing your request.\n[zypper output]\n{friendly_output.strip()}"))
585605
zypperoni_cleanup()
586606
sys.exit(6)
587607
# parse all packages from xml output
@@ -598,22 +618,22 @@ elif COMMAND in ["in", "in-download"]:
598618
IN_PKG = list( set(IN_PKG) - set(RM_PKG) )
599619
IN_PKG.sort()
600620
if not IN_PKG:
601-
logging.info("Nothing to do. Exiting...")
621+
logging.info(color("info", "Nothing to do. Exiting..."))
602622
zypperoni_cleanup()
603623
sys.exit()
604624
# do not download if all packages are already in cache
605625
if COMMAND == "in-download" and download_size_bytes == 0:
606-
logging.info("Nothing to do. Exiting...")
626+
logging.info(color("info", "Nothing to do. Exiting..."))
607627
zypperoni_cleanup()
608628
sys.exit()
609629
# proceed straight to install if all packages are in cache
610630
if COMMAND == "in" and download_size_bytes == 0:
611631
zypperoni_cleanup()
612-
logging.info("Zypperoni has finished its tasks. Handing you over to zypper...")
632+
logging.info(color("info", "Zypperoni has finished its tasks. Handing you over to zypper..."))
613633
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if NO_CONFIRM else ''} --no-cd install {' '.join(ARG)}"
614634
os.system(command)
615635
sys.exit()
616-
logging.info(f"Packages to download: {' '.join(IN_PKG)}")
636+
logging.info(color("info", f"Packages to download: {' '.join(IN_PKG)}"))
617637
if not NO_CONFIRM and not query_yes_no("Would you like to continue?", default="yes"):
618638
zypperoni_cleanup()
619639
sys.exit()
@@ -622,16 +642,16 @@ elif COMMAND in ["in", "in-download"]:
622642
except asyncio.exceptions.CancelledError:
623643
logging.debug("Received SIGINT for asyncio runner")
624644
except:
625-
logging.exception("Unknown exception for asyncio runner")
645+
logging.exception(color("exception", "Unknown exception for asyncio runner"))
626646

627647
# Handle commands: inr, inr-download
628648
elif COMMAND in ["inr", "inr-download"]:
629649
# get info about recommended install packages
630-
logging.info("Getting new packages and their dependencies to be downloaded for recommended installation")
650+
logging.info(color("info", "Getting new packages and their dependencies to be downloaded for recommended installation"))
631651
xml_output, ret = shell_exec(f"env -i zypper --non-interactive --no-cd --xmlout install-new-recommends --dry-run")
632652
logging.debug(xml_output)
633653
if ret == 0 and xml_output.find("Nothing to do") != -1:
634-
logging.info("Nothing to do. Exiting...")
654+
logging.info(color("info", "Nothing to do. Exiting..."))
635655
zypperoni_cleanup()
636656
sys.exit()
637657
get_zypp_lock()
@@ -641,12 +661,13 @@ elif COMMAND in ["inr", "inr-download"]:
641661
download_size_bytes = float(item.attrib["download-size"])
642662
diff_bytes = float(item.attrib["space-usage-diff"])
643663
num_pkgs = int(item.attrib["packages-to-change"])
644-
logging.info(f"Number of packages to download: {num_pkgs}")
645-
logging.info(f"Total download size: {download_size_bytes/1000**2:.2f} MB")
646-
logging.info(f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB")
664+
logging.info(color("info", f"Number of packages to download: {num_pkgs}"))
665+
logging.info(color("info", f"Total download size: {download_size_bytes/1000**2:.2f} MB"))
666+
logging.info(color("info", f"Space usage difference after operation: {diff_bytes/1000**2:+.2f} MB"))
647667
if not num_pkgs:
648-
logging.warning("There are package conflicts that must be manually resolved. See output of:\n" \
649-
"zypper --non-interactive --no-cd dist-upgrade --dry-run")
668+
msg = "There are package conflicts that must be manually resolved. See output of:\n" \
669+
"zypper --non-interactive --no-cd dist-upgrade --dry-run"
670+
logging.warning(color("warning", msg))
650671
zypperoni_cleanup()
651672
sys.exit()
652673
# parse all packages from xml output
@@ -663,22 +684,22 @@ elif COMMAND in ["inr", "inr-download"]:
663684
INR_PKG = list( set(INR_PKG) - set(RM_PKG) )
664685
INR_PKG.sort()
665686
if not INR_PKG:
666-
logging.info("Nothing to do. Exiting...")
687+
logging.info(color("info", "Nothing to do. Exiting..."))
667688
zypperoni_cleanup()
668689
sys.exit()
669690
# do not download if all packages are already in cache
670691
if COMMAND == "inr-download" and download_size_bytes == 0:
671-
logging.info("Nothing to do. Exiting...")
692+
logging.info(color("info", "Nothing to do. Exiting..."))
672693
zypperoni_cleanup()
673694
sys.exit()
674695
# proceed straight to inr if all packages are in cache
675696
if COMMAND == "inr" and download_size_bytes == 0:
676697
zypperoni_cleanup()
677-
logging.info("Zypperoni has finished its tasks. Handing you over to zypper...")
698+
logging.info(color("info", "Zypperoni has finished its tasks. Handing you over to zypper..."))
678699
command = f"env ZYPP_SINGLE_RPMTRANS=1 zypper {'--non-interactive' if NO_CONFIRM else ''} --no-cd install-new-recommends"
679700
os.system(command)
680701
sys.exit()
681-
logging.info(f"Packages to download: {' '.join(INR_PKG)}")
702+
logging.info(color("info", f"Packages to download: {' '.join(INR_PKG)}"))
682703
if not NO_CONFIRM and not query_yes_no("Would you like to continue?", default="yes"):
683704
zypperoni_cleanup()
684705
sys.exit()
@@ -687,4 +708,4 @@ elif COMMAND in ["inr", "inr-download"]:
687708
except asyncio.exceptions.CancelledError:
688709
logging.debug("Received SIGINT for asyncio runner")
689710
except:
690-
logging.exception("Unknown exception for asyncio runner")
711+
logging.exception(color("exception", "Unknown exception for asyncio runner"))

0 commit comments

Comments
 (0)