diff --git a/adsb2influx.py b/adsb2influx.py index 5735747..329e82d 100644 --- a/adsb2influx.py +++ b/adsb2influx.py @@ -2,12 +2,15 @@ # -*- coding: utf-8 -*- # Imports +import os import signal import argparse import logging import socket import re import time +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS @@ -177,6 +180,7 @@ def __init__(self, host, port): self.port = port self.socket = None self.data = '' + self.connected = False def connect(self): log.info('Connecting to SBS host on {}:{}'.format(self.host, self.port)) @@ -194,9 +198,15 @@ def connect(self): self.socket.setblocking(False) self.socket.settimeout(1) + self.connected = True def disconnect(self): - self.socket.close() + try: + if self.socket: + self.socket.close() + except Exception: + pass + self.connected = False log.info('Disconnected from SBS host') def receive(self): @@ -223,6 +233,7 @@ class InfluxDB(object): def __init__(self, url, token, org, bucket): self.org = org self.bucket = bucket + self.connected = False self.client = InfluxDBClient( url = url, @@ -234,19 +245,87 @@ def __init__(self, url, token, org, bucket): write_options = SYNCHRONOUS ) + # Best-effort check for connection/health. + try: + # prefer ping if available + if hasattr(self.client, "ping"): + try: + self.connected = bool(self.client.ping()) + except Exception: + self.connected = True + elif hasattr(self.client, "health"): + try: + health = self.client.health() + status = getattr(health, 'status', None) + self.connected = (status and str(status).lower() in ('pass', 'passive', 'ok', 'healthy')) + except Exception: + self.connected = True + else: + # unknown client version; assume OK for now + self.connected = True + except Exception as e: + log.warning('Could not verify InfluxDB connection: {}'.format(e)) + self.connected = False + + def is_connected(self): + return self.connected + def write(self, data): log.debug('Write data to InfluxDB: {}'.format(data)) try: self.writeapi.write(self.bucket, self.org, data) log.info('Written {} aircraft to InfluxDB'.format(len(data))) + self.connected = True except ApiException as e: log.error('Writing data to InfluxDB failed, status code: {} {}'.format(e.status, e.reason)) + self.connected = False + return False + except Exception as e: + log.error('Writing data to InfluxDB failed: {}'.format(e)) + self.connected = False return False return True +# Health HTTP server +class _HealthHandler(BaseHTTPRequestHandler): + # check_fn will be set as a staticmethod before server start + check_fn = staticmethod(lambda: False) + + def do_GET(self): + try: + ok = bool(self.check_fn()) + except Exception: + ok = False + + if ok: + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + self.wfile.write(b'OK') + else: + self.send_response(503) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + self.wfile.write(b'UNHEALTHY') + + def log_message(self, format, *args): + # suppress default HTTP logging to avoid noisy output + log.debug("HealthHandler: " + (format % args)) + + +def start_health_server(port, check_fn): + handler = _HealthHandler + handler.check_fn = staticmethod(check_fn) + server = HTTPServer(('0.0.0.0', port), handler) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server + + # Main def main(): parser = argparse.ArgumentParser( @@ -309,6 +388,20 @@ def main(): action = 'store_true', help = 'set logging to debug level' ) + parser.add_argument( + '--health-enable', + dest = 'health_enable', + action = 'store_true', + default = os.environ.get('HEALTH_ENABLE', 'false').lower() == 'true', + help = 'enable HTTP health endpoint for Docker (default: disabled, or enabled via HEALTH_ENABLE env)' + ) + parser.add_argument( + '--health-port', + dest = 'health_port', + type = int, + default = int(os.environ.get('HEALTH_PORT', 8080)), + help = 'Port for HTTP health endpoint (default: 8080, or HEALTH_PORT env)' + ) args = parser.parse_args() @@ -338,6 +431,18 @@ def main(): bucket = args.influxdb_bucket ) + # Health server + health_server = None + if args.health_enable: + def _check(): + return getattr(sbs, 'connected', False) and getattr(influxdb, 'is_connected', lambda: False)() + try: + health_server = start_health_server(args.health_port, _check) + log.info('Health endpoint listening on 0.0.0.0:{}'.format(args.health_port)) + except Exception as e: + log.error('Could not start health server on port {}: {}'.format(args.health_port, e)) + health_server = None + measurement = args.influxdb_measurement send_interval = args.send_interval @@ -402,7 +507,18 @@ def main(): # Exit - sbs.disconnect() + try: + sbs.disconnect() + except Exception: + pass + + if health_server: + try: + health_server.shutdown() + health_server.server_close() + log.info('Health server shut down') + except Exception as e: + log.warning('Error shutting down health server: {}'.format(e)) if __name__ == "__main__":