Skip to content

Commit ce2f039

Browse files
committed
feat: add common main
1 parent e38d8ca commit ce2f039

File tree

13 files changed

+337
-65
lines changed

13 files changed

+337
-65
lines changed

deploy/dev/tgdb/conf.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ conf:
77
bots: "/app/deploy/dev/tgdb/bots/bots.txt"
88
userbots: "/app/deploy/dev/tgdb/bots/userbots.txt"
99

10+
horizon:
11+
max_len: 10_000
12+
transaction:
13+
max_age_seconds: 10
14+
15+
message_cache:
16+
max_len: 1_000_000
17+
1018
heap:
1119
chat: -1005010263669
1220
page:
@@ -15,6 +23,7 @@ conf:
1523
relations:
1624
chat: -1005000098156
1725
vacuum:
26+
max_workers: 1
1827
window_delay_seconds: 30
1928
min_message_count: 1
2029

@@ -25,5 +34,6 @@ conf:
2534
timeout_seconds: 0.1
2635

2736
vacuum:
37+
max_workers: 10
2838
window_delay_seconds: 10
2939
min_message_count: 200

src/tgdb/infrastructure/adapters/buffer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
@dataclass(frozen=True, unsafe_hash=False)
1616
class InMemoryBuffer[ValueT](Buffer[ValueT]):
17-
_size_to_overflow: int
18-
_overflow_timeout_seconds: int
17+
_len_to_overflow: int
18+
_overflow_timeout_seconds: int | float
1919
_values: deque[ValueT]
2020
_is_overflowed: Event = field(init=False, default_factory=Event)
2121

@@ -36,11 +36,11 @@ async def __aiter__(self) -> AsyncIterator[Sequence[ValueT]]:
3636

3737
yield tuple(
3838
self._values.popleft()
39-
for _ in range(self._size_to_overflow)
39+
for _ in range(self._len_to_overflow)
4040
)
4141

4242
def _refresh_overflow(self) -> None:
43-
if len(self._values) >= self._size_to_overflow:
43+
if len(self._values) >= self._len_to_overflow:
4444
self._is_overflowed.set()
4545

4646

src/tgdb/infrastructure/adapters/relations.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async def add(self, relation: Relation) -> None:
4141

4242
@dataclass
4343
class InTelegramReplicableRelations(Relations):
44-
_in_telegram_encoded_relations: InTelegramBytes
44+
_in_tg_encoded_relations: InTelegramBytes
4545
_cached_relations: InMemoryDb[Relation]
4646

4747
async def __aenter__(self) -> Self:
@@ -86,10 +86,10 @@ async def add(self, relation: Relation) -> None:
8686
self._cached_relations.insert(relation)
8787

8888
encoded_cached_relations = pickle.dumps(tuple(self._cached_relations))
89-
await self._in_telegram_encoded_relations.set(encoded_cached_relations)
89+
await self._in_tg_encoded_relations.set(encoded_cached_relations)
9090

9191
async def _loaded_relations(self) -> tuple[Relation, ...]:
92-
encoded_relations = await self._in_telegram_encoded_relations
92+
encoded_relations = await self._in_tg_encoded_relations
9393

9494
if encoded_relations is None:
9595
return tuple()

src/tgdb/infrastructure/adapters/shared_horizon.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
from dataclasses import dataclass
12
from types import TracebackType
23

34
from tgdb.application.common.ports.shared_horizon import SharedHorizon
45
from tgdb.entities.horizon.horizon import Horizon
56

67

8+
@dataclass(frozen=True)
79
class InMemorySharedHorizon(SharedHorizon):
810
_horizon: Horizon
911

src/tgdb/infrastructure/lazy_map.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,23 @@ async def __getitem__(self, key: KeyT) -> ValueT:
3737
return self._output(self._cache_map[key])
3838

3939
value = await self._external_value(key)
40-
self._cache_map[key] = value
41-
42-
if len(self._cache_map) > self._cache_map_max_len:
43-
self._cache_map.pop(next(iter(self._cache_map)))
40+
self._insert_to_cache_map(key, value)
4441

4542
return self._output(value)
4643

4744
def __setitem__(self, key: KeyT, value: ValueT) -> None:
48-
self._cache_map[key] = value
45+
self._insert_to_cache_map(key, value)
4946

5047
def _output(self, value: ExternalValue[ValueT]) -> ValueT:
5148
if isinstance(value, NoExternalValue):
5249
raise KeyError
5350

