Skip to content

Commit acf3d97

Browse files
authored
Merge pull request #238 from abalabin-bamfunds/merge-adaptors-2026-01-16
Rewrite Python engine subscription management, minor fixes
2 parents 05fe68f + 7c8695b commit acf3d97

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+314
-72
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON)
1616
if (UNIX)
1717
set(CMAKE_CXX_FLAGS_RELEASE "-g -O3")
1818
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-g -O3 -DNDEBUG")
19-
set(CMAKE_CXX_FLAGS_DEBUG "-g -v")
19+
set(CMAKE_CXX_FLAGS_DEBUG "-g")
2020
set(CMAKE_CXX_FLAGS "-Wall")
2121
# Sanitizers (Clang/GCC only)
2222
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|GNU")

cpp/src/cpp/types/tsb.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ namespace hgraph {
8383
size_t index{static_cast<size_t>(std::distance(_schema->keys().begin(), it))};
8484
return this->operator[](index);
8585
}
86-
throw std::out_of_range("Key not found in TimeSeriesSchema");
86+
throw std::out_of_range(std::format("Key {} not found in TimeSeriesSchema", key));
8787
}
8888

8989
template<typename T_TS>

cpp/src/cpp/types/tsd.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,7 @@ namespace hgraph
936936
_add_key_value(pv_key.view(), value);
937937
} else {
938938
to_keep.emplace(pv_key.view().clone(), std::make_pair(value, was_valid));
939+
value->make_passive();
939940
}
940941
}
941942
std::swap(_removed_items, to_keep);

hgraph/_impl/_builder/_ts_builder.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,7 @@ class PythonOutputBuilder:
5252

5353
def release_instance(self, item: "PythonTimeSeriesOutput"):
5454
if sys.exc_info()[0] is None:
55-
if len(item._subscribers) != 0:
56-
try:
57-
logger.error(
58-
f"Output instance still has subscribers when released, this is a bug. \n"
59-
f"output belongs to node {item.owning_node}\n"
60-
f"subscriber nodes are {[i.owning_node if isinstance(i, TimeSeries) else i for i in item._subscribers]}\n\n"
61-
f"subscriber inputs are {[i for i in item._subscribers if isinstance(i, TimeSeries)]}\n\n"
62-
f"{item}"
63-
)
64-
except Exception:
65-
...
66-
55+
item._subscribers.assert_empty(item)
6756
item._parent_or_node = None
6857

6958

hgraph/_impl/_operators/_conversion_operators/_date_time_conversion_operators.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import date, timedelta, datetime
1+
from datetime import date, time, timedelta, datetime
22

33
from hgraph import compute_node, TS, OUT, WiringPort, combine, convert
44

@@ -24,6 +24,11 @@ def convert_datetime_to_date(ts: TS[datetime]) -> TS[date]:
2424
return v.date()
2525

2626

27+
@compute_node(overloads=combine)
28+
def combine_date_and_time(date: TS[date], time: TS[time]) -> TS[datetime]:
29+
return datetime.combine(date.value, time.value)
30+
31+
2732
@compute_node(overloads=combine, requires=lambda m: m[OUT].py_type == TS[date])
2833
def combine_date(year: TS[int], month: TS[int], day: TS[int]) -> TS[date]:
2934
return date(year.value, month.value, day.value)

hgraph/_impl/_operators/_conversion_operators/_to_data_frame_converters.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from dataclasses import asdict
66
from datetime import date, datetime
7-
from typing import Dict, Type, Callable
7+
from typing import Dict, Tuple, Type, Callable
88

99
import polars as pl
1010
from frozendict import frozendict
@@ -324,6 +324,15 @@ def _check_schema(scalar, bundle):
324324
return True
325325

326326

327+
@compute_node(overloads=convert, requires=lambda m: m[OUT].py_type == TS[Frame] or m[OUT].matches_type(TS[Frame[m[COMPOUND_SCALAR].py_type]]))
328+
def convert_tuple_to_frame(
329+
ts: TS[Tuple[COMPOUND_SCALAR]],
330+
_tp: Type[TS[Frame[COMPOUND_SCALAR]]] = DEFAULT[OUT],
331+
_cs: Type[COMPOUND_SCALAR] = AUTO_RESOLVE,
332+
) -> TS[Frame[COMPOUND_SCALAR]]:
333+
return pl.DataFrame(ts.value)
334+
335+
327336
@compute_node(
328337
overloads=combine,
329338
requires=lambda m: _check_schema(m[COMPOUND_SCALAR], m[TS_SCHEMA]),

hgraph/_impl/_operators/_number_operators.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
DivideByZero,
2020
ln,
2121
)
22+
from hgraph._operators._operators import sign
2223
from hgraph._types._scalar_types import NUMBER_2
2324

