|
1 | | -from src.common.models import RideRequest, DriverLocation |
| 1 | +import asyncio |
| 2 | +import random |
2 | 3 | from src.common.utils import get_logger |
3 | 4 |
|
4 | 5 |
|
5 | | -class MatchingProducer: |
| 6 | +class PricingProducer: |
6 | 7 | """ |
7 | | - Utility class for publishing ride requests and driver updates |
8 | | - into the event-driven system. |
9 | | - Used for testing, load generation, or manual event injection. |
| 8 | + Publishes simulated supply/demand events for each zone. |
| 9 | + In production this could come from a real-time telemetry pipeline. |
10 | 10 | """ |
11 | 11 |
|
12 | 12 | def __init__(self, event_bus): |
13 | 13 | self.event_bus = event_bus |
14 | | - self.logger = get_logger("MatchingProducer") |
15 | | - |
16 | | - async def publish_ride_request(self, request: RideRequest): |
17 | | - self.logger.info( |
18 | | - f"Publishing ride request {request.request_id} for passenger {request.passenger_id}" |
19 | | - ) |
20 | | - await self.event_bus.publish("ride_requests", request.dict()) |
21 | | - |
22 | | - async def publish_driver_update(self, driver: DriverLocation): |
23 | | - self.logger.info( |
24 | | - f"Publishing driver update for driver {driver.driver_id}" |
25 | | - ) |
26 | | - await self.event_bus.publish("driver_updates", driver.dict()) |
| 14 | + self.logger = get_logger("PricingProducer") |
| 15 | + |
| 16 | + async def generate_supply_demand(self, zones=None): |
| 17 | + """ |
| 18 | + Creates random (but realistic) demand and supply levels for testing. |
| 19 | +
|
| 20 | + zones example: ["A1", "B2", "C3"] |
| 21 | + """ |
| 22 | + |
| 23 | + if zones is None: |
| 24 | + zones = ["A1", "B2", "C3"] |
| 25 | + |
| 26 | + while True: |
| 27 | + zone_id = random.choice(zones) |
| 28 | + |
| 29 | + demand = random.randint(1, 40) # number of riders requesting trips |
| 30 | + supply = random.randint(1, 25) # number of available drivers |
| 31 | + |
| 32 | + event = { |
| 33 | + "zone_id": zone_id, |
| 34 | + "demand": demand, |
| 35 | + "supply": supply |
| 36 | + } |
| 37 | + |
| 38 | + await self.event_bus.publish("zone_supply_demand", event) |
| 39 | + |
| 40 | + self.logger.info( |
| 41 | + f"[PRODUCER] Published supply/demand event: {event}" |
| 42 | + ) |
| 43 | + |
| 44 | + # control frequency of events |
| 45 | + await asyncio.sleep(1) |
| 46 | + |
| 47 | + async def start(self, zones=None): |
| 48 | + """ |
| 49 | + Start the producer loop. |
| 50 | + """ |
| 51 | + self.logger.info("PricingProducer started...") |
| 52 | + await self.generate_supply_demand(zones) |
0 commit comments