Skip to content

Commit d76d5f0

Browse files
committed
feat: add a part of schema versioning
1 parent 21facc3 commit d76d5f0

File tree

12 files changed

+435
-71
lines changed

12 files changed

+435
-71
lines changed

src/tgdb/application/errors/__init__.py

Whitespace-only changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class NotActiveNodeError(Exception): ...
2+
3+
4+
class ActiveNodeError(Exception): ...
5+
6+
7+
class InvalidInputOperatorError(Exception): ...
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
from dataclasses import dataclass
22

3+
from tgdb.application.errors.common import InvalidInputOperatorError
34
from tgdb.application.ports.operator_serialization import OperatorSerialization
45
from tgdb.application.ports.queque import Queque
56
from tgdb.entities.operator import Operator
67

78

8-
class InputOperatorError(Exception): ...
9-
10-
119
@dataclass(frozen=True)
1210
class InputOperator[SerializedOperatorsT]:
1311
input_operators: Queque[Operator]
@@ -17,14 +15,14 @@ async def __call__(
1715
self, serialized_operator: SerializedOperatorsT
1816
) -> None:
1917
"""
20-
:raises tgdb.application.input_operator.InputOperatorError:
18+
:raises tgdb.application.errors.common.InvalidInputOperatorError:
2119
"""
2220

2321
input_operator = await self.operator_serialization.deserialized(
2422
serialized_operator
2523
)
2624

2725
if input_operator is None:
28-
raise InputOperatorError
26+
raise InvalidInputOperatorError
2927

3028
await self.input_operators.push(input_operator)

src/tgdb/application/ports/heap.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66

77
class Heap(ABC):
8+
@abstractmethod
9+
async def rows(self, schema: str, )
10+
811
@abstractmethod
912
async def map(self, effects: Sequence[TransactionEffect], /) -> None: ...
1013

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from abc import ABC, abstractmethod
2+
from contextlib import AbstractAsyncContextManager
3+
4+
from tgdb.entities.transaction_horizon import TransactionHorizon
5+
6+
7+
class SharedHorizon(
8+
AbstractAsyncContextManager[TransactionHorizon | None], ABC
9+
):
10+
@abstractmethod
11+
async def set(self, horizon: TransactionHorizon | None) -> None: ...

src/tgdb/application/start_down.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.application.errors.common import NotActiveNodeError
4+
from tgdb.application.ports.shared_horizon import SharedHorizon
5+
6+
7+
@dataclass(frozen=True)
8+
class StartDown:
9+
shared_horizon: SharedHorizon
10+
11+
async def __call__(self) -> None:
12+
"""
13+
:raises tgdb.application.errors.common.NotActiveNodeError:
14+
"""
15+
16+
async with self.shared_horizon as horizon:
17+
if horizon is not None:
18+
raise NotActiveNodeError
19+
20+
await self.shared_horizon.set(None)

src/tgdb/application/start_up.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from dataclasses import dataclass
2+
3+
from tgdb.application.errors.common import ActiveNodeError
4+
from tgdb.application.ports.shared_horizon import SharedHorizon
5+
from tgdb.entities.logic_time import LogicTime
6+
from tgdb.entities.transaction_horizon import create_transaction_horizon
7+
8+
9+
@dataclass(frozen=True)
10+
class StartUp:
11+
shared_horizon: SharedHorizon
12+
13+
async def __call__(
14+
self,
15+
horizon_max_width: LogicTime | None,
16+
horizon_max_height: int | None,
17+
) -> None:
18+
"""
19+
:raises tgdb.application.errors.common.ActiveNodeError:
20+
"""
21+
22+
async with self.shared_horizon as horizon:
23+
if horizon is not None:
24+
raise ActiveNodeError
25+
26+
new_horizon = create_transaction_horizon(
27+
horizon_max_width, horizon_max_height
28+
)
29+
30+
await self.shared_horizon.set(new_horizon)

src/tgdb/entities/assert_.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1-
def not_none[ValueT](value: ValueT | None) -> ValueT:
2-
assert value is not None
3-
return value
1+
def not_none[ValueT](
2+
value: ValueT | None, *, error: Exception | type[Exception] = ValueError
3+
) -> ValueT:
4+
if value is not None:
5+
return value
6+
7+
raise error

src/tgdb/entities/operator.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,46 @@
44

55
from tgdb.entities.logic_time import LogicTime
66
from tgdb.entities.mark import Mark
7-
from tgdb.entities.row import RowEffect
8-
9-
10-
type IntermediateOperator = RowEffect | Mark
7+
from tgdb.entities.row import DeletedRow, MutatedRow, NewRow, RowEffect
8+
from tgdb.entities.transaction import TransactionIsolation
119

1210

1311
@dataclass(frozen=True)
1412
class StartOperator:
1513
transaction_id: UUID
14+
transaction_isolation: TransactionIsolation
15+
16+
17+
type IntermediateOperatorEffect = RowEffect | Mark
1618

1719

1820
@dataclass(frozen=True)
19-
class CommitOperator:
21+
class IntermediateOperator[
22+
EffectT: IntermediateOperatorEffect = IntermediateOperatorEffect
23+
]:
2024
transaction_id: UUID
21-
operators: Sequence[IntermediateOperator]
25+
effect: IntermediateOperatorEffect
2226

2327

2428
@dataclass(frozen=True)
2529
class RollbackOperator:
2630
transaction_id: UUID
2731

2832

29-
type Operator = StartOperator | CommitOperator | RollbackOperator
33+
@dataclass(frozen=True)
34+
class CommitOperator:
35+
transaction_id: UUID
36+
operators: Sequence[
37+
IntermediateOperator[NewRow | MutatedRow | DeletedRow | Mark]
38+
]
39+
40+
41+
type Operator = (
42+
StartOperator
43+
| IntermediateOperator
44+
| RollbackOperator
45+
| CommitOperator
46+
)
3047

3148

3249
@dataclass(frozen=True)

0 commit comments

Comments
 (0)