Skip to content

Commit 599c93e

Browse files
author
Sergio García Prado
authored
Merge pull request #165 from minos-framework/issue-156-add-root-entity-get-all
#156 - Add `get_all` method to `SnapshotRepository`
2 parents 2c712be + e9daed4 commit 599c93e

File tree

10 files changed

+223
-126
lines changed

10 files changed

+223
-126
lines changed

packages/core/minos-microservice-aggregate/minos/aggregate/entities/collections.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818
UUID,
1919
)
2020

21-
from minos.aggregate.collections import (
22-
IncrementalSet,
23-
IncrementalSetDiff,
24-
)
2521
from minos.common import (
2622
DataDecoder,
2723
DataEncoder,
@@ -31,6 +27,11 @@
3127
SchemaEncoder,
3228
)
3329

30+
from ..collections import (
31+
IncrementalSet,
32+
IncrementalSetDiff,
33+
)
34+
3435
T = TypeVar("T", bound=Model)
3536

3637

packages/core/minos-microservice-aggregate/minos/aggregate/entities/models.py

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,29 @@
2222
inject,
2323
)
2424

25-
from minos.aggregate.events import (
25+
from minos.common import (
26+
NULL_DATETIME,
27+
NULL_UUID,
28+
DeclarativeModel,
29+
NotProvidedException,
30+
)
31+
32+
from ..events import (
2633
Event,
2734
EventEntry,
2835
EventRepository,
2936
IncrementalFieldDiff,
3037
)
31-
from minos.aggregate.exceptions import (
38+
from ..exceptions import (
3239
EventRepositoryException,
3340
)
34-
from minos.aggregate.queries import (
41+
from ..queries import (
3542
_Condition,
3643
_Ordering,
3744
)
38-
from minos.aggregate.snapshots import (
45+
from ..snapshots import (
3946
SnapshotRepository,
4047
)
41-
from minos.common import (
42-
NULL_DATETIME,
43-
NULL_UUID,
44-
DeclarativeModel,
45-
NotProvidedException,
46-
)
4748

4849
logger = logging.getLogger(__name__)
4950

@@ -83,46 +84,74 @@ def __init__(
8384
version: int = 0,
8485
created_at: datetime = NULL_DATETIME,
8586
updated_at: datetime = NULL_DATETIME,
86-
_repository: EventRepository = Provide["event_repository"],
87-
_snapshot: SnapshotRepository = Provide["snapshot_repository"],
87+
_event_repository: EventRepository = Provide["event_repository"],
88+
_snapshot_repository: SnapshotRepository = Provide["snapshot_repository"],
8889
**kwargs,
8990
):
9091

9192
super().__init__(version, created_at, updated_at, *args, uuid=uuid, **kwargs)
9293

93-
if _repository is None or isinstance(_repository, Provide):
94-
raise NotProvidedException("An event repository instance is required.")
95-
if _snapshot is None or isinstance(_snapshot, Provide):
96-
raise NotProvidedException("A snapshot instance is required.")
94+
if _event_repository is None or isinstance(_event_repository, Provide):
95+
raise NotProvidedException(f"A {EventRepository!r} instance is required.")
96+
if _snapshot_repository is None or isinstance(_snapshot_repository, Provide):
97+
raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.")
9798

98-
self._repository = _repository
99-
self._snapshot = _snapshot
99+
self._event_repository = _event_repository
100+
self._snapshot_repository = _snapshot_repository
100101

101102
@classmethod
102103
@inject
103104
async def get(
104-
cls: Type[T], uuid: UUID, _snapshot: SnapshotRepository = Provide["snapshot_repository"], **kwargs
105+
cls: Type[T], uuid: UUID, *, _snapshot_repository: SnapshotRepository = Provide["snapshot_repository"], **kwargs
105106
) -> T:
106107
"""Get one instance from the database based on its identifier.
107108
108109
:param uuid: The identifier of the instance.
109-
:param _snapshot: Snapshot to be set to the root entity.
110+
:param _snapshot_repository: Snapshot to be set to the root entity.
110111
:return: A ``RootEntity`` instance.
111112
"""
112-
if _snapshot is None or isinstance(_snapshot, Provide):
113-
raise NotProvidedException("A snapshot instance is required.")
113+
if _snapshot_repository is None or isinstance(_snapshot_repository, Provide):
114+
raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.")
114115

115116
# noinspection PyTypeChecker
116-
return await _snapshot.get(cls.classname, uuid, _snapshot=_snapshot, **kwargs)
117+
return await _snapshot_repository.get(cls.classname, uuid, _snapshot_repository=_snapshot_repository, **kwargs)
117118

118119
@classmethod
119120
@inject
120-
async def find(
121+
def get_all(
122+
cls: Type[T],
123+
ordering: Optional[_Ordering] = None,
124+
limit: Optional[int] = None,
125+
*,
126+
_snapshot_repository: SnapshotRepository = Provide["snapshot_repository"],
127+
**kwargs,
128+
) -> AsyncIterator[T]:
129+
"""Get all instance from the database.
130+
131+
:param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour
132+
is to retrieve them without any order pattern.
133+
:param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the
134+
instances that meet the given condition.
135+
:param _snapshot_repository: Snapshot to be set to the root entity.
136+
:return: A ``RootEntity`` instance.
137+
"""
138+
if _snapshot_repository is None or isinstance(_snapshot_repository, Provide):
139+
raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.")
140+
141+
# noinspection PyTypeChecker
142+
return _snapshot_repository.get_all(
143+
cls.classname, ordering, limit, _snapshot_repository=_snapshot_repository, **kwargs
144+
)
145+
146+
@classmethod
147+
@inject
148+
def find(
121149
cls: Type[T],
122150
condition: _Condition,
123151
ordering: Optional[_Ordering] = None,
124152
limit: Optional[int] = None,
125-
_snapshot: SnapshotRepository = Provide["snapshot_repository"],
153+
*,
154+
_snapshot_repository: SnapshotRepository = Provide["snapshot_repository"],
126155
**kwargs,
127156
) -> AsyncIterator[T]:
128157
"""Find a collection of instances based on a given ``Condition``.
@@ -132,16 +161,15 @@ async def find(
132161
is to retrieve them without any order pattern.
133162
:param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the
134163
instances that meet the given condition.
135-
:param _snapshot: Snapshot to be set to the instances.
164+
:param _snapshot_repository: Snapshot to be set to the instances.
136165
:return: An asynchronous iterator of ``RootEntity`` instances.
137166
"""
138-
if _snapshot is None or isinstance(_snapshot, Provide):
139-
raise NotProvidedException("A snapshot instance is required.")
167+
if _snapshot_repository is None or isinstance(_snapshot_repository, Provide):
168+
raise NotProvidedException(f"A {SnapshotRepository!r} instance is required.")
140169
# noinspection PyTypeChecker
141-
iterable = _snapshot.find(cls.classname, condition, ordering, limit, _snapshot=_snapshot, **kwargs)
142-
# noinspection PyTypeChecker
143-
async for instance in iterable:
144-
yield instance
170+
return _snapshot_repository.find(
171+
cls.classname, condition, ordering, limit, _snapshot_repository=_snapshot_repository, **kwargs
172+
)
145173

146174
@classmethod
147175
async def create(cls: Type[T], *args, **kwargs) -> T:
@@ -171,7 +199,7 @@ async def create(cls: Type[T], *args, **kwargs) -> T:
171199
instance: T = cls(*args, **kwargs)
172200

173201
event = Event.from_root_entity(instance)
174-
entry = await instance._repository.submit(event)
202+
entry = await instance._event_repository.submit(event)
175203

176204
instance._update_from_repository_entry(entry)
177205

@@ -201,12 +229,14 @@ async def update(self: T, **kwargs) -> T:
201229
for key, value in kwargs.items():
202230
setattr(self, key, value)
203231

204-
previous = await self.get(self.uuid, _repository=self._repository, _snapshot=self._snapshot)
232+
previous = await self.get(
233+
self.uuid, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository
234+
)
205235
event = self.diff(previous)
206236
if not len(event.fields_diff):
207237
return self
208238

209-
entry = await self._repository.submit(event)
239+
entry = await self._event_repository.submit(event)
210240

211241
self._update_from_repository_entry(entry)
212242

@@ -234,17 +264,23 @@ async def save(self) -> None:
234264
if k not in {"uuid", "version", "created_at", "updated_at"}
235265
}
236266
if is_creation:
237-
new = await self.create(**values, _repository=self._repository, _snapshot=self._snapshot)
267+
new = await self.create(
268+
**values, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository
269+
)
238270
self._fields |= new.fields
239271
else:
240-
await self.update(**values, _repository=self._repository, _snapshot=self._snapshot)
272+
await self.update(
273+
**values, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository
274+
)
241275

242276
async def refresh(self) -> None:
243277
"""Refresh the state of the given instance.
244278
245279
:return: This method does not return anything.
246280
"""
247-
new = await type(self).get(self.uuid, _repository=self._repository, _snapshot=self._snapshot)
281+
new = await self.get(
282+
self.uuid, _event_repository=self._event_repository, _snapshot_repository=self._snapshot_repository
283+
)
248284
self._fields |= new.fields
249285

250286
async def delete(self) -> None:
@@ -253,7 +289,7 @@ async def delete(self) -> None:
253289
:return: This method does not return anything.
254290
"""
255291
event = Event.from_deleted_root_entity(self)
256-
entry = await self._repository.submit(event)
292+
entry = await self._event_repository.submit(event)
257293

258294
self._update_from_repository_entry(entry)
259295

packages/core/minos-microservice-aggregate/minos/aggregate/entities/refs/models.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
inject,
2121
)
2222

23-
from minos.aggregate.contextvars import (
24-
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
25-
)
2623
from minos.common import (
2724
DataDecoder,
2825
DataEncoder,
@@ -39,6 +36,10 @@
3936
BrokerMessageV1Payload,
4037
)
4138

39+
from ...contextvars import (
40+
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
41+
)
42+
4243
MT = TypeVar("MT", bound=Model)
4344

4445

packages/core/minos-microservice-aggregate/minos/aggregate/events/fields.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,20 @@
2222
uuid4,
2323
)
2424

25-
from minos.aggregate.actions import (
26-
Action,
27-
)
28-
from minos.aggregate.collections import (
29-
IncrementalSet,
30-
)
3125
from minos.common import (
3226
BucketModel,
3327
Field,
3428
Model,
3529
ModelType,
3630
)
3731

32+
from ..actions import (
33+
Action,
34+
)
35+
from ..collections import (
36+
IncrementalSet,
37+
)
38+
3839
logger = logging.getLogger(__name__)
3940

4041
T = TypeVar("T")

packages/core/minos-microservice-aggregate/minos/aggregate/events/models.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@
2121
UUID,
2222
)
2323

24-
from minos.aggregate.actions import (
25-
Action,
26-
)
2724
from minos.common import (
2825
DeclarativeModel,
2926
)
3027

28+
from ..actions import (
29+
Action,
30+
)
3131
from .fields import (
3232
FieldDiff,
3333
FieldDiffContainer,
3434
)
3535

3636
if TYPE_CHECKING:
37-
from minos.aggregate.entities import (
37+
from ..entities import (
3838
RootEntity,
3939
)
4040

packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/abc.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222

2323
from ..queries import (
24+
_TRUE_CONDITION,
2425
_Condition,
2526
_Ordering,
2627
)
@@ -64,6 +65,40 @@ async def get(self, name: str, uuid: UUID, transaction: Optional[TransactionEntr
6465
async def _get(self, *args, **kwargs) -> RootEntity:
6566
raise NotImplementedError
6667

68+
def get_all(
69+
self,
70+
name: str,
71+
ordering: Optional[_Ordering] = None,
72+
limit: Optional[int] = None,
73+
streaming_mode: bool = False,
74+
transaction: Optional[TransactionEntry] = None,
75+
**kwargs,
76+
) -> AsyncIterator[RootEntity]:
77+
"""Get all ``RootEntity`` instances.
78+
79+
:param name: Class name of the ``RootEntity``.
80+
:param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour
81+
is to retrieve them without any order pattern.
82+
:param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the
83+
instances that meet the given condition.
84+
:param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open
85+
database connection), otherwise preloads the full set of values on memory and then retrieves them.
86+
:param transaction: The transaction within the operation is performed. If not any value is provided, then the
87+
transaction is extracted from the context var. If not any transaction is being scoped then the query is
88+
performed to the global snapshot.
89+
:param kwargs: Additional named arguments.
90+
:return: An asynchronous iterator that containing the ``RootEntity`` instances.
91+
"""
92+
return self.find(
93+
name,
94+
_TRUE_CONDITION,
95+
ordering=ordering,
96+
limit=limit,
97+
streaming_mode=streaming_mode,
98+
transaction=transaction,
99+
**kwargs,
100+
)
101+
67102
async def find(
68103
self,
69104
name: str,

packages/core/minos-microservice-aggregate/minos/aggregate/value_objects.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,18 @@
77
TypeVar,
88
)
99

10-
from minos.aggregate.collections import (
11-
IncrementalSet,
12-
)
13-
from minos.aggregate.exceptions import (
14-
ValueObjectException,
15-
)
1610
from minos.common import (
1711
DeclarativeModel,
1812
Model,
1913
)
2014

15+
from .collections import (
16+
IncrementalSet,
17+
)
18+
from .exceptions import (
19+
ValueObjectException,
20+
)
21+
2122

2223
class ValueObject(DeclarativeModel):
2324
"""Value Object class."""

0 commit comments

Comments
 (0)