Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions meta/examples/subscriber_registration/all_in_one_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
async def add_company_projector(
event_store, projection_store, event_broker, service_manager
):
projection_subscriber = make_subscriber(
subscriber_group="search-company-projection",
subscription_request=CategoryIdentifier(category="company"),
subscriber_state_category=event_store.category(
category="search-company-projection-state"
),
subscriber_state_persistence_interval=EventCount(10000),
event_processor=CompanySearchEventProcessor(
projector=CompanySearchProjector(),
projection_store=projection_store,
),
)

error_handling_service = ErrorHandlingService(
callable=projection_subscriber.consume_all,
error_handler=ContinueErrorHandler(),
)

projection_service = PollingService(
callable=error_handling_service.execute,
poll_interval=timedelta(seconds=1),
)

await event_broker.register(projection_subscriber)
service_manager.register(
projection_service,
execution_mode=ExecutionMode.BACKGROUND,
)
82 changes: 82 additions & 0 deletions meta/examples/subscriber_registration/consumer_registrar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
class ConsumerRegistrar(ABC):
subscriber_group: ClassVar[LiteralString]
subscriber_state_category: ClassVar[LiteralString]
category: ClassVar[CategoryDefinition[Any, Any]]

def __repr__(self):
return (
f"{self.__class__.__name__}("
f"subscriber_group={self.subscriber_group}, "
f"category={self.category}, "
f"event_processor={repr(self.event_processor)}"
")"
)

def __init__(
self,
event_store: EventStore,
event_processor: EventProcessor,
):
self._event_store = event_store
self._event_processor = event_processor

self._state_store = EventConsumerStateStore(
category=event_store.category(category=self.subscriber_group),
)

@property
def event_processor(self) -> EventProcessor:
return self._event_processor

def _consumer_for_source(self, source: EventSource) -> EventSourceConsumer:
return EventSourceConsumer(
source=source,
processor=self._event_processor,
state_store=self._state_store,
)

async def consume_all(self) -> None:
source = self._event_store.category(category=self.category.name)
consumer = self._consumer_for_source(source)
return await consumer.consume_all()

async def register_as_service(
self, event_broker: EventBroker, service_manager: ServiceManager
) -> None:
subscriber = EventSubscriptionConsumer(
group=self.subscriber_group,
id=str(uuid4()),
subscription_requests=[
CategoryIdentifier(category=self.category.name)
],
delegate_factory=self._consumer_for_source,
)

await event_broker.register(subscriber)

service = PollingService(
callable=subscriber.consume_all,
poll_interval=timedelta(seconds=1),
)

service_manager.register(
service,
execution_mode=ExecutionMode.BACKGROUND,
)


async def register_tasks(
service_manager: ServiceManager,
event_broker: EventBroker,
registrars: Iterable[ConsumerRegistrar],
) -> None:
async with asyncio.TaskGroup() as tg:
service_manager.register(event_broker)
logger.info("Registering consumers", registrars=registrars)
for registrar in registrars:
tg.create_task(
registrar.register_as_service(
service_manager=service_manager,
event_broker=event_broker,
)
)
52 changes: 52 additions & 0 deletions meta/examples/subscriber_registration/register_many_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
async def register_subscribers(
event_broker: EventBroker,
service_manager: ServiceManager,
consumers: List[EventSubscriptionConsumer],
) -> None:
for consumer in consumers:
await event_broker.register(consumer)

error_handling_service = LoggingErrorHandlingService(
callable=consumer.consume_all,
error_handler=RaiseErrorHandler(),
)
polling_service = PollingService(
callable=error_handling_service.execute,
poll_interval=timedelta(milliseconds=100),
)

service_manager.register(
polling_service, execution_mode=ExecutionMode.BACKGROUND
)


async def init_event_services(
db_type: DBType,
db_settings: DBSettings,
connection_pool: AsyncConnectionPool[AsyncConnection],
db: DB,
publisher: EventPublisher,
clock: Clock,
metrics: Metrics
) -> ServiceManager:
service_manager = ServiceManager() # type: ignore[no-untyped-call]
event_broker = _make_event_broker(
db_type, db_settings, connection_pool, db.event_store_adapter
)
service_manager.register(event_broker)
event_store = db.event_store
subscription_consumers = [
*approval_subscribers.make_subscribers(
event_store, db, clock, metrics
),
*payment_subscribers.make_subscribers(
event_store, publisher, db, clock
),
*settings_subscribers.make_subscribers(
event_store, publisher, db, metrics,
)
]
await register_subscribers(
event_broker, service_manager, subscription_consumers
)
return service_manager
57 changes: 57 additions & 0 deletions meta/examples/subscriber_registration/register_one_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
async def register_subscriber(
subscriber_group: str,
subscription_request: EventSourceIdentifier,
subscriber_state_category_name: SubscriberStateCategory,
processor: SupportedProcessors,
event_broker: EventBroker,
event_store: EventStore,
service_manager: ServiceManager,
retryable_exceptions: list[Type[BaseException]] = (),
):
subscriber_id = str(uuid4())
subscriber_state_category = event_store.category(
category=subscriber_state_category_name
)

