Skip to content

Commit b553fe6

Browse files
authored
Merge pull request #26 from imubit/add-history-harvester
add historical data harvester
2 parents 95e3671 + 32e5645 commit b553fe6

17 files changed

+443
-87
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ jobs:
8585
- name: Generate coverage report
8686
run: pipx run coverage lcov -o coverage.lcov
8787
- name: Upload partial coverage report
88-
uses: coverallsapp/github-action@master
88+
uses: coverallsapp/github-action@v2
8989
with:
9090
path-to-lcov: coverage.lcov
9191
github-token: ${{ secrets.GITHUB_TOKEN }}
@@ -97,7 +97,7 @@ jobs:
9797
runs-on: ubuntu-latest
9898
steps:
9999
- name: Finalize coverage report
100-
uses: coverallsapp/github-action@master
100+
uses: coverallsapp/github-action@v2
101101
with:
102102
github-token: ${{ secrets.GITHUB_TOKEN }}
103103
parallel-finished: true

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*.orig
99
*.log
1010
*.pot
11-
__pycache__/*
11+
__pycache__/
1212
.cache/*
1313
.*.swp
1414
*/.ipynb_checkpoints/*

AUTHORS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Contributors
22

3-
* Meir Tseitlin [meir@imubit.com](mailto:meir@imubit.com)
3+
* Meir Tseitlin [opensource@imubit.com](mailto:opensource@imubit.com)

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
# Data Agent
77

88
Python package for accessing real-time and historical data on industrial historians and control systems.
9-
Different historian protocols and APIs are implemented through standalone plugins.
9+
Different historian protocols and APIs are implemented through standalone plugins:
1010

11-
*THIS PACKAGE IS USELESS WITHOUT EXTERNAL PLUGINS IMPLEMENTING TARGET SYSTEM CUSTOM DATA ACCESS PROTOCOLS*
11+
* https://github.com/imubit/data-agent-osisoft-pi
12+
* https://github.com/imubit/data-agent-aspen-ip21
1213

1314
## Description
1415

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ package_dir =
3232
install_requires =
3333
dynaconf
3434
pandas
35+
msgpack
36+
zstandard
3537
aiomisc
3638
aiodebug
3739
apscheduler

src/data_agent/abstract_connector.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import ABC, abstractmethod
2+
from enum import IntEnum
23
from functools import wraps
34
from typing import Union
45

@@ -7,7 +8,7 @@
78
from .exceptions import ConnectionNotActive, TagsGroupNotFound
89

910

10-
class SupportedOperation:
11+
class SupportedOperation(IntEnum):
1112
READ_TAG_VALUE = 1
1213
WRITE_TAG_VALUE = 2
1314
READ_TAG_PERIOD = 3
@@ -20,6 +21,12 @@ class SupportedOperation:
2021
DELETE_TAG = 10
2122

2223

24+
class HistDataFormat(IntEnum):
25+
DATAFRAME = (1,)
26+
SERIES_LIST = (2,)
27+
DICTIONARY = 3
28+
29+
2330
STANDARD_ATTRIBUTES = {
2431
"Name": {"Type": "str", "Name": "Tag Name"},
2532
"Type": {"Type": "str", "Name": "Data Type"},

src/data_agent/daq_scheduler.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import time
34
from datetime import timedelta
@@ -80,12 +81,13 @@ async def _job_func(self, job_id, conn, broker, tags, from_cache, refresh_rate_m
8081

8182
read_time = time.time() - start_time
8283

83-
msg = {
84-
"job_id": job_id,
84+
payload = {
8585
"sample_id": self._job_state[job_id]["iter_counter"],
8686
"data": tag_values,
8787
}
8888

89+
msg = json.dumps(payload, sort_keys=True, default=str).encode()
90+
8991
# Publish
9092
to_publish = [f'{t}={tag_values[t]["Value"]}' for t in tag_values]
9193
self._total_iterations_counter += 1

src/data_agent/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ class SafetyErrorManipulateOutsideOfRateBound(Exception):
7676

7777
class DaqJobAlreadyExists(Exception):
7878
pass
79+
80+
81+
class HistoryHarvesterJobAlreadyExists(Exception):
82+
pass
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import datetime as dt
2+
import logging
3+
import time as tm
4+
5+
from apscheduler.executors.pool import ThreadPoolExecutor
6+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
7+
8+
from data_agent.exceptions import HistoryHarvesterJobAlreadyExists
9+
from data_agent.msg_packer import encode_dataframe
10+
11+
log = logging.getLogger(__name__)
12+
13+
14+
class HistoryHarvester:
15+
def __init__(self, connection_manager, broker):
16+
self.connection_manager = connection_manager
17+
self.broker = broker
18+
19+
self._thread_pool_executor = ThreadPoolExecutor(max_workers=20)
20+
self._scheduler = AsyncIOScheduler(thread_pool=self._thread_pool_executor)
21+
self._scheduler.start()
22+
23+
async def _delivery_job_func(
24+
self,
25+
job_id,
26+
conn,
27+
tags,
28+
first_timestamp,
29+
last_timestamp,
30+
time_frequency,
31+
batch_size,
32+
iteration,
33+
):
34+
try:
35+
start_time = tm.time()
36+
37+
next_period_end = min(last_timestamp, first_timestamp + batch_size)
38+
39+
df = conn.read_tag_values_period(
40+
tags=tags,
41+
first_timestamp=first_timestamp,
42+
last_timestamp=next_period_end,
43+
time_frequency=time_frequency,
44+
)
45+
46+
read_time = tm.time() - start_time
47+
48+
if df.empty:
49+
log.warning(
50+
f"No data read for job '{job_id}' for period {first_timestamp} - {next_period_end}"
51+
)
52+
53+
else: # Publish data
54+
headers = {
55+
"data_category": "historical",
56+
"connection": conn.name,
57+
"job_id": job_id,
58+
"batch_num": iteration,
59+
}
60+
61+
payload = encode_dataframe(df)
62+
63+
log.debug(
64+
f"(#{iteration}): Job {job_id}: "
65+
f"Data publish: read time={read_time:.2f}s), {len(df)} samples, "
66+
f"period: {first_timestamp} - {next_period_end}"
67+
)
68+
self.broker.publish_data(payload, headers=headers)
69+
70+
if next_period_end < last_timestamp:
71+
# Reschedule next run
72+
self._scheduler.add_job(
73+
func=self._delivery_job_func,
74+
trigger="date",
75+
# next_run_time=dt.datetime.now(),
76+
coalesce=True, # Always run once
77+
id=job_id,
78+
max_instances=2,
79+
replace_existing=True,
80+
args=[
81+
job_id,
82+
conn,
83+
tags,
84+
next_period_end,
85+
last_timestamp,
86+
time_frequency,
87+
batch_size,
88+
iteration + 1,
89+
],
90+
)
91+
92+
except Exception as e:
93+
log.exception(f'Exception in history harvester job "{job_id}" - {e}')
94+
95+
def create_delivery_job(
96+
self,
97+
job_id: str,
98+
conn_name: str,
99+
tags: list,
100+
first_timestamp: dt.datetime,
101+
last_timestamp: dt.datetime,
102+
time_frequency: dt.timedelta,
103+
batch_size: dt.timedelta = None,
104+
progress_callback=None,
105+
):
106+
# order tags alphabetically
107+
tags.sort()
108+
109+
existing_job = self._scheduler.get_job(job_id)
110+
if existing_job:
111+
raise HistoryHarvesterJobAlreadyExists(
112+
f"History loader Job {job_id} already exists."
113+
)
114+
115+
conn = self.connection_manager.connection(conn_name, check_enabled=False)
116+
117+
self._scheduler.add_job(
118+
func=self._delivery_job_func,
119+
trigger="date",
120+
# next_run_time=dt.datetime.now(),
121+
coalesce=True, # Always run once
122+
id=job_id,
123+
max_instances=2,
124+
replace_existing=True,
125+
args=[
126+
job_id,
127+
conn,
128+
tags,
129+
first_timestamp,
130+
last_timestamp,
131+
time_frequency,
132+
batch_size,
133+
0,
134+
],
135+
)

src/data_agent/linux/config_default.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ log: # standard logging dictConfig
160160
level: 'DEBUG'
161161
propagate: False
162162

163+
data_agent.history_harvester:
164+
handlers: ['console', 'amqp']
165+
level: 'DEBUG'
166+
propagate: False
167+
163168
data_agent.connection_manager:
164169
handlers: ['console', 'amqp']
165170
level: 'DEBUG'

0 commit comments

Comments
 (0)