Add new Scheduled Task to netbox plugin #14040
Unanswered
hamza-hadda
asked this question in
Help Wanted!
Replies: 2 comments 7 replies
-
https://docs.netbox.dev/en/stable/features/background-jobs/#scheduled-jobs |
Beta Was this translation helpful? Give feedback.
2 replies
-
This is done through the Job model as mentioned above, but the implementation of enqueuing a function isn't well documented. I have created the following example by reading the implementation of Netbox's own jobs Like the script implementation. If you want to run the job at an interval since Netbox Startup:Example of running the job on startup# example_plugin/__init__.py
from datetime import datetime, timezone
from extras.plugins import PluginConfig
class ExamplePlugin(PluginConfig):
name = 'example_plugin'
version = '0.1'
def ready(self):
super().ready()
from core.models import Job
from .models import ExampleModel
from .jobs import sync_discrepancies, ENQUEUED_STATUS
if Job.objects.filter(
name="Running sync for example Model",
status__in=ENQUEUED_STATUS,
scheduled__gt=datetime.now(timezone.utc),
).exists():
return
Job.enqueue(
sync_discrepancies,
instance=ExampleModel.objects.first(),
name="Running sync for example Model",
user=None,
interval=60,
schedule_at=datetime.now(timezone.utc),
) The implementation of the job function# example_plugin/jobs.py
from datetime import timedelta
from core.models import Job
import logging
from time import tzname
from core.choices import JobStatusChoices
logger = logging.getLogger(__name__)
ENQUEUED_STATUS = [
JobStatusChoices.STATUS_PENDING,
JobStatusChoices.STATUS_RUNNING,
JobStatusChoices.STATUS_SCHEDULED,
]
# https://github.com/netbox-community/netbox/blob/d2fee886001e4abbcb1b4d9ed3fd32521c820be9/netbox/extras/reports.py#L30
def example_job(job: Job, *args, **kwargs):
try:
job.start()
print("We are running the job!")
job.terminate()
except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
finally:
if job.interval:
new_scheduled_time = job.scheduled + timedelta(minutes=job.interval)
job.enqueue(
example_job,
instance=job.object,
name=job.name,
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
) |
Beta Was this translation helpful? Give feedback.
5 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
hey
I'm just get started with netbox , I'm developing a plugin in netbox and I want to add a daily task which run everyday , but I didn't find any configuration of a scheduled tasks for that , can you help to do that if you have any ideas
Beta Was this translation helpful? Give feedback.
All reactions