|
| 1 | +""" |
| 2 | +Custom Manager with event publish support. |
| 3 | +
|
| 4 | +This sample demonstrates how to create a completely new, custom manager |
| 5 | +that runs its own background task to publish ad-hoc UDMI events. |
| 6 | +
|
| 7 | +Key Features Demonstrated: |
| 8 | +1. Inheriting from `BaseManager`. |
| 9 | +2. Using `start_periodic_task` for background logic. |
| 10 | +3. Publishing custom events using `publish_event`. |
| 11 | +4. Handling configuration updates for the custom manager. |
| 12 | +""" |
| 13 | + |
| 14 | +import logging |
| 15 | +import sys |
| 16 | +from datetime import datetime |
| 17 | +from datetime import timezone |
| 18 | + |
| 19 | +from udmi.constants import UDMI_VERSION |
| 20 | +from udmi.core.factory import create_device |
| 21 | +from udmi.core.managers import BaseManager |
| 22 | +from udmi.core.managers import SystemManager |
| 23 | +from udmi.schema import Config |
| 24 | +from udmi.schema import EndpointConfiguration |
| 25 | +from udmi.schema import Entry |
| 26 | +from udmi.schema import State |
| 27 | +from udmi.schema import SystemEvents |
| 28 | + |
| 29 | +# --- Configuration Constants --- |
| 30 | +DEVICE_ID = "AHU-1" |
| 31 | +MQTT_HOSTNAME = "localhost" |
| 32 | +MQTT_PORT = 1883 |
| 33 | +BROKER_USERNAME = "pyudmi-device" |
| 34 | +BROKER_PASSWORD = "somesecureword" |
| 35 | + |
| 36 | +LOGGER = logging.getLogger("CustomManagerSample") |
| 37 | + |
| 38 | + |
| 39 | +# --- 1. Define a Custom Manager --- |
| 40 | + |
| 41 | +class HeartbeatManager(BaseManager): |
| 42 | + """ |
| 43 | + A custom manager that logs a 'heartbeat' event periodically. |
| 44 | +
|
| 45 | + This demonstrates adding new, periodic logic to the device. |
| 46 | + """ |
| 47 | + DEFAULT_INTERVAL_SEC = 10 |
| 48 | + |
| 49 | + @property |
| 50 | + def model_field_name(self) -> str: |
| 51 | + # If we wanted to read metadata, we would define the field name here. |
| 52 | + # e.g. return "heartbeat_config" |
| 53 | + return "custom" |
| 54 | + |
| 55 | + def __init__(self, interval_sec=DEFAULT_INTERVAL_SEC): |
| 56 | + super().__init__() |
| 57 | + self.interval_sec = interval_sec |
| 58 | + # We store a reference to the wake event so we can trigger immediate runs |
| 59 | + self._wake_event = None |
| 60 | + |
| 61 | + def start(self) -> None: |
| 62 | + """ |
| 63 | + Lifecycle hook: Called by the Device when `device.run()` is initiated. |
| 64 | + """ |
| 65 | + LOGGER.info("HeartbeatManager starting...") |
| 66 | + |
| 67 | + # Use the BaseManager's helper to run a periodic task. |
| 68 | + # This handles threading, exception catching, and graceful shutdown automatically. |
| 69 | + self._wake_event = self.start_periodic_task( |
| 70 | + interval_getter=lambda: self.interval_sec, |
| 71 | + task=self._send_heartbeat, |
| 72 | + name="HeartbeatTask" |
| 73 | + ) |
| 74 | + |
| 75 | + def _send_heartbeat(self): |
| 76 | + """ |
| 77 | + The task function. Builds and publishes a UDMI SystemEvents message. |
| 78 | + """ |
| 79 | + LOGGER.info(f"Sending heartbeat (Interval: {self.interval_sec}s)...") |
| 80 | + |
| 81 | + # Create a UDMI-compliant log entry |
| 82 | + log_entry = Entry( |
| 83 | + message="Heartbeat OK", |
| 84 | + level=200, # INFO |
| 85 | + timestamp=datetime.now(timezone.utc).isoformat() |
| 86 | + ) |
| 87 | + |
| 88 | + # Create the SystemEvents message |
| 89 | + event = SystemEvents( |
| 90 | + timestamp=datetime.now(timezone.utc).isoformat(), |
| 91 | + version=UDMI_VERSION, |
| 92 | + logentries=[log_entry] |
| 93 | + ) |
| 94 | + |
| 95 | + # Use the BaseManager's publish_event method. |
| 96 | + # This handles topic formatting ("events/system") and JSON serialization. |
| 97 | + self.publish_event(event, "system") |
| 98 | + |
| 99 | + # --- Handling Config Updates --- |
| 100 | + |
| 101 | + def handle_config(self, config: Config) -> None: |
| 102 | + """ |
| 103 | + Optional: React to configuration changes. |
| 104 | + Here we demonstrate checking a custom field in the 'system' config |
| 105 | + to dynamically change the heartbeat rate. |
| 106 | + """ |
| 107 | + # In a real scenario, you might have your own config block (e.g. config.heartbeat) |
| 108 | + # For this demo, we'll check system.metrics_rate_sec as a proxy, |
| 109 | + # or just log that we saw a config. |
| 110 | + if config.system and config.system.metrics_rate_sec: |
| 111 | + new_rate = config.system.metrics_rate_sec |
| 112 | + if new_rate != self.interval_sec: |
| 113 | + LOGGER.info(f"Updating heartbeat interval: {self.interval_sec} -> {new_rate}") |
| 114 | + self.interval_sec = new_rate |
| 115 | + # Wake up the thread immediately to apply the new rate |
| 116 | + if self._wake_event: |
| 117 | + self._wake_event.set() |
| 118 | + |
| 119 | + # --- Required Abstract Methods --- |
| 120 | + |
| 121 | + def handle_command(self, command_name: str, payload: dict) -> None: |
| 122 | + """This manager doesn't handle any commands.""" |
| 123 | + pass |
| 124 | + |
| 125 | + def update_state(self, state: State) -> None: |
| 126 | + """This manager doesn't contribute to the device state.""" |
| 127 | + pass |
| 128 | + |
| 129 | + |
| 130 | +# --- End Custom Manager --- |
| 131 | + |
| 132 | + |
| 133 | +if __name__ == "__main__": |
| 134 | + logging.basicConfig(level=logging.INFO, |
| 135 | + format='%(asctime)s - %(levelname)s - %(message)s') |
| 136 | + |
| 137 | + try: |
| 138 | + # Standard setup for a local broker |
| 139 | + topic_prefix = "/r/ZZ-TRI-FECTA/d/" |
| 140 | + client_id = f"{topic_prefix}{DEVICE_ID}" |
| 141 | + |
| 142 | + endpoint_config = EndpointConfiguration.from_dict({ |
| 143 | + "client_id": client_id, |
| 144 | + "hostname": MQTT_HOSTNAME, |
| 145 | + "port": MQTT_PORT, |
| 146 | + "topic_prefix": topic_prefix, |
| 147 | + "auth_provider": { |
| 148 | + "basic": { |
| 149 | + "username": BROKER_USERNAME, |
| 150 | + "password": BROKER_PASSWORD |
| 151 | + } |
| 152 | + } |
| 153 | + }) |
| 154 | + |
| 155 | + LOGGER.info("Creating device with SystemManager + HeartbeatManager...") |
| 156 | + |
| 157 | + # --- 2. Create Manager List --- |
| 158 | + # We explicitly list the managers we want. |
| 159 | + # We include SystemManager for basic device health/lifecycle. |
| 160 | + # We include our HeartbeatManager for the custom logic. |
| 161 | + custom_managers_list = [ |
| 162 | + SystemManager(), |
| 163 | + HeartbeatManager(interval_sec=5) |
| 164 | + ] |
| 165 | + |
| 166 | + # 3. Create the Device |
| 167 | + device = create_device( |
| 168 | + endpoint_config=endpoint_config, |
| 169 | + managers=custom_managers_list, |
| 170 | + ) |
| 171 | + |
| 172 | + # 4. Start the device |
| 173 | + # This will start the device's main loop, which in turn calls |
| 174 | + # `start()` on all registered managers. |
| 175 | + device.run() |
| 176 | + |
| 177 | + except KeyboardInterrupt: |
| 178 | + LOGGER.info("Stopped by user.") |
| 179 | + except Exception as e: |
| 180 | + LOGGER.error(f"A critical error occurred: {e}", exc_info=True) |
| 181 | + sys.exit(1) |
| 182 | + |
| 183 | + LOGGER.info("Device shut down gracefully. Exiting.") |
0 commit comments