Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 118 additions & 2 deletions adsb2influx.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
# -*- coding: utf-8 -*-

# Imports
import os
import signal
import argparse
import logging
import socket
import re
import time
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
Expand Down Expand Up @@ -177,6 +180,7 @@ def __init__(self, host, port):
self.port = port
self.socket = None
self.data = ''
self.connected = False

def connect(self):
log.info('Connecting to SBS host on {}:{}'.format(self.host, self.port))
Expand All @@ -194,9 +198,15 @@ def connect(self):

self.socket.setblocking(False)
self.socket.settimeout(1)
self.connected = True

def disconnect(self):
self.socket.close()
try:
if self.socket:
self.socket.close()
except Exception:
pass
self.connected = False
log.info('Disconnected from SBS host')

def receive(self):
Expand All @@ -223,6 +233,7 @@ class InfluxDB(object):
def __init__(self, url, token, org, bucket):
self.org = org
self.bucket = bucket
self.connected = False

self.client = InfluxDBClient(
url = url,
Expand All @@ -234,19 +245,87 @@ def __init__(self, url, token, org, bucket):
write_options = SYNCHRONOUS
)

# Best-effort check for connection/health.
try:
# prefer ping if available
if hasattr(self.client, "ping"):
try:
self.connected = bool(self.client.ping())
except Exception:
self.connected = True
elif hasattr(self.client, "health"):
try:
health = self.client.health()
status = getattr(health, 'status', None)
self.connected = (status and str(status).lower() in ('pass', 'passive', 'ok', 'healthy'))
except Exception:
self.connected = True
else:
# unknown client version; assume OK for now
self.connected = True
except Exception as e:
log.warning('Could not verify InfluxDB connection: {}'.format(e))
self.connected = False

def is_connected(self):
return self.connected

def write(self, data):
log.debug('Write data to InfluxDB: {}'.format(data))

try:
self.writeapi.write(self.bucket, self.org, data)
log.info('Written {} aircraft to InfluxDB'.format(len(data)))
self.connected = True
except ApiException as e:
log.error('Writing data to InfluxDB failed, status code: {} {}'.format(e.status, e.reason))
self.connected = False
return False
except Exception as e:
log.error('Writing data to InfluxDB failed: {}'.format(e))
self.connected = False
return False

return True


# Health HTTP server
class _HealthHandler(BaseHTTPRequestHandler):
# check_fn will be set as a staticmethod before server start
check_fn = staticmethod(lambda: False)

def do_GET(self):
try:
ok = bool(self.check_fn())
except Exception:
ok = False

if ok:
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'OK')
else:
self.send_response(503)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'UNHEALTHY')

def log_message(self, format, *args):
# suppress default HTTP logging to avoid noisy output
log.debug("HealthHandler: " + (format % args))


def start_health_server(port, check_fn):
handler = _HealthHandler
handler.check_fn = staticmethod(check_fn)
server = HTTPServer(('0.0.0.0', port), handler)

thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server


# Main
def main():
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -309,6 +388,20 @@ def main():
action = 'store_true',
help = 'set logging to debug level'
)
parser.add_argument(
'--health-enable',
dest = 'health_enable',
action = 'store_true',
default = os.environ.get('HEALTH_ENABLE', 'false').lower() == 'true',
help = 'enable HTTP health endpoint for Docker (default: disabled, or enabled via HEALTH_ENABLE env)'
)
parser.add_argument(
'--health-port',
dest = 'health_port',
type = int,
default = int(os.environ.get('HEALTH_PORT', 8080)),
help = 'Port for HTTP health endpoint (default: 8080, or HEALTH_PORT env)'
)

args = parser.parse_args()

Expand Down Expand Up @@ -338,6 +431,18 @@ def main():
bucket = args.influxdb_bucket
)

# Health server
health_server = None
if args.health_enable:
def _check():
return getattr(sbs, 'connected', False) and getattr(influxdb, 'is_connected', lambda: False)()
try:
health_server = start_health_server(args.health_port, _check)
log.info('Health endpoint listening on 0.0.0.0:{}'.format(args.health_port))
except Exception as e:
log.error('Could not start health server on port {}: {}'.format(args.health_port, e))
health_server = None


measurement = args.influxdb_measurement
send_interval = args.send_interval
Expand Down Expand Up @@ -402,7 +507,18 @@ def main():


# Exit
sbs.disconnect()
try:
sbs.disconnect()
except Exception:
pass

if health_server:
try:
health_server.shutdown()
health_server.server_close()
log.info('Health server shut down')
except Exception as e:
log.warning('Error shutting down health server: {}'.format(e))


if __name__ == "__main__":
Expand Down