Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
---------

0.21.0 (unreleased)
+++++++++++++++++++

Features:

- Rework of the tasks feature:
- All scheduled tasks share the same interface and campaign association table
- Tasks can now be called asynchronously anytime

0.20.0 (2025-01-03)
+++++++++++++++++++

Expand Down
8 changes: 3 additions & 5 deletions src/bemserver_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
input_output, # noqa
model,
plugins,
scheduled_tasks,
settings,
tasks, # noqa
utils,
)
from bemserver_core.celery import celery as celery_app
Expand All @@ -26,13 +26,10 @@

class BEMServerCore:
def __init__(self):
self.auth_model_classes = (
model.AUTH_MODEL_CLASSES + scheduled_tasks.AUTH_MODEL_CLASSES
)
self.auth_model_classes = model.AUTH_MODEL_CLASSES
self.auth_polar_files = [
authorization.AUTH_POLAR_FILE,
model.AUTH_POLAR_FILE,
scheduled_tasks.AUTH_POLAR_FILE,
]

# Load config
Expand Down Expand Up @@ -68,6 +65,7 @@ def __init__(self):
ems.init_core(self)

# Configure Celery
celery_app.set_default()
celery_app.conf.update(self.config["CELERY_CONFIG"])

# Load plugins
Expand Down
82 changes: 78 additions & 4 deletions src/bemserver_core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from celery.exceptions import WorkerShutdown
from celery.utils.log import get_task_logger

from bemserver_core.authorization import OpenBar
from bemserver_core import model
from bemserver_core.authorization import CurrentUser, OpenBar
from bemserver_core.database import db
from bemserver_core.exceptions import BEMServerCoreSettingsError
from bemserver_core.exceptions import BEMServerCoreSettingsError, BEMServerCoreTaskError

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -43,7 +44,7 @@
db.session.remove()


class BEMServerCoreTask(Task):
class BEMServerCoreSystemTask(Task):
"""Celery Task override

- Wrap tasks in OpenBar context
Expand All @@ -55,14 +56,87 @@
self.run = task_wrapper(self.run)


class BEMServerCoreClassBasedTaskMixin:
@property
def name(self):
return self.__class__.__name__


class BEMServerCoreAsyncTask(BEMServerCoreClassBasedTaskMixin, Task):
TASK_FUNCTION = None
DEFAULT_PARAMETERS = {}

def run(self, user_id, campaign_id, start_td, end_dt, **kwargs):
logger.info("Start")

Check warning on line 70 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L70

Added line #L70 was not covered by tests

with OpenBar():
user = model.User.get_by_id(user_id)

Check warning on line 73 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L72-L73

Added lines #L72 - L73 were not covered by tests
if user is None:
raise BEMServerCoreTaskError(f"Unknown user ID {user_id}")

Check warning on line 75 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L75

Added line #L75 was not covered by tests

with CurrentUser(user):
campaign = model.Campaign.get_by_id(campaign_id)

Check warning on line 78 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L77-L78

Added lines #L77 - L78 were not covered by tests
if campaign is None:
raise BEMServerCoreTaskError(f"Unknown campaign ID {campaign_id}")

Check warning on line 80 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L80

Added line #L80 was not covered by tests

# Function is bound at init. Use __func__ to avoid passing self
self.TASK_FUNCTION.__func__(

Check warning on line 83 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L83

Added line #L83 was not covered by tests
campaign,
start_td,
end_dt,
**{**self.DEFAULT_PARAMETERS, **kwargs},
)


class BEMServerCoreScheduledTask(
BEMServerCoreClassBasedTaskMixin, BEMServerCoreSystemTask
):
TASK_FUNCTION = None
DEFAULT_PARAMETERS = {}

def run(self):
logger.info("Start")

Check warning on line 98 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L98

Added line #L98 was not covered by tests

for tbc in model.TaskByCampaign.get(task_name=self.name, is_enabled=True):
start_dt, end_dt = tbc.make_interval()

Check warning on line 101 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L101

Added line #L101 was not covered by tests

# Function is bound at init. Use __func__ to avoid passing self
self.TASK_FUNCTION.__func__(

Check warning on line 104 in src/bemserver_core/celery.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/celery.py#L104

Added line #L104 was not covered by tests
tbc.campaign,
start_dt,
end_dt,
**{**self.DEFAULT_PARAMETERS, **tbc.parameters},
)


class BEMServerCoreCelery(Celery):
"""Celery app class override

In case we need to override someday so we don't have to fix imports everywhere
"""

SCHEDULED_TASKS_NAME_SUFFIX = "Scheduled"

def register_task(self, task, **options):
"""Register task

