Skip to content

Commit c02b016

Browse files
committed
Replace asyncmy with aiomysql for database connection and enhance data consistency in stream processing
1 parent 5eae73d commit c02b016

File tree

1 file changed

+51
-18
lines changed

1 file changed

+51
-18
lines changed

main.py

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from multiprocessing import Process
1414
from typing import TypedDict
1515

16-
from asyncmy import create_pool
16+
import aiomysql
1717
from dotenv import load_dotenv
1818
from sqlalchemy.engine.url import make_url
1919

@@ -87,17 +87,17 @@ async def _ensure_db_pool(self) -> None:
8787
'DATABASE_URL environment variable is required',
8888
)
8989
url = make_url(database_url)
90-
self.db_pool = await create_pool(
90+
self.db_pool = await aiomysql.create_pool(
9191
host=url.host,
9292
port=url.port or 3306,
9393
user=url.username,
9494
password=url.password,
9595
db=url.database,
96-
minsize=2, # Minimum connections
97-
maxsize=10, # Maximum connections
98-
pool_recycle=3600, # 1 hour connection recycling
96+
minsize=2,
97+
maxsize=10,
98+
pool_recycle=3600,
9999
autocommit=True,
100-
echo=False, # Disable SQL logging for performance
100+
echo=False,
101101
)
102102

103103
async def fetch_stream_configs(self) -> list[StreamConfig]:
@@ -370,11 +370,19 @@ async def _run_single_stream(cfg: StreamConfig) -> None:
370370
work_start_hour <= detection_time.hour < work_end_hour
371371
)
372372

373+
# Create a copy of the current frame to ensure data consistency
374+
# Use copy() to avoid any reference issues across async operations
375+
current_frame = frame.copy()
376+
current_timestamp = int(ts)
377+
378+
# Generate detections with the frame copy
373379
datas, track_data = (
374380
await live_stream_detector.generate_detections(
375-
frame,
381+
current_frame,
376382
)
377383
)
384+
385+
# Generate danger detection results
378386
warnings, cone_polys, pole_polys = (
379387
danger_detector.detect_danger(
380388
track_data,
@@ -384,14 +392,16 @@ async def _run_single_stream(cfg: StreamConfig) -> None:
384392
warnings, is_working,
385393
)
386394

387-
# Use optimized frame encoding (JPEG for better compression)
388-
frame_bytes = Utils.encode_frame(frame, 'jpeg', 85)
395+
# Use optimized frame encoding with the same frame used for
396+
# detection. This ensures the image bytes correspond exactly
397+
# to the detection data.
398+
frame_bytes = Utils.encode_frame(current_frame, 'jpeg', 85)
389399

390400
# Optionally stream result to backend using optimised transmission
391401
if store_in_redis:
392402
try:
393403
result = await frame_sender.send_optimized_frame(
394-
frame=frame,
404+
frame=current_frame,
395405
site=site,
396406
stream_name=stream_name,
397407
encoding_format='jpeg',
@@ -400,7 +410,7 @@ async def _run_single_stream(cfg: StreamConfig) -> None:
400410
warnings_json=json.dumps(warnings),
401411
cone_polygons_json=json.dumps(cone_polys),
402412
pole_polygons_json=json.dumps(pole_polys),
403-
detection_items_json=json.dumps(datas),
413+
detection_items_json=json.dumps(track_data),
404414
)
405415

406416
# Check send result
@@ -415,19 +425,36 @@ async def _run_single_stream(cfg: StreamConfig) -> None:
415425
print(f"[{site}:{stream_name}] Frame send error: {e}")
416426

417427
# Send violation record + FCM push if needed
428+
# Ensure all data corresponds to the same frame and timestamp
418429
if warnings and Utils.should_notify(
419-
int(ts),
430+
current_timestamp,
420431
last_notification_time,
421432
):
433+
# Create deep copies of all detection data to ensure
434+
# complete consistency. This prevents any race conditions
435+
# or data corruption in async operations.
436+
current_warnings = json.loads(json.dumps(warnings))
437+
current_datas = json.loads(
438+
json.dumps(
439+
track_data,
440+
),
441+
) if track_data else []
442+
current_cone_polys = json.loads(
443+
json.dumps(cone_polys),
444+
) if cone_polys else []
445+
current_pole_polys = json.loads(
446+
json.dumps(pole_polys),
447+
) if pole_polys else []
448+
422449
violation_id_str = await violation_sender.send_violation(
423450
site=site,
424451
stream_name=stream_name,
425-
warnings_json=json.dumps(warnings),
452+
warnings_json=json.dumps(current_warnings),
426453
detection_time=detection_time,
427454
image_bytes=frame_bytes,
428-
detections_json=json.dumps(datas),
429-
cone_polygon_json=json.dumps(cone_polys),
430-
pole_polygon_json=json.dumps(pole_polys),
455+
detections_json=json.dumps(current_datas),
456+
cone_polygon_json=json.dumps(current_cone_polys),
457+
pole_polygon_json=json.dumps(current_pole_polys),
431458
)
432459
# Try to convert violation_id to int, else None
433460
try:
@@ -442,11 +469,11 @@ async def _run_single_stream(cfg: StreamConfig) -> None:
442469
await fcm_sender.send_fcm_message_to_site(
443470
site=site,
444471
stream_name=stream_name,
445-
message=warnings,
472+
message=current_warnings,
446473
image_path=None,
447474
violation_id=violation_id,
448475
)
449-
last_notification_time = int(ts)
476+
last_notification_time = current_timestamp
450477

451478
# Dynamically adjust processing interval
452479
proc_time = time.time() - start
@@ -538,3 +565,9 @@ async def main() -> None:
538565
if __name__ == '__main__':
539566
multiprocessing.set_start_method('spawn', force=True)
540567
asyncio.run(main())
568+
569+
"""
570+
python main.py --poll 15
571+
572+
uv run python main.py --poll 15
573+
"""

0 commit comments

Comments
 (0)