|
| 1 | +""" |
| 2 | +Side Input Example for pynumaflow-lite. |
| 3 | +
|
| 4 | +This module contains both a SideInput retriever and a Mapper that reads from side inputs. |
| 5 | +The mode is controlled by the MAPPER environment variable: |
| 6 | +- If MAPPER is set to "true", runs as a Mapper that reads side input files |
| 7 | +- Otherwise, runs as a SideInput retriever that broadcasts values |
| 8 | +""" |
| 9 | +import asyncio |
| 10 | +import os |
| 11 | +import signal |
| 12 | +import threading |
| 13 | +from threading import Thread |
| 14 | +import datetime |
| 15 | + |
| 16 | +from pynumaflow_lite import sideinputer, mapper |
| 17 | +from watchfiles import watch |
| 18 | + |
| 19 | + |
| 20 | +class ExampleSideInput(sideinputer.SideInput): |
| 21 | + """ |
| 22 | + A SideInput retriever that broadcasts a timestamp message every time. |
| 23 | + """ |
| 24 | + |
| 25 | + def __init__(self): |
| 26 | + self.counter = 0 |
| 27 | + |
| 28 | + async def retrieve_handler(self) -> sideinputer.Response: |
| 29 | + """ |
| 30 | + This function is called every time the side input is requested. |
| 31 | + """ |
| 32 | + time_now = datetime.datetime.now() |
| 33 | + # val is the value to be broadcasted |
| 34 | + val = f"an example: {str(time_now)}" |
| 35 | + self.counter += 1 |
| 36 | + # broadcast_message() is used to indicate that there is a broadcast |
| 37 | + return sideinputer.Response.broadcast_message(val.encode("utf-8")) |
| 38 | + |
| 39 | + |
| 40 | +class SideInputHandler(mapper.Mapper): |
| 41 | + """ |
| 42 | + A Mapper that reads from side input files and includes the value in its output. |
| 43 | + """ |
| 44 | + |
| 45 | + # variable and lock for thread safety |
| 46 | + data_value = "no_value" |
| 47 | + data_value_lock = threading.Lock() |
| 48 | + |
| 49 | + # Side input file that we are watching |
| 50 | + watched_file = "myticker" |
| 51 | + |
| 52 | + async def handler(self, keys: list[str], datum: mapper.Datum) -> mapper.Messages: |
| 53 | + with self.data_value_lock: |
| 54 | + current_value = self.data_value |
| 55 | + |
| 56 | + messages = mapper.Messages() |
| 57 | + messages.append(mapper.Message(str.encode(current_value))) |
| 58 | + return messages |
| 59 | + |
| 60 | + def file_watcher(self): |
| 61 | + """ |
| 62 | + This function is used to watch the side input directory for changes. |
| 63 | + """ |
| 64 | + path = sideinputer.DIR_PATH |
| 65 | + for changes in watch(path): |
| 66 | + for change in changes: |
| 67 | + change_type, file_path = change |
| 68 | + if file_path.endswith(self.watched_file): |
| 69 | + with self.data_value_lock: |
| 70 | + self.update_data_from_file(file_path) |
| 71 | + |
| 72 | + def init_data_value(self): |
| 73 | + """Read the SIDE INPUT FILE for initial value before starting the server.""" |
| 74 | + path = os.path.join(sideinputer.DIR_PATH, self.watched_file) |
| 75 | + print(f"Initializing side input from: {path}") |
| 76 | + self.update_data_from_file(path) |
| 77 | + |
| 78 | + def update_data_from_file(self, path): |
| 79 | + try: |
| 80 | + with open(path) as file: |
| 81 | + value = file.read().strip() |
| 82 | + self.data_value = value |
| 83 | + print(f"Data value variable set to: {self.data_value}") |
| 84 | + except Exception as e: |
| 85 | + print(f"Error reading file: {e}") |
| 86 | + |
| 87 | + |
| 88 | +# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly. |
| 89 | +signal.signal(signal.SIGINT, signal.default_int_handler) |
| 90 | +try: |
| 91 | + signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| 92 | +except AttributeError: |
| 93 | + pass |
| 94 | + |
| 95 | + |
| 96 | +async def start_sideinput(): |
| 97 | + """Start the SideInput retriever server.""" |
| 98 | + server = sideinputer.SideInputAsyncServer() |
| 99 | + side_input = ExampleSideInput() |
| 100 | + |
| 101 | + loop = asyncio.get_running_loop() |
| 102 | + loop.add_signal_handler(signal.SIGINT, lambda: server.stop()) |
| 103 | + loop.add_signal_handler(signal.SIGTERM, lambda: server.stop()) |
| 104 | + |
| 105 | + try: |
| 106 | + await server.start(side_input) |
| 107 | + print("SideInput server shutting down gracefully...") |
| 108 | + except asyncio.CancelledError: |
| 109 | + server.stop() |
| 110 | + |
| 111 | + |
| 112 | +async def start_mapper(): |
| 113 | + """Start the Mapper server that reads from side inputs.""" |
| 114 | + server = mapper.MapAsyncServer() |
| 115 | + handler = SideInputHandler() |
| 116 | + |
| 117 | + # Initialize the data value from the side input file |
| 118 | + handler.init_data_value() |
| 119 | + |
| 120 | + # Start the file watcher in a background thread |
| 121 | + watcher_thread = Thread(target=handler.file_watcher, daemon=True) |
| 122 | + watcher_thread.start() |
| 123 | + |
| 124 | + loop = asyncio.get_running_loop() |
| 125 | + loop.add_signal_handler(signal.SIGINT, lambda: server.stop()) |
| 126 | + loop.add_signal_handler(signal.SIGTERM, lambda: server.stop()) |
| 127 | + |
| 128 | + try: |
| 129 | + await server.start(handler) |
| 130 | + print("Mapper server shutting down gracefully...") |
| 131 | + except asyncio.CancelledError: |
| 132 | + server.stop() |
| 133 | + |
| 134 | + |
| 135 | +if __name__ == "__main__": |
| 136 | + # Check if we should run as a mapper or side input retriever |
| 137 | + is_mapper = os.environ.get("MAPPER", "").lower() == "true" |
| 138 | + |
| 139 | + if is_mapper: |
| 140 | + print("Starting as Mapper (reading side inputs)...") |
| 141 | + asyncio.run(start_mapper()) |
| 142 | + else: |
| 143 | + print("Starting as SideInput retriever...") |
| 144 | + asyncio.run(start_sideinput()) |
| 145 | + |
0 commit comments