2425
__all__ = tuple()
@@ -278,3 +279,19 @@ def eq_float_float(lhs: TS[float], rhs: TS[float], epsilon: TS[float] = EPSILON)
278279
@compute_node(overloads=ln)
279280
def ln_impl(ts: TS[float]) -> TS[float]:
280281
return math.log(ts.value)
282+
283+
284+
@compute_node(overloads=sign)
285+
def sign_impl(ts: TS[int]) -> TS[int]:
286+
if ts.value < 0:
287+
return -1
288+
else:
289+
return 1
290+
291+
292+
@compute_node(overloads=sign)
293+
def sign_float_impl(ts: TS[float]) -> TS[float]:
294+
if ts.value < 0:
295+
return -1.0
296+
else:
297+
return 1.0

hgraph/_impl/_operators/_stream_operators.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,29 @@ def lag_timedelta_start(_state: STATE[LagState]):
118118
_state.buffer = deque[SCALAR]()
119119

120120

121+
@compute_node(overloads=lag)
122+
def lag_timedelta_ts(
123+
ts: TIME_SERIES_TYPE,
124+
period: TS[timedelta],
125+
on_wall_clock: bool = False,
126+
_scheduler: SCHEDULER = None,
127+
_state: STATE[LagState] = None,
128+
) -> TIME_SERIES_TYPE:
129+
# Uses the scheduler to keep track of when to deliver the values recorded in the buffer.
130+
buffer: deque[SCALAR] = _state.buffer
131+
if ts.modified:
132+
buffer.append(ts.delta_value)
133+
_scheduler.schedule(period.value, f"{ts.last_modified_time}", on_wall_clock=on_wall_clock)
134+
135+
if _scheduler.is_scheduled_now:
136+
return buffer.popleft()
137+
138+
139+
@lag_timedelta_ts.start
140+
def lag_timedelta_ts_start(_state: STATE[LagState]):
141+
_state.buffer = deque[SCALAR]()
142+
143+
121144
@generator(overloads=schedule)
122145
def schedule_scalar(delay: timedelta, initial_delay: bool = True, max_ticks: int = sys.maxsize) -> TS[bool]:
123146
initial_timedelta = delay if initial_delay else timedelta()

hgraph/_impl/_operators/_time_series_properties.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
valid,
1010
last_modified_date,
1111
last_modified_time,
12+
last_modified_wall_clock_time,
1213
modified,
1314
SCHEDULER,
1415
TS_OUT,
@@ -62,6 +63,11 @@ def last_modified_time_impl(ts: SIGNAL) -> TS[datetime]:
6263
return ts.last_modified_time
6364

6465

66+
@compute_node(overloads=last_modified_wall_clock_time)
67+
def last_modified_wall_clock_time_time_impl(ts: SIGNAL, _clock: EvaluationClock = None) -> TS[datetime]:
68+
return _clock.now
69+
70+
6571
@compute_node(overloads=last_modified_date)
6672
def last_modified_date_impl(ts: SIGNAL) -> TS[date]:
6773
return ts.last_modified_time.date()

hgraph/_impl/_operators/_tuple_operators.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,19 @@ def index_of_tuple(ts: TS[tuple[SCALAR, ...]], item: TS[SCALAR]) -> TS[int]:
140140
return -1
141141

142142

143+
@compute_node(overloads=add_, valid=lambda m, __strict__: ("lhs", "rhs") if __strict__ else tuple())
144+
def add_tuple_tuple(lhs: TS[Tuple[SCALAR, ...]], rhs: TS[Tuple[SCALAR, ...]], __strict__: bool = True) -> TS[Tuple[SCALAR, ...]]:
145+
"""Adds the tuples together."""
146+
if lhs.valid:
147+
if rhs.valid:
148+
return lhs.value + rhs.value
149+
else:
150+
return lhs.value
151+
elif rhs.valid:
152+
# If lhs is not valid, rhs must be valid as we were activated by something ticking.
153+
return rhs.value
154+
155+
143156
@compute_node(overloads=add_, valid=tuple())
144157
def add_tuple_scalar(lhs: TS[Tuple[SCALAR, ...]], rhs: TS[SCALAR]) -> TS[Tuple[SCALAR, ...]]:
145158
"""Adds the element to the tuple."""

0 commit comments

Comments
 (0)