Skip to content

Commit fbeed7f

Browse files
committed
feat: make it better, faster, stronger
1 parent d76d5f0 commit fbeed7f

24 files changed

+647
-324
lines changed

src/tgdb/application/input_operator.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
from dataclasses import dataclass
22

3-
from tgdb.application.errors.common import InvalidInputOperatorError
3+
from tgdb.application.errors.common import (
4+
InvalidInputOperatorError,
5+
NotActiveNodeError,
6+
)
7+
from tgdb.application.ports.buffer import Buffer
8+
from tgdb.application.ports.logic_clock import LogicClock
49
from tgdb.application.ports.operator_serialization import OperatorSerialization
5-
from tgdb.application.ports.queque import Queque
6-
from tgdb.entities.operator import Operator
10+
from tgdb.application.ports.shared_horizon import SharedHorizon
11+
from tgdb.entities.operator import AppliedOperator
12+
from tgdb.entities.transaction import TransactionPreparedCommit
713

814

915
@dataclass(frozen=True)
1016
class InputOperator[SerializedOperatorsT]:
11-
input_operators: Queque[Operator]
17+
clock: LogicClock
1218
operator_serialization: OperatorSerialization[SerializedOperatorsT]
19+
commit_buffer: Buffer[TransactionPreparedCommit]
20+
shared_horizon: SharedHorizon
1321

1422
async def __call__(
1523
self, serialized_operator: SerializedOperatorsT
@@ -25,4 +33,11 @@ async def __call__(
2533
if input_operator is None:
2634
raise InvalidInputOperatorError
2735

28-
await self.input_operators.push(input_operator)
36+
async with self.shared_horizon as horizon:
37+
time = await self.clock.time()
38+
applied_input_operator = AppliedOperator(input_operator, time)
39+
40+
transaction_commit = horizon.add(applied_input_operator)
41+
42+
if transaction_commit:
43+
await self.commit_buffer.add(transaction_commit)

src/tgdb/application/output_commits.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,34 @@
22
from dataclasses import dataclass
33

44
from tgdb.application.ports.buffer import Buffer
5+
from tgdb.application.ports.notifying import Notifying
56
from tgdb.application.ports.queque import Queque
6-
from tgdb.entities.transaction import TransactionCommit
7+
from tgdb.application.ports.shared_horizon import SharedHorizon
8+
from tgdb.entities.transaction import (
9+
TransactionOkPreparedCommit,
10+
TransactionPreparedCommit,
11+
)
712

813

914
@dataclass(frozen=True)
1015
class OutputCommits:
11-
commit_buffer: Buffer[TransactionCommit]
12-
output_commits: Queque[Sequence[TransactionCommit]]
16+
commit_buffer: Buffer[TransactionPreparedCommit]
17+
notifying: Notifying[Sequence[TransactionPreparedCommit]]
18+
output_commits: Queque[Sequence[TransactionOkPreparedCommit]]
19+
shared_horizon: SharedHorizon
1320

1421
async def __call__(self) -> None:
15-
async for commits in self.commit_buffer:
16-
await self.output_commits.push(commits)
22+
async for prepared_commits in self.commit_buffer:
23+
await self.notifying.publish(prepared_commits)
24+
25+
ok_prepared_commits = tuple(
26+
commit for commit in prepared_commits
27+
if isinstance(commit, TransactionOkPreparedCommit)
28+
)
29+
30+
await self.output_commits.push(ok_prepared_commits)
1731
await self.output_commits.sync()
32+
33+
async with self.shared_horizon as horizon:
34+
for ok_prepared_commit in ok_prepared_commits:
35+
horizon.complete(ok_prepared_commit)

src/tgdb/application/output_commits_to_clients.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/tgdb/application/output_commits_to_heap.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,16 @@
33

44
from tgdb.application.ports.heap import Heap
55
from tgdb.application.ports.queque import Queque
6-
from tgdb.entities.transaction import TransactionCommit, TransactionOkCommit
6+
from tgdb.entities.transaction import TransactionOkPreparedCommit
77

88

99
@dataclass(frozen=True)
1010
class OutputCommitsToHeap:
1111
heap: Heap
12-
output_commits: Queque[Sequence[TransactionCommit]]
12+
output_commits: Queque[Sequence[TransactionOkPreparedCommit]]
1313

1414
async def __call__(self) -> None:
15-
is_previous_map_partial = True
16-
1715
async for commits in self.output_commits:
18-
effects = tuple(
19-
commit.effect
20-
for commit in commits
21-
if isinstance(commit, TransactionOkCommit)
22-
)
16+
effects = tuple(commit.effect for commit in commits)
2317

24-
if is_previous_map_partial:
25-
await self.heap.map_as_duplicate(effects)
26-
is_previous_map_partial = False
27-
else:
28-
await self.heap.map(effects)
18+
await self.heap.map(effects)

src/tgdb/application/ports/heap.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,3 @@ async def rows(self, schema: str, )
1010

1111
@abstractmethod
1212
async def map(self, effects: Sequence[TransactionEffect], /) -> None: ...
13-
14-
@abstractmethod
15-
async def map_as_duplicate(
16-
self, effects: Sequence[TransactionEffect], /
17-
) -> None: ...

src/tgdb/application/ports/notification.py

Lines changed: 0 additions & 6 deletions
This file was deleted.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from abc import ABC, abstractmethod
2+
3+
4+
class Notifying[ValueT](ABC):
5+
@abstractmethod
6+
async def publish(self, value: ValueT, /) -> None: ...

src/tgdb/application/ports/shared_horizon.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,4 @@
44
from tgdb.entities.transaction_horizon import TransactionHorizon
55

66

7-
class SharedHorizon(
8-
AbstractAsyncContextManager[TransactionHorizon | None], ABC
9-
):
10-
@abstractmethod
11-
async def set(self, horizon: TransactionHorizon | None) -> None: ...
7+
class SharedHorizon(AbstractAsyncContextManager[TransactionHorizon], ABC): ...

src/tgdb/application/serialize_transactions.py

Lines changed: 0 additions & 40 deletions
This file was deleted.

src/tgdb/application/start_down.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)