|
1 | | -import logging |
2 | | -import time |
3 | 1 |
|
4 | | -import swapper |
5 | | -from celery import current_app, shared_task |
6 | | -from celery.exceptions import SoftTimeLimitExceeded |
7 | | -from django.core.exceptions import ObjectDoesNotExist |
8 | | -from django.utils.translation import gettext_lazy as _ |
9 | | -from swapper import load_model |
10 | | - |
11 | | -from . import settings as app_settings |
12 | | -from .connectors.exceptions import CommandTimeoutException |
13 | | -from .exceptions import NoWorkingDeviceConnectionError |
14 | | - |
15 | | -logger = logging.getLogger(__name__) |
16 | | -_TASK_NAME = "openwisp_controller.connection.tasks.update_config" |
17 | | - |
18 | | - |
19 | | -def _is_update_in_progress(device_id, current_task_id=None): |
20 | | - active = current_app.control.inspect().active() |
21 | | - if not active: |
22 | | - return False |
23 | | - # check if there's any other running task before adding it |
24 | | - # exclude the current task by comparing task IDs |
25 | | - for task_list in active.values(): |
26 | | - for task in task_list: |
27 | | - if ( |
28 | | - task["name"] == _TASK_NAME |
29 | | - and str(device_id) in task["args"] |
30 | | - and task["id"] != current_task_id |
31 | | - ): |
32 | | - return True |
33 | | - return False |
34 | | - |
35 | | - |
36 | | -@shared_task(bind=True) |
37 | | -def update_config(self, device_id): |
38 | | - """ |
39 | | - Launches the ``update_config()`` operation |
40 | | - of a specific device in the background |
41 | | - """ |
42 | | - Device = swapper.load_model(*swapper.split(app_settings.UPDATE_CONFIG_MODEL)) |
43 | | - DeviceConnection = swapper.load_model("connection", "DeviceConnection") |
44 | | - # wait for the saving operations of this device to complete |
45 | | - # (there may be multiple ones happening at the same time) |
46 | | - time.sleep(2) |
47 | | - try: |
48 | | - device = Device.objects.select_related("config").get(pk=device_id) |
49 | | - # abort operation if device shouldn't be updated |
50 | | - if not device.can_be_updated(): |
51 | | - logger.info(f"{device} (pk: {device_id}) is not going to be updated") |
52 | | - return |
53 | | - except ObjectDoesNotExist as e: |
54 | | - logger.warning(f'update_config("{device_id}") failed: {e}') |
55 | | - return |
56 | | - if _is_update_in_progress(device_id, current_task_id=self.request.id): |
57 | | - return |
58 | | - try: |
59 | | - device_conn = DeviceConnection.get_working_connection(device) |
60 | | - except NoWorkingDeviceConnectionError: |
61 | | - return |
62 | | - else: |
63 | | - logger.info(f"Updating {device} (pk: {device_id})") |
64 | | - |
65 | | - try: |
66 | | - device_conn.update_config() |
67 | | - except Exception as e: |
68 | | - logger.error(f"update_config failed for device {device_id}: {e}") |
69 | | - raise |
70 | | - finally: |
71 | | - # ensure connection is closed |
72 | | - close_method = getattr(device_conn, "close", None) |
73 | | -if callable(close_method): |
74 | | - try: |
75 | | - close_method() |
76 | | - try: |
77 | | - device_conn.close() |
78 | | - except Exception as close_err: |
79 | | - logger.warning(f"Error closing connection: {close_err}") |
80 | | - |
81 | | - |
82 | | -# task timeout is SSH_COMMAND_TIMEOUT plus a 20% margin |
83 | | -@shared_task(soft_time_limit=app_settings.SSH_COMMAND_TIMEOUT * 1.2) |
84 | | -def launch_command(command_id): |
85 | | - """ |
86 | | - Launches execution of commands in the background |
87 | | - """ |
88 | | - Command = load_model("connection", "Command") |
89 | | - try: |
90 | | - command = Command.objects.get(pk=command_id) |
91 | | - except Command.DoesNotExist as e: |
92 | | - logger.warning(f'launch_command("{command_id}") failed: {e}') |
93 | | - return |
94 | | - try: |
95 | | - command.execute() |
96 | | - except SoftTimeLimitExceeded: |
97 | | - command.status = "failed" |
98 | | - command._add_output(_("Background task time limit exceeded.")) |
99 | | - command.save() |
100 | | - except CommandTimeoutException as e: |
101 | | - command.status = "failed" |
102 | | - command._add_output(_(f"The command took longer than expected: {e}")) |
103 | | - command.save() |
104 | | - except Exception as e: |
105 | | - logger.exception( |
106 | | - f"An exception was raised while executing command {command_id}" |
107 | | - ) |
108 | | - command.status = "failed" |
109 | | - command._add_output(_(f"Internal system error: {e}")) |
110 | | - command.save() |
111 | | - |
112 | | - |
113 | | -@shared_task(soft_time_limit=3600) |
114 | | -def auto_add_credentials_to_devices(credential_id, organization_id): |
115 | | - Credentials = load_model("connection", "Credentials") |
116 | | - Credentials.auto_add_to_devices(credential_id, organization_id) |
0 commit comments