Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
74ccfda
testing add list dialogs api endpoint
acriptis Jun 18, 2020
de1356e
fix query collecting
acriptis Jun 25, 2020
5fb6c70
fix API for dialogs list
acriptis Jun 25, 2020
404af68
fix types
acriptis Jun 25, 2020
a6b520b
fix final page case
acriptis Jun 25, 2020
5661c1d
add doc
acriptis Jul 2, 2020
453f1ee
fix bug with api bu dirty hack with renaming telegram_id into externa…
acriptis Jul 2, 2020
d28ae69
add limit to next link in HTTP API for list of dialog ids
acriptis Jul 2, 2020
15d9d9c
extend api with active dialogs filtering
acriptis Jul 2, 2020
55833db
multiple api enhancements
acriptis Jul 2, 2020
8420a7f
Merge pull request #97 from deepmipt/feat/dialogs_list_api
IgnatovFedor Jul 9, 2020
a646bb9
add correct serialization of dates in Utternace.attribute field (#99)
acriptis Jul 30, 2020
f84f102
fix (fix/origin_text_for_bot_utters): add updating origin text (#101)
kudep Jul 30, 2020
b3ed868
fix (hotfix/origin_text): add origin text into add_bot_utterance_last…
kudep Jul 30, 2020
02dc3a7
add ratings ouput in dialog api (#103)
acriptis Aug 6, 2020
cb78f97
fix: debug/current_load graphics error
IgnatovFedor Aug 21, 2020
2d50c60
fix: correct adding hyps annotations
dilyararimovna Dec 30, 2020
53b018f
fix: import and no log
dilyararimovna Dec 31, 2020
5f3f9d7
fix/add_hyps_annotations_add_sentry: add sentry sending
kudep Jan 20, 2021
f07edf3
fix/add_hyps_annotations_add_sentry: add logger
kudep Jan 20, 2021
afe9010
feat/agent_updates: add service timeout
kudep Mar 1, 2021
5652357
feat/agent_updates: extend log info
kudep Mar 1, 2021
5c1172c
feat/agent_updates: change int to float
kudep Mar 1, 2021
1b688a5
upd/agent_service_timeout: add new logging
kudep Mar 1, 2021
fde06f2
feat/agent_updates: add extra logging for last_chance_service and tim…
kudep Mar 29, 2021
35960a8
feat/agent_updates: fix name of external_id
kudep Mar 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions deeppavlov_agent/cmd_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import argparse
import asyncio
import os
from logging import getLogger

import sentry_sdk
from aioconsole import ainput

from .setup_agent import setup_agent

logger = getLogger(__name__)

sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN'))


async def message_processor(register_msg):
user_id = await ainput('Provide user id: ')
Expand All @@ -30,6 +37,8 @@ def run_cmd(pipeline_configs, debug):
except KeyboardInterrupt:
pass
except Exception as e:
sentry_sdk.capture_exception(e)
logger.exception(e)
raise e
finally:
future.cancel()
Expand Down
15 changes: 13 additions & 2 deletions deeppavlov_agent/core/connectors.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
import asyncio
from typing import Any, Callable, Dict, List
from collections import defaultdict
from logging import getLogger
import os

import sentry_sdk
import aiohttp

from .transport.base import ServiceGatewayConnectorBase

logger = getLogger(__name__)
sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN'))


class HTTPConnector:
def __init__(self, session: aiohttp.ClientSession, url: str):
def __init__(self, session: aiohttp.ClientSession, url: str, timeout: int):
self.session = session
self.url = url
self.timeout = aiohttp.ClientTimeout(total=timeout)

async def send(self, payload: Dict, callback: Callable):
try:
async with self.session.post(self.url, json=payload['payload']) as resp:
async with self.session.post(self.url, json=payload['payload'], timeout=self.timeout) as resp:
resp.raise_for_status()
response = await resp.json()
await callback(
task_id=payload['task_id'],
response=response[0]
)
except Exception as e:
sentry_sdk.capture_exception(e)
logger.exception(e)
response = e
await callback(
task_id=payload['task_id'],
Expand Down Expand Up @@ -85,6 +94,8 @@ async def send(self, payload: Dict, callback: Callable):
response=best_skill
)
except Exception as e:
sentry_sdk.capture_exception(e)
logger.exception(e)
await callback(
task_id=payload['task_id'],
response=e
Expand Down
15 changes: 14 additions & 1 deletion deeppavlov_agent/core/state_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from typing import Dict

from datetime import datetime
Expand Down Expand Up @@ -41,7 +42,9 @@ async def add_hypothesis_annotation_batch(self, dialog: Dialog, payload: Dict, l
dialog.utterances[-1].hypotheses[i]['annotations'][label] = {}
else:
for i in range(len(payload["batch"])):
dialog.utterances[-1].hypotheses[i]['annotations'][label] = payload["batch"][i]
new_val = deepcopy(dialog.utterances[-1].hypotheses[i])
new_val['annotations'][label] = payload["batch"][i]
dialog.utterances[-1].hypotheses[i] = new_val

async def add_text(self, dialog: Dialog, payload: str, label: str, **kwargs):
dialog.utterances[-1].text = payload
Expand Down Expand Up @@ -69,6 +72,7 @@ async def add_bot_utterance(self, dialog: Dialog, payload: Dict, label: str, **k
await self.update_bot(dialog.bot, payload)
dialog.add_bot_utterance()
dialog.utterances[-1].text = payload['text']
dialog.utterances[-1].orig_text = payload['text']
dialog.utterances[-1].active_skill = payload['skill_name']
dialog.utterances[-1].confidence = payload['confidence']
dialog.utterances[-1].annotations = payload.get('annotations', {})
Expand All @@ -78,6 +82,7 @@ async def add_bot_utterance_last_chance(self, dialog: Dialog, payload: Dict, lab
if isinstance(dialog.utterances[-1], HumanUtterance):
dialog.add_bot_utterance()
dialog.utterances[-1].text = payload['text']
dialog.utterances[-1].orig_text = payload['text']
dialog.utterances[-1].active_skill = label
dialog.utterances[-1].confidence = 0
dialog.utterances[-1].annotations = payload['annotations']
Expand All @@ -87,6 +92,7 @@ async def add_bot_utterance_last_chance_overwrite(self, dialog: Dialog, payload:
if isinstance(dialog.utterances[-1], HumanUtterance):
dialog.add_bot_utterance()
dialog.utterances[-1].text = payload['text']
dialog.utterances[-1].orig_text = payload['text']
dialog.utterances[-1].active_skill = label
dialog.utterances[-1].confidence = 0
dialog.utterances[-1].annotations = payload['annotations']
Expand All @@ -95,6 +101,7 @@ async def add_bot_utterance_last_chance_overwrite(self, dialog: Dialog, payload:
async def add_failure_bot_utterance(self, dialog: Dialog, payload: Dict, label: str, **kwargs) -> None:
dialog.add_bot_utterance()
dialog.utterances[-1].text = payload
dialog.utterances[-1].orig_text = payload
dialog.utterances[-1].active_skill = label
dialog.utterances[-1].confidence = 0
dialog.utterances[-1].user = dialog.bot.to_dict()
Expand All @@ -108,6 +115,12 @@ async def get_or_create_dialog(self, user_external_id, channel_type, **kwargs):
async def get_dialog_by_id(self, dialog_id):
return await Dialog.get_by_id(self._db, dialog_id)

async def get_dialog_by_dialog_id(self, dialog_id):
return await Dialog.get_by_dialog_id(self._db, dialog_id, full=True)

async def list_dialog_ids(self, *args, **kwargs):
return await Dialog.list_ids(self._db, *args, **kwargs)

async def get_dialogs_by_user_ext_id(self, user_external_id):
return await Dialog.get_many_by_ext_id(self._db, user_external_id)

Expand Down
40 changes: 38 additions & 2 deletions deeppavlov_agent/core/state_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import pymongo
from bson.objectid import ObjectId
import bson.json_util
import json

from . import STATE_API_VERSION

Expand Down Expand Up @@ -48,14 +50,15 @@ async def prepare_collection(cls, db):
await db[cls.collection_name].create_index('utt_id')

def to_dict(self):
dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default))
return {
'utt_id': self.utt_id,
'text': self.text,
'user': self.user,
'annotations': self.annotations,
'hypotheses': self.hypotheses,
'date_time': str(self.date_time),
'attributes': self.attributes
'attributes': dumped_attrs
}

async def save(self, db):
Expand Down Expand Up @@ -123,6 +126,7 @@ async def prepare_collection(cls, db):
await db[cls.collection_name].create_index('utt_id')

def to_dict(self):
dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default))
return {
'utt_id': self.utt_id,
'text': self.text,
Expand All @@ -132,7 +136,7 @@ def to_dict(self):
'annotations': self.annotations,
'date_time': str(self.date_time),
'user': self.user,
'attributes': self.attributes,
'attributes': dumped_attrs
}

async def save(self, db):
Expand Down Expand Up @@ -223,7 +227,9 @@ async def prepare_collection(cls, db):
await db[cls.collection_name].create_index('dialog_id')

def to_dict(self):
dumped_attrs = json.loads(json.dumps(self.attributes, default=bson.json_util.default))
return {
'_id': str(self._id),
'dialog_id': self.dialog_id,
'utterances': [i.to_dict() for i in self.utterances],
'human_utterances': [i.to_dict() for i in self.human_utterances],
Expand All @@ -233,6 +239,8 @@ def to_dict(self):
'channel_type': self.channel_type,
'date_start': str(self.date_start),
'date_finish': str(self.date_finish),
'_active': str(self._active),
'attributes': dumped_attrs
}

async def load_external_info(self, db):
Expand Down Expand Up @@ -283,6 +291,31 @@ async def get_all(cls, db):
result.append(dialog)
return result

@classmethod
async def list_ids(cls, db, offset=0, limit=10, **filter_kwargs):
"""
request list of ids for particular page
:param db: TODO
:param offset: int, since each id we need to retrieve
:param limit: int, how many ids to retrieve
:param filter_kwargs: dict which is transmitted to mongo find request to filter dialogs
:return: ?
"""
result = []
result_cntr = 0
# TODO sorting by -date (from recent to old)
cntr = 0
async for document in db[cls.collection_name].find(filter_kwargs):
if cntr<offset:
cntr += 1
continue
result.append(str(document['dialog_id']))
result_cntr += 1
cntr += 1
if result_cntr >= limit:
break
return result

@classmethod
async def get_by_id(cls, db, dialog_id):
dialog = await db[cls.collection_name].find_one({'_id': ObjectId(dialog_id)})
Expand Down Expand Up @@ -431,6 +464,9 @@ async def get_or_create(cls, db, external_id):
async def get_by_id(cls, db, id):
user = await db[cls.collection_name].find_one({'_id': id})
if user:
if 'telegram_id' in user:
user['external_id'] = user['telegram_id']
del user['telegram_id']
return cls(**user)
return None

Expand Down
13 changes: 11 additions & 2 deletions deeppavlov_agent/core/transport/gateways/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import time
from logging import getLogger
from typing import Dict, List, Optional, Callable
import os

import sentry_sdk
import aio_pika
from aio_pika import Connection, Channel, Exchange, Queue, IncomingMessage, Message

Expand All @@ -24,6 +26,7 @@

logger = getLogger(__name__)

sentry_sdk.init(os.getenv('DP_AGENT_SENTRY_DSN'))

# TODO: add proper RabbitMQ SSL authentication
class RabbitMQTransportBase:
Expand Down Expand Up @@ -61,7 +64,8 @@ async def _connect(self) -> None:

logger.info('RabbitMQ connected')
break
except ConnectionError:
except ConnectionError as e:
sentry_sdk.capture_exception(e)
reconnect_timeout = 5
logger.error(f'RabbitMQ connection error, making another attempt in {reconnect_timeout} secs')
time.sleep(reconnect_timeout)
Expand Down Expand Up @@ -241,6 +245,9 @@ async def _on_message_callback(self, message: IncomingMessage) -> None:

elif self._add_to_buffer_lock.locked():
self._add_to_buffer_lock.release()
except Exception as e:
sentry_sdk.capture_exception(e)
logger.exception(e)
finally:
self._infer_lock.release()

Expand All @@ -264,7 +271,9 @@ async def _process_tasks(self, tasks_batch: List[ServiceTaskMessage]) -> bool:
await asyncio.gather(*results_replies)
logger.debug(f'Processed tasks {str(task_uuids_batch)}')
return True
except asyncio.TimeoutError:
except asyncio.TimeoutError as e:
sentry_sdk.capture_exception(e)
logger.exception(e)
return False

async def _send_results(self, task: ServiceTaskMessage, response: Dict) -> None:
Expand Down
2 changes: 1 addition & 1 deletion deeppavlov_agent/http_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

def app_factory(pipeline_configs=None, debug=None, response_time_limit=None, cors=None):
agent, session, workers = setup_agent(pipeline_configs)
response_logger = LocalResponseLogger(RESPONSE_LOGGER)
response_logger = agent._response_logger
if DEBUG:
output_formatter = DEBUG_OUTPUT_FORMATTER
else:
Expand Down
2 changes: 2 additions & 0 deletions deeppavlov_agent/http_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async def on_shutdown(app):

app.router.add_post('', handler.handle_api_request)
app.router.add_options('', handler.options)
app.router.add_get('/api/dialogs/', handler.dialog_list)
app.router.add_get('/api/dialogs/{dialog_id}', handler.dialog)

app.router.add_get('/api/user/{user_external_id}', handler.dialogs_by_user)
app.router.add_get('/ping', pages.ping)
app.router.add_options('/ping', pages.options)
Expand Down
42 changes: 39 additions & 3 deletions deeppavlov_agent/http_api/handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import urllib.request
from datetime import datetime
from string import hexdigits
from time import time
Expand Down Expand Up @@ -60,12 +61,47 @@ async def handle_api_request(self, request):
async def dialog(self, request):
state_manager = request.app['agent'].state_manager
dialog_id = request.match_info['dialog_id']
if len(dialog_id) == 24 and all(c in hexdigits for c in dialog_id):
dialog_obj = await state_manager.get_dialog_by_id(dialog_id)
if all(c in hexdigits for c in dialog_id):
if len(dialog_id) == 24:
dialog_obj = await state_manager.get_dialog_by_id(dialog_id)
else:
dialog_obj = await state_manager.get_dialog_by_dialog_id(dialog_id)

if not dialog_obj:
raise web.HTTPNotFound(reason=f'dialog with id {dialog_id} does not exist')

return web.json_response(dialog_obj.to_dict())
raise web.HTTPBadRequest(reason='dialog id should be 24-character hex string')

raise web.HTTPBadRequest(
reason='dialog id should be 24-character hex string or 34-char hex string for dialog_id')

async def dialog_list(self, request):
"""Function to get list of dialog ids as JSON response"""
state_manager = request.app['agent'].state_manager

params = {
'offset': int(request.rel_url.query.get('offset', 0)),
'limit': int(request.rel_url.query.get('limit', 100)),
}
_active_raw = request.rel_url.query.get('_active', None)
if _active_raw:
active = bool(int(_active_raw))
params["_active"] = active

list_ids = await state_manager.list_dialog_ids(**params)

if len(list_ids) < params['limit']:
# final page or no more items?
next_offset_link = None
else:
params['offset'] = params['offset']+params['limit']
next_offset_link = "?"+urllib.parse.urlencode(params)

resp_dict = {
"dialog_ids": list_ids,
"next": next_offset_link
}
return web.json_response(resp_dict)

async def dialogs_by_user(self, request):
state_manager = request.app['agent'].state_manager
Expand Down
2 changes: 1 addition & 1 deletion deeppavlov_agent/parse_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def make_connector(self, name: str, data: Dict):
for url in urllist:
workers.append(QueueListenerBatchifyer(self.get_session(), url, queue, batch_size))
else:
connector = HTTPConnector(self.get_session(), data['url'])
connector = HTTPConnector(self.get_session(), data['url'], timeout=data.get("timeout", 0))

elif data['protocol'] == 'AMQP':
gateway = self.get_gateway()
Expand Down
Loading