Skip to content

Commit 434bf12

Browse files
committed
feat(application): add a part of operations
1 parent 90f3787 commit 434bf12

25 files changed

+431
-163
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from asyncio import gather
2+
from dataclasses import dataclass
3+
4+
from tgdb.application.ports.buffer import Buffer
5+
from tgdb.application.ports.channel import Channel
6+
from tgdb.application.ports.clock import Clock
7+
from tgdb.application.ports.operator_encoding import (
8+
DeletedTupleOperator,
9+
MutatedTupleOperator,
10+
NewTupleOperator,
11+
Operator,
12+
OperatorEncoding,
13+
)
14+
from tgdb.application.ports.relations import Relations
15+
from tgdb.application.ports.shared_horizon import SharedHorizon
16+
from tgdb.application.ports.uuids import UUIDs
17+
from tgdb.entities.horizon.claim import Claim
18+
from tgdb.entities.horizon.transaction import XID, Commit, PreparedCommit
19+
from tgdb.entities.relation.tuple_effect import (
20+
DeletedTuple,
21+
InvalidTuple,
22+
MutatedTuple,
23+
NewTuple,
24+
deleted_tuple,
25+
mutated_tuple,
26+
new_tuple,
27+
)
28+
29+
30+
@dataclass(frozen=True)
31+
class CommitTransaction[EncodedOperatorsT]:
32+
uuids: UUIDs
33+
shared_horizon: SharedHorizon
34+
clock: Clock
35+
operator_encoding: OperatorEncoding[EncodedOperatorsT]
36+
relations: Relations
37+
channel: Channel
38+
commit_buffer: Buffer[PreparedCommit]
39+
40+
async def __call__(
41+
self, xid: XID, encoded_operators: EncodedOperatorsT
42+
) -> None:
43+
"""
44+
:raises tgdb.entities.horizon.horizon.NoTransactionError:
45+
:raises tgdb.entities.horizon.horizon.InvalidTransactionStateError:
46+
:raises tgdb.entities.horizon.horizon.InvalidTupleError:
47+
:raises tgdb.entities.horizon.horizon.InvalidEffectsError:
48+
:raises tgdb.entities.horizon.transaction.ConflictError:
49+
:raises tgdb.entities.horizon.transaction.NonSerializableWriteTransactionError:
50+
""" # noqa: E501
51+
52+
operators = await self.operator_encoding.decoded(encoded_operators)
53+
54+
if operators is None:
55+
effects = None
56+
else:
57+
effects = await gather(*map(self._effect, operators))
58+
59+
time = await self.clock
60+
61+
async with self.shared_horizon as horizon:
62+
commit = horizon.commit_transaction(time, xid, effects)
63+
64+
if not isinstance(commit, PreparedCommit):
65+
return
66+
67+
notification, _ = await gather(
68+
self.channel.wait(commit.xid),
69+
self.commit_buffer.add(commit),
70+
)
71+
if notification.error is not None:
72+
raise notification.error from notification.error
73+
74+
async def _effect(
75+
self, operator: Operator | None
76+
) -> NewTuple | MutatedTuple | DeletedTuple | Claim | InvalidTuple:
77+
match operator:
78+
case Claim():
79+
return operator
80+
case DeletedTupleOperator():
81+
return deleted_tuple(operator.tid)
82+
case None:
83+
return InvalidTuple(None, None, None)
84+
case _:
85+
...
86+
87+
if operator.relation_number is None:
88+
relation = None
89+
else:
90+
relation = await self.relations.relation(operator.relation_number)
91+
92+
match operator:
93+
case NewTupleOperator():
94+
tid = await self.uuids.random_uuid()
95+
return new_tuple(tid, operator.scalars, relation)
96+
case MutatedTupleOperator():
97+
return mutated_tuple(operator.tid, operator.scalars, relation)

src/tgdb/application/errors/__init__.py

