Skip to content

Commit 46e3689

Browse files
committed
add historical data harvester
1 parent 95e3671 commit 46e3689

File tree

15 files changed

+427
-79
lines changed

15 files changed

+427
-79
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ jobs:
8585
- name: Generate coverage report
8686
run: pipx run coverage lcov -o coverage.lcov
8787
- name: Upload partial coverage report
88-
uses: coverallsapp/github-action@master
88+
uses: coverallsapp/github-action@v2
8989
with:
9090
path-to-lcov: coverage.lcov
9191
github-token: ${{ secrets.GITHUB_TOKEN }}
@@ -97,7 +97,7 @@ jobs:
9797
runs-on: ubuntu-latest
9898
steps:
9999
- name: Finalize coverage report
100-
uses: coverallsapp/github-action@master
100+
uses: coverallsapp/github-action@v2
101101
with:
102102
github-token: ${{ secrets.GITHUB_TOKEN }}
103103
parallel-finished: true

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*.orig
99
*.log
1010
*.pot
11-
__pycache__/*
11+
__pycache__/
1212
.cache/*
1313
.*.swp
1414
*/.ipynb_checkpoints/*

AUTHORS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Contributors
22

3-
* Meir Tseitlin [meir@imubit.com](mailto:meir@imubit.com)
3+
* Meir Tseitlin [opensource@imubit.com](mailto:opensource@imubit.com)

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ package_dir =
3232
install_requires =
3333
dynaconf
3434
pandas
35+
msgpack
36+
zstandard
3537
aiomisc
3638
aiodebug
3739
apscheduler

src/data_agent/abstract_connector.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import ABC, abstractmethod
2+
from enum import IntEnum
23
from functools import wraps
34
from typing import Union
45

@@ -7,7 +8,7 @@
78
from .exceptions import ConnectionNotActive, TagsGroupNotFound
89

910

10-
class SupportedOperation:
11+
class SupportedOperation(IntEnum):
1112
READ_TAG_VALUE = 1
1213
WRITE_TAG_VALUE = 2
1314
READ_TAG_PERIOD = 3
@@ -20,6 +21,12 @@ class SupportedOperation:
2021
DELETE_TAG = 10
2122

2223

24+
class HistDataFormat(IntEnum):
25+
DATAFRAME = (1,)
26+
SERIES_LIST = (2,)
27+
DICTIONARY = 3
28+
29+
2330
STANDARD_ATTRIBUTES = {
2431
"Name": {"Type": "str", "Name": "Tag Name"},
2532
"Type": {"Type": "str", "Name": "Data Type"},

src/data_agent/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ class SafetyErrorManipulateOutsideOfRateBound(Exception):
7676

7777
class DaqJobAlreadyExists(Exception):
7878
pass
79+
80+
81+
class HistoryHarvesterJobAlreadyExists(Exception):
82+
pass
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import datetime as dt
2+
import logging
3+
import time as tm
4+
5+
from apscheduler.executors.pool import ThreadPoolExecutor
6+
from apscheduler.schedulers.asyncio import AsyncIOScheduler
7+
8+
from data_agent.exceptions import HistoryHarvesterJobAlreadyExists
9+
from data_agent.msg_packer import encode_dataframe
10+
11+
log = logging.getLogger(__name__)
12+
13+
14+
class HistoryHarvester:
15+
def __init__(self, connection_manager, broker):
16+
self.connection_manager = connection_manager
17+
self.broker = broker
18+
19+
self._thread_pool_executor = ThreadPoolExecutor(max_workers=20)
20+
self._scheduler = AsyncIOScheduler(thread_pool=self._thread_pool_executor)
21+
self._scheduler.start()
22+
23+
async def _delivery_job_func(
24+
self,
25+
job_id,
26+
conn,
27+
tags,
28+
first_timestamp,
29+
last_timestamp,
30+
time_frequency,
31+
batch_size,
32+
iteration,
33+
):
34+
try:
35+
start_time = tm.time()
36+
37+
next_period_end = min(last_timestamp, first_timestamp + batch_size)
38+
39+
df = conn.read_tag_values_period(
40+
tags=tags,
41+
first_timestamp=first_timestamp,
42+
last_timestamp=next_period_end,
43+
time_frequency=time_frequency,
44+
)
45+
46+
read_time = tm.time() - start_time
47+
48+
if df.empty:
49+
log.warning(
50+
f"No data read for job '{job_id}' for period {first_timestamp} - {next_period_end}"
51+
)
52+
53+
else: # Publish data
54+
headers = {
55+
"data_category": "historical",
56+
"connection": conn.name,
57+
"job_id": job_id,
58+
"batch_num": iteration,
59+
}
60+
61+
payload = encode_dataframe(df)
62+
63+
log.debug(
64+
f"(#{iteration}): Job {job_id}: "
65+
f"Data publish: read time={read_time:.2f}s), {len(df)} samples, "
66+
f"period: {first_timestamp} - {next_period_end}"
67+
)
68+
self.broker.publish_data(payload, headers=headers)
69+
70+
if next_period_end < last_timestamp:
71+
# Reschedule next run
72+
self._scheduler.add_job(
73+
func=self._delivery_job_func,
74+
trigger="date",
75+
# next_run_time=dt.datetime.now(),
76+
coalesce=True, # Always run once
77+
id=job_id,
78+
max_instances=2,
79+
replace_existing=True,
80+
args=[
81+
job_id,
82+
conn,
83+
tags,
84+
next_period_end,
85+
last_timestamp,
86+
time_frequency,
87+
batch_size,
88+
iteration + 1,
89+
],
90+
)
91+
92+
except Exception as e:
93+
log.exception(f'Exception in history harvester job "{job_id}" - {e}')
94+
95+
def create_delivery_job(
96+
self,
97+
job_id: str,
98+
conn_name: str,
99+
tags: list,
100+
first_timestamp: dt.datetime,
101+
last_timestamp: dt.datetime,
102+
time_frequency: dt.timedelta,
103+
batch_size: dt.timedelta = None,
104+
progress_callback=None,
105+
):
106+
# order tags alphabetically
107+
tags.sort()
108+
109+
existing_job = self._scheduler.get_job(job_id)
110+
if existing_job:
111+
raise HistoryHarvesterJobAlreadyExists(
112+
f"History loader Job {job_id} already exists."
113+
)
114+
115+
conn = self.connection_manager.connection(conn_name, check_enabled=False)
116+
117+
self._scheduler.add_job(
118+
func=self._delivery_job_func,
119+
trigger="date",
120+
# next_run_time=dt.datetime.now(),
121+
coalesce=True, # Always run once
122+
id=job_id,
123+
max_instances=2,
124+
replace_existing=True,
125+
args=[
126+
job_id,
127+
conn,
128+
tags,
129+
first_timestamp,
130+
last_timestamp,
131+
time_frequency,
132+
batch_size,
133+
0,
134+
],
135+
)

src/data_agent/linux/config_default.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ log: # standard logging dictConfig
160160
level: 'DEBUG'
161161
propagate: False
162162

163+
data_agent.history_harvester:
164+
handlers: ['console', 'amqp']
165+
level: 'DEBUG'
166+
propagate: False
167+
163168
data_agent.connection_manager:
164169
handlers: ['console', 'amqp']
165170
level: 'DEBUG'

src/data_agent/msg_packer.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import msgpack
2+
import numpy as np
3+
import pandas as pd
4+
import zstandard as zstd
5+
6+
7+
def encode_dataframe(df, ts_unit="s", zstd_level=10):
8+
# 1) timestamps
9+
ts = df.index.view("int64")
10+
if ts_unit != "ns":
11+
factor = {"s": 1_000_000_000, "ms": 1_000_000, "us": 1_000, "ns": 1}[ts_unit]
12+
ts = (ts // factor).astype("int64")
13+
ts_blob = ts.tobytes()
14+
15+
# 2) split fixed vs. object columns
16+
obj_cols = df.select_dtypes(include=["object"]).columns.tolist()
17+
num_cols = [c for c in df.columns if c not in obj_cols]
18+
19+
# 2a) fixed‐dtype blob
20+
if num_cols:
21+
rec = df[num_cols].to_records(index=False)
22+
num_blob = rec.tobytes()
23+
num_descr = rec.dtype.descr
24+
else:
25+
num_blob = b""
26+
num_descr = []
27+
28+
# 2b) object‐dtype data (simple Python lists)
29+
obj_data = {c: df[c].tolist() for c in obj_cols}
30+
31+
# 3) pack into ExtTypes + one metadata map
32+
p = msgpack.Packer(use_bin_type=True, strict_types=True)
33+
parts = [
34+
p.pack(msgpack.ExtType(0, ts_blob)),
35+
p.pack(msgpack.ExtType(1, num_blob)),
36+
# Ext code 2 carries the already‐msgpacked object data blob:
37+
p.pack(msgpack.ExtType(2, msgpack.packb(obj_data, use_bin_type=True))),
38+
]
39+
40+
# build metadata — now include the original columns order
41+
meta = {
42+
"ts_unit": ts_unit,
43+
"num_descr": [list(x) for x in num_descr],
44+
"num_cols": num_cols,
45+
"obj_cols": obj_cols,
46+
"orig_cols": df.columns.tolist(),
47+
"index_name": df.index.name,
48+
}
49+
50+
parts.append(p.pack(meta))
51+
raw = b"".join(parts)
52+
return zstd.ZstdCompressor(level=zstd_level).compress(raw)
53+
54+
55+
def decode_payload(blob):
56+
# 1) decompress
57+
raw = zstd.ZstdDecompressor().decompress(blob)
58+
59+
# 2) ext_hook to pull out our three ExtTypes
60+
def ext_hook(code, data):
61+
if code == 0:
62+
# timestamps
63+
return np.frombuffer(data, dtype="int64")
64+
if code == 1:
65+
# numeric blob
66+
return data
67+
if code == 2:
68+
# object blob
69+
return data
70+
return msgpack.ExtType(code, data)
71+
72+
# 3) unpack in sequence
73+
unpacker = msgpack.Unpacker(ext_hook=ext_hook, raw=False)
74+
unpacker.feed(raw)
75+
ts_arr = next(unpacker)
76+
num_blob = next(unpacker)
77+
obj_blob = next(unpacker)
78+
meta = next(unpacker)
79+
80+
# 4) rebuild timestamps
81+
factor = {"s": 1_000_000_000, "ms": 1_000_000, "us": 1_000, "ns": 1}[
82+
meta["ts_unit"]
83+
]
84+
idx = pd.to_datetime(ts_arr * factor)
85+
idx.name = meta["index_name"]
86+
87+
# 5) rebuild fixed‐dtype DataFrame
88+
num_cols = meta["num_cols"]
89+
if num_cols:
90+
dtype_descr = [tuple(x) for x in meta["num_descr"]]
91+
rec = np.frombuffer(num_blob, dtype=np.dtype(dtype_descr))
92+
df_num = pd.DataFrame(rec, columns=num_cols)
93+
else:
94+
df_num = pd.DataFrame(index=idx)
95+
96+
# 6) rebuild object‐dtype DataFrame
97+
obj_cols = meta["obj_cols"]
98+
if obj_cols:
99+
obj_data = msgpack.unpackb(obj_blob, raw=False)
100+
df_obj = pd.DataFrame(obj_data)
101+
else:
102+
df_obj = pd.DataFrame()
103+
104+
# 7) combine, restore index, and **reorder**:
105+
df = pd.concat([df_num, df_obj], axis=1)
106+
df.index = idx
107+
108+
# ← HERE: reorder exactly as original
109+
df = df[meta["orig_cols"]]
110+
111+
return df

0 commit comments

Comments
 (0)