Skip to content

Commit 6f551af

Browse files
committed
test(TransactionHorizon): cover basic cases
1 parent 2da099d commit 6f551af

File tree

15 files changed

+873
-127
lines changed

15 files changed

+873
-127
lines changed

pyproject.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ select = [
6262
"Q", "RET", "SLF", "SLOT", "SIM", "TID252", "TCH", "ARG", "PTH", "ERA", "TRY",
6363
"PERF", "INP", "I", "S", "FAST", "TID", "TCH", "INT"
6464
]
65-
ignore = ["N818", "RUF009", "UP018", "PLR6301", "PLR0913", "PLW0108", "TC006"]
65+
ignore = ["N818", "RUF009", "UP018", "PLR6301", "PLR0913", "PLW0108", "TC006", "S101"]
6666

6767
[tool.ruff.lint.isort]
6868
lines-after-imports = 2
@@ -71,9 +71,8 @@ lines-after-imports = 2
7171
"src/tgdb/entities/*" = ["PLR2004"]
7272
"src/tgdb/application/*" = ["PLR0917"]
7373
"src/tgdb/infrastructure/adapters/*" = ["RUF029"]
74-
"src/tgdb/infrastructure/*" = ["S101"]
7574
"src/tgdb/entities/transaction.py" = ["SLF001"]
76-
"tests/*" = ["S101", "PLR0124", "PLR0917", "S106", "C901"]
75+
"tests/*" = ["PLR0124", "PLR0917", "S106", "C901", "PLR2004"]
7776
"__init__.py" = ["PLC0414"]
7877

7978
[tool.pytest.ini_options]

src/tgdb/application/input_operator.py

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.application.ports.async_queque import AsyncQueque
4+
from tgdb.application.ports.logic_clock import LogicClock
5+
from tgdb.application.ports.operator_serialization import OperatorSerialization
6+
from tgdb.entities.operator import AppliedOperator, applied_operator
7+
8+
9+
class InputOperatorsError(Exception): ...
10+
11+
12+
@dataclass(frozen=True)
13+
class InputOperators[SerializedOperatorsT]:
14+
clock: LogicClock
15+
input_operators: AsyncQueque[AppliedOperator]
16+
operator_serialization: OperatorSerialization[SerializedOperatorsT]
17+
18+
async def __call__(
19+
self, serialized_operators: SerializedOperatorsT
20+
) -> None:
21+
"""
22+
:raises tgdb.application.input_operator.InputOperatorsError:
23+
"""
24+
25+
input_operators = await self.operator_serialization.deserialized(
26+
serialized_operators
27+
)
28+
29+
if input_operators is None:
30+
raise InputOperatorsError
31+
32+
times = await self.clock.times(len(input_operators))
33+
34+
await self.input_operators.push(*(
35+
applied_operator(input_operator, time)
36+
for time, input_operator in zip(times, input_operators, strict=True)
37+
))

src/tgdb/application/ports/async_queque.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
class AsyncQueque[ValueT](ABC):
66
@abstractmethod
7-
async def push(self, value: ValueT) -> None: ...
7+
async def push(self, *values: ValueT) -> None: ...
88

99
@abstractmethod
1010
async def iter(self) -> AsyncIterator[ValueT]: ...

src/tgdb/application/ports/log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class Log(ABC):
1111
@abstractmethod
12-
async def push(self, operator: AppliedOperator, /) -> None: ...
12+
async def push(self, *operators: AppliedOperator) -> None: ...
1313

1414
@abstractmethod
1515
async def truncate(self, offset: LogOffset, /) -> None: ...
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
from abc import ABC
2-
from collections.abc import Awaitable
1+
from abc import ABC, abstractmethod
2+
from collections.abc import Sequence
33

44
from tgdb.entities.logic_time import LogicTime
55

66

7-
class LogicClock(ABC, Awaitable[LogicTime]): ...
7+
class LogicClock(ABC):
8+
@abstractmethod
9+
async def times(self, count: int) -> Sequence[LogicTime]: ...
10+
11+
@abstractmethod
12+
async def time(self) -> LogicTime: ...
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from abc import ABC, abstractmethod
2+
from collections.abc import Sequence
23

34
from tgdb.entities.operator import Operator
45

56

