Skip to content

Commit 5dbf238

Browse files
committed
feat(infrastructure): add all adapters
1 parent 9b362ca commit 5dbf238

File tree

17 files changed

+500
-304
lines changed

17 files changed

+500
-304
lines changed

src/tgdb/application/common/ports/tuples.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ async def tuples_with_attribute(
1414
relation_number: Number,
1515
attribute_number: Number,
1616
attribute_scalar: Scalar,
17+
/,
1718
) -> Sequence[Tuple]: ...
1819

1920
@abstractmethod

src/tgdb/entities/relation/tuple.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from collections.abc import Iterator
1+
from collections.abc import Iterator, Sequence
22
from dataclasses import dataclass
3+
from typing import Any, overload
34
from uuid import UUID
45

56
from tgdb.entities.numeration.number import Number
@@ -12,7 +13,7 @@
1213

1314

1415
@dataclass(frozen=True)
15-
class Tuple:
16+
class Tuple(Sequence[Scalar]):
1617
tid: TID
1718
relation_version_id: RelationVersionID
1819
scalars: tuple[Scalar, ...]
@@ -32,6 +33,18 @@ def matches(self, schema: Schema) -> bool:
3233
for scalar, domain in zip(self, schema, strict=True)
3334
)
3435

36+
@overload
37+
def __getitem__(self, index: int, /) -> Scalar: ...
38+
39+
@overload
40+
def __getitem__(self, sclice: "slice[Any, Any, Any]", /) -> Sequence[Scalar]:
41+
...
42+
43+
def __getitem__(
44+
self, key: "int | slice[Any, Any, Any]", /
45+
) -> Scalar | Sequence[Scalar]:
46+
return self.scalars[key]
47+
3548

3649
def tuple_(
3750
*scalars: Scalar,

src/tgdb/infrastructure/adapters/buffer.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pickle
12
from asyncio import Event, wait_for
23
from collections import deque
34
from collections.abc import AsyncIterator, Sequence
@@ -8,10 +9,7 @@
89

910
from tgdb.application.common.ports.buffer import Buffer
1011
from tgdb.entities.horizon.transaction import PreparedCommit
11-
from tgdb.infrastructure.pydantic.commit_encoding import (
12-
PreparedCommitListSchema,
13-
)
14-
from tgdb.infrastructure.telethon.in_telegram_big_text import InTelegramBigText
12+
from tgdb.infrastructure.telethon.in_telegram_bytes import InTelegramBytes
1513

1614

1715
@dataclass(frozen=True, unsafe_hash=False)
@@ -49,18 +47,23 @@ def _refresh_overflow(self) -> None:
4947
@dataclass(frozen=True)
5048
class InTelegramReplicablePreparedCommitBuffer(Buffer[PreparedCommit]):
5149
_buffer: Buffer[PreparedCommit]
52-
_in_tg_encoded_commits: InTelegramBigText
50+
_in_tg_encoded_commits: InTelegramBytes
5351

5452
async def __aenter__(self) -> Self:
5553
encoded_commits = await self._in_tg_encoded_commits
5654

5755
if encoded_commits is None:
5856
return self
5957

60-
commit_list_schema = (
61-
PreparedCommitListSchema.model_validate_json(encoded_commits)
62-
)
63-
for commit in commit_list_schema.commits:
58+
commits = pickle.loads(encoded_commits)
59+
60+
if not isinstance(commits, list):
61+
raise TypeError(str(commits))
62+
63+
for commit in commits:
64+
if not isinstance(commit, PreparedCommit):
65+
raise TypeError(str(commits))
66+
6467
await self._buffer.add(commit)
6568

6669
return self
@@ -77,9 +80,7 @@ async def add(self, commit: PreparedCommit, /) -> None:
7780

7881
async def __aiter__(self) -> AsyncIterator[Sequence[PreparedCommit]]:
7982
async for commits in self._buffer:
80-
commit_list_schema = PreparedCommitListSchema(commits=commits)
81-
encoded_encoded_commits = commit_list_schema.model_dump_json()
82-
83-
await self._in_tg_encoded_commits.set(encoded_encoded_commits)
83+
encoded_commits = pickle.dumps(list(commits))
84+
await self._in_tg_encoded_commits.set(encoded_commits)
8485

8586
yield commits

src/tgdb/infrastructure/adapters/relations.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from abc import ABC, abstractmethod
2-
from dataclasses import dataclass
1+
import pickle
2+
from dataclasses import dataclass, field
3+
from types import TracebackType
4+
from typing import Self, cast
35

46
from in_memory_db import InMemoryDb
57

@@ -10,6 +12,7 @@
1012
)
1113
from tgdb.entities.numeration.number import Number
1214
from tgdb.entities.relation.relation import Relation
15+
from tgdb.infrastructure.telethon.in_telegram_bytes import InTelegramBytes
1316