When registering an AsyncTask, also register corresponding Scheduled task
"""
task = super().register_task(task, **options)
if isinstance(task, BEMServerCoreAsyncTask):
scheduled_task = type(
f"{task.__name__}{self.SCHEDULED_TASKS_NAME_SUFFIX}",
(BEMServerCoreScheduledTask,),
{
"TASK_FUNCTION": task.TASK_FUNCTION,
"DEFAULT_PARAMETERS": task.DEFAULT_PARAMETERS,
},
)
super().register_task(scheduled_task, **options)
return task


celery = BEMServerCoreCelery("BEMServer Core", task_cls=BEMServerCoreTask)
celery = BEMServerCoreCelery("BEMServer Core")
celery.config_from_object(DefaultCeleryConfig)


Expand Down
4 changes: 2 additions & 2 deletions src/bemserver_core/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import smtplib
from email.message import EmailMessage

from bemserver_core.celery import celery, logger
from bemserver_core.celery import BEMServerCoreSystemTask, celery, logger


class EmailSender:
Expand Down Expand Up @@ -39,7 +39,7 @@ def send(self, dest_addrs, subject, content):
ems = EmailSender()


@celery.task(name="Email")
@celery.task(name="Email", base=BEMServerCoreSystemTask)
def send_email(dest_addrs, subject, content):
"""Send message in a task"""
logger.info("Send email to %", dest_addrs)
Expand Down
4 changes: 4 additions & 0 deletions src/bemserver_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,7 @@ class BEMServerCoreEnergyPowerProcessError(BEMServerCoreError):

class BEMServerCoreEnergyPowerProcessMissingIntervalError(BEMServerCoreError):
"""Missing timeseries interval property"""


class BEMServerCoreScheduledTaskParametersError(BEMServerCoreError):
"""Error in scheduled task parameter"""
76 changes: 76 additions & 0 deletions src/bemserver_core/input_output/timeseries_data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,82 @@

return data_df

@classmethod
def get_timeseries_aggregate_data(
cls,
start_dt,
end_dt,
timeseries,
data_state,
*,
agg="avg",
inclusive="left",
col_label="id",
):
"""Export aggregated timeseries data

:param datetime start_dt: Time interval lower bound (tz-aware)
:param datetime end_dt: Time interval exclusive upper bound (tz-aware)
:param list timeseries: List of timeseries
:param TimeseriesDataState data_state: Timeseries data state
:param str agg: Aggreagation method.
Must be "avg", "min", "max" or "count". Default: "avg".
:param str inclusive: Whether to set each bound as closed or open.
Must be "both", "neither", "left" or "right". Default: "left".
:param string col_label: Timeseries attribute to use as key in returned dict.
Should be "id" or "name". Default: "id".

Returns a dataframe.
"""
for ts in timeseries:
auth.authorize(get_current_user(), "read_data", ts)

if agg == "avg":
agg_func = sqla.func.avg(TimeseriesData.value)
elif agg == "min":
agg_func = sqla.func.min(TimeseriesData.value)
elif agg == "max":
agg_func = sqla.func.max(TimeseriesData.value)
elif agg == "count":
agg_func = sqla.func.count(TimeseriesData.value)

stmt = (
sqla.select(Timeseries.id, Timeseries.name, agg_func)
.filter(
TimeseriesData.timeseries_by_data_state_id == TimeseriesByDataState.id
)
.filter(TimeseriesByDataState.timeseries_id == Timeseries.id)
.filter(TimeseriesByDataState.data_state_id == data_state.id)
.filter(TimeseriesByDataState.timeseries_id.in_(ts.id for ts in timeseries))
.group_by(Timeseries.id)
)
if start_dt:
if inclusive in {"both", "left"}:
stmt = stmt.filter(start_dt <= TimeseriesData.timestamp)
else:
stmt = stmt.filter(start_dt < TimeseriesData.timestamp)

Check warning on line 602 in src/bemserver_core/input_output/timeseries_data_io.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/input_output/timeseries_data_io.py#L602

Added line #L602 was not covered by tests
if end_dt:
if inclusive in {"both", "right"}:
stmt = stmt.filter(TimeseriesData.timestamp <= end_dt)

Check warning on line 605 in src/bemserver_core/input_output/timeseries_data_io.py

View check run for this annotation

Codecov / codecov/patch

src/bemserver_core/input_output/timeseries_data_io.py#L605

Added line #L605 was not covered by tests
else:
stmt = stmt.filter(TimeseriesData.timestamp < end_dt)

ts_counts = {
ts_id if col_label == "id" else ts_name: count
for ts_id, ts_name, count in db.session.execute(stmt).all()
}

data_df = pd.DataFrame.from_dict(
ts_counts, orient="index", columns=(agg,)
).reindex(getattr(ts, col_label) for ts in timeseries)
data_df.index.name = col_label
if agg == "count":
data_df["count"] = data_df["count"].fillna(0).astype(int)
else:
data_df[agg] = data_df[agg].astype(float)

return data_df

@classmethod
def delete(cls, start_dt, end_dt, timeseries, data_state):
"""Delete timeseries data
Expand Down
Loading
Loading