Skip to content

Commit 003fe8e

Browse files
committed
Improved documentation, renamed eq, neq, etc. to EQ, NEQ, etc. Only in python, not in chatterlang
1 parent 0f9dc24 commit 003fe8e

File tree

7 files changed

+240
-189
lines changed

7 files changed

+240
-189
lines changed

src/talkpipe/pipe/fork.py

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
"""Fork segments: split a stream into parallel branches.
2+
3+
ForkSegment distributes items across multiple downstream pipelines using
4+
threads and queues. Supports round-robin (one item per branch) or broadcast
5+
(all items to all branches).
6+
"""
17
from typing import List, Iterator, Iterable, Any
28
import logging
39
from queue import Queue
@@ -7,42 +13,50 @@
713

814
logger = logging.getLogger(__name__)
915

16+
# Sentinel to signal end of stream to branch consumers
17+
_poison_pill = object()
18+
19+
1020
class ForkMode(Enum):
1121
"""Distribution modes for fork segments."""
22+
1223
ROUND_ROBIN = "round_robin" # Distribute items across branches
1324
BROADCAST = "broadcast" # Send all items to all branches
1425

15-
_poison_pill = object()
1626

1727
def _poison_filter(queue: Queue) -> Iterator[Any]:
18-
"""Filter that adds a poison pill to the end of the input."""
28+
"""Iterator over queue items until _poison_pill is seen."""
1929
while True:
2030
item = queue.get()
2131
if item is _poison_pill:
2232
break
2333
yield item
2434

35+
2536
class ForkSegment(AbstractSegment):
26-
"""A segment that forks the input stream into multiple downstream pipelines,
27-
processing them in parallel using threads.
28-
"""
29-
30-
def __init__(self,
31-
branches: List[AbstractSegment],
32-
mode: ForkMode = ForkMode.BROADCAST,
33-
max_queue_size: int = 100,
34-
num_threads: int = None):
35-
"""Initialize the fork segment."""
36-
# Set process_metadata=True so metadata flows into branches
37-
super().__init__(process_metadata=True)
37+
"""Forks the input stream into multiple downstream pipelines in parallel."""
38+
39+
def __init__(
40+
self,
41+
branches: List[AbstractSegment],
42+
mode: ForkMode = ForkMode.BROADCAST,
43+
max_queue_size: int = 100,
44+
num_threads: int = None,
45+
):
46+
super().__init__(process_metadata=True) # Metadata flows into branches
3847
self.branches = branches
3948
self.mode = mode
4049
self.max_queue_size = max_queue_size
4150
self.num_threads = num_threads or len(branches)
4251

