-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery_tasks.py
More file actions
executable file
·176 lines (148 loc) · 5.18 KB
/
celery_tasks.py
File metadata and controls
executable file
·176 lines (148 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import json
import logging
from datetime import datetime, timedelta
from os import getenv, path
import django
import sh
from celery import Celery
from tenacity import Retrying, stop_after_attempt, wait_fixed
try:
django.setup()
# Place imports that uses Django in this block.
from lib import diagnostics
from lib.utils import (
connect_to_redis,
is_balena_app,
reboot_via_balena_supervisor,
shutdown_via_balena_supervisor,
)
except Exception:
pass
__author__ = 'Screenly, Inc'
__copyright__ = 'Copyright 2012-2024, Screenly, Inc'
__license__ = 'Dual License: GPLv2 and Commercial License'
CELERY_RESULT_BACKEND = getenv(
'CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'
)
CELERY_BROKER_URL = getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_TASK_RESULT_EXPIRES = timedelta(hours=6)
r = connect_to_redis()
celery = Celery(
'Anthias Celery Worker',
backend=CELERY_RESULT_BACKEND,
broker=CELERY_BROKER_URL,
result_expires=CELERY_TASK_RESULT_EXPIRES,
)
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls cleanup() every hour.
sender.add_periodic_task(3600, cleanup.s(), name='cleanup')
sender.add_periodic_task(
60 * 5, get_display_power.s(), name='display_power'
)
sender.add_periodic_task(
60, enforce_display_schedule.s(), name='display_schedule'
)
@celery.task(time_limit=30)
def get_display_power():
r.set('display_power', diagnostics.get_display_power())
r.expire('display_power', 3600)
@celery.task
def cleanup():
sh.find(
path.join(getenv('HOME'), 'screenly_assets'),
'-name',
'*.tmp',
'-delete',
)
@celery.task
def reboot_anthias():
"""
Background task to reboot Anthias
"""
if is_balena_app():
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
):
with attempt:
reboot_via_balena_supervisor()
else:
r.publish('hostcmd', 'reboot')
@celery.task
def shutdown_anthias():
"""
Background task to shutdown Anthias
"""
if is_balena_app():
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
):
with attempt:
shutdown_via_balena_supervisor()
else:
r.publish('hostcmd', 'shutdown')
@celery.task(time_limit=30)
def enforce_display_schedule():
"""
Reads display_power_schedule from settings and sends CEC
power_on/standby commands when the desired state changes.
"""
from settings import settings as app_settings
try:
app_settings.load()
raw = app_settings.get('display_power_schedule', '')
if not raw:
return
schedule = json.loads(raw)
if not isinstance(schedule, dict) or not schedule.get('enabled'):
return
days = schedule.get('days', {})
now = datetime.now()
day_key = str(now.isoweekday()) # 1=Mon .. 7=Sun
day_cfg = days.get(day_key)
if day_cfg is None:
# null = screen off all day
desired_on = False
else:
on_time = day_cfg.get('on', '00:00')
off_time = day_cfg.get('off', '23:59')
current_time = now.strftime('%H:%M')
if on_time <= off_time:
# Normal: e.g. 08:00 - 22:00
desired_on = on_time <= current_time < off_time
else:
# Overnight: e.g. 22:00 - 06:00
desired_on = current_time >= on_time or current_time < off_time
# Check if state changed since last enforcement
desired_str = '1' if desired_on else '0'
last_desired = r.get('display_schedule_desired')
if last_desired is not None:
last_desired = last_desired.decode() if isinstance(last_desired, bytes) else str(last_desired)
if last_desired == desired_str:
return # No change needed
result = diagnostics.set_display_power(desired_on)
if not result:
# CEC failed — try IR fallback
ir_enabled = app_settings.get('ir_enabled', False)
ir_protocol = app_settings.get('ir_protocol', '')
ir_scancode = app_settings.get('ir_power_scancode', '')
if ir_enabled and ir_protocol and ir_scancode:
from viewer.ir_controller import IrController
ir = IrController()
if ir._available:
sent = ir.send_power(ir_protocol, ir_scancode)
if sent:
result = True
logging.info('Display schedule: IR fallback sent %s:%s', ir_protocol, ir_scancode)
if result:
r.set('display_schedule_desired', desired_str, ex=120)
logging.info(
'Display schedule: turned %s (day=%s, time=%s)',
'ON' if desired_on else 'OFF', day_key, now.strftime('%H:%M')
)
except (json.JSONDecodeError, TypeError, KeyError) as e:
logging.warning('Display schedule error: %s', e)
except Exception as e:
logging.error('Display schedule unexpected error: %s', e)