5451
return value
52+
53+
def _insert_to_cache_map(
54+
self, key: KeyT, value: ExternalValue[ValueT]
55+
) -> None:
56+
self._cache_map[key] = value
57+
58+
if len(self._cache_map) > self._cache_map_max_len:
59+
self._cache_map.pop(next(iter(self._cache_map)))

src/tgdb/infrastructure/pyyaml/conf.py

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,53 +2,76 @@
22
from pathlib import Path
33

44
import yaml
5+
from pydantic import BaseModel, Field
56

67

7-
@dataclass(frozen=True)
8-
class Conf:
9-
api_id: int
10-
api_hash: str
8+
class APIConf(BaseModel):
9+
id: int
10+
hash: str
1111

12-
clients_bots: str
13-
clients_userbots: str
1412

15-
heap_chat: int
16-
heap_page_fullness: float
13+
class ClientsConf(BaseModel):
14+
bots: Path
15+
userbots: Path
1716

18-
relations_chat: int
19-
relations_vacuum_window_delay_seconds: int
20-
relations_vacuum_min_message_count: int
2117

22-
buffer_chat: int
23-
buffer_overflow_len: int
24-
buffer_overflow_timeout_seconds: float
25-
buffer_vacuum_window_delay_seconds: int
26-
buffer_vacuum_min_message_count: int
18+
class TransactionConf(BaseModel):
19+
max_age_seconds: int = Field(..., alias="max_age_seconds")
2720

28-
@classmethod
29-
def load(cls, path: Path) -> "Conf":
30-
with path.open("r") as file:
31-
data = yaml.safe_load(file)
3221

33-
conf = data["conf"]
22+
class HorizonConf(BaseModel):
23+
max_len: int
24+
transaction: TransactionConf
25+
26+
27+
class MessageCacheConf(BaseModel):
28+
max_len: int
29+
30+
31+
class PageConf(BaseModel):
32+
fullness: float
33+
34+
35+
class HeapConf(BaseModel):
36+
chat: int
37+
page: PageConf
38+
3439

35-
return cls(
36-
api_id=conf["api"]["id"],
37-
api_hash=conf["api"]["hash"],
40+
class VacuumConf(BaseModel):
41+
window_delay_seconds: int | float
42+
min_message_count: int
43+
max_workers: int
3844

39-
clients_bots=conf["clients"]["bots"],
40-
clients_userbots=conf["clients"]["userbots"],
4145

42-
heap_chat=conf["heap"]["chat"],
43-
heap_page_fullness=conf["heap"]["page"]["fullness"],
46+
class RelationsConf(BaseModel):
47+
chat: int
48+
vacuum: VacuumConf
4449

45-
relations_chat=conf["relations"]["chat"],
46-
relations_vacuum_window_delay_seconds=conf["relations"]["vacuum"]["window_delay_seconds"],
47-
relations_vacuum_min_message_count=conf["relations"]["vacuum"]["min_message_count"],
4850

49-
buffer_chat=conf["buffer"]["chat"],
50-
buffer_overflow_len=conf["buffer"]["overflow"]["len"],
51-
buffer_overflow_timeout_seconds=conf["buffer"]["overflow"]["timeout_seconds"],
52-
buffer_vacuum_window_delay_seconds=conf["buffer"]["vacuum"]["window_delay_seconds"],
53-
buffer_vacuum_min_message_count=conf["buffer"]["vacuum"]["min_message_count"],
54-
)
51+
class OverflowConf(BaseModel):
52+
len: int
53+
timeout_seconds: int | float
54+
55+
56+
class BufferConf(BaseModel):
57+
chat: int
58+
overflow: OverflowConf
59+
vacuum: VacuumConf
60+
61+
62+
class Conf(BaseModel):
63+
api: APIConf
64+
clients: ClientsConf
65+
horizon: HorizonConf
66+
message_cache: MessageCacheConf
67+
heap: HeapConf
68+
relations: RelationsConf
69+
buffer: BufferConf
70+
71+
@classmethod
72+
def load(cls, path: Path) -> "Conf":
73+
with path.open() as file:
74+
data = yaml.safe_load(file)
75+
76+
conf = data["conf"]
77+
return Conf(**conf)

