Skip to content

Commit 42570cf

Browse files
committed
feat(infrastructure): add some adapters for telegram
1 parent 209ebdb commit 42570cf

23 files changed

+810
-249
lines changed
Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
from dataclasses import dataclass
22

33
from tgdb.application.ports.heap import Heap
4+
from tgdb.application.ports.message import TransactionCommitMessage
45
from tgdb.application.ports.queque import Queque
5-
from tgdb.entities.transaction import (
6-
TransactionCommit,
7-
TransactionOkCommit,
8-
)
6+
from tgdb.entities.transaction import TransactionOkCommit
97

108

119
@dataclass(frozen=True)
1210
class OutputCommitsToHeap:
1311
heap: Heap
14-
output_commits: Queque[TransactionCommit]
12+
output_commit_messages: Queque[TransactionCommitMessage]
1513

1614
async def __call__(self) -> None:
17-
async for commit in self.output_commits:
18-
if isinstance(commit, TransactionOkCommit):
19-
await self.heap.map(commit.effect)
15+
async for message in self.output_commit_messages:
16+
if not isinstance(message.commit, TransactionOkCommit):
17+
return
18+
19+
if message.is_commit_duplicate:
20+
await self.heap.map_as_duplicate(message.commit.effect)
21+
else:
22+
await self.heap.map(message.commit.effect)
Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
from dataclasses import dataclass
22

33
from tgdb.application.ports.log import Log
4+
from tgdb.application.ports.message import TransactionCommitMessage
45
from tgdb.application.ports.queque import Queque
56
from tgdb.entities.operator import CommitOperator, StartOperator
6-
from tgdb.entities.transaction import (
7-
TransactionCommit,
8-
TransactionOkCommit,
9-
)
7+
from tgdb.entities.transaction import TransactionOkCommit
108

119

1210
@dataclass(frozen=True)
1311
class OutputCommitsToLog:
1412
log: Log
15-
output_commits: Queque[TransactionCommit]
13+
output_commit_messages: Queque[TransactionCommitMessage]
1614

1715
async def __call__(self) -> None:
18-
async for commit in self.output_commits:
19-
if not isinstance(commit, TransactionOkCommit):
16+
async for message in self.output_commit_messages:
17+
if not isinstance(message.commit, TransactionOkCommit):
2018
continue
2119

22-
start_operator = StartOperator(commit.transaction_id)
20+
start_operator = StartOperator(message.commit.transaction_id)
2321
commit_operator = CommitOperator(
24-
commit.transaction_id, commit.effect
22+
message.commit.transaction_id, message.commit.effect
2523
)
2624

27-
await self.log.push_many((start_operator, commit_operator))
25+
await self.log.push(start_operator)
26+
await self.log.push(commit_operator)

src/tgdb/application/ports/heap.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@
66
class Heap(ABC):
77
@abstractmethod
88
async def map(self, effect: TransactionEffect, /) -> None: ...
9+
10+
@abstractmethod
11+
async def map_as_duplicate(self, effect: TransactionEffect, /) -> None: ...

src/tgdb/application/ports/log.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from abc import ABC, abstractmethod
2-
from collections.abc import Sequence
32

43
from tgdb.entities.logic_time import LogicTime
54
from tgdb.entities.operator import AppliedOperator, Operator
@@ -10,12 +9,7 @@
109

1110
class Log(ABC):
1211
@abstractmethod
13-
async def push_one(self, operator: Operator, /) -> AppliedOperator: ...
14-
15-
@abstractmethod
16-
async def push_many(
17-
self, operators: Sequence[Operator], /
18-
) -> Sequence[AppliedOperator]: ...
12+
async def push(self, operator: Operator, /) -> AppliedOperator: ...
1913

2014
@abstractmethod
2115
async def truncate(self, offset: LogOffset, /) -> None: ...
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.entities.transaction import TransactionCommit
4+
5+
6+
@dataclass(frozen=True)
7+
class TransactionCommitMessage:
8+
commit: TransactionCommit
9+
is_commit_duplicate: bool

src/tgdb/application/serialize_transactions.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from tgdb.application.ports.log import Log, LogOffset
44
from tgdb.application.ports.log_iterator import LogIterator
5+
from tgdb.application.ports.message import TransactionCommitMessage
56
from tgdb.application.ports.queque import Queque
67
from tgdb.entities.logic_time import LogicTime
78
from tgdb.entities.operator import AppliedOperator, Operator
@@ -17,7 +18,7 @@ class SerializeTransactions:
1718
log: Log
1819
log_iterator: LogIterator
1920
input_operators: Queque[Operator]
20-
output_commits: Queque[TransactionCommit]
21+
output_commit_messages: Queque[TransactionCommitMessage]
2122