43-
def process_branch(self, branch_id: int, branch: AbstractSegment,
44-
input_queue: Queue, output_queue: Queue):
45-
"""Process a single branch of the fork."""
52+
def process_branch(
53+
self,
54+
branch_id: int,
55+
branch: AbstractSegment,
56+
input_queue: Queue,
57+
output_queue: Queue,
58+
):
59+
"""Run one branch: consume from input_queue, emit (branch_id, item) to output_queue."""
4660
try:
4761
if isinstance(branch, AbstractSegment):
4862
iter = branch(_poison_filter(input_queue))
@@ -51,19 +65,19 @@ def process_branch(self, branch_id: int, branch: AbstractSegment,
5165

5266
for item in iter:
5367
output_queue.put((branch_id, item))
54-
68+
5569
except Exception as e:
5670
logger.error(f"Error in fork branch {branch_id}: {e}")
5771
raise
5872
finally:
59-
output_queue.put((branch_id, None)) # Signal branch completion
73+
output_queue.put((branch_id, None)) # Sentinel: branch finished
6074
input_queue.task_done()
6175

6276
def transform(self, input_iter: Iterable[Any]) -> Iterator[Any]:
63-
"""Transform input by distributing it across multiple branches."""
77+
"""Distribute input to branches, collect results as they complete."""
6478
input_queues = [Queue(maxsize=self.max_queue_size) for _ in self.branches]
6579
output_queue = Queue()
66-
80+
6781
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
6882
# Submit branch processing tasks
6983
futures = [
@@ -89,7 +103,7 @@ def transform(self, input_iter: Iterable[Any]) -> Iterator[Any]:
89103
for queue in input_queues:
90104
queue.put(_poison_pill)
91105

92-
# Yield results as they become available
106+
# Drain output_queue; result=None is branch completion sentinel
93107
active_branches = len(self.branches)
94108
while active_branches > 0:
95109
branch_id, result = output_queue.get()
@@ -105,10 +119,11 @@ def transform(self, input_iter: Iterable[Any]) -> Iterator[Any]:
105119
for future in futures:
106120
future.cancel()
107121

108-
# Helper function to create a fork
109-
def fork(*branches: AbstractSegment,
110-
mode: ForkMode = ForkMode.ROUND_ROBIN,
111-
max_queue_size: int = 100,
112-
num_threads: int = None) -> ForkSegment:
113-
"""Create a fork segment with the given branches."""
122+
def fork(
123+
*branches: AbstractSegment,
124+
mode: ForkMode = ForkMode.ROUND_ROBIN,
125+
max_queue_size: int = 100,
126+
num_threads: int = None,
127+
) -> ForkSegment:
128+
"""Create a ForkSegment with the given branches."""
114129
return ForkSegment(list(branches), mode, max_queue_size, num_threads)

src/talkpipe/pipe/math.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
"""Math operations for pipe."""
1+
"""Math operations for pipe: random numbers, ranges, scaling, and comparison filters.
22
3+
Provides sources (randomInts, range) and segments (scale, eq, neq, gt, gte, lt, lte)
4+
for numeric pipelines.
5+
"""
36
from typing import Iterable, Union, Callable, Any, Annotated
47
from numpy import random
58
from talkpipe.pipe import core
@@ -27,7 +30,7 @@ def scale(
2730
yield x * multiplier
2831

2932
@registry.register_source(name="range")
30-
@core.source(lower=0, upper=10)
33+
@core.source(lower=0, upper=10)
3134
def arange(
3235
lower: Annotated[int, "Lower bound of the range (inclusive)"],
3336
upper: Annotated[int, "Upper bound of the range (exclusive)"]
@@ -42,27 +45,29 @@ def arange(
4245

4346

4447
class AbstractComparisonFilter(core.AbstractSegment):
45-
"""Abstract base class for comparison segments."""
48+
"""Base for comparison segments: filter items where field value op threshold."""
4649

47-
def __init__(self,
48-
field: Annotated[str, "Field/property to compare"],
49-
n: Annotated[Any, "Value to compare against"],
50-
comparator: Callable[[Any, Any], bool]):
50+
def __init__(
51+
self,
52+
field: Annotated[str, "Field/property to compare"],
53+
n: Annotated[Any, "Value to compare against"],
54+
comparator: Callable[[Any, Any], bool],
55+
):
5156
super().__init__()
5257
self.field = field
5358
self.n = n
5459
self.comparator = comparator
5560

5661
def transform(self, items: Iterable) -> Iterable:
57-
"""Filter items based on the comparison."""
62+
"""Yield items whose field value satisfies the comparator."""
5863
for item in items:
5964
value = extract_property(item, self.field, fail_on_missing=True)
6065
if self.comparator(value, self.n):
6166
yield item
6267

6368

6469
def _make_comparison_segment(name: str, op: Callable[[Any, Any], bool], docstring: str):
65-
"""Factory for comparison segments (eq, neq, gt, gte, lt, lte)."""
70+
"""Factory: create a registered comparison segment with given op and docstring."""
6671
@registry.register_segment(name=name)
6772
class ComparisonSegment(AbstractComparisonFilter):
6873
__doc__ = docstring
@@ -75,21 +80,16 @@ def __init__(self,
7580
return ComparisonSegment
7681

7782

78-
# TODO: rename to EQ in 0.5.0
79-
eq = _make_comparison_segment("eq", lambda x, y: x == y,
83+
# Comparison segments: filter by field value vs threshold
84+
EQ = _make_comparison_segment("eq", lambda x, y: x == y,
8085
"Filter items where a specified field's value equals a number.")
81-
# TODO: rename to NEQ in 0.5.0
82-
neq = _make_comparison_segment("neq", lambda x, y: x != y,
86+
NEQ = _make_comparison_segment("neq", lambda x, y: x != y,
8387
"Filter items where a specified field's value does not equal a number.")
84-
# TODO: rename to GT in 0.5.0
85-
gt = _make_comparison_segment("gt", lambda x, y: x > y,
88+
GT = _make_comparison_segment("gt", lambda x, y: x > y,
8689
"Filter items where a specified field's value is greater than a number.")
87-
# TODO: rename to GTE in 0.5.0
88-
gte = _make_comparison_segment("gte", lambda x, y: x >= y,
90+
GTE = _make_comparison_segment("gte", lambda x, y: x >= y,
8991
"Filter items where a specified field's value is greater than or equal to a number.")
90-
# TODO: rename to LT in 0.5.0
91-
lt = _make_comparison_segment("lt", lambda x, y: x < y,
92+
LT = _make_comparison_segment("lt", lambda x, y: x < y,
9293
"Filters items based on a field value being less than a specified number.")
93-
# TODO: rename to LTE in 0.5.0
94-
lte = _make_comparison_segment("lte", lambda x, y: x <= y,
94+
LTE = _make_comparison_segment("lte", lambda x, y: x <= y,
9595
"Filter items where a specified field's value is less than or equal to a number.")

src/talkpipe/util/collections.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
"""Collection utilities: adaptive buffers and expiring key-value stores.
2+
3+
Provides AdaptiveBuffer for rate-aware batching and ExpiringDict for
4+
in-memory caches with optional TTL and persistence.
5+
"""
16
import json
27
import logging
38
import os
@@ -6,8 +11,15 @@
611

712
logger = logging.getLogger(__name__)
813

14+
915
class AdaptiveBuffer:
10-
"""Buffer that adapts its flush size based on item arrival rate."""
16+
"""Buffer that adapts its flush size based on item arrival rate.
17+
18+
When items arrive quickly (intervals <= fast_interval), flushes at max_size
19+
for efficiency. When items arrive slowly (intervals >= slow_interval),
20+
flushes at min_size for responsiveness. Uses EMA of inter-arrival intervals
21+
to interpolate target size between these extremes.
22+
"""
1123

1224
def __init__(
1325
self,
@@ -18,6 +30,7 @@ def __init__(
1830
smoothing=0.2,
1931
time_func=time.time,
2032
):
33+
"""Initialize the buffer with size and timing parameters."""
2134
if min_size < 1:
2235
raise ValueError("min_size must be >= 1")
2336
if max_size < min_size:
@@ -42,6 +55,7 @@ def __init__(
4255
self._target_size = min_size
4356

4457
def append(self, item):
58+
"""Add an item; returns flushed batch if target size reached, else None."""
4559
now = self.time_func()
4660
if self._last_append_time is not None:
4761
interval = max(0.0, now - self._last_append_time)
@@ -55,6 +69,7 @@ def append(self, item):
5569
return None
5670

5771
def extend(self, items):
72+
"""Add multiple items; returns list of any flushed batches."""
5873
flushed = []
5974
for item in items:
6075
batch = self.append(item)
@@ -63,6 +78,7 @@ def extend(self, items):
6378
return flushed
6479

6580
def flush(self):
81+
"""Force flush and return all buffered items, or None if empty."""
6682
if not self._buffer:
6783
return None
6884
items = self._buffer
@@ -73,20 +89,23 @@ def __len__(self):
7389
return len(self._buffer)
7490

7591
def _update_interval(self, interval):
92+
"""Update EMA of inter-arrival interval using smoothing factor."""
7693
if self._ema_interval is None:
7794
self._ema_interval = interval
7895
else:
7996
alpha = self.smoothing
8097
self._ema_interval = (alpha * interval) + ((1 - alpha) * self._ema_interval)
8198

8299
def _compute_target_size(self):
100+
"""Compute target flush size from EMA interval (min_size to max_size)."""
83101
if self._ema_interval is None:
84102
return self.min_size
85103
if self._ema_interval <= self.fast_interval:
86104
return self.max_size
87105
if self._ema_interval >= self.slow_interval:
88106
return self.min_size
89107

108+
# Linear interpolation: faster arrivals -> larger target
90109
ratio = (self.slow_interval - self._ema_interval) / (
91110
self.slow_interval - self.fast_interval
92111
)
@@ -95,7 +114,14 @@ def _compute_target_size(self):
95114

96115

97116
class ExpiringDict(UserDict):
117+
"""Dict-like store with per-key TTL and optional JSON persistence.
118+
119+
Keys expire after their TTL (seconds). If filename is set, the dict is
120+
saved to disk on every mutation and loaded on init.
121+
"""
122+
98123
def __init__(self, filename=None, default_ttl=None):
124+
"""Initialize with optional persistence path and default TTL in seconds."""
99125
super().__init__()
100126
self.default_ttl = default_ttl
101127
self.filename = filename
@@ -105,6 +131,7 @@ def __init__(self, filename=None, default_ttl=None):
105131
self._load()
106132

107133
def __setitem__(self, key, value, ttl=None):
134+
"""Set key to value; ttl overrides default_ttl if provided."""
108135
self.data[key] = value
109136

110137
if ttl is None:
@@ -117,6 +144,7 @@ def __setitem__(self, key, value, ttl=None):
117144
self._save() # Save when a key is set
118145

119146
def __delitem__(self, key):
147+
"""Delete key and its expiry entry."""
120148
super().__delitem__(key)
121149
if key in self.expiry:
122150
del self.expiry[key]
@@ -125,6 +153,7 @@ def __delitem__(self, key):
125153
self._save() # Save when a key is deleted
126154

127155
def __getitem__(self, key):
156+
"""Get value; raises KeyError if missing or expired."""
128157
self._clean_expired()
129158
if key in self.expiry and time.time() > self.expiry[key]:
130159
del self.data[key]
@@ -135,6 +164,7 @@ def __getitem__(self, key):
135164
return self.data[key]
136165

137166
def set_with_ttl(self, key, value, ttl):
167+
"""Convenience method to set a key with explicit TTL."""
138168
self.__setitem__(key, value, ttl)
139169

140170
def clear(self):
@@ -165,7 +195,7 @@ def popitem(self):
165195
return key, value
166196

167197
def _clean_expired(self):
168-
"""Remove all expired keys"""
198+
"""Remove all expired keys from data and expiry maps."""
169199
now = time.time()
170200
expired = [k for k, exp in self.expiry.items() if now > exp]
171201
if expired: # Only save if something was expired
@@ -197,6 +227,7 @@ def __contains__(self, key):
197227
return key in self.data
198228

199229
def _save(self):
230+
"""Persist data and expiry to JSON file via atomic write."""
200231
if self.filename:
201232
# Use .tmp file and atomic rename for safety
202233
tmp_filename = str(self.filename) + '.tmp'
@@ -214,6 +245,7 @@ def _save(self):
214245
raise
215246

216247
def _load(self):
248+
"""Load data and expiry from JSON file; on failure, start empty."""
217249
try:
218250
if os.path.exists(self.filename):
219251
with open(self.filename, 'r') as f:

0 commit comments

Comments
 (0)