src/tgdb/infrastructure/telethon/client_pool.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ def loaded_client_pool_from_farm_file(
7272
with farm_file_path.open() as farm_file:
7373
return TelegramClientPool(deque(
7474
TelegramClient(
75-
StringSession(session_token), app_api_id, app_api_hash
75+
StringSession(session_token),
76+
app_api_id,
77+
app_api_hash,
78+
entity_cache=None,
7679
)
7780
for session_token in farm_file
7881
if session_token

src/tgdb/infrastructure/telethon/in_telegram_heap.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from collections.abc import Sequence
22
from dataclasses import dataclass
3-
from typing import cast
3+
from typing import ClassVar, cast
44

55
from telethon.hints import TotalList
66

@@ -29,8 +29,19 @@ class InTelegramHeap:
2929
_encoded_tuple_max_len: int
3030
_message_map: LazyMessageMap
3131

32+
_page_len: ClassVar = 4000
33+
34+
@staticmethod
35+
def encoded_tuple_max_len(page_fullness: float) -> int:
36+
page_fullness = min(0, page_fullness)
37+
page_fullness = max(page_fullness, 1)
38+
39+
return int(page_fullness * InTelegramHeap._page_len)
40+
3241
def __post_init__(self) -> None:
33-
assert_(self._encoded_tuple_max_len < 4000, ValueError) # noqa: PLR2004
42+
assert_(
43+
self._encoded_tuple_max_len <= InTelegramHeap._page_len, ValueError
44+
)
3445

3546
def tuple_max_len(self) -> int:
3647
return self._encoded_tuple_max_len
@@ -82,24 +93,24 @@ async def tuples_with_attribute(
8293
)
8394

8495
async def insert_idempotently(self, tuple: Tuple) -> None:
85-
message_ = await self._message_map[tuple.tid]
96+
message_ = await self._message_map[self._heap_id, tuple.tid]
8697

8798
if message_ is not None:
8899
return
89100

90101
new_message = await self._pool_to_insert().send_message(
91102
self._heap_id, HeapTupleEncoding.encoded_tuple(tuple)
92103
)
93-
self._message_map[tuple.tid] = new_message
104+
self._message_map[self._heap_id, tuple.tid] = new_message
94105

95106
async def insert(self, tuple: Tuple) -> None:
96107
new_message = await self._pool_to_insert().send_message(
97108
self._heap_id, HeapTupleEncoding.encoded_tuple(tuple)
98109
)
99-
self._message_map[tuple.tid] = new_message
110+
self._message_map[self._heap_id, tuple.tid] = new_message
100111

101112
async def update(self, tuple: Tuple) -> None:
102-
message = await self._message_map[tuple.tid]
113+
message = await self._message_map[self._heap_id, tuple.tid]
103114

104115
if message is None:
105116
return
@@ -109,7 +120,7 @@ async def update(self, tuple: Tuple) -> None:
109120
)
110121

111122
async def delete_tuple_with_tid(self, tid: TID) -> None:
112-
message = await self._message_map[tid]
123+
message = await self._message_map[self._heap_id, tid]
113124

114125
if message is None:
115126
return

src/tgdb/infrastructure/telethon/lazy_message_map.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,18 @@
99
from tgdb.infrastructure.telethon.client_pool import TelegramClientPool
1010

1111

12-
type LazyMessageMap = LazyMap[TID, Message]
12+
type ChatID = int
13+
type LazyMessageMap = LazyMap[tuple[ChatID, TID], Message]
1314

1415

1516
def lazy_message_map(
16-
chat_id: int, pool: TelegramClientPool, computed_map_max_len: int
17+
pool: TelegramClientPool, cache_map_max_len: int
1718
) -> LazyMessageMap:
18-
async def tuple_message(tid: TID) -> ExternalValue[Message]:
19+
async def tuple_message(
20+
chat_id_and_tid: tuple[ChatID, TID]
21+
) -> ExternalValue[Message]:
22+
chat_id, tid = chat_id_and_tid
23+
1924
search = HeapTupleEncoding.id_of_encoded_tuple_with_tid(tid)
2025

2126
messages = cast(TotalList, await pool().get_messages(
@@ -27,4 +32,4 @@ async def tuple_message(tid: TID) -> ExternalValue[Message]:
2732

2833
return cast(Message, messages[0])
2934

30-
return LazyMap(computed_map_max_len, tuple_message)
35+
return LazyMap(cache_map_max_len, tuple_message)

src/tgdb/main/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)