Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions packages/control/io_device.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, Union
from control import data
from control.limiting_value import LimitingValue
from typing import Dict, Optional, Tuple, Union
from control.limiting_value import LoadmanagementLimit
from helpermodules.constants import NO_ERROR
from modules.common.utils.component_parser import get_io_name_by_id
from modules.io_actions.controllable_consumers.dimming.api_eebus import DimmingEebus
from modules.io_actions.controllable_consumers.dimming.api_io import DimmingIo
from modules.io_actions.controllable_consumers.dimming_direct_control.api import DimmingDirectControl
Expand Down Expand Up @@ -63,19 +61,14 @@ def setup(self):
for action in self.actions.values():
action.setup()

def _check_fault_state_io_device(self, io_device: int) -> None:
if data.data.io_states[f"io_states{io_device}"].data.get.fault_state == 2:
raise ValueError(LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id(io_device)))

def dimming_get_import_power_left(self, device: Dict) -> Optional[float]:
def dimming_get_import_power_left(self, device: Dict) -> Tuple[Optional[float], LoadmanagementLimit]:
for action in self.actions.values():
if isinstance(action, (DimmingIo, DimmingEebus)):
for d in action.config.configuration.devices:
if device == d:
self._check_fault_state_io_device(action.config.configuration.io_device)
return action.dimming_get_import_power_left()
else:
return None
return None, LoadmanagementLimit(None, None)

def dimming_set_import_power_left(self, device: Dict, used_power: float) -> Optional[float]:
for action in self.actions.values():
Expand All @@ -84,31 +77,28 @@ def dimming_set_import_power_left(self, device: Dict, used_power: float) -> Opti
if d == device:
return action.dimming_set_import_power_left(used_power)

def dimming_via_direct_control(self, device: Dict) -> Optional[float]:
def dimming_via_direct_control(self, device: Dict) -> Tuple[Optional[float], LoadmanagementLimit]:
for action in self.actions.values():
if isinstance(action, DimmingDirectControl):
for d in action.config.configuration.devices:
if device == d:
self._check_fault_state_io_device(action.config.configuration.io_device)
return action.dimming_via_direct_control()
else:
return None
return None, LoadmanagementLimit(None, None)

def ripple_control_receiver(self, device: Dict) -> float:
def ripple_control_receiver(self, device: Dict) -> Tuple[float, LoadmanagementLimit]:
for action in self.actions.values():
if isinstance(action, RippleControlReceiver):
for d in action.config.configuration.devices:
if device == d:
self._check_fault_state_io_device(action.config.configuration.io_device)
return action.ripple_control_receiver()
else:
return 1
return 1, LoadmanagementLimit(None, None)

def stepwise_control(self, device_id: int) -> Optional[float]:
def stepwise_control(self, device_id: int) -> Tuple[Optional[float], LoadmanagementLimit]:
for action in self.actions.values():
if isinstance(action, (StepwiseControlEebus, StepwiseControlIo)):
if device_id in [component["id"] for component in action.config.configuration.devices]:
self._check_fault_state_io_device(action.config.configuration.io_device)
return action.control_stepwise()
else:
return None
return None, LoadmanagementLimit(None, None)
6 changes: 4 additions & 2 deletions packages/control/limiting_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ class LimitingValue(Enum):
DIMMING_VIA_DIRECT_CONTROL = ", da die Dimmung per Direkt-Steuerung die Ladeleistung auf 4,2 kW begrenzt."
RIPPLE_CONTROL_RECEIVER = (", da der Ladepunkt durch den RSE-Kontakt auf {}% der konfigurierten Anschlussleistung "
"reduziert wird.")
CONTROLLABLE_CONSUMERS_ERROR = (", da aufgrund eines Fehlers im IO-Gerät {} die steuerbaren Verbraucher nicht "
"gesteuert werden können. Bitte prüfe die Status-Seite.")
CONTROL_STEPWISE = "Leistung begrenzt auf {}%"
CONTROLLABLE_CONSUMERS_ERROR = (", da aufgrund eines Fehlers im IO-Gerät {} die steuerbaren Verbraucher nur "
"mit der minimalen Leistung betrieben werden können. Bitte prüfe die Status-Seite.")
MISSING_CONFIGURATION = ", da die Konfiguration für die Aktion unvollständig ist."


