Skip to content

Commit 5bb91a7

Browse files
authored
Create consumer.py
1 parent a0285ef commit 5bb91a7

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

consumer.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
from src.common.models import RideRequest, DriverLocation
3+
from src.common.utils import get_logger
4+
from src.matching-service.matching_engine import MatchingEngine
5+
6+
7+
class MatchingConsumer:
8+
"""
9+
Consumes ride-request and driver-location events from the EventBus,
10+
feeds them into MatchingEngine, and triggers match generation.
11+
"""
12+
13+
def __init__(self, event_bus):
14+
self.event_bus = event_bus
15+
self.engine = MatchingEngine()
16+
self.logger = get_logger("MatchingConsumer")
17+
18+
async def handle_ride_request(self, data: dict):
19+
request = RideRequest(**data)
20+
self.logger.info(f"Received ride request {request.request_id}")
21+
22+
match_result = self.engine.find_best_driver(request)
23+
24+
if match_result:
25+
self.logger.info(
26+
f"Matched request {request.request_id} "
27+
f"with driver {match_result.driver_id}"
28+
)
29+
# Publish match result to event bus
30+
await self.event_bus.publish("match_results", match_result.dict())
31+
else:
32+
self.logger.warning(
33+
f"No drivers available for request {request.request_id}"
34+
)
35+
36+
async def handle_driver_update(self, data: dict):
37+
driver = DriverLocation(**data)
38+
self.logger.info(f"Updated driver {driver.driver_id} location")
39+
self.engine.update_driver_location(driver)
40+
41+
async def start(self):
42+
"""
43+
Subscribes to ride request + driver updates.
44+
Two streams:
45+
- ride_requests
46+
- driver_updates
47+
"""
48+
49+
self.logger.info("MatchingConsumer started. Subscribing to topics...")
50+
51+
await asyncio.gather(
52+
self.event_bus.subscribe(
53+
"ride_requests", self.handle_ride_request
54+
),
55+
self.event_bus.subscribe(
56+
"driver_updates", self.handle_driver_update
57+
),
58+
)

0 commit comments

Comments
 (0)