Skip to content

Commit 1414e82

Browse files
authored
Update producer.py
1 parent cd4a953 commit 1414e82

File tree

1 file changed

+32
-29
lines changed

1 file changed

+32
-29
lines changed

producer.py

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,55 @@
11
import asyncio
22
import random
3-
from src.common.utils import get_logger
3+
from src.common.utils import get_logger, now_timestamp
44

55

6-
class PricingProducer:
6+
class DispatchProducer:
77
"""
8-
Publishes simulated supply/demand events for each zone.
9-
In production this could come from a real-time telemetry pipeline.
8+
Produces simulated TripRequest events and publishes them to the event bus.
9+
Used for:
10+
- Load testing
11+
- Demo scenarios
12+
- End-to-end pipeline validation
1013
"""
1114

1215
def __init__(self, event_bus):
1316
self.event_bus = event_bus
14-
self.logger = get_logger("PricingProducer")
17+
self.logger = get_logger("DispatchProducer")
1518

16-
async def generate_supply_demand(self, zones=None):
17-
"""
18-
Creates random (but realistic) demand and supply levels for testing.
19+
# ------------------------------------------------------------
20+
# Generate Random Trip Requests
21+
# ------------------------------------------------------------
1922

20-
zones example: ["A1", "B2", "C3"]
23+
async def generate_trip_requests(self):
24+
"""
25+
Simulates realistic rider pickup/dropoff coordinates.
26+
Coordinates centered loosely around NYC grid for demo.
2127
"""
22-
23-
if zones is None:
24-
zones = ["A1", "B2", "C3"]
2528

2629
while True:
27-
zone_id = random.choice(zones)
28-
29-
demand = random.randint(1, 40) # number of riders requesting trips
30-
supply = random.randint(1, 25) # number of available drivers
31-
3230
event = {
33-
"zone_id": zone_id,
34-
"demand": demand,
35-
"supply": supply
31+
"rider_id": f"rider_{random.randint(1000,9999)}",
32+
"pickup_lat": round(40.70 + random.random() * 0.06, 6),
33+
"pickup_lon": round(-74.01 + random.random() * 0.06, 6),
34+
"dropoff_lat": round(40.70 + random.random() * 0.06, 6),
35+
"dropoff_lon": round(-74.01 + random.random() * 0.06, 6),
36+
"timestamp": now_timestamp(),
3637
}
3738

38-
await self.event_bus.publish("zone_supply_demand", event)
39+
await self.event_bus.publish("trip_requests", event)
3940

40-
self.logger.info(
41-
f"[PRODUCER] Published supply/demand event: {event}"
42-
)
41+
self.logger.info(f"[PRODUCER] Published TripRequestEvent: {event}")
4342

44-
# control frequency of events
43+
# Frequency of trip events
4544
await asyncio.sleep(1)
4645

47-
async def start(self, zones=None):
46+
# ------------------------------------------------------------
47+
# Startup
48+
# ------------------------------------------------------------
49+
50+
async def start(self):
4851
"""
49-
Start the producer loop.
52+
Launches continuous trip request simulation.
5053
"""
51-
self.logger.info("PricingProducer started...")
52-
await self.generate_supply_demand(zones)
54+
self.logger.info("DispatchProducer started...")
55+
await self.generate_trip_requests()

0 commit comments

Comments
 (0)