Skip to content

Commit f5218ff

Browse files
committed
Add job_info API
1 parent 1b051bb commit f5218ff

File tree

4 files changed

+82
-2
lines changed

4 files changed

+82
-2
lines changed

src/data_agent/api.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,15 @@ def remove_job(self, job_id: str):
366366
"""
367367
self._scheduler.remove_job(job_id)
368368

369+
@traceapi
370+
def job_info(self, job_id: str):
371+
"""Return list of tags in the job
372+
373+
:return:
374+
"""
375+
376+
return self._scheduler.job_info(job_id)
377+
369378
@traceapi
370379
def list_job_tags(self, job_id: str):
371380
"""Return list of tags in the job

src/data_agent/daq_scheduler.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import datetime as dt
12
import json
23
import logging
34
import time
4-
from datetime import timedelta
5+
from datetime import timedelta, timezone
56

67
from apscheduler.schedulers.asyncio import AsyncIOScheduler
78
from apscheduler.triggers import interval
@@ -97,8 +98,15 @@ async def _job_func(self, job_id, conn, broker, tags, from_cache, refresh_rate_m
9798
f'Data publish (read time={read_time:.2f}s): {", ".join(to_publish[:120])}'
9899
)
99100
broker.publish_data(msg, headers={"job_id": job_id})
101+
self._job_state[job_id]["last_successful_timestamp"] = str(
102+
dt.datetime.now(timezone.utc).timestamp()
103+
)
100104

101105
except Exception as e:
106+
self._job_state[job_id]["last_exception_timestamp"] = str(
107+
dt.datetime.now(timezone.utc).timestamp()
108+
)
109+
self._job_state[job_id]["last_exception"] = str(e)
102110
log.exception(f'Exception in job "{job_id}" - {e}')
103111

104112
def list_jobs(self, conn_name=None):
@@ -206,7 +214,12 @@ def _create_scan_job(self, job_id, conn_name, tags, seconds, from_cache):
206214
args=[job_id, conn, self._broker_conn, tags, from_cache, refresh_rate_ms],
207215
)
208216

209-
self._job_state[job_id] = {"iter_counter": 0}
217+
self._job_state[job_id] = {
218+
"iter_counter": 0,
219+
"last_successful_timestamp": None,
220+
"last_exception": None,
221+
"last_exception_timestamp": None,
222+
}
210223

211224
return job
212225

@@ -225,6 +238,24 @@ def remove_job(self, job_id, persist=True):
225238
if persist:
226239
self._config.remove(f"{DAQ_CONFIG_KEY}.{j}")
227240

241+
def job_info(self, job_id):
242+
job = self.get_job(job_id)
243+
return {
244+
"job_id": job_id,
245+
"conn_name": job.args[1].name,
246+
"tags": job.args[3],
247+
"seconds": int(job.args[5] / 1000),
248+
"from_cache": job.args[4],
249+
"total_iterations": self._job_state[job_id]["iter_counter"],
250+
"last_successful_timestamp": self._job_state[job_id][
251+
"last_successful_timestamp"
252+
],
253+
"last_exception": self._job_state[job_id]["last_exception"],
254+
"last_exception_timestamp": self._job_state[job_id][
255+
"last_exception_timestamp"
256+
],
257+
}
258+
228259
def list_tags(self, job_id):
229260
job = self.get_job(job_id)
230261
return job.args[3]

tests/test_api_daq_job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,29 @@ async def test_job_create_modify(rpc_client, rpc_server, data_queue):
3939
await rpc_client.proxy.create_job(
4040
job_id=job1_id, conn_name=conn_name, tags=tags1, seconds=1
4141
)
42+
43+
assert await rpc_client.proxy.job_info(job_id=job1_id) == {
44+
"job_id": "job1",
45+
"conn_name": "test1",
46+
"tags": ["Random.Real8", "Random.String"],
47+
"seconds": 1,
48+
"from_cache": True,
49+
"total_iterations": 0,
50+
"last_successful_timestamp": None,
51+
"last_exception": None,
52+
"last_exception_timestamp": None,
53+
}
54+
4255
await rpc_client.proxy.create_job(
4356
job_id=job2_id, conn_name=conn_name, tags=tags2, seconds=1
4457
)
4558
assert await rpc_client.proxy.list_jobs() == [job1_id, job2_id]
4659

4760
await asyncio.sleep(1.5)
4861

62+
info = await rpc_client.proxy.job_info(job_id=job1_id)
63+
assert info["total_iterations"] == 1
64+
4965
# Receive 2 messages (1 should be from 1st job and another from 2nd)
5066
for i in range(2):
5167
incoming_message = await data_queue.get(timeout=5)

tests/test_daq_scheduler.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ async def test_job_lifecycle(
4646
)
4747
assert scheduler.get_job(job1_id).trigger.interval.seconds == 2
4848

49+
assert scheduler.job_info(job1_id) == {
50+
"job_id": "job1",
51+
"conn_name": "fake_conn",
52+
"tags": ["Random.String"],
53+
"seconds": 2,
54+
"from_cache": True,
55+
"total_iterations": 0,
56+
"last_successful_timestamp": None,
57+
"last_exception": None,
58+
"last_exception_timestamp": None,
59+
}
60+
4961
# Recreate with 1 second nd extra tag
5062
scheduler.create_scan_job(
5163
job_id=job1_id,
@@ -56,6 +68,18 @@ async def test_job_lifecycle(
5668
)
5769
assert scheduler.get_job(job1_id).trigger.interval.seconds == 1
5870

71+
assert scheduler.job_info(job1_id) == {
72+
"job_id": "job1",
73+
"conn_name": "fake_conn",
74+
"tags": ["Random.Real8", "Random.String"],
75+
"seconds": 1,
76+
"from_cache": True,
77+
"total_iterations": 0,
78+
"last_successful_timestamp": None,
79+
"last_exception": None,
80+
"last_exception_timestamp": None,
81+
}
82+
5983
# Job2
6084
scheduler.create_scan_job(
6185
job_id=job2_id, conn_name=conn_name, tags=tags2, seconds=1, from_cache=False

0 commit comments

Comments
 (0)