|
6 | 6 | import dataclasses |
7 | 7 | import logging |
8 | 8 | from collections.abc import Mapping |
| 9 | +from datetime import datetime, timezone |
9 | 10 | from functools import cached_property |
10 | 11 | from typing import Any, Self |
11 | 12 |
|
@@ -97,16 +98,24 @@ def __new__(cls, *args: Any, **kwargs: Any) -> Self: |
97 | 98 | raise TypeError(f"Cannot instantiate {cls.__name__} directly") |
98 | 99 | return super().__new__(cls) |
99 | 100 |
|
100 | | - @cached_property |
101 | | - def active(self) -> bool: |
102 | | - """Whether this component is currently active.""" |
| 101 | + def active_at(self, timestamp: datetime) -> bool: |
| 102 | + """Check whether this component is active at a specific timestamp.""" |
103 | 103 | if self.status is ComponentStatus.UNSPECIFIED: |
104 | | - # Because this is a cached property, the warning will only be logged once. |
105 | 104 | _logger.warning( |
106 | 105 | "Component %s has an unspecified status. Assuming it is active.", |
107 | 106 | self, |
108 | 107 | ) |
109 | | - return self.status in (ComponentStatus.ACTIVE, ComponentStatus.UNSPECIFIED) |
| 108 | + return self.operational_lifetime.active_at(timestamp) |
| 109 | + |
| 110 | + return ( |
| 111 | + self.status is ComponentStatus.ACTIVE |
| 112 | + and self.operational_lifetime.active_at(timestamp) |
| 113 | + ) |
| 114 | + |
| 115 | + @cached_property |
| 116 | + def active(self) -> bool: |
| 117 | + """Whether this component is currently active.""" |
| 118 | + return self.active_at(datetime.now(timezone.utc)) |
110 | 119 |
|
111 | 120 | def __str__(self) -> str: |
112 | 121 | """Return a human-readable string representation of this instance.""" |
|
0 commit comments