|
1 | 1 | import asyncio
|
2 |
| -from datetime import datetime, timezone |
| 2 | +from datetime import datetime, timedelta, timezone |
3 | 3 | from typing import Any
|
4 | 4 | from unittest.mock import MagicMock
|
5 | 5 |
|
6 | 6 | import pytest
|
7 | 7 | from faststream.utils.functions import timeout_scope
|
| 8 | +from freezegun import freeze_time |
8 | 9 | from taskiq import AsyncBroker, TaskiqScheduler
|
9 | 10 | from taskiq.cli.scheduler.args import SchedulerArgs
|
10 | 11 | from taskiq.cli.scheduler.run import run_scheduler
|
11 | 12 | from taskiq.schedule_sources import LabelScheduleSource
|
12 | 13 |
|
13 |
| -from taskiq_faststream import BrokerWrapper |
| 14 | +from taskiq_faststream import BrokerWrapper, StreamScheduler |
14 | 15 |
|
15 | 16 |
|
16 | 17 | @pytest.mark.anyio
|
@@ -67,3 +68,52 @@ async def handler(msg: str) -> None:
|
67 | 68 |
|
68 | 69 | mock.assert_called_once_with("Hi!")
|
69 | 70 | task.cancel()
|
| 71 | + |
| 72 | + async def test_task_multiple_schedules_by_cron( |
| 73 | + self, |
| 74 | + subject: str, |
| 75 | + broker: Any, |
| 76 | + event: asyncio.Event, |
| 77 | + ) -> None: |
| 78 | + """Test cron runs twice via StreamScheduler.""" |
| 79 | + received_message = [] |
| 80 | + |
| 81 | + @broker.subscriber(subject) |
| 82 | + async def handler(msg: str) -> None: |
| 83 | + received_message.append(msg) |
| 84 | + event.set() |
| 85 | + |
| 86 | + taskiq_broker = self.build_taskiq_broker(broker) |
| 87 | + |
| 88 | + taskiq_broker.task( |
| 89 | + "Hi!", |
| 90 | + **{self.subj_name: subject}, |
| 91 | + schedule=[ |
| 92 | + { |
| 93 | + "cron": "* * * * *", |
| 94 | + }, |
| 95 | + ], |
| 96 | + ) |
| 97 | + |
| 98 | + async with self.test_class(broker): |
| 99 | + with freeze_time("00:00:00", tick=True) as frozen_datetime: |
| 100 | + task = asyncio.create_task( |
| 101 | + run_scheduler( |
| 102 | + SchedulerArgs( |
| 103 | + scheduler=StreamScheduler( |
| 104 | + broker=taskiq_broker, |
| 105 | + sources=[LabelScheduleSource(taskiq_broker)], |
| 106 | + ), |
| 107 | + modules=[], |
| 108 | + ), |
| 109 | + ), |
| 110 | + ) |
| 111 | + |
| 112 | + await asyncio.wait_for(event.wait(), 2.0) |
| 113 | + event.clear() |
| 114 | + frozen_datetime.tick(timedelta(minutes=2)) |
| 115 | + await asyncio.wait_for(event.wait(), 2.0) |
| 116 | + |
| 117 | + task.cancel() |
| 118 | + |
| 119 | + assert received_message == ["Hi!", "Hi!"], received_message |
0 commit comments