From 0ee0ac0e63c5ab040047ef9eae367c1ea7d3cb8d Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Wed, 28 Mar 2018 22:34:03 +0100 Subject: [PATCH 1/3] simple udp proxy --- docker-compose.yml | 10 +++++ proxy/Dockerfile | 11 ++++++ proxy/main.py | 92 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+) create mode 100644 proxy/Dockerfile create mode 100755 proxy/main.py diff --git a/docker-compose.yml b/docker-compose.yml index 63464d3..26d1aec 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,6 +51,16 @@ services: depends_on: - postgres + proxy: + build: proxy + environment: + APP_SIP_HOST: ${APP_SIP_HOST} + APP_SIP_PORT: ${APP_SIP_PORT} + ports: + - 8060:8060/udp + depends_on: + - logs + backend: environment: APP_PG_HOST: postgres diff --git a/proxy/Dockerfile b/proxy/Dockerfile new file mode 100644 index 0000000..c274ef6 --- /dev/null +++ b/proxy/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.6-alpine + +RUN apk --update --no-cache add gcc g++ musl-dev libuv libffi-dev make postgresql-dev \ + && rm -rf /var/cache/apk/* + +ADD ./main.py /home/root/main.py + +ENV IMAGE_NAME mithra_proxy +WORKDIR /home/root +ENV PYTHONUNBUFFERED 1 +ENTRYPOINT ["./main.py"] diff --git a/proxy/main.py b/proxy/main.py new file mode 100755 index 0000000..b1df895 --- /dev/null +++ b/proxy/main.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3.6 +import asyncio +import os +import signal + +SIP_HOST = os.getenv('APP_SIP_HOST') +SIP_PORT = os.getenv('APP_SIP_PORT') +REMOTE_ADDR = SIP_HOST, SIP_PORT + +PROXY_HOST = '0.0.0.0' +PROXY_PORT = SIP_PORT +PROXY_ADDR = PROXY_HOST, PROXY_PORT + + +def print_data(direction, data): + # print(f'{direction} {"=" * 78}\n{data.decode()}\n{"=" * 80}') + print(f'{direction} ', data.decode().split('\n', 1)[0]) + + +class RemoteDatagramProtocol: + def __init__(self, proxy, addr, data): + self.proxy = proxy + self.addr = addr + self.data = data + self.transport = None + + def connection_made(self, transport): + self.transport = transport + print_data('▲', self.data) + self.transport.sendto(self.data) + + def datagram_received(self, data, _): + print_data('▼', data) + self.proxy.transport.sendto(data, self.addr) + + def connection_lost(self, exc): + self.proxy.remotes.pop(self.addr) + + def error_received(self, exc): + print(f'remote error received: {exc}') + + +class ProxyDatagramProtocol: + def __init__(self): + self.remotes = {} + self.transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + if addr in self.remotes: + print_data('▲', data) + self.remotes[addr].transport.sendto(data) + else: + loop = asyncio.get_event_loop() + self.remotes[addr] = RemoteDatagramProtocol(self, addr, data) + loop.create_task( + loop.create_datagram_endpoint(lambda: self.remotes[addr], remote_addr=REMOTE_ADDR) + ) + + def error_received(self, exc): + print(f'proxy error received: {exc}') + + def connection_lost(self, exc): + print(f'proxy connection lost: {exc}') + + +def stop(reason): + print('') # leaves the ^C on it's own line + raise KeyboardInterrupt(f'reason "{reason or "unknown"}"') + + +async def start_datagram_proxy(): + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, stop, 'sigint') + loop.add_signal_handler(signal.SIGTERM, stop, 'sigterm') + return await loop.create_datagram_endpoint(lambda: ProxyDatagramProtocol(), local_addr=PROXY_ADDR) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + print(f'starting proxy {PROXY_ADDR} to {REMOTE_ADDR}') + transport, _ = loop.run_until_complete(start_datagram_proxy()) + print('UPD proxy is running...') + try: + loop.run_forever() + except KeyboardInterrupt as e: + print(f'exiting:', e) + print('UDP proxy closing...') + transport.close() + loop.close() From ae36b3350b644865f1bb0ed2ad7d68bcc0c57035 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Fri, 30 Mar 2018 12:26:22 +0100 Subject: [PATCH 2/3] updating calls modal and adding call_events --- src/Dockerfile.backend | 1 + src/shared/sql/models.sql | 21 +++++++++++++++++++-- src/web/app/patch.py | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/Dockerfile.backend b/src/Dockerfile.backend index d0a6be3..6532a13 100644 --- a/src/Dockerfile.backend +++ b/src/Dockerfile.backend @@ -12,6 +12,7 @@ ADD ./backend/main.py /home/root/main.py ADD ./backend/run.py /home/root/run.py ENV IMAGE_NAME mithra_backend +ENV PYTHONUNBUFFERED 1 WORKDIR /home/root HEALTHCHECK --interval=60s --timeout=10s --retries=2 CMD /home/root/run.py check || exit 1 ENTRYPOINT ["./run.py"] diff --git a/src/shared/sql/models.sql b/src/shared/sql/models.sql index c100c23..b8bdc41 100644 --- a/src/shared/sql/models.sql +++ b/src/shared/sql/models.sql @@ -32,11 +32,28 @@ CREATE TABLE people_numbers ( ); CREATE INDEX number_index ON people_numbers USING GIN (number gin_trgm_ops); +CREATE TYPE BOUND AS ENUM ('inbound', 'outbound'); CREATE TABLE calls ( id SERIAL PRIMARY KEY, - number VARCHAR(127) NOT NULL, + call_id VARCHAR(127) NOT NULL, + ext_number VARCHAR(127), + int_number VARCHAR(127), + bound BOUND NOT NULL, + answered BOOLEAN DEFAULT FALSE, + finished BOOLEAN DEFAULT FALSE, person INT REFERENCES people, country VARCHAR(31), - ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + duration INTERVAL NOT NULL, + details JSONB ); +CREATE INDEX call_id ON calls USING btree (call_id); CREATE INDEX call_ts ON calls USING btree (ts); + +CREATE TABLE call_events ( + id SERIAL PRIMARY KEY, + call INT REFERENCES calls, + ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + event VARCHAR(31) NOT NULL, + details JSONB +); diff --git a/src/web/app/patch.py b/src/web/app/patch.py index 26a553b..370ffe6 100644 --- a/src/web/app/patch.py +++ b/src/web/app/patch.py @@ -99,3 +99,41 @@ async def run_logic_sql(conn, settings, **kwargs): run logic.sql code. """ await conn.execute(settings.logic_sql) + + +@patch +async def update_call_model(conn, settings, **kwargs): + """ + add call_events and modify calls + """ + await conn.execute(""" +ALTER TABLE calls ADD call_id VARCHAR(127) NOT NULL DEFAULT '-'; +ALTER TABLE calls ALTER call_id DROP DEFAULT; + +ALTER TABLE calls ADD int_number VARCHAR(127); + +ALTER TABLE calls RENAME number TO ext_number; +ALTER TABLE calls ALTER ext_number DROP NOT NULL; + +CREATE TYPE BOUND AS ENUM ('inbound', 'outbound'); +ALTER TABLE calls ADD bound BOUND NOT NULL DEFAULT 'inbound'; +ALTER TABLE calls ALTER bound DROP DEFAULT; + +ALTER TABLE calls ADD answered BOOLEAN DEFAULT FALSE; + +ALTER TABLE calls ADD finished BOOLEAN DEFAULT FALSE; + +ALTER TABLE calls DROP country; + +ALTER TABLE calls ADD duration INTERVAL NOT NULL DEFAULT '0s'; +ALTER TABLE calls ALTER duration DROP DEFAULT; + +ALTER TABLE calls ADD details JSONB; + +CREATE INDEX call_id ON calls USING btree (call_id); +CREATE TABLE call_events ( + id SERIAL PRIMARY KEY, + call INT REFERENCES calls, + ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + event VARCHAR(31) NOT NULL +);""") From 040b4e231883319be5a7a658414dd834db1c48c3 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Fri, 30 Mar 2018 15:12:00 +0100 Subject: [PATCH 3/3] formalising proxy --- proxy/main.py | 92 ----------- proxy/Dockerfile => src/Dockerfile.proxy | 0 src/proxy/main.py | 192 +++++++++++++++++++++++ src/{backend => proxy}/run.py | 2 +- 4 files changed, 193 insertions(+), 93 deletions(-) delete mode 100755 proxy/main.py rename proxy/Dockerfile => src/Dockerfile.proxy (100%) create mode 100755 src/proxy/main.py rename src/{backend => proxy}/run.py (95%) diff --git a/proxy/main.py b/proxy/main.py deleted file mode 100755 index b1df895..0000000 --- a/proxy/main.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3.6 -import asyncio -import os -import signal - -SIP_HOST = os.getenv('APP_SIP_HOST') -SIP_PORT = os.getenv('APP_SIP_PORT') -REMOTE_ADDR = SIP_HOST, SIP_PORT - -PROXY_HOST = '0.0.0.0' -PROXY_PORT = SIP_PORT -PROXY_ADDR = PROXY_HOST, PROXY_PORT - - -def print_data(direction, data): - # print(f'{direction} {"=" * 78}\n{data.decode()}\n{"=" * 80}') - print(f'{direction} ', data.decode().split('\n', 1)[0]) - - -class RemoteDatagramProtocol: - def __init__(self, proxy, addr, data): - self.proxy = proxy - self.addr = addr - self.data = data - self.transport = None - - def connection_made(self, transport): - self.transport = transport - print_data('▲', self.data) - self.transport.sendto(self.data) - - def datagram_received(self, data, _): - print_data('▼', data) - self.proxy.transport.sendto(data, self.addr) - - def connection_lost(self, exc): - self.proxy.remotes.pop(self.addr) - - def error_received(self, exc): - print(f'remote error received: {exc}') - - -class ProxyDatagramProtocol: - def __init__(self): - self.remotes = {} - self.transport = None - - def connection_made(self, transport): - self.transport = transport - - def datagram_received(self, data, addr): - if addr in self.remotes: - print_data('▲', data) - self.remotes[addr].transport.sendto(data) - else: - loop = asyncio.get_event_loop() - self.remotes[addr] = RemoteDatagramProtocol(self, addr, data) - loop.create_task( - loop.create_datagram_endpoint(lambda: self.remotes[addr], remote_addr=REMOTE_ADDR) - ) - - def error_received(self, exc): - print(f'proxy error received: {exc}') - - def connection_lost(self, exc): - print(f'proxy connection lost: {exc}') - - -def stop(reason): - print('') # leaves the ^C on it's own line - raise KeyboardInterrupt(f'reason "{reason or "unknown"}"') - - -async def start_datagram_proxy(): - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, stop, 'sigint') - loop.add_signal_handler(signal.SIGTERM, stop, 'sigterm') - return await loop.create_datagram_endpoint(lambda: ProxyDatagramProtocol(), local_addr=PROXY_ADDR) - - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - print(f'starting proxy {PROXY_ADDR} to {REMOTE_ADDR}') - transport, _ = loop.run_until_complete(start_datagram_proxy()) - print('UPD proxy is running...') - try: - loop.run_forever() - except KeyboardInterrupt as e: - print(f'exiting:', e) - print('UDP proxy closing...') - transport.close() - loop.close() diff --git a/proxy/Dockerfile b/src/Dockerfile.proxy similarity index 100% rename from proxy/Dockerfile rename to src/Dockerfile.proxy diff --git a/src/proxy/main.py b/src/proxy/main.py new file mode 100755 index 0000000..74514ed --- /dev/null +++ b/src/proxy/main.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3.6 +import asyncio +import logging +import signal +from enum import IntEnum + +import asyncpg + +from shared.db import lenient_conn +from shared.settings import PgSettings + +try: + from devtools import debug +except ImportError: + def debug(*args, **kwargs): + pass + +logger = logging.getLogger('mithra.proxy.main') + + +class Settings(PgSettings): + sip_host: str + sip_port: int = 5060 + proxy_host: str = '0.0.0.0' + + cache_dir: str = '/tmp/mithra' + sentinel_file: str = 'sentinel.txt' + + @property + def sip_addr(self): + return self.sip_host, self.sip_port + + @property + def proxy_addr(self): + return self.proxy_host, self.sip_port + + +class Database: + def __init__(self, settings: Settings, loop): + self.settings = settings + self._pg = None + self._loop = loop + self.tasks = [] + + async def init(self): + conn = await lenient_conn(self.settings) + await conn.close() + self._pg = await asyncpg.create_pool(dsn=self.settings.pg_dsn, min_size=2) + + def record_call(self, number, country): + self.tasks.append(self._loop.create_task(self._record_call(number, country))) + + async def _record_call(self, number, country): + number = number.replace(' ', '').upper() + await self._pg.execute('INSERT INTO calls (number, country) VALUES ($1, $2)', number, country) + + async def complete_tasks(self): + if self.tasks: + await asyncio.gather(*self.tasks) + self.tasks = [] + + async def close(self): + await self.complete_tasks() + await self._pg.close() + + +class Directions(IntEnum): + inbound = 1 + outbound = 2 + + +async def record(direction: Directions, data: bytes, db: Database): + # print(f'{direction} {"=" * 78}\n{data.decode()}\n{"=" * 80}') + print(f'{"▼" if direction == Directions.inbound else "▲"} ', data.decode().split('\n', 1)[0]) + + +class RemoteDatagramProtocol: + def __init__(self, proxy, addr, data): + self.proxy = proxy + self.addr = addr + self.init_data = data + self.transport = None + + def connection_made(self, transport): + self.transport = transport + self.proxy.record(Directions.outbound, self.init_data) + self.transport.sendto(self.init_data) + self.init_data = None + + def datagram_received(self, data, _): + self.proxy.record(Directions.inbound, data) + self.proxy.transport.sendto(data, self.addr) + + def connection_lost(self, exc): + if exc: + logger.warning('remote connection lost: %s', exc) + else: + logger.debug('remote connection lost') + self.proxy.remotes.pop(self.addr) + + def error_received(self, exc): + logger.error('remote error received: %s', exc) + + +class ProxyDatagramProtocol: + def __init__(self, settings: Settings, db: Database, loop): + self.settings = settings + self.db = db + self.loop = loop + self.remotes = {} + self.transport = None + + def record(self, direction: Directions, data: bytes): + self.loop.create_task(record(direction, data, self.db)) + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + if addr in self.remotes: + self.record(Directions.outbound, data) + self.remotes[addr].transport.sendto(data) + else: + self.remotes[addr] = RemoteDatagramProtocol(self, addr, data) + self.loop.create_task( + self.loop.create_datagram_endpoint(lambda: self.remotes[addr], remote_addr=self.settings.sip_addr) + ) + + def connection_lost(self, exc): + if exc: + logger.warning('proxy connection lost: %s', exc) + else: + logger.debug('proxy connection lost') + + def error_received(self, exc): + logger.error('proxy error received: %s', exc) + + +class GracefulShutdown(RuntimeError): + pass + + +class Proxy: + def __init__(self, settings: Settings, loop): + self.settings = settings + self.loop = loop + self.db = Database(settings, loop) + self.stopping = None + self.task = None + + async def start(self): + await self.db.init() + self.loop.add_signal_handler(signal.SIGINT, self.stop, 'sigint') + self.loop.add_signal_handler(signal.SIGTERM, self.stop, 'sigterm') + self.task = self.loop.create_task(self.main_task()) + + async def main_task(self): + proto = ProxyDatagramProtocol(self.settings, self.db, self.loop) + transport, _ = await self.loop.create_datagram_endpoint(lambda: proto, local_addr=self.settings.proxy_addr) + try: + + while True: + await asyncio.sleep(600) + # save sentinel + + except GracefulShutdown: + pass + finally: + logger.info('stopping reason: "%s"...', self.stopping) + await self.db.complete_tasks() + if transport: + transport.close() + await self.db.close() + + async def run_forever(self): + await self.task + + def stop(self, reason): + print('') # leaves the ^C on it's own line + self.stopping = reason or 'unknown' + raise GracefulShutdown() + + +def main(): + loop = asyncio.get_event_loop() + settings = Settings() + try: + proxy = Proxy(settings, loop) + loop.run_until_complete(proxy.run_forever()) + proxy.task.result() + finally: + loop.close() diff --git a/src/backend/run.py b/src/proxy/run.py similarity index 95% rename from src/backend/run.py rename to src/proxy/run.py index 851e4bb..026a181 100755 --- a/src/backend/run.py +++ b/src/proxy/run.py @@ -13,7 +13,7 @@ from main import Settings, main # NOQA -logger = logging.getLogger('mithra.backend.run') +logger = logging.getLogger('mithra.proxy.run') def check():