Skip to content

Commit 9b362ca

Browse files
committed
feat(infrastructure): add a part for adapters
1 parent f4f8f63 commit 9b362ca

File tree

19 files changed

+240
-103
lines changed

19 files changed

+240
-103
lines changed

src/tgdb/application/common/ports/channel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ async def publish(
2626
) -> None: ...
2727

2828
@abstractmethod
29-
async def wait(self, xid: XID) -> Notification: ...
29+
async def wait(self, xid: XID, /) -> Notification: ...

src/tgdb/entities/relation/relation.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,9 @@ class RelationVersionID:
3232
class NotIncrementedRelationVersionError(Exception): ...
3333

3434

35-
class RelationWithoutLastVersionError(Exception): ...
36-
37-
3835
@dataclass
3936
class Relation:
4037
"""
41-
:raises tgdb.entities.relation.relation.RelationWithoutLastVersionError:
4238
:raises tgdb.entities.relation.relation.NotIncrementedRelationVersionError:
4339
"""
4440

@@ -47,9 +43,6 @@ class Relation:
4743
_intermediate_versions: list[DerivativeRelationVersion]
4844

4945
def __post_init__(self) -> None:
50-
if not self._intermediate_versions or not self._inital_version:
51-
raise RelationWithoutLastVersionError
52-
5346
for version, next_version in pairwise(self):
5447
if next(version.number) != next_version.number:
5548
raise NotIncrementedRelationVersionError

src/tgdb/infrastructure/adapters/buffer.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
from typing import Self
88

99
from tgdb.application.common.ports.buffer import Buffer
10-
from tgdb.entities.transaction import TransactionCommit
10+
from tgdb.entities.horizon.transaction import PreparedCommit
1111
from tgdb.infrastructure.pydantic.commit_encoding import (
12-
TransactionCommitListSchema,
12+
PreparedCommitListSchema,
1313
)
1414
from tgdb.infrastructure.telethon.in_telegram_big_text import InTelegramBigText
1515

@@ -47,8 +47,8 @@ def _refresh_overflow(self) -> None:
4747

4848

4949
@dataclass(frozen=True)
50-
class InTelegramReplicableTransactionCommitBuffer(Buffer[TransactionCommit]):
51-
_buffer: Buffer[TransactionCommit]
50+
class InTelegramReplicablePreparedCommitBuffer(Buffer[PreparedCommit]):
51+
_buffer: Buffer[PreparedCommit]
5252
_in_tg_encoded_commits: InTelegramBigText
5353

5454
async def __aenter__(self) -> Self:
@@ -58,7 +58,7 @@ async def __aenter__(self) -> Self:
5858
return self
5959

6060
commit_list_schema = (
61-
TransactionCommitListSchema.model_validate_json(encoded_commits)
61+
PreparedCommitListSchema.model_validate_json(encoded_commits)
6262
)
6363
for commit in commit_list_schema.commits:
6464
await self._buffer.add(commit)
@@ -72,12 +72,12 @@ async def __aexit__(
7272
traceback: TracebackType | None,
7373
) -> None: ...
7474

75-
async def add(self, commit: TransactionCommit, /) -> None:
75+
async def add(self, commit: PreparedCommit, /) -> None:
7676
await self._buffer.add(commit)
7777

78-
async def __aiter__(self) -> AsyncIterator[Sequence[TransactionCommit]]:
78+
async def __aiter__(self) -> AsyncIterator[Sequence[PreparedCommit]]:
7979
async for commits in self._buffer:
80-
commit_list_schema = TransactionCommitListSchema(commits=commits)
80+
commit_list_schema = PreparedCommitListSchema(commits=commits)
8181
encoded_encoded_commits = commit_list_schema.model_dump_json()
8282

8383
await self._in_tg_encoded_commits.set(encoded_encoded_commits)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from collections.abc import Mapping, Sequence
2+
from dataclasses import dataclass
3+
4+
from tgdb.application.common.ports.channel import Channel, Notification
5+
from tgdb.entities.horizon.horizon import (
6+
InvalidTransactionStateError,
7+
NoTransactionError,
8+
)
9+
from tgdb.entities.horizon.transaction import XID
10+
from tgdb.infrastructure.async_map import AsyncMap
11+
12+
13+
@dataclass(frozen=True)
14+
class AsyncMapChannel(Channel):
15+
_async_map: AsyncMap[
16+
XID, NoTransactionError | InvalidTransactionStateError | None
17+
]
18+
19+
async def publish(
20+
self,
21+
ok_commit_xids: Sequence[XID],
22+
error_commit_map: Mapping[
23+
XID, NoTransactionError | InvalidTransactionStateError
24+
],
25+
) -> None:
26+
for ok_commit_xid in ok_commit_xids:
27+
self._async_map[ok_commit_xid] = None
28+
del self._async_map[ok_commit_xid]
29+
30+
for error_commit_xid, error in error_commit_map.items():
31+
self._async_map[error_commit_xid] = error
32+
del self._async_map[error_commit_xid]
33+
34+
async def wait(self, xid: XID) -> Notification:
35+
notification_error = await self._async_map[xid]
36+
return Notification(notification_error)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from collections.abc import Generator
2+
from dataclasses import dataclass
3+
from time import perf_counter_ns
4+
from typing import Any
5+
6+
from tgdb.application.common.ports.clock import Clock
7+
from tgdb.entities.time.logic_time import LogicTime
8+
9+
10+
@dataclass
11+
class InMemoryClock(Clock):
12+
_time_counter: int = 0
13+
14+
def __await__(self) -> Generator[Any, Any, LogicTime]:
15+
return self._time().__await__()
16+
17+
async def _time(self) -> LogicTime:
18+
self._time_counter += 1
19+
return self._time_counter
20+
21+
22+
@dataclass(frozen=True)
23+
class PerfCounterClock(Clock):
24+
def __await__(self) -> Generator[Any, Any, LogicTime]:
25+
return self._time().__await__()
26+
27+
async def _time(self) -> LogicTime:
28+
return perf_counter_ns()

