|
1 | | -#!/usr/bin/env python3 |
2 | | -"""IRCStream — MediaWiki RecentChanges → IRC gateway. |
| 1 | +"""IRC Server component. |
3 | 2 |
|
4 | | -IRCStream is a simple gateway to the MediaWiki recent changes feed, from the |
5 | | -IRC protocol. It was written mainly for compatibility reasons, as there are a |
6 | | -number of legacy clients in the wild relying on this interface. |
| 3 | +This implements an IRC Server, capable of broadcasting messages to clients |
| 4 | +through a "fake" IRC bot. |
| 5 | +
|
| 6 | +Implements all the relevant bits of the IRC protocol, including a few IRCv3 |
| 7 | +extensions. |
7 | 8 | """ |
8 | 9 |
|
9 | | -# Copyright © Faidon Liambotis |
10 | | -# Copyright © Wikimedia Foundation, Inc. |
11 | | -# |
12 | | -# Licensed under the Apache License, Version 2.0 (the "License"); |
13 | | -# you may not use this file except in compliance with the License. |
14 | | -# You may obtain a copy of the License at |
15 | | -# |
16 | | -# http://www.apache.org/licenses/LICENSE-2.0 |
17 | | -# |
18 | | -# Unless required by applicable law or agreed to in writing, software |
19 | | -# distributed under the License is distributed on an "AS IS" BASIS, |
20 | | -# WITHOUT WARRANTIES OR CONDITIONS OF ANY CODE, either express or implied. |
21 | | -# See the License for the specific language governing permissions and |
22 | | -# limitations under the License. |
23 | | -# |
| 10 | +# SPDX-FileCopyrightText: Faidon Liambotis |
| 11 | +# SPDX-FileCopyrightText: Wikimedia Foundation |
24 | 12 | # SPDX-License-Identifier: Apache-2.0 |
25 | 13 |
|
26 | 14 | from __future__ import annotations |
27 | 15 |
|
28 | | -__version__ = "0.12.0.dev0" |
29 | | - |
30 | | -import argparse |
31 | 16 | import asyncio |
32 | 17 | import configparser |
33 | 18 | import dataclasses |
34 | 19 | import datetime |
35 | 20 | import enum |
36 | 21 | import errno |
37 | | -import http.server |
38 | | -import logging |
39 | | -import pathlib |
40 | 22 | import re |
41 | 23 | import socket |
42 | | -import sys |
43 | 24 | from collections.abc import Iterable, Sequence |
44 | 25 | from typing import Any |
45 | 26 |
|
46 | 27 | import prometheus_client |
47 | 28 | import structlog |
48 | 29 | from prometheus_client import Counter, Gauge |
49 | 30 |
|
| 31 | +from ._version import __version__ |
| 32 | + |
50 | 33 |
|
51 | 34 | class IRCNumeric(enum.Enum): |
52 | 35 | """Base class for IRC numeric enums.""" |
@@ -833,194 +816,3 @@ async def broadcast(self, target: str, msg: str) -> None: |
833 | 816 | self.log.debug("Unable to broadcast", exc_info=True) |
834 | 817 | continue # ignore exceptions, to catch corner cases |
835 | 818 | self.metrics["messages"].inc() |
836 | | - |
837 | | - |
838 | | -class RC2UDPHandler(asyncio.Protocol): |
839 | | - """A handler implementing the RC2UDP protocol, as used by MediaWiki.""" |
840 | | - |
841 | | - log = structlog.get_logger("ircstream.rc2udp") |
842 | | - |
843 | | - def __init__(self, server: RC2UDPServer) -> None: |
844 | | - self.server = server |
845 | | - self.running_tasks: set[asyncio.Task[Any]] = set() |
846 | | - |
847 | | - def datagram_received(self, data: bytes, _: tuple[str, int]) -> None: |
848 | | - """Receive a new RC2UDP message and broadcast to all clients.""" |
849 | | - try: |
850 | | - decoded = data.decode("utf8") |
851 | | - channel, text = decoded.split("\t", maxsplit=1) |
852 | | - channel = channel.strip() |
853 | | - text = text.lstrip().replace("\r", "").replace("\n", "") |
854 | | - except Exception: # pylint: disable=broad-except |
855 | | - self.server.ircserver.metrics["errors"].labels("rc2udp-parsing").inc() |
856 | | - return |
857 | | - |
858 | | - self.log.debug("Broadcasting message", channel=channel, message=text) |
859 | | - task = asyncio.create_task(self.server.ircserver.broadcast(channel, text)) |
860 | | - self.running_tasks.add(task) |
861 | | - task.add_done_callback(self.running_tasks.discard) |
862 | | - |
863 | | - |
864 | | -class RC2UDPServer: # pylint: disable=too-few-public-methods |
865 | | - """A server implementing the RC2UDP protocol, as used by MediaWiki.""" |
866 | | - |
867 | | - log = structlog.get_logger("ircstream.rc2udp") |
868 | | - |
869 | | - def __init__(self, config: configparser.SectionProxy, ircserver: IRCServer) -> None: |
870 | | - self.ircserver = ircserver |
871 | | - self.address = config.get("listen_address", fallback="::") |
872 | | - self.port = config.getint("listen_port", fallback=9390) |
873 | | - |
874 | | - async def serve(self) -> None: |
875 | | - """Create a new socket, listen to it and serve requests.""" |
876 | | - loop = asyncio.get_running_loop() |
877 | | - local_addr = (self.address, self.port) |
878 | | - transport, _ = await loop.create_datagram_endpoint(lambda: RC2UDPHandler(self), local_addr=local_addr) |
879 | | - local_addr = transport.get_extra_info("sockname")[:2] |
880 | | - self.address, self.port = local_addr # update address/port based on what bind() returned |
881 | | - self.log.info("Listening for RC2UDP broadcast", listen_address=self.address, listen_port=self.port) |
882 | | - |
883 | | - |
884 | | -class PrometheusServer(http.server.ThreadingHTTPServer): |
885 | | - """A Prometheus HTTP server.""" |
886 | | - |
887 | | - log = structlog.get_logger("ircstream.prometheus") |
888 | | - daemon_threads = True |
889 | | - allow_reuse_address = True |
890 | | - |
891 | | - def __init__( |
892 | | - self, |
893 | | - config: configparser.SectionProxy, |
894 | | - registry: prometheus_client.CollectorRegistry = prometheus_client.REGISTRY, |
895 | | - ) -> None: |
896 | | - listen_address = config.get("listen_address", fallback="::") |
897 | | - if ":" in listen_address: |
898 | | - self.address_family = socket.AF_INET6 |
899 | | - listen_port = config.getint("listen_port", fallback=9200) |
900 | | - super().__init__((listen_address, listen_port), prometheus_client.MetricsHandler.factory(registry)) |
901 | | - # update address/port based on what bind() returned |
902 | | - self.address, self.port = str(self.server_address[0]), self.server_address[1] |
903 | | - self.log.info("Listening for Prometheus HTTP", listen_address=self.address, listen_port=self.port) |
904 | | - |
905 | | - def server_bind(self) -> None: |
906 | | - """Bind to an IP address. |
907 | | -
|
908 | | - Override to set an opt to listen to both IPv4/IPv6 on the same socket. |
909 | | - """ |
910 | | - if self.address_family == socket.AF_INET6: |
911 | | - self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) |
912 | | - super().server_bind() |
913 | | - |
914 | | - |
915 | | -def parse_args(argv: Sequence[str] | None) -> argparse.Namespace: |
916 | | - """Parse and return the parsed command line arguments.""" |
917 | | - parser = argparse.ArgumentParser( |
918 | | - prog="ircstream", |
919 | | - description="MediaWiki RecentChanges → IRC gateway", |
920 | | - formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
921 | | - ) |
922 | | - cfg_dflt = pathlib.Path("ircstream.conf") |
923 | | - if not cfg_dflt.exists(): |
924 | | - cfg_dflt = pathlib.Path("/etc/ircstream.conf") |
925 | | - parser.add_argument("--config-file", "-c", type=pathlib.Path, default=cfg_dflt, help="Path to configuration file") |
926 | | - |
927 | | - log_levels = ("DEBUG", "INFO", "WARNING", "ERROR") # no public method to get a list from logging :( |
928 | | - parser.add_argument("--log-level", default="INFO", choices=log_levels, type=str.upper, help="Log level") |
929 | | - log_formats = ("plain", "console", "json") |
930 | | - log_dflt = "console" if sys.stdout.isatty() else "plain" |
931 | | - parser.add_argument("--log-format", default=log_dflt, choices=log_formats, help="Log format") |
932 | | - return parser.parse_args(argv) |
933 | | - |
934 | | - |
935 | | -def configure_logging(log_format: str = "plain") -> None: |
936 | | - """Configure logging parameters.""" |
937 | | - logging.basicConfig(format="%(message)s", level=logging.WARNING) |
938 | | - |
939 | | - processors: list[structlog.types.Processor] = [ |
940 | | - structlog.stdlib.filter_by_level, |
941 | | - structlog.processors.StackInfoRenderer(), |
942 | | - structlog.processors.format_exc_info, |
943 | | - ] |
944 | | - if log_format == "plain": |
945 | | - processors += [structlog.dev.ConsoleRenderer(colors=False)] |
946 | | - elif log_format == "console": |
947 | | - # >= 20.2.0 has this in the default config |
948 | | - processors = [structlog.stdlib.add_log_level] + structlog.get_config()["processors"] |
949 | | - elif log_format == "json": |
950 | | - processors += [ |
951 | | - structlog.stdlib.add_logger_name, # adds a "logger" key |
952 | | - structlog.stdlib.add_log_level, # adds a "level" key (string, not int) |
953 | | - structlog.processors.TimeStamper(fmt="iso"), |
954 | | - structlog.processors.JSONRenderer(sort_keys=True), |
955 | | - ] |
956 | | - else: |
957 | | - raise ValueError(f"Invalid logging format specified: {log_format}") |
958 | | - |
959 | | - structlog.configure( |
960 | | - processors=processors, |
961 | | - logger_factory=structlog.stdlib.LoggerFactory(), |
962 | | - wrapper_class=structlog.stdlib.BoundLogger, |
963 | | - ) |
964 | | - |
965 | | - |
966 | | -async def start_servers(config: configparser.ConfigParser) -> None: |
967 | | - """Start all servers in asyncio tasks or threads and then busy-loop.""" |
968 | | - log = structlog.get_logger("ircstream.main") |
969 | | - loop = asyncio.get_running_loop() |
970 | | - |
971 | | - try: |
972 | | - if "irc" in config: |
973 | | - ircserver = IRCServer(config["irc"]) |
974 | | - irc_coro = ircserver.serve() |
975 | | - irc_task = asyncio.create_task(irc_coro) |
976 | | - else: |
977 | | - log.critical('Invalid configuration, missing section "irc"') |
978 | | - raise SystemExit(-1) |
979 | | - |
980 | | - if "rc2udp" in config: |
981 | | - rc2udp_coro = RC2UDPServer(config["rc2udp"], ircserver).serve() |
982 | | - rc2udp_task = asyncio.create_task(rc2udp_coro) # noqa: F841 pylint: disable=unused-variable |
983 | | - else: |
984 | | - log.warning("RC2UDP is not enabled in the config; server usefulness may be limited") |
985 | | - |
986 | | - if "prometheus" in config: |
987 | | - prom_server = PrometheusServer(config["prometheus"], ircserver.metrics_registry) |
988 | | - prom_server.socket.setblocking(False) |
989 | | - loop.add_reader(prom_server.socket, prom_server.handle_request) |
990 | | - |
991 | | - await asyncio.wait_for(irc_task, timeout=None) # run forever |
992 | | - except OSError as exc: |
993 | | - log.critical(f"System error: {exc.strerror}", errno=errno.errorcode[exc.errno]) |
994 | | - raise SystemExit(-2) from exc |
995 | | - |
996 | | - |
997 | | -def main(argv: Sequence[str] | None = None) -> None: |
998 | | - """Entry point.""" |
999 | | - options = parse_args(argv) |
1000 | | - |
1001 | | - configure_logging(options.log_format) |
1002 | | - # only set e.g. INFO or DEBUG for our own loggers |
1003 | | - structlog.get_logger("ircstream").setLevel(options.log_level) |
1004 | | - log = structlog.get_logger("ircstream.main") |
1005 | | - log.info("Starting IRCStream", config_file=str(options.config_file), version=__version__) |
1006 | | - |
1007 | | - config = configparser.ConfigParser(strict=True) |
1008 | | - try: |
1009 | | - with options.config_file.open(encoding="utf-8") as config_fh: |
1010 | | - config.read_file(config_fh) |
1011 | | - except OSError as exc: |
1012 | | - log.critical(f"Cannot open configuration file: {exc.strerror}", errno=errno.errorcode[exc.errno]) |
1013 | | - raise SystemExit(-1) from exc |
1014 | | - except configparser.Error as exc: |
1015 | | - msg = repr(exc).replace("\n", " ") # configparser exceptions sometimes include newlines |
1016 | | - log.critical(f"Invalid configuration, {msg}") |
1017 | | - raise SystemExit(-1) from exc |
1018 | | - |
1019 | | - try: |
1020 | | - asyncio.run(start_servers(config)) |
1021 | | - except KeyboardInterrupt: |
1022 | | - pass |
1023 | | - |
1024 | | - |
1025 | | -if __name__ == "__main__": |
1026 | | - main() |
0 commit comments