2223
async def __call__(
2324
self,
@@ -37,14 +38,19 @@ async def __call__(
3738
input_operators = aiter(self.input_operators)
3839

3940
async for operator in self.log_iterator.finite():
40-
await self._output_operator(operator, horizon)
41+
await self._output_operator(operator, horizon, is_duplicate=True)
4142

4243
async for operator in input_operators:
43-
applied_operator = await self.log.push_one(operator)
44-
await self._output_operator(applied_operator, horizon)
44+
applied_operator = await self.log.push(operator)
45+
await self._output_operator(
46+
applied_operator, horizon, is_duplicate=False
47+
)
4548

4649
async def _output_operator(
47-
self, operator: AppliedOperator, horizon: TransactionHorizon
50+
self,
51+
operator: AppliedOperator,
52+
horizon: TransactionHorizon,
53+
is_duplicate: bool,
4854
) -> None:
4955
transaction_commit = horizon.add(operator)
5056

@@ -54,10 +60,14 @@ async def _output_operator(
5460
)
5561

5662
if transaction_commit:
57-
await self.output_commits.push(transaction_commit)
63+
await self.output_commit_messages.push(
64+
TransactionCommitMessage(
65+
transaction_commit, is_commit_duplicate=is_duplicate
66+
)
67+
)
5868

5969
if need_to_commit_offset:
60-
await self.output_commits.sync()
70+
await self.output_commit_messages.sync()
6171
await self.log_iterator.commit(offset_to_commit)
6272

6373
def _safe_offset_to_commit(

src/tgdb/entities/mark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44

55
@dataclass(frozen=True)
66
class Mark:
7-
mark_id: UUID
7+
id: UUID
88
key: str

src/tgdb/entities/row.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,24 @@ def row(*attrs: RowAttribute, schema: Schema = "__undefined__") -> Row:
4949
class NewRow:
5050
row: Row
5151

52+
@property
53+
def row_id(self) -> RowAttribute:
54+
return self.row.id
55+
5256

5357
@dataclass(frozen=True)
5458
class MutatedRow:
5559
row: Row
5660
message: Message | None
5761

62+
@property
63+
def row_id(self) -> RowAttribute:
64+
return self.row.id
65+
5866

5967
@dataclass(frozen=True)
6068
class DeletedRow:
61-
row: Row
69+
row_id: RowAttribute
6270
message: Message | None
6371

6472

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,66 @@
11
from asyncio import gather
2-
from dataclasses import dataclass, field
3-
from itertools import groupby
2+
from dataclasses import dataclass
3+
4+
from in_memory_db import InMemoryDb
45

56
from tgdb.application.ports.heap import Heap
67
from tgdb.entities.message import Message
7-
from tgdb.entities.row import DeletedRow, MutatedRow, NewRow, Row, RowEffect
8+
from tgdb.entities.row import (
9+
DeletedRow,
10+
MutatedRow,
11+
NewRow,
12+
Row,
13+
RowEffect,
14+
)
815
from tgdb.entities.transaction import TransactionEffect
9-
from tgdb.infrastructure.row_encoding import encoded_row
16+
from tgdb.infrastructure.heap_row_encoding import encoded_heap_row
1017
from tgdb.infrastructure.telethon.client_pool import TelegramClientPool
1118
from tgdb.infrastructure.telethon.lazy_message_map import LazyMessageMap
1219
from tgdb.infrastructure.telethon.mapping import message
1320

1421

1522
@dataclass(frozen=True, unsafe_hash=False)
1623
class InMemoryHeap(Heap):
17-
_rows: set[Row] = field(default_factory=set)
24+
_db: InMemoryDb[Row]
1825

1926
async def map(self, effect: TransactionEffect) -> None:
20-
for row_operator_effect in effect:
21-
match row_operator_effect, row_operator_effect.row in self._rows:
22-
case NewRow(row) | MutatedRow(row), _:
23-
self._rows.add(row)
24-
case DeletedRow(row), True:
25-
self._rows.remove(row)
27+
for row_effect in effect:
28+
prevous_row = self._db.select_one(
29+
lambda row: row.id == row_effect.row_id
30+
)
31+
32+
match row_effect, prevous_row:
33+
case DeletedRow(), Row():
34+
self._db.remove(prevous_row)
35+
36+
case NewRow(new_row), _:
37+
self._db.insert(new_row)
38+
39+
case MutatedRow(new_row), Row():
40+
self._db.remove(prevous_row)
41+
self._db.insert(new_row)
42+
43+
case _: ...
44+
45+
async def map_as_duplicate(self, effect: TransactionEffect) -> None:
46+
for row_effect in effect:
47+
prevous_row = self._db.select_one(
48+
lambda row: row.id == row_effect.row_id
49+
)
50+
51+
match row_effect, prevous_row:
52+
case DeletedRow(row_id), Row():
53+
self._db.remove(prevous_row)
54+
55+
case NewRow(new_row) | MutatedRow(new_row), Row():
56+
self._db.remove(prevous_row)
57+
self._db.insert(new_row)
58+
2659
case _: ...
2760

2861

2962
@dataclass(frozen=True)
30-
class TelethonHeap(Heap):
63+
class InTelegramHeap(Heap):
3164
_pool_to_insert: TelegramClientPool
3265
_pool_to_select: TelegramClientPool
3366
_pool_to_edit: TelegramClientPool
@@ -36,34 +69,41 @@ class TelethonHeap(Heap):
3669
_message_map: LazyMessageMap
3770

3871
async def map(self, transaction_effect: TransactionEffect) -> None:
39-
row_effects_by_row_id = groupby(
40-
transaction_effect, key=lambda it: it.row.id
41-
)
42-
4372
await gather(*(
44-
self._map_row_effects(tuple(row_effects))
45-
for _, row_effects in row_effects_by_row_id
73+
self._map_row_effect(row_effect, is_duplicate=False)
74+
for row_effect in transaction_effect
4675
))
4776

48-
async def _map_row_effects(self, effect_part: TransactionEffect) -> None:
49-
await gather(*map(self._map_row_effect, effect_part))
77+
async def map_as_duplicate(
78+
self, transaction_effect: TransactionEffect
79+
) -> None:
80+
await gather(*(
81+
self._map_row_effect(row_effect, is_duplicate=True)
82+
for row_effect in transaction_effect
83+
))
5084

51-
async def _map_row_effect(self, row_effect: RowEffect) -> None:
85+
async def _map_row_effect(
86+
self, row_effect: RowEffect, *, is_duplicate: bool
87+
) -> None:
5288
match row_effect:
5389
case NewRow():
54-
await self._insert(row_effect)
90+
await self._insert(row_effect, is_duplicate)
5591
case MutatedRow():
5692
await self._update(row_effect)
5793
case DeletedRow():
5894
await self._delete(row_effect)
5995

60-
async def _insert(self, row_effect: NewRow) -> None:
61-
telethon_message = await self._pool_to_insert().send_message(
62-
self._heap_id, encoded_row(row_effect.row)
63-
)
64-
self._message_map[row_effect.row.schema, row_effect.row.id] = (
65-
message(telethon_message)
96+
async def _insert(self, row_effect: NewRow, is_duplicate: bool) -> None:
97+
if is_duplicate:
98+
message_ = await self._message_map[row_effect.row.id]
99+
100+
if message_ is not None:
101+
return
102+
103+
tg_new_message = await self._pool_to_insert().send_message(
104+
self._heap_id, encoded_heap_row(row_effect.row)
66105
)
106+
self._message_map[row_effect.row.id] = message(tg_new_message)
67107

68108
async def _update(self, row_effect: MutatedRow) -> None:
69109
message = await self._message(row_effect)
@@ -72,7 +112,7 @@ async def _update(self, row_effect: MutatedRow) -> None:
72112
return
73113

74114
await self._pool_to_edit(message.author_id).edit_message(
75-
self._heap_id, message.id, encoded_row(row_effect.row)
115+
self._heap_id, message.id, encoded_heap_row(row_effect.row)
76116
)
77117

78118
async def _delete(self, row_effect: DeletedRow) -> None:
@@ -91,4 +131,10 @@ async def _message(
91131
if row_effect.message is not None:
92132
return row_effect.message
93133

94-
return await self._message_map[row_effect.row.schema, row_effect.row.id]
134+
match row_effect:
135+
case MutatedRow():
136+
row_id = row_effect.row.id
137+
case DeletedRow():
138+
row_id = row_effect.row_id
139+
140+
return await self._message_map[row_id]

0 commit comments

Comments
 (0)