Skip to content

Commit 5dcc4a0

Browse files
author
Sergio García Prado
authored
Merge pull request #317 from minos-framework/issue-87-rename-idempotent-broker-subscriber
#87 - Rename `IdempotentBrokerSubscriber` as `FilteredBrokerSubscriber`
2 parents 9ab0b23 + 1a8c9d1 commit 5dcc4a0

File tree

29 files changed

+379
-294
lines changed

29 files changed

+379
-294
lines changed

packages/core/minos-microservice-common/minos/common/config/v2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ def _parse_broker_interface(data: dict[str, Any]) -> dict[str, Any]:
130130
data["subscriber"]["client"] = import_module(data["subscriber"]["client"])
131131
if "queue" in data["subscriber"]:
132132
data["subscriber"]["queue"] = import_module(data["subscriber"]["queue"])
133-
if "idempotent" in data["subscriber"]:
134-
data["subscriber"]["idempotent"] = import_module(data["subscriber"]["idempotent"])
133+
if "validator" in data["subscriber"]:
134+
data["subscriber"]["validator"] = import_module(data["subscriber"]["validator"])
135135

136136
return data
137137

packages/core/minos-microservice-common/tests/config/v2.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ interfaces:
4040
subscriber:
4141
client: tests.utils.FakeBrokerSubscriber
4242
queue: builtins.int
43-
idempotent: builtins.float
43+
validator: builtins.float
4444
periodic:
4545
port: tests.utils.FakePeriodicPort
4646
pools:

packages/core/minos-microservice-common/tests/test_common/test_config/test_v2/test_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def test_interface_broker(self):
115115
"queue": {"records": 10, "retry": 2},
116116
},
117117
"publisher": {"client": FakeBrokerPublisher, "queue": int},
118-
"subscriber": {"client": FakeBrokerSubscriber, "queue": int, "idempotent": float},
118+
"subscriber": {"client": FakeBrokerSubscriber, "queue": int, "validator": float},
119119
}
120120

121121
self.assertEqual(expected, broker)

packages/core/minos-microservice-networks/minos/networks/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,26 @@
2424
BrokerResponseException,
2525
BrokerSubscriber,
2626
BrokerSubscriberBuilder,
27-
BrokerSubscriberDuplicateDetector,
27+
BrokerSubscriberDuplicateValidator,
2828
BrokerSubscriberQueue,
2929
BrokerSubscriberQueueBuilder,
30-
IdempotentBrokerSubscriber,
30+
BrokerSubscriberValidator,
31+
FilteredBrokerSubscriber,
3132
InMemoryBrokerPublisher,
3233
InMemoryBrokerPublisherQueue,
3334
InMemoryBrokerQueue,
3435
InMemoryBrokerSubscriber,
3536
InMemoryBrokerSubscriberBuilder,
36-
InMemoryBrokerSubscriberDuplicateDetector,
37+
InMemoryBrokerSubscriberDuplicateValidator,
3738
InMemoryBrokerSubscriberQueue,
3839
InMemoryBrokerSubscriberQueueBuilder,
3940
PostgreSqlBrokerPublisherQueue,
4041
PostgreSqlBrokerPublisherQueueQueryFactory,
4142
PostgreSqlBrokerQueue,
4243
PostgreSqlBrokerQueueBuilder,
43-
PostgreSqlBrokerSubscriberDuplicateDetector,
44-
PostgreSqlBrokerSubscriberDuplicateDetectorBuilder,
45-
PostgreSqlBrokerSubscriberDuplicateDetectorQueryFactory,
44+
PostgreSqlBrokerSubscriberDuplicateValidator,
45+
PostgreSqlBrokerSubscriberDuplicateValidatorBuilder,
46+
PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory,
4647
PostgreSqlBrokerSubscriberQueue,
4748
PostgreSqlBrokerSubscriberQueueBuilder,
4849
PostgreSqlBrokerSubscriberQueueQueryFactory,