state_store = EventConsumerStateStore(
category=subscriber_state_category,
converter=StoredEventEventConsumerStateConverter(),
persistence_interval=EventCount(1),
)

def delegate_factory[I: EventSourceIdentifier](
source: EventSource[I, StoredEvent],
) -> EventSourceConsumer[I, StoredEvent]:
return EventSourceConsumer(
source=source,
processor=processor,
state_store=state_store,
)

subscriber = EventSubscriptionConsumer(
group=subscriber_group,
id=subscriber_id,
subscription_requests=[subscription_request],
delegate_factory=delegate_factory,
)

await event_broker.register(subscriber)

error_handling_service = ErrorHandlingService(
callable=subscriber.consume_all,
error_handler=TypeMappingErrorHandler(
type_mappings=error_handler_type_mappings(
continue_execution=retryable_exceptions
)
),
)

polling_service = PollingService(
callable=error_handling_service.execute,
poll_interval=timedelta(seconds=1),
)

service_manager.register(
polling_service,
execution_mode=ExecutionMode.BACKGROUND,
)
138 changes: 138 additions & 0 deletions meta/examples/subscriber_registration/subsytem_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
class ProviderService[EntityRecord: RecordBaseModel]:
def __init__(
self,
event_store: EventStore,
projection_store: ProjectionStore,
loader: Loader[EntityRecord],
record_type: type[EntityRecord],
constraints: Sequence[Constraint],
):
self._event_store = event_store
self._projection_store = projection_store
self._loader = loader
self._constraints = constraints
self._record_type = record_type

async def _make_projection_service(
self,
config: ProjectionConfig,
event_broker: EventBroker,
service_manager: ServiceManager,
enabled: bool,
event_processor: EventProcessor,
):
projection_subscriber = make_subscriber(
subscriber_group=config.subscriber_config.subscriber_group,
subscription_request=CategoryIdentifier(
category=config.subscriber_config.subscription_request_category
),
subscriber_state_category=self._event_store.category(
category=config.subscriber_config.subscriber_state_category
),
subscriber_state_persistence_interval=EventCount(10000),
event_processor=event_processor,
)

error_handling_service = ErrorHandlingService(
callable=projection_subscriber.consume_all,
error_handler=ContinueErrorHandler(),
)

projection_service = PollingService(
callable=error_handling_service.execute,
poll_interval=timedelta(seconds=1),
)

if enabled:
await event_broker.register(projection_subscriber)
service_manager.register(
projection_service,
execution_mode=ExecutionMode.BACKGROUND,
)

def _make_event_processor(self, config: IngestionConfig):
return ProjectionEventProcessor[RecordLog | None](
projector=RecordLogProjector[EntityRecord](
projection_name=config.projection_name,
payload_type=self._record_type,
),
projection_store=self._projection_store,
state_type=RecordLog,
)

def _make_ingestion_service(
self,
config: IngestionConfig,
service_manager: ServiceManager,
enabled: bool,
):
record_log_cache = RecordLogCache(
event_store=self._event_store,
projection_store=self._projection_store,
projection_name=config.projection_name,
record_category_name=config.record_category_name,
projection_consumer_state_category_name=(
config.projection_consumer_state_category_name
),
)
ingestion_service = IngestionService(
event_store=self._event_store,
projection_store=self._projection_store,
loader=self._loader,
record_log_cache=record_log_cache,
constraints=self._constraints,
projection_name=config.projection_name,
event_name=config.event_name,
category_name=config.record_category_name,
)

error_handling_ingestion_service = ErrorHandlingService(
callable=ingestion_service.execute,
error_handler=RetryErrorHandler(),
)

if enabled:
service_manager.register(
error_handling_ingestion_service,
execution_mode=ExecutionMode.BACKGROUND,
)

async def make(
self,
event_broker: EventBroker,
service_manager: ServiceManager,
config: ProviderConfig,
):
if config.ingestion_config is not None:
self._make_ingestion_service(
config=config.ingestion_config,
service_manager=service_manager,
enabled=config.ingestion_config.enabled,
)

if config.record_log_config is not None:
await self._make_projection_service(
config=config.record_log_config,
event_broker=event_broker,
service_manager=service_manager,
event_processor=config.record_log_config.event_processor,
enabled=config.record_log_config.enabled,
)

if config.changeset_config is not None:
await self._make_projection_service(
config=config.changeset_config,
event_broker=event_broker,
service_manager=service_manager,
event_processor=config.changeset_config.event_processor,
enabled=config.changeset_config.enabled,
)

if config.entity_config is not None:
await self._make_projection_service(
config=config.entity_config,
event_broker=event_broker,
service_manager=service_manager,
event_processor=config.entity_config.event_processor,
enabled=config.entity_config.enabled,
)
Loading