@dataclass
Expand Down
25 changes: 9 additions & 16 deletions packages/control/loadmanagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,35 +137,33 @@ def _limit_by_current(self,
def _limit_by_dimming_via_direct_control(self,
missing_currents: List[float],
cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]:
if data.data.io_actions.dimming_via_direct_control({"type": "cp", "id": cp.num}):
value, limit = data.data.io_actions.dimming_via_direct_control({"type": "cp", "id": cp.num})
if value is not None:
Comment on lines +140 to +141
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Die geänderte IO-Action-Integration (Tuple-Return mit LoadmanagementLimit) in _limit_by_dimming_via_direct_control, _limit_by_dimming und _limit_by_ripple_control_receiver ist aktuell nicht durch Unit-Tests abgedeckt (in loadmanagement_test.py werden nur _limit_by_power und _limit_by_current getestet). Bitte Tests ergänzen, die die neue Limit-Auswahl und die Fehler-/Failsafe-Pfade verifizieren.

Copilot uses AI. Check for mistakes.
phases = 3-missing_currents.count(0)
current_per_phase = 4200 / 230 / phases
available_currents = [current_per_phase -
cp.data.set.target_current if c > 0 else 0 for c in missing_currents]
log.debug(f"Dimmung per Direkt-Steuerung: {available_currents}A")
limit = LoadmanagementLimit(LimitingValue.DIMMING_VIA_DIRECT_CONTROL.value,
LimitingValue.DIMMING_VIA_DIRECT_CONTROL)
return available_currents, limit
else:
return missing_currents, LoadmanagementLimit(None, None)
available_currents = missing_currents
return available_currents, limit

def _limit_by_dimming(self,
available_currents: List[float],
cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]:
dimming_power_left = data.data.io_actions.dimming_get_import_power_left({"type": "cp", "id": cp.num})
dimming_power_left, limit = data.data.io_actions.dimming_get_import_power_left({"type": "cp", "id": cp.num})
if dimming_power_left:
if sum(available_currents)*230 > dimming_power_left:
phases = 3-available_currents.count(0)
overload_per_phase = (sum(available_currents) - dimming_power_left/230)/phases
available_currents = [c - overload_per_phase if c > 0 else 0 for c in available_currents]
log.debug(f"Reduzierung der Ströme durch die Dimmung: {available_currents}A")
return available_currents, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING)
return available_currents, LoadmanagementLimit(None, None)
return available_currents, limit

def _limit_by_ripple_control_receiver(self,
available_currents: List[float],
cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]:
value = data.data.io_actions.ripple_control_receiver({"type": "cp", "id": cp.num})
value, limit = data.data.io_actions.ripple_control_receiver({"type": "cp", "id": cp.num})
if value != 1:
phases = 3-available_currents.count(0)
if phases > 1:
Expand All @@ -174,12 +172,7 @@ def _limit_by_ripple_control_receiver(self,
max_current = cp.template.data.max_current_multi_phases
# target_current ist das Ergebnis der letzten Iteration. Die Differenz der begrenzten Anschlussleistung und
# der Sollstrom der letzten Iteration dürfen daher nicht größer sein als der aktuell fehlende Strom.
available_currents = [min(max_current*value - cp.data.set.target_current, c)
available_currents = [max(min(max_current*value - cp.data.set.target_current, c), 0)
if c > 0 else 0 for c in available_currents]
log.debug(f"Reduzierung durch RSE-Kontakt auf {value*100}%, maximal {max_current*value}A")
limit = LoadmanagementLimit(
LimitingValue.RIPPLE_CONTROL_RECEIVER.value.format(value*100),
LimitingValue.RIPPLE_CONTROL_RECEIVER)
return available_currents, limit
else:
return available_currents, LoadmanagementLimit(None, None)
return available_currents, limit
4 changes: 2 additions & 2 deletions packages/control/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def process_algorithm_results(self) -> None:
for d in action.config.configuration.devices:
if d["type"] == "io":
data.data.io_states[f"io_states{d['id']}"].data.set.digital_output[d["digital_output"]] = (
action.dimming_via_direct_control() is None # active output (True) if no dimming
action.dimming_via_direct_control()[0] is None # active output (True) if no dimming
)
if isinstance(action, DimmingIo):
for d in action.config.configuration.devices:
Expand All @@ -85,7 +85,7 @@ def process_algorithm_results(self) -> None:
if action.config.configuration.passthrough_enabled:
# find output pattern by value
for pattern in action.config.configuration.output_pattern:
if pattern["value"] == action.control_stepwise():
if pattern["value"] == action.control_stepwise()[0]:
# set digital outputs according to matching output_pattern
for output in pattern["matrix"].keys():
data.data.io_states[
Expand Down
14 changes: 5 additions & 9 deletions packages/control/pv_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,11 @@ def calc_power_for_all_components(self) -> None:
else:
if fault_state < module.data.get.fault_state:
fault_state = module.data.get.fault_state
limit_value = data.data.io_actions.stepwise_control(module.num)
if limit_value is not None and module.data.get.fault_state == 0:
msg = (
f"Leistung begrenzt auf {int(limit_value * 100)}%"
if limit_value < 1
else "Keine Leistungsbegrenzung aktiv."
)
module.data.get.fault_str = msg
Pub().pub(f"openWB/set/pv/{module.num}/get/fault_str", msg)
limit = data.data.io_actions.stepwise_control(module.num)[1]
if module.data.get.fault_state == 0:
# Fehlermeldung nicht überschreiben
module.data.get.fault_str = limit.message
Pub().pub(f"openWB/set/pv/{module.num}/get/fault_str", limit.message)
except Exception:
log.exception(f"Fehler im allgemeinen PV-Modul für pv{module.num}")
if fault_state == 0:
Expand Down
6 changes: 6 additions & 0 deletions packages/modules/io_actions/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from control import data
from modules.common.fault_state_level import FaultStateLevel


def check_fault_state_io_device(io_device: int) -> bool:
return data.data.io_states[f"io_states{io_device}"].data.get.fault_state == FaultStateLevel.ERROR
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
from typing import Optional, Tuple
from control import data
from control.limiting_value import LimitingValue, LoadmanagementLimit
from helpermodules import timecheck
from helpermodules.logger import ModifyLoglevelContext
from helpermodules.pub import Pub
from helpermodules.timecheck import create_timestamp
from dataclass_utils import asdict
from modules.common.abstract_io import AbstractIoAction
from modules.common.utils.component_parser import get_io_name_by_id
from modules.io_actions.common import check_fault_state_io_device
from modules.io_actions.controllable_consumers.dimming.config import DimmingSetup
from modules.io_devices.eebus.config import AnalogInputMapping, DigitalInputMapping

Expand Down Expand Up @@ -42,14 +46,17 @@ def setup(self) -> None:
log.debug(f"Dimmen: {self.import_power_left}W inkl. Überschuss")

with ModifyLoglevelContext(control_command_log, logging.DEBUG):
if self.dimming_active():
if self.dimming_active() or check_fault_state_io_device(self.config.configuration.io_device):
if self.timestamp is None:
Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp())
if check_fault_state_io_device(self.config.configuration.io_device):
control_command_log.info(
"Fehler des IO-Geräts: Dimmen aktiviert für Failsafe-Modus.")
control_command_log.info(f"Dimmen aktiviert. Übermittelter LPC-Wert: {lpc_value/1000}kWh. "
"Leistungswerte vor Ausführung des Steuerbefehls:")

msg = (f"EVU-Zähler: "
f"{data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()].data.get.powers}W")
evu_counter = data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()]
msg = f"EVU-Zähler: {evu_counter.data.get.powers}W, {evu_counter.data.get.power}W"
for device in self.config.configuration.devices:
if device["type"] == "cp":
cp = f"cp{device['id']}"
Expand All @@ -64,11 +71,16 @@ def setup(self) -> None:
Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None)
control_command_log.info("Dimmen deaktiviert.")