packages/core/minos-microservice-networks/minos/networks/brokers/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,19 @@
4242
from .subscribers import (
4343
BrokerSubscriber,
4444
BrokerSubscriberBuilder,
45-
BrokerSubscriberDuplicateDetector,
45+
BrokerSubscriberDuplicateValidator,
4646
BrokerSubscriberQueue,
4747
BrokerSubscriberQueueBuilder,
48-
IdempotentBrokerSubscriber,
48+
BrokerSubscriberValidator,
49+
FilteredBrokerSubscriber,
4950
InMemoryBrokerSubscriber,
5051
InMemoryBrokerSubscriberBuilder,
51-
InMemoryBrokerSubscriberDuplicateDetector,
52+
InMemoryBrokerSubscriberDuplicateValidator,
5253
InMemoryBrokerSubscriberQueue,
5354
InMemoryBrokerSubscriberQueueBuilder,
54-
PostgreSqlBrokerSubscriberDuplicateDetector,
55-
PostgreSqlBrokerSubscriberDuplicateDetectorBuilder,
56-
PostgreSqlBrokerSubscriberDuplicateDetectorQueryFactory,
55+
PostgreSqlBrokerSubscriberDuplicateValidator,
56+
PostgreSqlBrokerSubscriberDuplicateValidatorBuilder,
57+
PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory,
5758
PostgreSqlBrokerSubscriberQueue,
5859
PostgreSqlBrokerSubscriberQueueBuilder,
5960
PostgreSqlBrokerSubscriberQueueQueryFactory,

packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/__init__.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
BrokerSubscriber,
33
BrokerSubscriberBuilder,
44
)
5-
from .idempotent import (
6-
BrokerSubscriberDuplicateDetector,
7-
IdempotentBrokerSubscriber,
8-
InMemoryBrokerSubscriberDuplicateDetector,
9-
PostgreSqlBrokerSubscriberDuplicateDetector,
10-
PostgreSqlBrokerSubscriberDuplicateDetectorBuilder,
11-
PostgreSqlBrokerSubscriberDuplicateDetectorQueryFactory,
5+
from .filtered import (
6+
BrokerSubscriberDuplicateValidator,
7+
BrokerSubscriberValidator,
8+
FilteredBrokerSubscriber,
9+
InMemoryBrokerSubscriberDuplicateValidator,
10+
PostgreSqlBrokerSubscriberDuplicateValidator,
11+
PostgreSqlBrokerSubscriberDuplicateValidatorBuilder,
12+
PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory,
1213
)
1314
from .memory import (
1415
InMemoryBrokerSubscriber,

packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/abc.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
)
3434

3535
if TYPE_CHECKING:
36-
from .idempotent import (
37-
BrokerSubscriberDuplicateDetector,
38-
IdempotentBrokerSubscriber,
36+
from .filtered import (
37+
BrokerSubscriberValidator,
38+
FilteredBrokerSubscriber,
3939
)
4040
from .queued import (
4141
BrokerSubscriberQueue,
@@ -94,20 +94,20 @@ class BrokerSubscriberBuilder(Builder[BrokerSubscriberCls], Generic[BrokerSubscr
9494
def __init__(
9595
self,
9696
*args,
97-
idempotent_builder: Optional[Builder] = None,
97+
validator_builder: Optional[Builder] = None,
9898
queue_builder: Optional[BrokerSubscriberQueueBuilder] = None,
99-
idempotent_cls: Optional[type[IdempotentBrokerSubscriber]] = None,
99+
filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None,
100100
queued_cls: Optional[type[QueuedBrokerSubscriber]] = None,
101101
**kwargs,
102102
):
103103
super().__init__(*args, **kwargs)
104104

105-
if idempotent_cls is None:
106-
from .idempotent import (
107-
IdempotentBrokerSubscriber,
105+
if filtered_cls is None:
106+
from .filtered import (
107+
FilteredBrokerSubscriber,
108108
)
109109

110-
idempotent_cls = IdempotentBrokerSubscriber
110+
filtered_cls = FilteredBrokerSubscriber
111111

112112
if queued_cls is None:
113113
from .queued import (
@@ -116,19 +116,19 @@ def __init__(
116116

117117
queued_cls = QueuedBrokerSubscriber
118118

119-
self.duplicate_detector_builder = idempotent_builder
119+
self.validator_builder = validator_builder
120120
self.queue_builder = queue_builder
121121

122-
self.idempotent_cls = idempotent_cls
122+
self.filtered_cls = filtered_cls
123123
self.queued_cls = queued_cls
124124

125-
def with_idempotent_cls(self, idempotent_cls: type[IdempotentBrokerSubscriber]):
126-
"""Set the idempotent class.
125+
def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]):
126+
"""Set the filtered class.
127127
128-
:param idempotent_cls: A subclass of ``IdempotentBrokerSubscriber``.
128+
:param filtered_cls: A subclass of ``FilteredBrokerSubscriber``.
129129
:return: This method return the builder instance.
130130
"""
131-
self.idempotent_cls = idempotent_cls
131+
self.filtered_cls = filtered_cls
132132

133133
return self
134134

@@ -150,8 +150,8 @@ def with_config(self, config: Config):
150150
"""
151151
self._with_builders_from_config(config)
152152

153-
if self.duplicate_detector_builder is not None:
154-
self.duplicate_detector_builder.with_config(config)
153+
if self.validator_builder is not None:
154+
self.validator_builder.with_config(config)
155155
if self.queue_builder is not None:
156156
self.queue_builder.with_config(config)
157157
return super().with_config(config)
@@ -164,24 +164,24 @@ def _with_builders_from_config(self, config):
164164

165165
broker_subscriber_config = broker_config["subscriber"]
166166

167-
if "idempotent" in broker_subscriber_config:
168-
self.with_duplicate_detector(broker_subscriber_config["idempotent"])
167+
if "validator" in broker_subscriber_config:
168+
self.with_validator(broker_subscriber_config["validator"])
169169

170170
if "queue" in broker_subscriber_config:
171171
self.with_queue(broker_subscriber_config["queue"])
172172

173-
def with_duplicate_detector(
173+
def with_validator(
174174
self,
175-
duplicate_detector: Union[type[BrokerSubscriberDuplicateDetector], Builder[BrokerSubscriberDuplicateDetector]],
175+
validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]],
176176
):
177177
"""Set the duplicate detector.
178178
179-
:param duplicate_detector: The duplicate detector to be set.
179+
:param validator: The duplicate detector to be set.
180180
:return: This method return the builder instance.
181181
"""
182-
if not isinstance(duplicate_detector, Builder):
183-
duplicate_detector = duplicate_detector.get_builder()
184-
self.duplicate_detector_builder = duplicate_detector.copy()
182+
if not isinstance(validator, Builder):
183+
validator = validator.get_builder()
184+
self.validator_builder = validator.copy()
185185
return self
186186

187187
def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]):
@@ -201,8 +201,8 @@ def with_kwargs(self, kwargs: dict[str, Any]):
201201
:param kwargs: The kwargs to be set.
202202
:return: This method return the builder instance.
203203
"""
204-
if self.duplicate_detector_builder is not None:
205-
self.duplicate_detector_builder.with_kwargs(kwargs)
204+
if self.validator_builder is not None:
205+
self.validator_builder.with_kwargs(kwargs)
206206

207207
if self.queue_builder is not None:
208208
self.queue_builder.with_kwargs(kwargs)
@@ -250,9 +250,9 @@ def build(self) -> BrokerSubscriber:
250250
"""
251251
impl = super().build()
252252

253-
if self.duplicate_detector_builder is not None:
254-
duplicate_detector = self.duplicate_detector_builder.build()
255-
impl = self.idempotent_cls(impl=impl, duplicate_detector=duplicate_detector, **self.kwargs)
253+
if self.validator_builder is not None:
254+
validator = self.validator_builder.build()
255+
impl = self.filtered_cls(impl=impl, validator=validator, **self.kwargs)
256256

257257
if self.queue_builder is not None:
258258
queue = self.queue_builder.build()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from .impl import (
2+
FilteredBrokerSubscriber,
3+
)
4+
from .validators import (
5+
BrokerSubscriberDuplicateValidator,
6+
BrokerSubscriberValidator,
7+
InMemoryBrokerSubscriberDuplicateValidator,
8+
PostgreSqlBrokerSubscriberDuplicateValidator,
9+
PostgreSqlBrokerSubscriberDuplicateValidatorBuilder,
10+
PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory,
11+
)
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@
88
from ..abc import (
99
BrokerSubscriber,
1010
)
11-
from .detectors import (
12-
BrokerSubscriberDuplicateDetector,
11+
from .validators import (
12+
BrokerSubscriberValidator,
1313
)
1414

1515
_sentinel = object()
1616

1717

18-
class IdempotentBrokerSubscriber(BrokerSubscriber):
19-
"""Idempotent Broker Subscriber class."""
18+
class FilteredBrokerSubscriber(BrokerSubscriber):
19+
"""Filtered Broker Subscriber class."""
2020

2121
impl: BrokerSubscriber
22-
duplicate_detector: BrokerSubscriberDuplicateDetector
22+
validator: BrokerSubscriberValidator
2323

24-
def __init__(self, impl: BrokerSubscriber, duplicate_detector: BrokerSubscriberDuplicateDetector, **kwargs):
24+
def __init__(self, impl: BrokerSubscriber, validator: BrokerSubscriberValidator, **kwargs):
2525
super().__init__(**(kwargs | {"topics": impl.topics}))
2626
self.impl = impl
27-
self.duplicate_detector = duplicate_detector
27+
self.validator = validator
2828

2929
async def _setup(self) -> None:
3030
await super()._setup()
31-
await self.duplicate_detector.setup()
31+
await self.validator.setup()
3232
await self.impl.setup()
3333

3434
async def _destroy(self) -> None:
3535
await self.impl.destroy()
36-
await self.duplicate_detector.destroy()
36+
await self.validator.destroy()
3737
await super()._destroy()
3838

3939
async def _receive(self) -> BrokerMessage:
4040
message = _sentinel
41-
while message is _sentinel or not (await self.duplicate_detector.is_valid(message)):
41+
while message is _sentinel or not (await self.validator.is_valid(message)):
4242
message = await self.impl.receive()
4343
return message
4444

4545

46-
IdempotentBrokerSubscriber.set_builder(Builder)
46+
FilteredBrokerSubscriber.set_builder(Builder)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .abc import (
2+
BrokerSubscriberValidator,
3+
)
4+
from .duplicates import (
5+
BrokerSubscriberDuplicateValidator,
6+
InMemoryBrokerSubscriberDuplicateValidator,
7+
PostgreSqlBrokerSubscriberDuplicateValidator,
8+
PostgreSqlBrokerSubscriberDuplicateValidatorBuilder,
9+
PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory,
10+
)

0 commit comments

Comments
 (0)