Replies: 1 comment
-
|
After calling class CreatedSchedule(Generic[_ReturnType]):
"""A schedule that has been created."""
def __init__(
self,
kicker: "AsyncKicker[Any,_ReturnType]",
source: ScheduleSource,
task: ScheduledTask,
) -> None:
self.kicker = kicker
self.source = source
self.task = task
self.schedule_id = task.schedule_idTo update schedule time you can use fields from this object to delete previous schedule and add a new one with different time: # It's a pseudocode to demonstrate this idea:
# 1) Create schedule source
schedule_source = ...
await schedule_source.startup()
# 2) Schedule task for the first time
created_schedule = await my_task.schedule_by_time(
schedule_source,
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=0),
)
# 3) Reschedule task
await schedule_source.delete_schedule(created_schedule.schedule_id)
new_scheduled_task = created_schedule.task
new_scheduled_task.time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=2, seconds=0)
await schedule_source.add_schedule(new_scheduled_task)This methods of schedule source are optional according to documentation, but they are implemented in most of them. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Hi guys!
I have dynamic scheduled tasks in my FastAPI project, and the project consists of the following steps:
When creating an object: task.schedule_by_time(time=<some_time>), but when attempting to update the object, the schedule time should also be updated.
How do I do this?
Thanks.
Beta Was this translation helpful? Give feedback.
All reactions