1417

1518
@dataclass(frozen=True)
@@ -24,10 +27,11 @@ async def relation(self, relation_number: Number) -> Relation:
2427
relation = self._db.select_one(
2528
lambda it: it.number() == relation_number
2629
)
27-
2830
if relation is None:
2931
raise NoRelationError(relation_number)
3032

33+
return relation
34+
3135
async def add(self, relation: Relation) -> None:
3236
"""
3337
:raises tgdb.application.common.ports.relations.NotUniqueRelationNumberError:
@@ -43,32 +47,78 @@ async def add(self, relation: Relation) -> None:
4347
self._db.insert(relation)
4448

4549

46-
@dataclass(frozen=True)
47-
class InMemoryRelations(Relations):
48-
_db: InMemoryDb[Relation]
50+
@dataclass
51+
class InTelegramReplicableRelations(Relations):
52+
_in_telegram_encoded_relations: InTelegramBytes
53+
_cached_relations: InMemoryDb[Relation] | None = field(
54+
init=False, default=None
55+
)
56+
57+
async def __aenter__(self) -> Self:
58+
loaded_relations = await self._loaded_relations()
59+
self._cached_relations = loaded_relations
60+
61+
return self
62+
63+
async def __aexit__(
64+
self,
65+
error_type: type[BaseException] | None,
66+
error: BaseException | None,
67+
traceback: TracebackType | None,
68+
) -> None: ...
4969

5070
async def relation(self, relation_number: Number) -> Relation:
5171
"""
5272
:raises tgdb.application.common.ports.relations.NoRelationError:
5373
"""
5474

55-
relation = self._db.select_one(
75+
if self._cached_relations is None:
76+
raise ValueError
77+
78+
relation = self._cached_relations.select_one(
5679
lambda it: it.number() == relation_number
5780
)
5881

5982
if relation is None:
6083
raise NoRelationError(relation_number)
6184

85+
return relation
86+
6287
async def add(self, relation: Relation) -> None:
6388
"""
6489
:raises tgdb.application.common.ports.relations.NotUniqueRelationNumberError:
6590
""" # noqa: E501
6691

67-
selected_relation = self._db.select_one(
92+
if self._cached_relations is None:
93+
raise ValueError
94+
95+
selected_relation = self._cached_relations.select_one(
6896
lambda it: it.number() == relation.number()
6997
)
7098

7199
if selected_relation is not None:
72100
raise NotUniqueRelationNumberError(relation.number())
73101

74-
self._db.insert(relation)
102+
self._cached_relations.insert(relation)
103+
104+
encoded_cached_relations = pickle.dumps(self._cached_relations)
105+
await self._in_telegram_encoded_relations.set(encoded_cached_relations)
106+
107+
async def _loaded_relations(self) -> InMemoryDb[Relation]:
108+
encoded_relations = await self._in_telegram_encoded_relations
109+
110+
if encoded_relations is None:
111+
return InMemoryDb()
112+
113+
relations = pickle.loads(encoded_relations)
114+
115+
if relations is None:
116+
return InMemoryDb()
117+
if not isinstance(relations, InMemoryDb):
118+
raise TypeError(relations)
119+
120+
for relation in relations:
121+
if not isinstance(relation, Relation):
122+
raise TypeError(relation)
123+
124+
return cast(InMemoryDb[Relation], relations)

0 commit comments

Comments
 (0)