6-
class OperatorSerialization[SerializedOperatorT](ABC):
7+
class OperatorSerialization[SerializedOperatorsT](ABC):
78
@abstractmethod
89
async def deserialized(
9-
self, operator: SerializedOperatorT, /
10-
) -> Operator | None: ...
10+
self, operators: SerializedOperatorsT, /
11+
) -> Sequence[Operator] | None: ...

src/tgdb/application/ports/sync_queque.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
class SyncQueque[ValueT](ABC):
66
@abstractmethod
7-
async def push(self, value: ValueT) -> None: ...
7+
async def push(self, *values: ValueT) -> None: ...
88

99
@abstractmethod
1010
async def sync(self) -> None: ...

src/tgdb/entities/row.py

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,53 +11,9 @@
1111
type RowAttribute = bool | int | str | datetime | UUID | StrEnum
1212

1313

14-
@dataclass(frozen=True)
15-
class RowSchema(Sequence[type[RowAttribute]]):
16-
name: str
17-
id_type: type[RowAttribute]
18-
body_types: tuple[type[RowAttribute], ...] = tuple()
19-
20-
def __iter__(self) -> Iterator[type[RowAttribute]]:
21-
yield self.id_type
22-
yield from self.body_types
23-
24-
@overload
25-
def __getitem__(self, index: int, /) -> type[RowAttribute]: ...
26-
27-
@overload
28-
def __getitem__(
29-
self, slice_: "slice[Any, Any, Any]", /
30-
) -> Sequence[type[RowAttribute]]: ...
31-
32-
def __getitem__(
33-
self, value: "int | slice[Any, Any, Any]", /
34-
) -> Sequence[type[RowAttribute]] | type[RowAttribute]:
35-
return tuple(self)[value]
36-
37-
def __len__(self) -> int:
38-
return len(self.body_types) + 1
39-
40-
41-
class RowSchemaError(Exception):
42-
def __init__(self, schema: RowSchema) -> None:
43-
self.schema = schema
44-
super().__init__()
45-
46-
4714
@dataclass(frozen=True)
4815
class Row(IdentifiedValue[RowAttribute], Sequence[RowAttribute]):
4916
body: tuple[RowAttribute, ...]
50-
schema: RowSchema
51-
52-
def __post_init__(self) -> None:
53-
for attribute_and_type in zip(self, self.schema, strict=False):
54-
if len(attribute_and_type) != 2:
55-
raise RowSchemaError(self.schema)
56-
57-
attribute, type = attribute_and_type
58-
59-
if not isinstance(attribute, type):
60-
raise RowSchemaError(self.schema)
6117

6218
def __iter__(self) -> Iterator[RowAttribute]:
6319
yield self.id
@@ -80,4 +36,8 @@ def __len__(self) -> int:
8036
return len(self.body) + 1
8137

8238

39+
def row(*attrs: RowAttribute) -> Row:
40+
return Row(attrs[0], attrs[1:])
41+
42+
8343
type RowEffect = New[Row] | Mutated[Row] | Dead[Row]

src/tgdb/entities/transaction.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from uuid import UUID
66

77
from effect import Effect, IdentifiedValueSet
8+
from effect.state_transition import InvalidStateTransitionError
89

910
from tgdb.entities.logic_time import LogicTime
1011
from tgdb.entities.row import Row, RowEffect
@@ -32,7 +33,7 @@ class TransactionOkCommit:
3233
@dataclass(frozen=True)
3334
class TransactionFailedCommit:
3435
transaction_id: UUID
35-
conflict: TransactionConflict
36+
conflict: TransactionConflict | None
3637

3738

3839
type TransactionCommit = TransactionOkCommit | TransactionFailedCommit
@@ -55,6 +56,10 @@ def is_readonly(self) -> bool:
5556
return not any(self._effect) and not self._uniqueness_marks
5657

5758
def consider(self, effect: RowEffect) -> None:
59+
"""
60+
:raises effect.state_transition.InvalidStateTransitionError:
61+
"""
62+
5863
self._effect &= effect
5964

6065
def add_uniqueness_mark(
@@ -68,9 +73,6 @@ def add_viewed_row_mark(
6873
self._viewed_row_marks.add(viewed_row_mark)
6974

7075
def beginning(self) -> LogicTime | None:
71-
if self._beginning is None:
72-
return None
73-
7476
return self._beginning
7577

7678
def rollback(self) -> None:

0 commit comments

Comments
 (0)