|
1 | | -import asyncio |
2 | | -from src.common.models import RideRequest, DriverLocation |
| 1 | +from src.pricing-service.pricing_engine import PricingEngine |
| 2 | +from src.common.models import PricingEvent |
3 | 3 | from src.common.utils import get_logger |
4 | | -from src.matching-service.matching_engine import MatchingEngine |
5 | 4 |
|
6 | 5 |
|
7 | | -class MatchingConsumer: |
| 6 | +class PricingConsumer: |
8 | 7 | """ |
9 | | - Consumes ride-request and driver-location events from the EventBus, |
10 | | - feeds them into MatchingEngine, and triggers match generation. |
| 8 | + Consumes supply/demand events for each zone, |
| 9 | + computes surge multipliers, and publishes PricingEvents. |
11 | 10 | """ |
12 | 11 |
|
13 | 12 | def __init__(self, event_bus): |
14 | 13 | self.event_bus = event_bus |
15 | | - self.engine = MatchingEngine() |
16 | | - self.logger = get_logger("MatchingConsumer") |
17 | | - |
18 | | - async def handle_ride_request(self, data: dict): |
19 | | - request = RideRequest(**data) |
20 | | - self.logger.info(f"Received ride request {request.request_id}") |
21 | | - |
22 | | - match_result = self.engine.find_best_driver(request) |
23 | | - |
24 | | - if match_result: |
25 | | - self.logger.info( |
26 | | - f"Matched request {request.request_id} " |
27 | | - f"with driver {match_result.driver_id}" |
28 | | - ) |
29 | | - # Publish match result to event bus |
30 | | - await self.event_bus.publish("match_results", match_result.dict()) |
31 | | - else: |
32 | | - self.logger.warning( |
33 | | - f"No drivers available for request {request.request_id}" |
34 | | - ) |
35 | | - |
36 | | - async def handle_driver_update(self, data: dict): |
37 | | - driver = DriverLocation(**data) |
38 | | - self.logger.info(f"Updated driver {driver.driver_id} location") |
39 | | - self.engine.update_driver_location(driver) |
| 14 | + self.engine = PricingEngine() |
| 15 | + self.logger = get_logger("PricingConsumer") |
40 | 16 |
|
41 | | - async def start(self): |
| 17 | + async def handle_supply_demand(self, data: dict): |
42 | 18 | """ |
43 | | - Subscribes to ride request + driver updates. |
44 | | - Two streams: |
45 | | - - ride_requests |
46 | | - - driver_updates |
| 19 | + Example incoming event: |
| 20 | + { |
| 21 | + "zone_id": "A1", |
| 22 | + "demand": 15, |
| 23 | + "supply": 4 |
| 24 | + } |
47 | 25 | """ |
| 26 | + zone_id = data["zone_id"] |
| 27 | + demand = data["demand"] |
| 28 | + supply = data["supply"] |
| 29 | + |
| 30 | + pricing_event: PricingEvent = self.engine.compute_surge( |
| 31 | + demand=demand, |
| 32 | + supply=supply, |
| 33 | + zone_id=zone_id |
| 34 | + ) |
48 | 35 |
|
49 | | - self.logger.info("MatchingConsumer started. Subscribing to topics...") |
| 36 | + # Publish surge multiplier downstream |
| 37 | + await self.event_bus.publish("surge_updates", pricing_event.dict()) |
50 | 38 |
|
51 | | - await asyncio.gather( |
52 | | - self.event_bus.subscribe( |
53 | | - "ride_requests", self.handle_ride_request |
54 | | - ), |
55 | | - self.event_bus.subscribe( |
56 | | - "driver_updates", self.handle_driver_update |
57 | | - ), |
| 39 | + self.logger.info( |
| 40 | + f"Published surge update for zone {zone_id}: {pricing_event.surge_multiplier}" |
58 | 41 | ) |
| 42 | + |
| 43 | + async def start(self): |
| 44 | + """ |
| 45 | + Subscribe to supply/demand updates. |
| 46 | + """ |
| 47 | + self.logger.info("PricingConsumer started. Listening for supply/demand events...") |
| 48 | + await self.event_bus.subscribe("zone_supply_demand", self.handle_supply_demand) |
0 commit comments