|
| 1 | +from typing import Dict, List, Optional |
| 2 | + |
| 3 | +try: |
| 4 | + import prettytable |
| 5 | +except ModuleNotFoundError: |
| 6 | + raise ModuleNotFoundError( |
| 7 | + "CLI dependency \"prettytable\" not found. Install python-omemo with CLI dependencies using" |
| 8 | + " \"pip install OMEMO[cli]\"" |
| 9 | + ) |
| 10 | + |
| 11 | +from .backend import Backend |
| 12 | +from .session_manager import SessionManager, UnknownNamespace |
| 13 | +from .types import DeviceList |
| 14 | + |
| 15 | + |
| 16 | +__all__ = [ "debug_encrypt" ] |
| 17 | + |
| 18 | + |
| 19 | +async def debug_encrypt( |
| 20 | + session_manager: SessionManager, |
| 21 | + bare_jid: str, |
| 22 | + backend_priority_order: Optional[List[str]] = None |
| 23 | +) -> None: |
| 24 | + # pylint: disable=protected-access |
| 25 | + # pylint: disable=fixme |
| 26 | + |
| 27 | + print() |
| 28 | + print("*" * 100) |
| 29 | + print(f"Start of health check for {bare_jid}.") |
| 30 | + print() |
| 31 | + backends: List[Backend] = getattr(session_manager, "_SessionManager__backends") |
| 32 | + |
| 33 | + # Print the list of available backends and their priorities |
| 34 | + available_namespaces = [ backend.namespace for backend in backends ] |
| 35 | + |
| 36 | + if backend_priority_order is not None: |
| 37 | + unavailable_namespaces = frozenset(backend_priority_order) - frozenset(available_namespaces) |
| 38 | + if len(unavailable_namespaces) > 0: |
| 39 | + raise UnknownNamespace( |
| 40 | + f"One or more unavailable namespaces were passed in the backend priority order list:" |
| 41 | + f" {unavailable_namespaces}" |
| 42 | + ) |
| 43 | + |
| 44 | + effective_backend_priority_order = \ |
| 45 | + available_namespaces if backend_priority_order is None else backend_priority_order |
| 46 | + |
| 47 | + print("Available backends by priority:") |
| 48 | + for namespace in effective_backend_priority_order: |
| 49 | + print(f"{namespace}") |
| 50 | + |
| 51 | + for namespace in frozenset(available_namespaces) - frozenset(effective_backend_priority_order): |
| 52 | + print(f"(no priority: {namespace})") |
| 53 | + |
| 54 | + print() |
| 55 | + print("Device list access check:") |
| 56 | + |
| 57 | + # Print whether the device list nodes can be accessed |
| 58 | + device_lists: Dict[str, DeviceList] = {} |
| 59 | + for namespace in effective_backend_priority_order: |
| 60 | + try: |
| 61 | + device_list = await session_manager._download_device_list(namespace, bare_jid) |
| 62 | + if len(device_list) == 0: |
| 63 | + print(f"Device list for backend {namespace} doesn't exist or is empty.") |
| 64 | + else: |
| 65 | + print(f"Device list access for backend {namespace} ok.") |
| 66 | + device_lists[namespace] = device_list |
| 67 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 68 | + print(f"Device list download for backend {namespace} failed: {e}") |
| 69 | + |
| 70 | + if len(device_lists) == 0: |
| 71 | + print() |
| 72 | + print(f"No devices found for {bare_jid}, health check failed.") |
| 73 | + return |
| 74 | + |
| 75 | + print() |
| 76 | + print("Gathering device information...") |
| 77 | + table: List[Dict[str, str]] = [] |
| 78 | + for namespace, device_list in device_lists.items(): |
| 79 | + backend = next(backend for backend in backends if backend.namespace == namespace) |
| 80 | + |
| 81 | + await session_manager.refresh_device_list(namespace, bare_jid) |
| 82 | + device_information = await session_manager.get_device_information(bare_jid) |
| 83 | + |
| 84 | + for device_id, _label in device_list.items(): |
| 85 | + info = next((info for info in device_information if info.device_id == device_id), None) |
| 86 | + |
| 87 | + table_row: Dict[str, str] = {} |
| 88 | + |
| 89 | + table_row["device id"] = str(device_id) |
| 90 | + table_row["namespace"] = namespace |
| 91 | + # TODO: Print whether a label is present per-device and whether there is a valid signature for the |
| 92 | + # label |
| 93 | + # TODO: If a device is listed in both the omemo:1 and omemo:2 device lists, check whether the |
| 94 | + # identity key matches |
| 95 | + |
| 96 | + # Print whether the bundle node for each device in the device list can be accessed |
| 97 | + try: |
| 98 | + await session_manager._download_bundle(namespace, bare_jid, device_id) |
| 99 | + table_row["bundle download ok?"] = "yes" |
| 100 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 101 | + table_row["bundle download ok?"] = f"no: {type(e).__name__}" |
| 102 | + |
| 103 | + # Print whether "full" device information exists, i.e. whether the device is included in |
| 104 | + # get_device_information. |
| 105 | + table_row["full info available?"] = "no" if info is None else "yes" |
| 106 | + |
| 107 | + # Print whether a session exists |
| 108 | + session = await backend.load_session(bare_jid, device_id) |
| 109 | + table_row["session exists?"] = "no" if session is None else "yes" |
| 110 | + |
| 111 | + # Print the trust status of each device |
| 112 | + if info is None: |
| 113 | + table_row["trust status"] = "n.a. (full info missing)" |
| 114 | + else: |
| 115 | + trust_level = await session_manager._evaluate_custom_trust_level(info) |
| 116 | + table_row["trust status"] = f"{info.trust_level_name} ({trust_level})" |
| 117 | + |
| 118 | + table.append(table_row) |
| 119 | + |
| 120 | + if len(table) > 0: |
| 121 | + print() |
| 122 | + print("Device information:") |
| 123 | + ptable = prettytable.PrettyTable() |
| 124 | + for field_name in table[0].keys(): |
| 125 | + ptable.add_column(field_name, [ row[field_name] for row in table ]) |
| 126 | + print(ptable) |
| 127 | + |
| 128 | + print() |
| 129 | + print("Health check complete.") |
| 130 | + print("*" * 100) |
0 commit comments