Skip to content

Commit 4456be6

Browse files
committed
chore(infrastructure): add not completed adapters
1 parent 94a545c commit 4456be6

File tree

10 files changed

+223
-7
lines changed

10 files changed

+223
-7
lines changed

src/tgdb/application/ports/heap.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async def row(
1818
) -> Row | None: ...
1919

2020
@abstractmethod
21-
async def rows(
21+
def rows(
2222
self,
2323
schema: RowSchema,
2424
attribute_number: int,

src/tgdb/infrastructure/adapters/__init__.py

Whitespace-only changes.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.application.ports.commit_serialization import CommitSerialization
4+
from tgdb.entities.transaction import TransactionCommit
5+
6+
7+
@dataclass(frozen=True)
8+
class CommitSerializationToCommit(CommitSerialization[TransactionCommit]):
9+
async def serialized(self, commit: TransactionCommit) -> TransactionCommit:
10+
return commit
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from collections.abc import AsyncIterator
2+
from dataclasses import dataclass, field
3+
from functools import partial
4+
5+
from tgdb.application.ports.heap import Heap
6+
from tgdb.entities.row import Row, RowAttribute, RowSchema
7+
from tgdb.entities.transaction import TransactionEffect
8+
9+
10+
@dataclass(frozen=True, unsafe_hash=False)
11+
class InMemoryHeap(Heap):
12+
_row_by_row_id: dict[RowAttribute, Row] = field(default_factory=dict)
13+
14+
async def insert(self, row: Row) -> None:
15+
if row.id in self._row_by_row_id:
16+
raise ValueError
17+
18+
self._row_by_row_id[row.id] = row
19+
20+
async def row(
21+
self,
22+
schema: RowSchema,
23+
attribute_number: int,
24+
attribute: RowAttribute | None = None,
25+
) -> Row | None:
26+
for row in self._row_by_row_id.values():
27+
if self._is_valid(schema, attribute_number, attribute, row):
28+
return row
29+
30+
return None
31+
32+
async def rows(
33+
self,
34+
schema: RowSchema,
35+
attribute_number: int,
36+
attribute: RowAttribute | None = None,
37+
) -> AsyncIterator[Row]:
38+
is_valid = partial(self._is_valid, schema, attribute_number, attribute)
39+
rows = tuple(filter(is_valid, self._row_by_row_id.values()))
40+
41+
for row in rows:
42+
yield row
43+
44+
async def update(self, row: Row) -> None:
45+
if row.id not in self._row_by_row_id:
46+
raise ValueError
47+
48+
self._row_by_row_id[row.id] = row
49+
50+
async def delete(self, row: Row) -> None:
51+
del self._row_by_row_id[row.id]
52+
53+
async def map(self, effect: TransactionEffect) -> None:
54+
for new_row in effect.new_values:
55+
await self.insert(new_row)
56+
57+
for mutated_row in effect.mutated_values:
58+
await self.update(mutated_row)
59+
60+
for dead_row in effect.dead_values:
61+
await self.delete(dead_row)
62+
63+
def _is_valid(
64+
self,
65+
schema: RowSchema,
66+
attribute_number: int,
67+
attribute: RowAttribute | None,
68+
row: Row,
69+
) -> bool:
70+
return (
71+
row.schema == schema
72+
and len(row) >= attribute_number + 1
73+
and (
74+
attribute is not None
75+
or row[attribute_number] == attribute
76+
)
77+
)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from abc import abstractmethod
2+
from collections.abc import AsyncIterator
3+
from dataclasses import dataclass
4+
5+
from tgdb.application.ports.log import LogSlot
6+
from tgdb.entities.logic_time import LogicTime
7+
from tgdb.entities.operator import AppliedOperator
8+
from tgdb.infrastructure.async_queque import AsyncQueque
9+
10+
11+
@dataclass(frozen=True, unsafe_hash=False)
12+
class AsyncQuequeLogSlot(LogSlot):
13+
_queque: AsyncQueque[AppliedOperator]
14+
15+
async def push(self, operator: AppliedOperator, /) -> None:
16+
if self._queque and operator.time <= self._queque[-1].time:
17+
raise ValueError
18+
19+
self._queque.push(operator)
20+
21+
async def __call__(self, *, block: bool) -> AsyncIterator[AppliedOperator]:
22+
if block:
23+
async for operator in self._queque:
24+
yield operator
25+
26+
return
27+
28+
for operator in self._queque:
29+
yield operator
30+
31+
@abstractmethod
32+
async def commit(self, offset: LogicTime) -> None: ...
33+
34+
@abstractmethod
35+
async def offset(self) -> LogicTime | None: ...
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from collections.abc import Generator
2+
from dataclasses import dataclass
3+
4+
5+
@dataclass
6+
class InMemoryLogicClock:
7+
_current_time: int = 0
8+
9+
def __await__(self) -> Generator[None, None, int]:
10+
self._current_time += 1
11+
yield
12+
return self._current_time
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.application.ports.operator_serialization import OperatorSerialization
4+
from tgdb.entities.operator import Operator
5+
6+
7+
@dataclass(frozen=True)
8+
class OperatorSerializationToOperator(OperatorSerialization[Operator | None]):
9+
async def deserialized(self, operator: Operator | None) -> Operator | None:
10+
return operator
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from abc import ABC, abstractmethod
2+
from asyncio import Event, Semaphore
3+
from collections.abc import AsyncIterator
4+
from dataclasses import dataclass, field
5+
6+
from tgdb.application.ports.queque import Queque
7+
from tgdb.infrastructure.async_queque import AsyncQueque
8+
9+
10+
@dataclass
11+
class InMemoryQueque[ValueT](Queque[ValueT]):
12+
_queque: AsyncQueque[ValueT]
13+
_sync_reading_completed: Event = field(init=False, default_factory=Event)
14+
15+
async def async_push(self, value: ValueT) -> None:
16+
self._queque.push(value)
17+
18+
async def sync_push(self, value: ValueT) -> None:
19+
self._sync_reading_completed.clear()
20+
self._queque.push(value)
21+
await self._sync_reading_completed.wait()
22+
23+
async def async_(self) -> AsyncIterator[ValueT]:
24+
values = self._async_values()
25+
first_value = await anext(values)
26+
27+
return self._values(first_value, values)
28+
29+
async def _values(
30+
self, first_value: ValueT, values: AsyncIterator[ValueT]
31+
) -> AsyncIterator[ValueT]:
32+
yield first_value
33+
34+
async for value in values:
35+
yield value
36+
37+
async def _async_values(self) -> AsyncIterator[ValueT]:
38+
async for value in self._queque:
39+
yield value
40+
41+
async def sync(self) -> AsyncIterator[ValueT]:
42+
for value in

src/tgdb/infrastructure/async_queque.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
from asyncio import Event
22
from collections import deque
3-
from collections.abc import AsyncIterator
3+
from collections.abc import AsyncIterator, Iterator, Sequence
44
from dataclasses import dataclass, field
5+
from typing import Any, overload
6+
7+
8+
# |
9+
# a
10+
# |
11+
# |
512

613

714
@dataclass(frozen=True, unsafe_hash=False)
8-
class AsyncQueque[ValueT]:
15+
class AsyncQueque[ValueT](Sequence[ValueT]):
916
_values: deque[ValueT] = field(default_factory=deque)
1017
_offset_by_event: dict[Event, int] = field(
1118
default_factory=dict, init=False
@@ -14,6 +21,25 @@ class AsyncQueque[ValueT]:
1421
def __len__(self) -> int:
1522
return len(self._values)
1623

24+
def __bool__(self) -> bool:
25+
return bool(self._values)
26+
27+
def __iter__(self) -> Iterator[ValueT]:
28+
return iter(self._values)
29+
30+
@overload
31+
def __getitem__(self, index: int, /) -> ValueT: ...
32+
33+
@overload
34+
def __getitem__(
35+
self, slice_: "slice[Any, Any, Any]", /
36+
) -> Sequence[ValueT]: ...
37+
38+
def __getitem__(
39+
self, value: "int | slice[Any, Any, Any]", /
40+
) -> Sequence[ValueT] | ValueT:
41+
return self._values[value]
42+
1743
def push(self, value: ValueT) -> None:
1844
self._values.append(value)
1945

@@ -22,6 +48,10 @@ def push(self, value: ValueT) -> None:
2248

2349
async def __aiter__(self) -> AsyncIterator[ValueT]:
2450
event = Event()
51+
52+
if self._values:
53+
event.set()
54+
2555
self._offset_by_event[event] = -1
2656

2757
while True:
@@ -34,7 +64,10 @@ async def __aiter__(self) -> AsyncIterator[ValueT]:
3464

3565
yield new_value
3666

37-
def _refresh(self) -> None:
67+
if self._offset_by_event[event] == len(self._values) - 1:
68+
event.clear()
69+
70+
def refresh(self) -> None:
3871
min_offset = min(self._offset_by_event.values())
3972

4073
if min_offset >= 0:

src/tgdb/infrastructure/telethon/in_telegram/value.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ async def set(self, value: ValueT, /) -> None:
2020
self._encoded(value),
2121
)
2222

23-
def __await__(self) -> Generator[None, None, ValueT]:
24-
return self.get().__await__()
25-
2623
async def get(self) -> ValueT:
2724
pointer_message = await self._pointer_message()
2825

0 commit comments

Comments
 (0)