|
| 1 | +from argparse import ArgumentParser, Namespace |
| 2 | +import logging |
| 3 | +import multiprocessing |
| 4 | +import os |
| 5 | +import signal |
| 6 | +from typing import ( |
| 7 | + Any, |
| 8 | + Callable, |
| 9 | + Dict, |
| 10 | + Iterable, |
| 11 | + Type, |
| 12 | +) |
| 13 | + |
| 14 | +from lahja import ( |
| 15 | + EventBus, |
| 16 | + Endpoint, |
| 17 | +) |
| 18 | + |
| 19 | +from eth.db.backends.base import BaseDB |
| 20 | + |
| 21 | +from trinity.db.manager import get_chaindb_manager |
| 22 | +from trinity.exceptions import ( |
| 23 | + AmbigiousFileSystem, |
| 24 | + MissingPath, |
| 25 | +) |
| 26 | +from trinity.initialization import ( |
| 27 | + initialize_data_dir, |
| 28 | + is_data_dir_initialized, |
| 29 | +) |
| 30 | +from trinity.cli_parser import ( |
| 31 | + parser, |
| 32 | + subparser, |
| 33 | +) |
| 34 | +from trinity.config import ( |
| 35 | + TrinityConfig, |
| 36 | +) |
| 37 | +from trinity.constants import ( |
| 38 | + MAINNET_NETWORK_ID, |
| 39 | + MAIN_EVENTBUS_ENDPOINT, |
| 40 | + ROPSTEN_NETWORK_ID, |
| 41 | +) |
| 42 | +from trinity.extensibility import ( |
| 43 | + BaseManagerProcessScope, |
| 44 | + MainAndIsolatedProcessScope, |
| 45 | + PluginManager, |
| 46 | +) |
| 47 | +from trinity.plugins.registry import ( |
| 48 | + get_all_plugins, |
| 49 | +) |
| 50 | +from trinity.utils.ipc import ( |
| 51 | + kill_process_gracefully, |
| 52 | +) |
| 53 | +from trinity.utils.logging import ( |
| 54 | + enable_warnings_by_default, |
| 55 | + setup_log_levels, |
| 56 | + setup_trinity_stderr_logging, |
| 57 | + setup_trinity_file_and_queue_logging, |
| 58 | + with_queued_logging, |
| 59 | +) |
| 60 | +from trinity.utils.mp import ( |
| 61 | + ctx, |
| 62 | +) |
| 63 | +from trinity.utils.profiling import ( |
| 64 | + setup_cprofiler, |
| 65 | +) |
| 66 | +from trinity.utils.version import ( |
| 67 | + construct_trinity_client_identifier, |
| 68 | + is_prerelease, |
| 69 | +) |
| 70 | + |
| 71 | + |
| 72 | +PRECONFIGURED_NETWORKS = {MAINNET_NETWORK_ID, ROPSTEN_NETWORK_ID} |
| 73 | + |
| 74 | + |
| 75 | +TRINITY_HEADER = "\n".join(( |
| 76 | + "\n" |
| 77 | + r" ______ _ _ __ ", |
| 78 | + r" /_ __/____(_)___ (_) /___ __", |
| 79 | + r" / / / ___/ / __ \/ / __/ / / /", |
| 80 | + r" / / / / / / / / / / /_/ /_/ / ", |
| 81 | + r" /_/ /_/ /_/_/ /_/_/\__/\__, / ", |
| 82 | + r" /____/ ", |
| 83 | +)) |
| 84 | + |
| 85 | +TRINITY_AMBIGIOUS_FILESYSTEM_INFO = ( |
| 86 | + "Could not initialize data directory\n\n" |
| 87 | + " One of these conditions must be met:\n" |
| 88 | + " * HOME environment variable set\n" |
| 89 | + " * XDG_TRINITY_ROOT environment variable set\n" |
| 90 | + " * TRINITY_DATA_DIR environment variable set\n" |
| 91 | + " * --data-dir command line argument is passed\n" |
| 92 | + "\n" |
| 93 | + " In case the data directory is outside of the trinity root directory\n" |
| 94 | + " Make sure all paths are pre-initialized as Trinity won't attempt\n" |
| 95 | + " to create directories outside of the trinity root directory\n" |
| 96 | +) |
| 97 | + |
| 98 | + |
| 99 | +BootFn = Callable[[ |
| 100 | + Namespace, |
| 101 | + TrinityConfig, |
| 102 | + Dict[str, Any], |
| 103 | + PluginManager, |
| 104 | + logging.handlers.QueueListener, |
| 105 | + EventBus, |
| 106 | + Endpoint, |
| 107 | + logging.Logger |
| 108 | +], None] |
| 109 | + |
| 110 | + |
| 111 | +def main_entry(trinity_boot: BootFn) -> None: |
| 112 | + event_bus = EventBus(ctx) |
| 113 | + main_endpoint = event_bus.create_endpoint(MAIN_EVENTBUS_ENDPOINT) |
| 114 | + main_endpoint.connect_no_wait() |
| 115 | + |
| 116 | + plugin_manager = setup_plugins( |
| 117 | + MainAndIsolatedProcessScope(event_bus, main_endpoint) |
| 118 | + ) |
| 119 | + plugin_manager.amend_argparser_config(parser, subparser) |
| 120 | + args = parser.parse_args() |
| 121 | + |
| 122 | + if args.network_id not in PRECONFIGURED_NETWORKS: |
| 123 | + raise NotImplementedError( |
| 124 | + f"Unsupported network id: {args.network_id}. Only the ropsten and mainnet " |
| 125 | + "networks are supported." |
| 126 | + ) |
| 127 | + |
| 128 | + has_ambigous_logging_config = ( |
| 129 | + args.log_levels is not None and |
| 130 | + None in args.log_levels and |
| 131 | + args.stderr_log_level is not None |
| 132 | + ) |
| 133 | + if has_ambigous_logging_config: |
| 134 | + parser.error( |
| 135 | + "\n" |
| 136 | + "Ambiguous logging configuration: The logging level for stderr was " |
| 137 | + "configured with both `--stderr-log-level` and `--log-level`. " |
| 138 | + "Please remove one of these flags", |
| 139 | + ) |
| 140 | + |
| 141 | + if is_prerelease(): |
| 142 | + # this modifies the asyncio logger, but will be overridden by any custom settings below |
| 143 | + enable_warnings_by_default() |
| 144 | + |
| 145 | + stderr_logger, formatter, handler_stream = setup_trinity_stderr_logging( |
| 146 | + args.stderr_log_level or (args.log_levels and args.log_levels.get(None)) |
| 147 | + ) |
| 148 | + |
| 149 | + if args.log_levels: |
| 150 | + setup_log_levels(args.log_levels) |
| 151 | + |
| 152 | + try: |
| 153 | + trinity_config = TrinityConfig.from_parser_args(args) |
| 154 | + except AmbigiousFileSystem: |
| 155 | + parser.error(TRINITY_AMBIGIOUS_FILESYSTEM_INFO) |
| 156 | + |
| 157 | + if not is_data_dir_initialized(trinity_config): |
| 158 | + # TODO: this will only work as is for chains with known genesis |
| 159 | + # parameters. Need to flesh out how genesis parameters for custom |
| 160 | + # chains are defined and passed around. |
| 161 | + try: |
| 162 | + initialize_data_dir(trinity_config) |
| 163 | + except AmbigiousFileSystem: |
| 164 | + parser.error(TRINITY_AMBIGIOUS_FILESYSTEM_INFO) |
| 165 | + except MissingPath as e: |
| 166 | + parser.error( |
| 167 | + "\n" |
| 168 | + f"It appears that {e.path} does not exist. " |
| 169 | + "Trinity does not attempt to create directories outside of its root path. " |
| 170 | + "Either manually create the path or ensure you are using a data directory " |
| 171 | + "inside the XDG_TRINITY_ROOT path" |
| 172 | + ) |
| 173 | + |
| 174 | + file_logger, log_queue, listener = setup_trinity_file_and_queue_logging( |
| 175 | + stderr_logger, |
| 176 | + formatter, |
| 177 | + handler_stream, |
| 178 | + trinity_config.logfile_path, |
| 179 | + args.file_log_level, |
| 180 | + ) |
| 181 | + |
| 182 | + display_launch_logs(trinity_config) |
| 183 | + |
| 184 | + # compute the minimum configured log level across all configured loggers. |
| 185 | + min_configured_log_level = min( |
| 186 | + stderr_logger.level, |
| 187 | + file_logger.level, |
| 188 | + *(args.log_levels or {}).values() |
| 189 | + ) |
| 190 | + |
| 191 | + extra_kwargs = { |
| 192 | + 'log_queue': log_queue, |
| 193 | + 'log_level': min_configured_log_level, |
| 194 | + 'profile': args.profile, |
| 195 | + } |
| 196 | + |
| 197 | + # Plugins can provide a subcommand with a `func` which does then control |
| 198 | + # the entire process from here. |
| 199 | + if hasattr(args, 'func'): |
| 200 | + args.func(args, trinity_config) |
| 201 | + else: |
| 202 | + trinity_boot( |
| 203 | + args, |
| 204 | + trinity_config, |
| 205 | + extra_kwargs, |
| 206 | + plugin_manager, |
| 207 | + listener, |
| 208 | + event_bus, |
| 209 | + main_endpoint, |
| 210 | + stderr_logger, |
| 211 | + ) |
| 212 | + |
| 213 | + |
| 214 | +def setup_plugins(scope: BaseManagerProcessScope) -> PluginManager: |
| 215 | + plugin_manager = PluginManager(scope) |
| 216 | + # TODO: most plugins should check if they are in eth1 / eth2 context |
| 217 | + # and only start when appropriate |
| 218 | + plugin_manager.register(get_all_plugins()) |
| 219 | + |
| 220 | + return plugin_manager |
| 221 | + |
| 222 | + |
| 223 | +def display_launch_logs(trinity_config: TrinityConfig) -> None: |
| 224 | + logger = logging.getLogger('trinity') |
| 225 | + logger.info(TRINITY_HEADER) |
| 226 | + logger.info("Started main process (pid=%d)", os.getpid()) |
| 227 | + logger.info(construct_trinity_client_identifier()) |
| 228 | + logger.info("Trinity DEBUG log file is created at %s", str(trinity_config.logfile_path)) |
| 229 | + |
| 230 | + |
| 231 | +@setup_cprofiler('run_database_process') |
| 232 | +@with_queued_logging |
| 233 | +def run_database_process(trinity_config: TrinityConfig, db_class: Type[BaseDB]) -> None: |
| 234 | + with trinity_config.process_id_file('database'): |
| 235 | + base_db = db_class(db_path=trinity_config.database_dir) |
| 236 | + |
| 237 | + manager = get_chaindb_manager(trinity_config, base_db) |
| 238 | + server = manager.get_server() # type: ignore |
| 239 | + |
| 240 | + def _sigint_handler(*args: Any) -> None: |
| 241 | + server.stop_event.set() |
| 242 | + |
| 243 | + signal.signal(signal.SIGINT, _sigint_handler) |
| 244 | + |
| 245 | + try: |
| 246 | + server.serve_forever() |
| 247 | + except SystemExit: |
| 248 | + server.stop_event.set() |
| 249 | + raise |
| 250 | + |
| 251 | + |
| 252 | +def kill_trinity_gracefully(logger: logging.Logger, |
| 253 | + processes: Iterable[multiprocessing.Process], |
| 254 | + plugin_manager: PluginManager, |
| 255 | + main_endpoint: Endpoint, |
| 256 | + event_bus: EventBus, |
| 257 | + reason: str=None) -> None: |
| 258 | + # When a user hits Ctrl+C in the terminal, the SIGINT is sent to all processes in the |
| 259 | + # foreground *process group*, so both our networking and database processes will terminate |
| 260 | + # at the same time and not sequentially as we'd like. That shouldn't be a problem but if |
| 261 | + # we keep getting unhandled BrokenPipeErrors/ConnectionResetErrors like reported in |
| 262 | + # https://github.com/ethereum/py-evm/issues/827, we might want to change the networking |
| 263 | + # process' signal handler to wait until the DB process has terminated before doing its |
| 264 | + # thing. |
| 265 | + # Notice that we still need the kill_process_gracefully() calls here, for when the user |
| 266 | + # simply uses 'kill' to send a signal to the main process, but also because they will |
| 267 | + # perform a non-gracefull shutdown if the process takes too long to terminate. |
| 268 | + |
| 269 | + hint = f"({reason})" if reason else f"" |
| 270 | + logger.info('Shutting down Trinity %s', hint) |
| 271 | + plugin_manager.shutdown_blocking() |
| 272 | + main_endpoint.stop() |
| 273 | + event_bus.stop() |
| 274 | + for process in processes: |
| 275 | + # Our sub-processes will have received a SIGINT already (see comment above), so here we |
| 276 | + # wait 2s for them to finish cleanly, and if they fail we kill them for real. |
| 277 | + process.join(2) |
| 278 | + if process.is_alive(): |
| 279 | + kill_process_gracefully(process, logger) |
| 280 | + logger.info('%s process (pid=%d) terminated', process.name, process.pid) |
| 281 | + |
| 282 | + ArgumentParser().exit(message=f"Trinity shutdown complete {hint}\n") |
0 commit comments