Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ services:
depends_on:
- postgres

proxy:
build: proxy
environment:
APP_SIP_HOST: ${APP_SIP_HOST}
APP_SIP_PORT: ${APP_SIP_PORT}
ports:
- 8060:8060/udp
depends_on:
- logs

backend:
environment:
APP_PG_HOST: postgres
Expand Down
1 change: 1 addition & 0 deletions src/Dockerfile.backend
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ADD ./backend/main.py /home/root/main.py
ADD ./backend/run.py /home/root/run.py

ENV IMAGE_NAME mithra_backend
ENV PYTHONUNBUFFERED 1
WORKDIR /home/root
HEALTHCHECK --interval=60s --timeout=10s --retries=2 CMD /home/root/run.py check || exit 1
ENTRYPOINT ["./run.py"]
11 changes: 11 additions & 0 deletions src/Dockerfile.proxy
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.6-alpine

RUN apk --update --no-cache add gcc g++ musl-dev libuv libffi-dev make postgresql-dev \
&& rm -rf /var/cache/apk/*

ADD ./main.py /home/root/main.py

ENV IMAGE_NAME mithra_proxy
WORKDIR /home/root
ENV PYTHONUNBUFFERED 1
ENTRYPOINT ["./main.py"]
192 changes: 192 additions & 0 deletions src/proxy/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python3.6
import asyncio
import logging
import signal
from enum import IntEnum

import asyncpg

from shared.db import lenient_conn
from shared.settings import PgSettings

try:
from devtools import debug
except ImportError:
def debug(*args, **kwargs):
pass

logger = logging.getLogger('mithra.proxy.main')


class Settings(PgSettings):
sip_host: str
sip_port: int = 5060
proxy_host: str = '0.0.0.0'

cache_dir: str = '/tmp/mithra'
sentinel_file: str = 'sentinel.txt'

@property
def sip_addr(self):
return self.sip_host, self.sip_port

@property
def proxy_addr(self):
return self.proxy_host, self.sip_port


class Database:
def __init__(self, settings: Settings, loop):
self.settings = settings
self._pg = None
self._loop = loop
self.tasks = []

async def init(self):
conn = await lenient_conn(self.settings)
await conn.close()
self._pg = await asyncpg.create_pool(dsn=self.settings.pg_dsn, min_size=2)

def record_call(self, number, country):
self.tasks.append(self._loop.create_task(self._record_call(number, country)))

async def _record_call(self, number, country):
number = number.replace(' ', '').upper()
await self._pg.execute('INSERT INTO calls (number, country) VALUES ($1, $2)', number, country)

async def complete_tasks(self):
if self.tasks:
await asyncio.gather(*self.tasks)
self.tasks = []

async def close(self):
await self.complete_tasks()
await self._pg.close()


class Directions(IntEnum):
inbound = 1
outbound = 2


async def record(direction: Directions, data: bytes, db: Database):
# print(f'{direction} {"=" * 78}\n{data.decode()}\n{"=" * 80}')
print(f'{"▼" if direction == Directions.inbound else "▲"} ', data.decode().split('\n', 1)[0])


class RemoteDatagramProtocol:
def __init__(self, proxy, addr, data):
self.proxy = proxy
self.addr = addr
self.init_data = data
self.transport = None

def connection_made(self, transport):
self.transport = transport
self.proxy.record(Directions.outbound, self.init_data)
self.transport.sendto(self.init_data)
self.init_data = None

def datagram_received(self, data, _):
self.proxy.record(Directions.inbound, data)
self.proxy.transport.sendto(data, self.addr)

def connection_lost(self, exc):
if exc:
logger.warning('remote connection lost: %s', exc)
else:
logger.debug('remote connection lost')
self.proxy.remotes.pop(self.addr)

def error_received(self, exc):
logger.error('remote error received: %s', exc)


class ProxyDatagramProtocol:
def __init__(self, settings: Settings, db: Database, loop):
self.settings = settings
self.db = db
self.loop = loop
self.remotes = {}
self.transport = None

def record(self, direction: Directions, data: bytes):
self.loop.create_task(record(direction, data, self.db))

def connection_made(self, transport):
self.transport = transport

def datagram_received(self, data, addr):
if addr in self.remotes:
self.record(Directions.outbound, data)
self.remotes[addr].transport.sendto(data)
else:
self.remotes[addr] = RemoteDatagramProtocol(self, addr, data)
self.loop.create_task(
self.loop.create_datagram_endpoint(lambda: self.remotes[addr], remote_addr=self.settings.sip_addr)
)

def connection_lost(self, exc):
if exc:
logger.warning('proxy connection lost: %s', exc)
else:
logger.debug('proxy connection lost')

def error_received(self, exc):
logger.error('proxy error received: %s', exc)


class GracefulShutdown(RuntimeError):
pass


class Proxy:
def __init__(self, settings: Settings, loop):
self.settings = settings
self.loop = loop
self.db = Database(settings, loop)
self.stopping = None
self.task = None

async def start(self):
await self.db.init()
self.loop.add_signal_handler(signal.SIGINT, self.stop, 'sigint')
self.loop.add_signal_handler(signal.SIGTERM, self.stop, 'sigterm')
self.task = self.loop.create_task(self.main_task())

async def main_task(self):
proto = ProxyDatagramProtocol(self.settings, self.db, self.loop)
transport, _ = await self.loop.create_datagram_endpoint(lambda: proto, local_addr=self.settings.proxy_addr)
try:

while True:
await asyncio.sleep(600)
# save sentinel

except GracefulShutdown:
pass
finally:
logger.info('stopping reason: "%s"...', self.stopping)
await self.db.complete_tasks()
if transport:
transport.close()
await self.db.close()

async def run_forever(self):
await self.task

def stop(self, reason):
print('') # leaves the ^C on it's own line
self.stopping = reason or 'unknown'
raise GracefulShutdown()


def main():
loop = asyncio.get_event_loop()
settings = Settings()
try:
proxy = Proxy(settings, loop)
loop.run_until_complete(proxy.run_forever())
proxy.task.result()
finally:
loop.close()
2 changes: 1 addition & 1 deletion src/backend/run.py → src/proxy/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from main import Settings, main # NOQA


logger = logging.getLogger('mithra.backend.run')
logger = logging.getLogger('mithra.proxy.run')


def check():
Expand Down
21 changes: 19 additions & 2 deletions src/shared/sql/models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,28 @@ CREATE TABLE people_numbers (
);
CREATE INDEX number_index ON people_numbers USING GIN (number gin_trgm_ops);

CREATE TYPE BOUND AS ENUM ('inbound', 'outbound');
CREATE TABLE calls (
id SERIAL PRIMARY KEY,
number VARCHAR(127) NOT NULL,
call_id VARCHAR(127) NOT NULL,
ext_number VARCHAR(127),
int_number VARCHAR(127),
bound BOUND NOT NULL,
answered BOOLEAN DEFAULT FALSE,
finished BOOLEAN DEFAULT FALSE,
person INT REFERENCES people,
country VARCHAR(31),
ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
duration INTERVAL NOT NULL,
details JSONB
);
CREATE INDEX call_id ON calls USING btree (call_id);
CREATE INDEX call_ts ON calls USING btree (ts);

CREATE TABLE call_events (
id SERIAL PRIMARY KEY,
call INT REFERENCES calls,
ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
event VARCHAR(31) NOT NULL,
details JSONB
);
38 changes: 38 additions & 0 deletions src/web/app/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,41 @@ async def run_logic_sql(conn, settings, **kwargs):
run logic.sql code.
"""
await conn.execute(settings.logic_sql)


@patch
async def update_call_model(conn, settings, **kwargs):
"""
add call_events and modify calls
"""
await conn.execute("""
ALTER TABLE calls ADD call_id VARCHAR(127) NOT NULL DEFAULT '-';
ALTER TABLE calls ALTER call_id DROP DEFAULT;

ALTER TABLE calls ADD int_number VARCHAR(127);

ALTER TABLE calls RENAME number TO ext_number;
ALTER TABLE calls ALTER ext_number DROP NOT NULL;

CREATE TYPE BOUND AS ENUM ('inbound', 'outbound');
ALTER TABLE calls ADD bound BOUND NOT NULL DEFAULT 'inbound';
ALTER TABLE calls ALTER bound DROP DEFAULT;

ALTER TABLE calls ADD answered BOOLEAN DEFAULT FALSE;

ALTER TABLE calls ADD finished BOOLEAN DEFAULT FALSE;

ALTER TABLE calls DROP country;

ALTER TABLE calls ADD duration INTERVAL NOT NULL DEFAULT '0s';
ALTER TABLE calls ALTER duration DROP DEFAULT;

ALTER TABLE calls ADD details JSONB;

CREATE INDEX call_id ON calls USING btree (call_id);
CREATE TABLE call_events (
id SERIAL PRIMARY KEY,
call INT REFERENCES calls,
ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
event VARCHAR(31) NOT NULL
);""")