Skip to content

Commit 37a9522

Browse files
committed
feat: allow SerializableReadTransaction writing
1 parent 4a88519 commit 37a9522

File tree

10 files changed

+89
-103
lines changed

10 files changed

+89
-103
lines changed

src/tgdb/application/horizon/commit_transaction.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class CommitTransaction:
3333
clock: Clock
3434
relations: Relations
3535
channel: Channel
36-
commit_buffer: Buffer[PreparedCommit]
36+
commit_buffer: Buffer[Commit | PreparedCommit]
3737

3838
async def __call__(
3939
self, xid: XID, operators: Sequence[Operator]
@@ -52,9 +52,6 @@ async def __call__(
5252
async with self.shared_horizon as horizon:
5353
commit = horizon.commit_transaction(time, xid, effects)
5454

55-
if isinstance(commit, Commit):
56-
return
57-
5855
notification, _ = await gather(
5956
self.channel.wait(commit.xid),
6057
self.commit_buffer.add(commit),

src/tgdb/application/horizon/output_commits.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
InvalidTransactionStateError,
1111
NoTransactionError,
1212
)
13-
from tgdb.entities.horizon.transaction import XID, PreparedCommit
13+
from tgdb.entities.horizon.transaction import XID, Commit, PreparedCommit
1414

1515

1616
@dataclass(frozen=True)
1717
class OutputCommits:
18-
commit_buffer: Buffer[PreparedCommit]
18+
commit_buffer: Buffer[Commit | PreparedCommit]
1919
channel: Channel
20-
output_commits: Queque[Sequence[PreparedCommit]]
20+
output_commits: Queque[Sequence[Commit | PreparedCommit]]
2121
shared_horizon: SharedHorizon
2222
clock: Clock
2323

src/tgdb/application/horizon/output_commits_to_tuples.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
from tgdb.application.common.ports.queque import Queque
55
from tgdb.application.relation.ports.tuples import Tuples
6-
from tgdb.entities.horizon.transaction import PreparedCommit
6+
from tgdb.entities.horizon.transaction import Commit, PreparedCommit
77

88

99
@dataclass(frozen=True)
1010
class OutputCommitsToTuples:
1111
tuples: Tuples
12-
output_commits: Queque[Sequence[PreparedCommit]]
12+
output_commits: Queque[Sequence[Commit | PreparedCommit]]
1313

1414
async def __call__(self) -> None:
1515
is_previous_map_partial = True

src/tgdb/entities/horizon/horizon.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
Commit,
1010
ConflictError,
1111
IsolationLevel,
12-
NonSerializableReadTransaction,
1312
NonSerializableWriteTransactionError,
1413
PreparedCommit,
14+
ReadUncommitedTransaction,
1515
SerializableTransaction,
1616
SerializableTransactionState,
1717
Transaction,
@@ -58,8 +58,8 @@ class Horizon:
5858
_max_len: int
5959
_max_transaction_age: LogicTime
6060
_serializable_transaction_map: OrderedDict[XID, SerializableTransaction]
61-
_non_serializable_read_transaction_map: OrderedDict[
62-
XID, NonSerializableReadTransaction
61+
_read_uncommited_transaction_map: OrderedDict[
62+
XID, ReadUncommitedTransaction
6363
]
6464

6565
def __post_init__(self) -> None:
@@ -162,7 +162,8 @@ def commit_transaction(
162162
match transaction:
163163
case SerializableTransaction():
164164
return transaction.prepare_commit()
165-
case NonSerializableReadTransaction():
165+
case ReadUncommitedTransaction():
166+
del self._transaction_map(transaction)[xid]
166167
return transaction.commit()
167168
except (ConflictError, NonSerializableWriteTransactionError) as error:
168169
del self._transaction_map(transaction)[xid]
@@ -263,12 +264,12 @@ def _serializable_transaction(
263264

264265
def _non_serializable_read_transaction(
265266
self, xid: XID
266-
) -> NonSerializableReadTransaction:
267+
) -> ReadUncommitedTransaction:
267268
"""
268269
:raises tgdb.entities.horizon.horizon.NoTransactionError:
269270
"""
270271

271-
transaction = self._non_serializable_read_transaction_map.get(xid)
272+
transaction = self._read_uncommited_transaction_map.get(xid)
272273

273274
if transaction is None:
274275
raise NoTransactionError
@@ -292,14 +293,14 @@ def _transaction(
292293

293294
def _transaction_maps(self) -> Iterable[Mapping[XID, Transaction]]:
294295
yield self._serializable_transaction_map
295-
yield self._non_serializable_read_transaction_map
296+
yield self._read_uncommited_transaction_map
296297

297298
def _transaction_map[TransactionT: Transaction](
298299
self, transaction: TransactionT
299300
) -> OrderedDict[XID, TransactionT]:
300301
match transaction:
301-
case NonSerializableReadTransaction():
302-
return self._non_serializable_read_transaction_map # type: ignore[return-value]
302+
case ReadUncommitedTransaction():
303+
return self._read_uncommited_transaction_map # type: ignore[return-value]
303304
case SerializableTransaction():
304305
return self._serializable_transaction_map # type: ignore[return-value]
305306

@@ -316,5 +317,5 @@ def horizon(
316317
_max_len=max_len,
317318
_max_transaction_age=max_transaction_age,
318319
_serializable_transaction_map=OrderedDict(),
319-
_non_serializable_read_transaction_map=OrderedDict(),
320+
_read_uncommited_transaction_map=OrderedDict(),
320321
)

src/tgdb/entities/horizon/transaction.py

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,10 @@ def _complete(self) -> None:
233233

234234

235235
@dataclass
236-
class NonSerializableReadTransaction:
236+
class ReadUncommitedTransaction:
237237
_xid: XID
238238
_start_time: LogicTime
239-
_is_readonly: bool
240-
_is_completed: bool
239+
_space: dict[TID, TupleEffect]
241240

242241
def xid(self) -> XID:
243242
return self._xid
@@ -249,54 +248,46 @@ def age(self, time: LogicTime) -> LogicTime:
249248
return time - self._start_time
250249

251250
def include(self, effect: ConflictableTransactionScalarEffect) -> None:
252-
if self._is_readonly and not isinstance(
253-
effect, JustViewedTuple | MigratedTuple
254-
):
255-
self._is_readonly = False
256-
257-
def rollback(self) -> None:
258-
self._is_completed = True
251+
if isinstance(effect, JustViewedTuple | Claim):
252+
return
259253

260-
def commit(self) -> Commit:
261-
"""
262-
:raises tgdb.entities.horizon.transaction.NonSerializableWriteTransactionError:
263-
""" # noqa: E501
254+
if effect.tid in self._space:
255+
effect = self._space[effect.tid] & effect
264256

265-
self._is_completed = True
257+
self._space[effect.tid] = effect
266258

267-
if not self._is_readonly:
268-
self.rollback()
269-
raise NonSerializableWriteTransactionError(self._xid)
259+
def rollback(self) -> None:
260+
...
270261

262+
def commit(self) -> Commit:
271263
return Commit(self._xid, frozenset())
272264

273265
@classmethod
274266
def start(
275267
cls, xid: XID, time: LogicTime
276-
) -> "NonSerializableReadTransaction":
277-
return NonSerializableReadTransaction(
268+
) -> "ReadUncommitedTransaction":
269+
return ReadUncommitedTransaction(
278270
_xid=xid,
279271
_start_time=time,
280-
_is_readonly=True,
281-
_is_completed=False,
272+
_space=dict(),
282273
)
283274

284275
def __eq__(self, other: object) -> bool:
285276
return (
286-
isinstance(other, SerializableTransaction)
277+
isinstance(other, ReadUncommitedTransaction)
287278
and self.xid() == other.xid()
288279
)
289280

290281
def __hash__(self) -> int:
291282
return hash(type(self)) + hash(self._xid)
292283

293284

294-
type Transaction = SerializableTransaction | NonSerializableReadTransaction
285+
type Transaction = SerializableTransaction | ReadUncommitedTransaction
295286

296287

297288
class IsolationLevel(Enum):
298-
serializable_read_and_write = auto()
299-
non_serializable_read = auto()
289+
serializable = auto()
290+
read_uncommited = auto()
300291

301292

302293
def start_transaction(
@@ -306,10 +297,10 @@ def start_transaction(
306297
serializable_transactions: Iterable[SerializableTransaction],
307298
) -> Transaction:
308299
match isolation:
309-
case IsolationLevel.serializable_read_and_write:
300+
case IsolationLevel.serializable:
310301
return SerializableTransaction.start(
311302
xid, time, serializable_transactions
312303
)
313304

314-
case IsolationLevel.non_serializable_read:
315-
return NonSerializableReadTransaction.start(xid, time)
305+
case IsolationLevel.read_uncommited:
306+
return ReadUncommitedTransaction.start(xid, time)

src/tgdb/infrastructure/adapters/buffer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Self
88

99
from tgdb.application.common.ports.buffer import Buffer
10-
from tgdb.entities.horizon.transaction import PreparedCommit
10+
from tgdb.entities.horizon.transaction import Commit, PreparedCommit
1111
from tgdb.infrastructure.telethon.in_telegram_bytes import InTelegramBytes
1212

1313

@@ -47,8 +47,8 @@ def _refresh_overflow(self) -> None:
4747

4848

4949
@dataclass(frozen=True)
50-
class InTelegramReplicablePreparedCommitBuffer(Buffer[PreparedCommit]):
51-
_buffer: Buffer[PreparedCommit]
50+
class InTelegramReplicablePreparedCommitBuffer(Buffer[Commit | PreparedCommit]):
51+
_buffer: Buffer[Commit | PreparedCommit]
5252
_in_tg_encoded_commits: InTelegramBytes
5353

5454
async def __aenter__(self) -> Self:
@@ -77,10 +77,12 @@ async def __aexit__(
7777
traceback: TracebackType | None,
7878
) -> None: ...
7979

80-
async def add(self, commit: PreparedCommit, /) -> None:
80+
async def add(self, commit: Commit | PreparedCommit, /) -> None:
8181
await self._buffer.add(commit)
8282

83-
async def __aiter__(self) -> AsyncIterator[Sequence[PreparedCommit]]:
83+
async def __aiter__(
84+
self
85+
) -> AsyncIterator[Sequence[Commit | PreparedCommit]]:
8486
async for commits in self._buffer:
8587
encoded_commits = pickle.dumps(list(commits))
8688
await self._in_tg_encoded_commits.set(encoded_commits)

src/tgdb/infrastructure/telethon/lazy_map.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
from telethon.hints import TotalList
44
from telethon.tl.types import Message
55

6-
from tgdb.entities.relation.tuple import TID
76
from tgdb.infrastructure.heap_tuple_encoding import HeapTupleEncoding
8-
from tgdb.infrastructure.lazy_map import ExternalValue, LazyMap, NoExternalValue
7+
from tgdb.infrastructure.lazy_map import LazyMap
98
from tgdb.infrastructure.telethon.client_pool import TelegramClientPool
109
from tgdb.infrastructure.telethon.index import (
1110
MessageIndex,

src/tgdb/main/common/di.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from tgdb.application.relation.ports.tuples import Tuples
2424
from tgdb.application.relation.view_tuples import ViewTuples
2525
from tgdb.entities.horizon.horizon import Horizon, horizon
26-
from tgdb.entities.horizon.transaction import PreparedCommit
26+
from tgdb.entities.horizon.transaction import Commit, PreparedCommit
2727
from tgdb.entities.relation.relation import Relation
2828
from tgdb.infrastructure.adapters.buffer import (
2929
InMemoryBuffer,
@@ -71,7 +71,7 @@ class CommonProvider(Provider):
7171
provide_uuids = provide(UUIDs4, provides=UUIDs, scope=Scope.APP)
7272
provide_queque = provide(
7373
staticmethod(lambda: InMemoryQueque(AsyncQueque())),
74-
provides=Queque[Sequence[PreparedCommit]],
74+
provides=Queque[Sequence[Commit | PreparedCommit]],
7575
scope=Scope.APP
7676
)
7777
provide_channel = provide(
@@ -163,8 +163,8 @@ async def provide_buffer(
163163
conf: Conf,
164164
bot_pool: BotPool,
165165
user_bot_pool: UserBotPool,
166-
buffer: InMemoryBuffer[PreparedCommit]
167-
) -> AsyncIterator[Buffer[PreparedCommit]]:
166+
buffer: InMemoryBuffer[Commit | PreparedCommit]
167+
) -> AsyncIterator[Buffer[Commit | PreparedCommit]]:
168168
in_tg_bytes = InTelegramBytes(bot_pool, user_bot_pool, conf.buffer.chat)
169169

170170
biffer = InTelegramReplicablePreparedCommitBuffer(buffer, in_tg_bytes)

src/tgdb/presentation/fastapi/horizon/schemas/isolation_level.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,24 @@
44
from tgdb.entities.horizon.transaction import IsolationLevel
55

66

7-
type _EncodedIsolationLevel = Literal[
8-
"serializableReadAndWrite", "nonSerializableRead"
9-
]
7+
type _EncodedIsolationLevel = Literal["serializable", "readUncommited"]
108

119

1210
class IsolationLevelSchema(StrEnum):
13-
serializable_read_and_write = "serializableReadAndWrite"
14-
non_serializable_read = "nonSerializableRead"
11+
serializable = "serializable"
12+
read_uncommited = "readUncommited"
1513

1614
def decoded(self) -> IsolationLevel:
1715
match self:
18-
case IsolationLevelSchema.serializable_read_and_write:
19-
return IsolationLevel.serializable_read_and_write
20-
case IsolationLevelSchema.non_serializable_read:
21-
return IsolationLevel.non_serializable_read
16+
case IsolationLevelSchema.serializable:
17+
return IsolationLevel.serializable
18+
case IsolationLevelSchema.read_uncommited:
19+
return IsolationLevel.read_uncommited
2220

2321
@classmethod
2422
def of(cls, level: IsolationLevel) -> "IsolationLevelSchema":
2523
match level:
26-
case IsolationLevel.serializable_read_and_write:
27-
return IsolationLevelSchema.serializable_read_and_write
28-
case IsolationLevel.non_serializable_read:
29-
return IsolationLevelSchema.non_serializable_read
24+
case IsolationLevel.serializable:
25+
return IsolationLevelSchema.serializable
26+
case IsolationLevel.read_uncommited:
27+
return IsolationLevelSchema.read_uncommited

0 commit comments

Comments
 (0)