Whitespace-only changes.

src/tgdb/application/errors/common.py

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/tgdb/application/input_operator.py

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/tgdb/application/output_commits.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,46 @@
22
from dataclasses import dataclass
33

44
from tgdb.application.ports.buffer import Buffer
5-
from tgdb.application.ports.notifying import Notifying
5+
from tgdb.application.ports.channel import Channel
6+
from tgdb.application.ports.clock import Clock
67
from tgdb.application.ports.queque import Queque
78
from tgdb.application.ports.shared_horizon import SharedHorizon
8-
from tgdb.entities.horizon.transaction import PreparedCommit
9+
from tgdb.entities.horizon.horizon import (
10+
InvalidTransactionStateError,
11+
NoTransactionError,
12+
)
13+
from tgdb.entities.horizon.transaction import XID, PreparedCommit
914

1015

1116
@dataclass(frozen=True)
1217
class OutputCommits:
1318
commit_buffer: Buffer[PreparedCommit]
14-
notifying: Notifying[Sequence[TransactionCommit]]
15-
output_commits: Queque[Sequence[TransactionCommit]]
19+
channel: Channel
20+
output_commits: Queque[Sequence[PreparedCommit]]
1621
shared_horizon: SharedHorizon
22+
clock: Clock
1723

1824
async def __call__(self) -> None:
1925
async for prepared_commits in self.commit_buffer:
20-
await self.notifying.publish(prepared_commits)
21-
22-
ok_prepared_commits = tuple(
23-
commit for commit in prepared_commits
24-
if isinstance(commit, TransactionCommit)
25-
)
26-
27-
await self.output_commits.push(ok_prepared_commits)
26+
await self.output_commits.push(prepared_commits)
2827
await self.output_commits.sync()
2928

29+
ok_commit_xids = list[XID]()
30+
error_commit_map = dict[
31+
XID, NoTransactionError | InvalidTransactionStateError
32+
]()
33+
3034
async with self.shared_horizon as horizon:
31-
for ok_prepared_commit in ok_prepared_commits:
32-
horizon.complete(ok_prepared_commit)
35+
for prepared_commit in prepared_commits:
36+
time = await self.clock
37+
38+
try:
39+
horizon.complete_commit(time, prepared_commit.xid)
40+
except (
41+
NoTransactionError, InvalidTransactionStateError
42+
) as error:
43+
error_commit_map[prepared_commit.xid] = error
44+
else:
45+
ok_commit_xids.append(prepared_commit.xid)
46+
47+
await self.channel.publish(ok_commit_xids, error_commit_map)

src/tgdb/application/output_commits_to_heap.py

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from abc import ABC, abstractmethod
2+
from collections.abc import Mapping, Sequence
3+
from dataclasses import dataclass
4+
5+
from tgdb.entities.horizon.horizon import (
6+
InvalidTransactionStateError,
7+
NoTransactionError,
8+
)
9+
from tgdb.entities.horizon.transaction import XID
10+
11+
12+
@dataclass(frozen=True)
13+
class Notification:
14+
error: NoTransactionError | InvalidTransactionStateError | None
15+
16+
17+
class Channel(ABC):
18+
@abstractmethod
19+
async def publish(
20+
self,
21+
ok_commit_xids: Sequence[XID],
22+
error_commit_map: Mapping[
23+
XID, NoTransactionError | InvalidTransactionStateError
24+
],
25+
/,
26+
) -> None: ...
27+
28+
@abstractmethod
29+
async def wait(self, xid: XID) -> Notification: ...
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
from abc import ABC
12
from collections.abc import Awaitable
23

34
from tgdb.entities.time.logic_time import LogicTime
45

56

6-
class Clock(Awaitable[LogicTime]): ...
7+
class Clock(ABC, Awaitable[LogicTime]): ...

src/tgdb/application/ports/effect_encoding.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/tgdb/application/ports/heap.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)