Skip to content

Commit 16291fe

Browse files
authored
Merge pull request #27 from imubit/fix-da-broker-svc
Fix da broker svc
2 parents b553fe6 + df0dd6e commit 16291fe

File tree

11 files changed

+113
-24
lines changed

11 files changed

+113
-24
lines changed

setup.cfg

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,6 @@ install_requires =
4141
amqp-fabric
4242
pywin32 >= 1.0;platform_system=='Windows'
4343

44-
# The usage of test_requires is discouraged, see `Dependency Management` docs
45-
tests_require =
46-
pytest
47-
pytest-cov
48-
pytest-asyncio
49-
aiomisc-pytest
50-
coverage
51-
5244
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
5345
# python_requires = >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*
5446

@@ -66,7 +58,7 @@ testing =
6658
setuptools
6759
pytest
6860
pytest-cov
69-
pytest-asyncio
61+
pytest-asyncio==0.26.0
7062
aiomisc-pytest
7163

7264
[options.entry_points]

src/data_agent/api.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,13 @@ def write_manipulated_tags(
321321

322322
# ================== JOBS ===================
323323
@traceapi
324-
def list_jobs(self):
324+
def list_jobs(self, conn_name=None):
325325
"""List all running DAQ jobs
326326
327327
:return:
328328
"""
329329

330-
return self._scheduler.list_jobs()
330+
return self._scheduler.list_jobs(conn_name=conn_name)
331331

332332
@traceapi
333333
def create_job(
@@ -366,6 +366,15 @@ def remove_job(self, job_id: str):
366366
"""
367367
self._scheduler.remove_job(job_id)
368368

369+
@traceapi
370+
def job_info(self, job_id: str):
371+
"""Return list of tags in the job
372+
373+
:return:
374+
"""
375+
376+
return self._scheduler.job_info(job_id)
377+
369378
@traceapi
370379
def list_job_tags(self, job_id: str):
371380
"""Return list of tags in the job

src/data_agent/broker/agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ class BrokerAgent:
2222
_data_exchanger = None
2323
_scheduler = None
2424

25-
async def init(self, loop, is_service=False, enable_persistance=True):
26-
self._config = ConfigManager(loop=loop, enable_persistance=enable_persistance)
25+
async def init(self, loop, is_service=False, enable_persistence=True):
26+
self._config = ConfigManager(loop=loop, enable_persistence=enable_persistence)
2727

2828
service_config = self._config.get("service")
2929
broker_config = self._config.get("broker")

src/data_agent/connection_manager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class ConnectionManager:
5050
def __init__(self, config, extra_connectors=None):
5151
self._config = config
5252
self._connections_map = {}
53+
54+
log.info("Enumerating connection plugins....")
55+
5356
self._connector_classes = {
5457
entry.name: entry.load() for entry in self.list_plugins()
5558
}
@@ -59,6 +62,8 @@ def __init__(self, config, extra_connectors=None):
5962
self._connector_classes[conn] = extra_connectors[conn]
6063

6164
# Recreate connections from config
65+
log.info("Connecting to pre-configured target systems....")
66+
6267
connections = self._config.connections
6368
for conn in connections:
6469
self._create_connection(
@@ -68,9 +73,10 @@ def __init__(self, config, extra_connectors=None):
6873
)
6974
if connections[conn]["enabled"]:
7075
try:
76+
log.info(f"Enabling connection to '{conn}'...")
7177
self.enable_connection(conn)
7278
except Exception as e:
73-
log.error(f"Error enabling connection {conn} - {e}. ")
79+
log.error(f"Error enabling connection '{conn}' - {e}. ")
7480
# self.disable_connection(conn)
7581

7682
log.info(

src/data_agent/connectors/fake_connector.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ def list_connection_fields():
4545
def target_info(_):
4646
return {"Name": "absolute-fake", "Endpoints": []}
4747

48-
def __init__(self, conn_name="fake_client", **kwargs):
48+
def __init__(self, conn_name="fake_client", arg_example="val", **kwargs):
4949
super(FakeConnector, self).__init__(conn_name)
5050
self._connected = False
51+
self._arg_example = arg_example
5152
self._tags = {
5253
"Static": {
5354
"Float": {
@@ -99,7 +100,10 @@ def disconnect(self):
99100
self._groups = {}
100101

101102
def connection_info(self):
102-
pass
103+
return {
104+
"OneLiner": f"[{self.TYPE}] 'absolute-fake'",
105+
"ArgExample": self._arg_example,
106+
}
103107

104108
@active_connection
105109
def list_tags(

src/data_agent/daq_scheduler.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import datetime as dt
12
import json
23
import logging
34
import time
4-
from datetime import timedelta
5+
from datetime import timedelta, timezone
56

67
from apscheduler.schedulers.asyncio import AsyncIOScheduler
78
from apscheduler.triggers import interval
@@ -97,8 +98,15 @@ async def _job_func(self, job_id, conn, broker, tags, from_cache, refresh_rate_m
9798
f'Data publish (read time={read_time:.2f}s): {", ".join(to_publish[:120])}'
9899
)
99100
broker.publish_data(msg, headers={"job_id": job_id})
101+
self._job_state[job_id]["last_successful_timestamp"] = str(
102+
dt.datetime.now(timezone.utc).timestamp()
103+
)
100104

101105
except Exception as e:
106+
self._job_state[job_id]["last_exception_timestamp"] = str(
107+
dt.datetime.now(timezone.utc).timestamp()
108+
)
109+
self._job_state[job_id]["last_exception"] = str(e)
102110
log.exception(f'Exception in job "{job_id}" - {e}')
103111

104112
def list_jobs(self, conn_name=None):
@@ -206,7 +214,12 @@ def _create_scan_job(self, job_id, conn_name, tags, seconds, from_cache):
206214
args=[job_id, conn, self._broker_conn, tags, from_cache, refresh_rate_ms],
207215
)
208216

209-
self._job_state[job_id] = {"iter_counter": 0}
217+
self._job_state[job_id] = {
218+
"iter_counter": 0,
219+
"last_successful_timestamp": None,
220+
"last_exception": None,
221+
"last_exception_timestamp": None,
222+
}
210223

211224
return job
212225

@@ -225,6 +238,24 @@ def remove_job(self, job_id, persist=True):
225238
if persist:
226239
self._config.remove(f"{DAQ_CONFIG_KEY}.{j}")
227240

241+
def job_info(self, job_id):
242+
job = self.get_job(job_id)
243+
return {
244+
"job_id": job_id,
245+
"conn_name": job.args[1].name,
246+
"tags": job.args[3],
247+
"seconds": int(job.args[5] / 1000),
248+
"from_cache": job.args[4],
249+
"total_iterations": self._job_state[job_id]["iter_counter"],
250+
"last_successful_timestamp": self._job_state[job_id][
251+
"last_successful_timestamp"
252+
],
253+
"last_exception": self._job_state[job_id]["last_exception"],
254+
"last_exception_timestamp": self._job_state[job_id][
255+
"last_exception_timestamp"
256+
],
257+
}
258+
228259
def list_tags(self, job_id):
229260
job = self.get_job(job_id)
230261
return job.args[3]

src/data_agent/win32/config_default.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ log: # standard logging dictConfig
7272
# level: 'WARNING',
7373
# propagate: False
7474

75-
# amqp_fabric.amq_broker_connector:
76-
# handlers: ['console', 'ntevent', 'file', 'err_file']
77-
# level: 'INFO'
78-
# propagate: True
75+
amqp_fabric.amq_broker_connector:
76+
handlers: ['console', 'ntevent', 'file', 'err_file']
77+
level: 'INFO'
78+
propagate: True
7979

8080
asyncio:
8181
handlers: ['console', 'err_file']

tests/test_api_daq_job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,29 @@ async def test_job_create_modify(rpc_client, rpc_server, data_queue):
3939
await rpc_client.proxy.create_job(
4040
job_id=job1_id, conn_name=conn_name, tags=tags1, seconds=1
4141
)
42+
43+
assert await rpc_client.proxy.job_info(job_id=job1_id) == {
44+
"job_id": "job1",
45+
"conn_name": "test1",
46+
"tags": ["Random.Real8", "Random.String"],
47+
"seconds": 1,
48+
"from_cache": True,
49+
"total_iterations": 0,
50+
"last_successful_timestamp": None,
51+
"last_exception": None,
52+
"last_exception_timestamp": None,
53+
}
54+
4255
await rpc_client.proxy.create_job(
4356
job_id=job2_id, conn_name=conn_name, tags=tags2, seconds=1
4457
)
4558
assert await rpc_client.proxy.list_jobs() == [job1_id, job2_id]
4659

4760
await asyncio.sleep(1.5)
4861

62+
info = await rpc_client.proxy.job_info(job_id=job1_id)
63+
assert info["total_iterations"] == 1
64+
4965
# Receive 2 messages (1 should be from 1st job and another from 2nd)
5066
for i in range(2):
5167
incoming_message = await data_queue.get(timeout=5)

tests/test_api_target_connection.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ async def test_lifecycle(rpc_client, rpc_server):
1818

1919
# Create
2020
assert await rpc_client.proxy.list_connections() == []
21-
await rpc_client.proxy.create_connection(conn_name=conn_name, conn_type="fake")
21+
await rpc_client.proxy.create_connection(
22+
conn_name=conn_name, conn_type="fake", arg_example="me"
23+
)
2224
assert await rpc_client.proxy.list_connections() == [
2325
{
2426
"name": "test1",
@@ -65,6 +67,9 @@ async def test_lifecycle(rpc_client, rpc_server):
6567
}
6668
]
6769

70+
info = await rpc_client.proxy.connection_info(conn_name=conn_name)
71+
assert info == {"OneLiner": "[fake] 'absolute-fake'", "ArgExample": "me"}
72+
6873
# Properties
6974
properties = await rpc_client.proxy.read_tag_attributes(
7075
conn_name=conn_name, tags=["Random.String", "Random.Real8"]

tests/test_connection_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ def test_connection_lifecycle(connection_manager):
2424

2525
# Create
2626
assert connection_manager.list_connections() == []
27-
connection_manager.create_connection(conn_name=conn_name, conn_type="fake")
27+
connection_manager.create_connection(
28+
conn_name=conn_name, conn_type="fake", arg_example="me"
29+
)
2830
assert connection_manager.list_connections() == [
2931
{
3032
"name": "test1",

0 commit comments

Comments
 (0)