src/tgdb/infrastructure/adapters/logic_clock.py

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass
3+
4+
from in_memory_db import InMemoryDb
5+
6+
from tgdb.application.common.ports.relations import (
7+
NoRelationError,
8+
NotUniqueRelationNumberError,
9+
Relations,
10+
)
11+
from tgdb.entities.numeration.number import Number
12+
from tgdb.entities.relation.relation import Relation
13+
14+
15+
@dataclass(frozen=True)
16+
class InMemoryRelations(Relations):
17+
_db: InMemoryDb[Relation]
18+
19+
async def relation(self, relation_number: Number) -> Relation:
20+
"""
21+
:raises tgdb.application.common.ports.relations.NoRelationError:
22+
"""
23+
24+
relation = self._db.select_one(
25+
lambda it: it.number() == relation_number
26+
)
27+
28+
if relation is None:
29+
raise NoRelationError(relation_number)
30+
31+
async def add(self, relation: Relation) -> None:
32+
"""
33+
:raises tgdb.application.common.ports.relations.NotUniqueRelationNumberError:
34+
""" # noqa: E501
35+
36+
selected_relation = self._db.select_one(
37+
lambda it: it.number() == relation.number()
38+
)
39+
40+
if selected_relation is not None:
41+
raise NotUniqueRelationNumberError(relation.number())
42+
43+
self._db.insert(relation)
44+
45+
46+
@dataclass(frozen=True)
47+
class InMemoryRelations(Relations):
48+
_db: InMemoryDb[Relation]
49+
50+
async def relation(self, relation_number: Number) -> Relation:
51+
"""
52+
:raises tgdb.application.common.ports.relations.NoRelationError:
53+
"""
54+
55+
relation = self._db.select_one(
56+
lambda it: it.number() == relation_number
57+
)
58+
59+
if relation is None:
60+
raise NoRelationError(relation_number)
61+
62+
async def add(self, relation: Relation) -> None:
63+
"""
64+
:raises tgdb.application.common.ports.relations.NotUniqueRelationNumberError:
65+
""" # noqa: E501
66+
67+
selected_relation = self._db.select_one(
68+
lambda it: it.number() == relation.number()
69+
)
70+
71+
if selected_relation is not None:
72+
raise NotUniqueRelationNumberError(relation.number())
73+
74+
self._db.insert(relation)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from types import TracebackType
2+
3+
from tgdb.application.common.ports.shared_horizon import SharedHorizon
4+
from tgdb.entities.horizon.horizon import Horizon
5+
6+
7+
class InMemorySharedHorizon(SharedHorizon):
8+
_horizon: Horizon
9+
10+
async def __aenter__(self) -> Horizon:
11+
return self._horizon
12+
13+
async def __aexit__(
14+
self,
15+
exc_type: type[BaseException] | None,
16+
exc_value: BaseException | None,
17+
traceback: TracebackType | None,
18+
/,
19+
) -> None: ...

src/tgdb/infrastructure/adapters/heap.py renamed to src/tgdb/infrastructure/adapters/tuples.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44

55
from in_memory_db import InMemoryDb
66

7-
from tgdb.application.common.ports.heap import Heap
8-
from tgdb.entities.message import Message
7+
from tgdb.application.common.ports.tuples import Tuples
98
from tgdb.entities.row import (
109
DeletedRow,
1110
MutatedRow,
@@ -20,7 +19,7 @@
2019

2120

2221
@dataclass(frozen=True, unsafe_hash=False)
23-
class InMemoryHeap(Heap):
22+
class InMemoryTuples(Tuples):
2423
_db: InMemoryDb[Row]
2524

2625
async def map(self, effects: Sequence[TransactionEffect]) -> None:
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass, field
2+
from uuid import UUID, uuid4
3+
4+
from tgdb.application.common.ports.uuids import UUIDs
5+
6+
7+
@dataclass(frozen=True)
8+
class UUIDs4(UUIDs):
9+
async def random_uuid(self) -> UUID:
10+
return uuid4()
11+
12+
13+
@dataclass
14+
class MonotonicUUIDs(UUIDs):
15+
_counter: int = field(init=False, default=0)
16+
17+
async def random_uuid(self) -> UUID:
18+
next_uuid = UUID(int=self._counter)
19+
self._counter += 1
20+
21+
return next_uuid

0 commit comments

Comments
 (0)