def dimming_get_import_power_left(self) -> None:
def dimming_get_import_power_left(self) -> Tuple[Optional[float], LoadmanagementLimit]:
if check_fault_state_io_device(self.config.configuration.io_device):
return (self.import_power_left, LoadmanagementLimit(
LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id(
self.config.configuration.io_device)),
LimitingValue.CONTROLLABLE_CONSUMERS_ERROR))
if self.dimming_active():
return self.import_power_left
return self.import_power_left, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING)
else:
return None
return None, LoadmanagementLimit(None, None)

def dimming_set_import_power_left(self, used_power: float) -> None:
self.import_power_left -= used_power
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
from typing import Optional, Tuple

from control import data
from control.limiting_value import LimitingValue, LoadmanagementLimit
from helpermodules.logger import ModifyLoglevelContext
from helpermodules.pub import Pub
from helpermodules.timecheck import create_timestamp
from dataclass_utils import asdict
from modules.common.abstract_io import AbstractIoAction
from modules.common.utils.component_parser import get_io_name_by_id
from modules.io_actions.common import check_fault_state_io_device
from modules.io_actions.controllable_consumers.dimming.config import DimmingSetup

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,14 +53,15 @@ def setup(self) -> None:
log.debug(f"Dimmen: {self.import_power_left}W inkl. Überschuss")

with ModifyLoglevelContext(control_command_log, logging.DEBUG):
if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[
self.dimming_input] == self.dimming_value:
if self.dimming_active() or check_fault_state_io_device(self.config.configuration.io_device):
if self.timestamp is None:
Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp())
if check_fault_state_io_device(self.config.configuration.io_device):
control_command_log.info("Fehler des IO-Geräts: Dimmen aktiviert für Failsafe-Modus.")
control_command_log.info("Dimmen aktiviert. Leistungswerte vor Ausführung des Steuerbefehls:")

msg = (f"EVU-Zähler: "
f"{data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()].data.get.powers}W")
evu_counter = data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()]
msg = f"EVU-Zähler: {evu_counter.data.get.powers}W, {evu_counter.data.get.power}W"
for device in self.config.configuration.devices:
if device["type"] == "cp":
cp = f"cp{device['id']}"
Expand All @@ -71,12 +76,17 @@ def setup(self) -> None:
Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None)
control_command_log.info("Dimmen deaktiviert.")

def dimming_get_import_power_left(self) -> None:
def dimming_get_import_power_left(self) -> Tuple[Optional[float], LoadmanagementLimit]:
if check_fault_state_io_device(self.config.configuration.io_device):
return (self.import_power_left, LoadmanagementLimit(
LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id(
self.config.configuration.io_device)),
LimitingValue.CONTROLLABLE_CONSUMERS_ERROR))
if self.dimming_active():
return self.import_power_left
return self.import_power_left, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING)
elif data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[
self.no_dimming_input] == self.no_dimming_value:
return None
return None, LoadmanagementLimit(None, None)
else:
raise Exception("Pattern passt nicht zur Dimmung.")

Expand Down
Loading
Loading