Skip to content

Commit 8733a1c

Browse files
committed
ref(channel): reduce adaptation
1 parent 3944da3 commit 8733a1c

File tree

4 files changed

+21
-42
lines changed

4 files changed

+21
-42
lines changed

src/tgdb/application/horizon/commit_transaction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ async def __call__(
5555
self.channel.wait(commit.xid),
5656
self.commit_buffer.add(commit),
5757
)
58-
if notification.error is not None:
59-
raise notification.error from notification.error
58+
if notification is not None:
59+
raise notification from notification
6060

6161
async def _effect(
6262
self, operator: Operator

src/tgdb/application/horizon/output_commits.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
NoTransactionError,
1111
TransactionNotCommittingError,
1212
)
13-
from tgdb.entities.horizon.transaction import XID, Commit, PreparedCommit
13+
from tgdb.entities.horizon.transaction import Commit, PreparedCommit
1414

1515

1616
@dataclass(frozen=True)
@@ -22,26 +22,23 @@ class OutputCommits:
2222
clock: Clock
2323

2424
async def __call__(self) -> None:
25-
async for prepared_commits in self.commit_buffer:
26-
await self.output_commits.push(prepared_commits)
25+
async for commits in self.commit_buffer:
26+
await self.output_commits.push(commits)
2727
await self.output_commits.sync()
2828

29-
ok_commit_xids = list[XID]()
30-
error_commit_map = dict[
31-
XID, NoTransactionError | TransactionNotCommittingError
32-
]()
33-
3429
async with self.shared_horizon as horizon:
35-
for prepared_commit in prepared_commits:
30+
for commit in commits:
31+
if isinstance(commit, Commit):
32+
await self.channel.publish(commit.xid, None)
33+
continue
34+
3635
time = await self.clock
3736

3837
try:
39-
horizon.complete_commit(time, prepared_commit.xid)
38+
horizon.complete_commit(time, commit.xid)
4039
except (
4140
NoTransactionError, TransactionNotCommittingError
4241
) as error:
43-
error_commit_map[prepared_commit.xid] = error
42+
await self.channel.publish(commit.xid, error)
4443
else:
45-
ok_commit_xids.append(prepared_commit.xid)
46-
47-
await self.channel.publish(ok_commit_xids, error_commit_map)
44+
await self.channel.publish(commit.xid, None)

src/tgdb/application/horizon/ports/channel.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
from abc import ABC, abstractmethod
2-
from collections.abc import Mapping, Sequence
3-
from dataclasses import dataclass
42

53
from tgdb.entities.horizon.horizon import (
64
NoTransactionError,
@@ -9,19 +7,15 @@
97
from tgdb.entities.horizon.transaction import XID
108

119

12-
@dataclass(frozen=True)
13-
class Notification:
14-
error: NoTransactionError | TransactionNotCommittingError | None
10+
type Notification = NoTransactionError | TransactionNotCommittingError | None
1511

1612

1713
class Channel(ABC):
1814
@abstractmethod
1915
async def publish(
2016
self,
21-
ok_commit_xids: Sequence[XID],
22-
error_commit_map: Mapping[
23-
XID, NoTransactionError | TransactionNotCommittingError
24-
],
17+
xid: XID,
18+
notification: Notification,
2519
/,
2620
) -> None: ...
2721

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from asyncio import wait_for
2-
from collections.abc import Mapping, Sequence
32
from dataclasses import dataclass
43

54
from tgdb.application.horizon.ports.channel import Channel, Notification
@@ -20,22 +19,11 @@ class AsyncMapChannel(Channel):
2019

2120
async def publish(
2221
self,
23-
ok_commit_xids: Sequence[XID],
24-
error_commit_map: Mapping[
25-
XID, NoTransactionError | TransactionNotCommittingError
26-
],
22+
xid: XID,
23+
notification: Notification,
2724
) -> None:
28-
for ok_commit_xid in ok_commit_xids:
29-
self._async_map[ok_commit_xid] = None
30-
del self._async_map[ok_commit_xid]
31-
32-
for error_commit_xid, error in error_commit_map.items():
33-
self._async_map[error_commit_xid] = error
34-
del self._async_map[error_commit_xid]
25+
self._async_map[xid] = notification
26+
del self._async_map[xid]
3527

3628
async def wait(self, xid: XID) -> Notification:
37-
notification_error = await wait_for(
38-
self._async_map[xid], self._timeout_seconds
39-
)
40-
41-
return Notification(notification_error)
29+
return await wait_for(self._async_map[xid], self._timeout_seconds)

0 commit comments

Comments
 (0)