diff --git a/onvif/client.py b/onvif/client.py index 093d732..84ffde5 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -26,7 +26,7 @@ from .managers import NotificationManager, PullPointManager from .settings import DEFAULT_SETTINGS from .transport import ASYNC_TRANSPORT -from .types import FastDateTime +from .types import FastDateTime, ForgivingTime from .util import create_no_verify_ssl_context, normalize_url, path_isfile, utcnow from .wrappers import retry_connection_error # noqa: F401 from .wsa import WsAddressingIfMissingPlugin @@ -129,8 +129,14 @@ def _load_document() -> DocumentWithDeferredLoad: schema = document.types.documents.get_by_namespace( "http://www.w3.org/2001/XMLSchema", False )[0] + logger.debug("Overriding default datetime type to use FastDateTime") instance = FastDateTime(is_global=True) schema.register_type(FastDateTime._default_qname, instance) + + logger.debug("Overriding default time type to use ForgivingTime") + instance = ForgivingTime(is_global=True) + schema.register_type(ForgivingTime._default_qname, instance) + document.types.add_documents([None], url) # Perform the original load document.original_load(url) diff --git a/onvif/types.py b/onvif/types.py index 3cd9dc9..c6a0d2f 100644 --- a/onvif/types.py +++ b/onvif/types.py @@ -1,7 +1,46 @@ """ONVIF types.""" +from datetime import datetime, timedelta, time import ciso8601 -from zeep.xsd.types.builtins import DateTime, treat_whitespace +from zeep.xsd.types.builtins import DateTime, treat_whitespace, Time +import isodate + + +def _try_parse_datetime(value: str) -> datetime | None: + try: + return ciso8601.parse_datetime(value) + except ValueError: + pass + + try: + return isodate.parse_datetime(value) + except ValueError: + pass + + return None + + +def _try_fix_time_overflow(time: str) -> tuple[str, dict[str, int]]: + """Some camera will overflow time so we need to fix it. + + To do this we calculate the offset beyond the maximum value + and then add it to the current time as a timedelta. + """ + offset: dict[str, int] = {} + hour = int(time[0:2]) + if hour > 23: + offset["hours"] = hour - 23 + hour = 23 + minute = int(time[3:5]) + if minute > 59: + offset["minutes"] = minute - 59 + minute = 59 + second = int(time[6:8]) + if second > 59: + offset["seconds"] = second - 59 + second = 59 + time_trailer = time[8:] + return f"{hour:02d}:{minute:02d}:{second:02d}{time_trailer}", offset # see https://github.com/mvantellingen/python-zeep/pull/1370 @@ -9,16 +48,54 @@ class FastDateTime(DateTime): """Fast DateTime that supports timestamps with - instead of T.""" @treat_whitespace("collapse") - def pythonvalue(self, value): + def pythonvalue(self, value: str) -> datetime: """Convert the xml value into a python value.""" if len(value) > 10 and value[10] == "-": # 2010-01-01-00:00:00... value[10] = "T" if len(value) > 10 and value[11] == "-": # 2023-05-15T-07:10:32Z... value = value[:11] + value[12:] + # Determine based on the length of the value if it only contains a date + # lazy hack ;-) + if len(value) == 10: + value += "T00:00:00" + elif (len(value) in (19, 20, 26)) and value[10] == " ": + value = "T".join(value.split(" ")) + if dt := _try_parse_datetime(value): + return dt + + # Some cameras overflow the hours/minutes/seconds + # For example, 2024-08-17T00:61:16Z so we need + # to fix the overflow + date, _, time = value.partition("T") try: + fixed_time, offset = _try_fix_time_overflow(time) + except ValueError: return ciso8601.parse_datetime(value) + + if dt := _try_parse_datetime(f"{date}T{fixed_time}"): + return dt + timedelta(**offset) + + return ciso8601.parse_datetime(value) + + +class ForgivingTime(Time): + """ForgivingTime.""" + + @treat_whitespace("collapse") + def pythonvalue(self, value: str) -> time: + try: + return isodate.parse_time(value) except ValueError: pass - return super().pythonvalue(value) + # Some cameras overflow the hours/minutes/seconds + # For example, 00:61:16Z so we need + # to fix the overflow + try: + fixed_time, offset = _try_fix_time_overflow(value) + except ValueError: + return isodate.parse_time(value) + if fixed_dt := _try_parse_datetime(f"2024-01-15T{fixed_time}Z"): + return (fixed_dt + timedelta(**offset)).time() + return isodate.parse_time(value) diff --git a/pyproject.toml b/pyproject.toml index 7a75060..4e89526 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,4 @@ -[tool.black] -target-version = ["py36", "py37", "py38"] -exclude = 'generated' +[tool.pytest.ini_options] +pythonpath = ["onvif"] +log_cli="true" +log_level="NOTSET" diff --git a/tests/test_types.py b/tests/test_types.py new file mode 100644 index 0000000..0dd88db --- /dev/null +++ b/tests/test_types.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import os + +import pytest +from zeep.loader import parse_xml +import datetime +from onvif.client import ONVIFCamera +from onvif.settings import DEFAULT_SETTINGS +from onvif.transport import ASYNC_TRANSPORT +from onvif.types import FastDateTime, ForgivingTime + +INVALID_TERM_TIME = b'\r\n\r\n\r\nhttp://www.onvif.org/ver10/events/wsdl/PullPointSubscription/PullMessagesResponse\r\n\r\n\r\n\r\n2024-08-17T00:56:16Z\r\n2024-08-17T00:61:16Z\r\n\r\n\r\n\r\n' +_WSDL_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "onvif", "wsdl") + + +@pytest.mark.asyncio +async def test_parse_invalid_dt(caplog: pytest.LogCaptureFixture) -> None: + device = ONVIFCamera("127.0.0.1", 80, "user", "pass", wsdl_dir=_WSDL_PATH) + device.xaddrs = { + "http://www.onvif.org/ver10/events/wsdl": "http://192.168.210.102:6688/onvif/event_service" + } + # Create subscription manager + subscription = await device.create_notification_service() + operation = subscription.document.bindings[subscription.binding_name].get( + "Subscribe" + ) + envelope = parse_xml( + INVALID_TERM_TIME, # type: ignore[arg-type] + ASYNC_TRANSPORT, + settings=DEFAULT_SETTINGS, + ) + result = operation.process_reply(envelope) + assert result.CurrentTime == datetime.datetime( + 2024, 8, 17, 0, 56, 16, tzinfo=datetime.timezone.utc + ) + assert result.TerminationTime == datetime.datetime( + 2024, 8, 17, 1, 1, 16, tzinfo=datetime.timezone.utc + ) + assert "ValueError" not in caplog.text + + +def test_parse_invalid_datetime() -> None: + with pytest.raises(ValueError, match="Invalid character while parsing year"): + FastDateTime().pythonvalue("aaaa-aa-aaTaa:aa:aaZ") + + +def test_parse_invalid_time() -> None: + with pytest.raises(ValueError, match="Unrecognised ISO 8601 time format"): + ForgivingTime().pythonvalue("aa:aa:aa") + + +def test_fix_datetime_missing_time() -> None: + assert FastDateTime().pythonvalue("2024-08-17") == datetime.datetime( + 2024, 8, 17, 0, 0, 0 + ) + + +def test_fix_datetime_missing_t() -> None: + assert FastDateTime().pythonvalue("2024-08-17 00:61:16Z") == datetime.datetime( + 2024, 8, 17, 1, 1, 16, tzinfo=datetime.timezone.utc + ) + assert FastDateTime().pythonvalue("2024-08-17 00:61:16") == datetime.datetime( + 2024, 8, 17, 1, 1, 16 + ) + + +def test_fix_datetime_overflow() -> None: + assert FastDateTime().pythonvalue("2024-08-17T00:61:16Z") == datetime.datetime( + 2024, 8, 17, 1, 1, 16, tzinfo=datetime.timezone.utc + ) + assert FastDateTime().pythonvalue("2024-08-17T00:60:16Z") == datetime.datetime( + 2024, 8, 17, 1, 0, 16, tzinfo=datetime.timezone.utc + ) + assert FastDateTime().pythonvalue("2024-08-17T00:59:16Z") == datetime.datetime( + 2024, 8, 17, 0, 59, 16, tzinfo=datetime.timezone.utc + ) + assert FastDateTime().pythonvalue("2024-08-17T23:59:59Z") == datetime.datetime( + 2024, 8, 17, 23, 59, 59, tzinfo=datetime.timezone.utc + ) + assert FastDateTime().pythonvalue("2024-08-17T24:00:00Z") == datetime.datetime( + 2024, 8, 18, 0, 0, 0, tzinfo=datetime.timezone.utc + ) + + +def test_unfixable_datetime_overflow() -> None: + with pytest.raises(ValueError, match="Invalid character while parsing minute"): + FastDateTime().pythonvalue("2024-08-17T999:00:00Z") + + +def test_fix_time_overflow() -> None: + assert ForgivingTime().pythonvalue("24:00:00") == datetime.time(0, 0, 0) + assert ForgivingTime().pythonvalue("23:59:59") == datetime.time(23, 59, 59) + assert ForgivingTime().pythonvalue("23:59:60") == datetime.time(0, 0, 0) + assert ForgivingTime().pythonvalue("23:59:61") == datetime.time(0, 0, 1) + assert ForgivingTime().pythonvalue("23:60:00") == datetime.time(0, 0, 0) + assert ForgivingTime().pythonvalue("23:61:00") == datetime.time(0, 1, 0) + + +def test_unfixable_time_overflow() -> None: + with pytest.raises(ValueError, match="Unrecognised ISO 8601 time format"): + assert ForgivingTime().pythonvalue("999:00:00")