diff --git a/blinkpy/api.py b/blinkpy/api.py index 12c8a3af..d01448cc 100644 --- a/blinkpy/api.py +++ b/blinkpy/api.py @@ -230,6 +230,18 @@ async def request_command_status(blink, network, command_id): return await http_get(blink, url) +async def request_command_done(blink, network, command_id): + """ + Request command to be done. + + :param blink: Blink instance. + :param network: Sync module network id. + :param command_id: Command id to mark as done. + """ + url = f"{blink.urls.base_url}/network/{network}/command/{command_id}/done/" + return await http_post(blink, url) + + @Throttle(seconds=MIN_THROTTLE_TIME) async def request_homescreen(blink, **kwargs): """Request homescreen info.""" @@ -347,7 +359,8 @@ async def request_camera_liveview(blink, network, camera_id): f"{blink.urls.base_url}/api/v5/accounts/{blink.account_id}" f"/networks/{network}/cameras/{camera_id}/liveview" ) - response = await http_post(blink, url) + data = dumps({"intent": "liveview"}) + response = await http_post(blink, url, data=data) await wait_for_command(blink, response) return response @@ -561,6 +574,7 @@ async def wait_for_command(blink, json_data: dict) -> bool: network_id = json_data.get("network_id") command_id = json_data.get("id") except AttributeError: + _LOGGER.exception("No network_id or id in response") return False if command_id and network_id: for _ in range(0, MAX_RETRY): @@ -573,3 +587,5 @@ async def wait_for_command(blink, json_data: dict) -> bool: if status.get("complete"): return True await sleep(COMMAND_POLL_TIME) + else: + _LOGGER.debug("No network_id or id in response") diff --git a/blinkpy/camera.py b/blinkpy/camera.py index b3c4a4b7..cb97c960 100644 --- a/blinkpy/camera.py +++ b/blinkpy/camera.py @@ -13,6 +13,7 @@ from blinkpy import api from blinkpy.helpers.constants import TIMEOUT_MEDIA from blinkpy.helpers.util import to_alphanumeric +from blinkpy.livestream import BlinkLiveStream _LOGGER = logging.getLogger(__name__) @@ -413,6 +414,15 @@ async def get_liveview(self): ) return response["server"] + async def init_livestream(self): + """Initialize livestream.""" + response = await api.request_camera_liveview( + self.sync.blink, self.sync.network_id, self.camera_id + ) + if not response["server"].startswith("immis://"): + raise NotImplementedError("Unsupported: {}".format(response["server"])) + return BlinkLiveStream(self, response) + async def image_to_file(self, path): """ Write image to file. @@ -548,13 +558,24 @@ async def get_liveview(self): f"{self.sync.blink.account_id}/networks/" f"{self.network_id}/owls/{self.camera_id}/liveview" ) - response = await api.http_post(self.sync.blink, url) + data = dumps({"intent": "liveview"}) + response = await api.http_post(self.sync.blink, url, data=data) + await api.wait_for_command(self.sync.blink, response) + return response["server"] + + async def init_livestream(self): + """Initialize livestream.""" + url = ( + f"{self.sync.urls.base_url}/api/v1/accounts/" + f"{self.sync.blink.account_id}/networks/" + f"{self.network_id}/owls/{self.camera_id}/liveview" + ) + data = dumps({"intent": "liveview"}) + response = await api.http_post(self.sync.blink, url, data=data) await api.wait_for_command(self.sync.blink, response) - server = response["server"] - server_split = server.split(":") - server_split[0] = "rtsps" - link = ":".join(server_split) - return link + if not response["server"].startswith("immis://"): + raise NotImplementedError("Unsupported: {}".format(response["server"])) + return BlinkLiveStream(self, response) class BlinkDoorbell(BlinkCamera): @@ -620,8 +641,21 @@ async def get_liveview(self): f"{self.sync.blink.account_id}/networks/" f"{self.sync.network_id}/doorbells/{self.camera_id}/liveview" ) - response = await api.http_post(self.sync.blink, url) + data = dumps({"intent": "liveview"}) + response = await api.http_post(self.sync.blink, url, data=data) + await api.wait_for_command(self.sync.blink, response) + return response["server"] + + async def init_livestream(self): + """Initialize livestream.""" + url = ( + f"{self.sync.urls.base_url}/api/v1/accounts/" + f"{self.sync.blink.account_id}/networks/" + f"{self.sync.network_id}/doorbells/{self.camera_id}/liveview" + ) + data = dumps({"intent": "liveview"}) + response = await api.http_post(self.sync.blink, url, data=data) await api.wait_for_command(self.sync.blink, response) - server = response["server"] - link = server.replace("immis://", "rtsps://") - return link + if not response["server"].startswith("immis://"): + raise NotImplementedError("Unsupported: {}".format(response["server"])) + return BlinkLiveStream(self, response) diff --git a/blinkpy/livestream.py b/blinkpy/livestream.py new file mode 100644 index 00000000..163a1b23 --- /dev/null +++ b/blinkpy/livestream.py @@ -0,0 +1,347 @@ +"""Handles immis livestream.""" + +import asyncio +import logging +import urllib.parse +import ssl +from blinkpy import api + +_LOGGER = logging.getLogger(__name__) + + +class BlinkLiveStream: + """Class to initialize individual stream.""" + + # Reference: https://github.com/amattu2/blink-liveview-middleware + + def __init__(self, camera, response): + """Initialize BlinkLiveStream.""" + self.camera = camera + self.command_id = response["command_id"] + self.polling_interval = response["polling_interval"] + self.target = urllib.parse.urlparse(response["server"]) + self.server = None + self.clients = [] + self.target_reader = None + self.target_writer = None + + def get_auth_header(self): + """Get authentication header.""" + auth_header = bytearray() + + # Magic numeber + # fmt: off + magic_number = [ + 0x00, 0x00, 0x00, 0x28, # Magic number (4 bytes) + ] + # fmt: on + auth_header.extend(magic_number) + # Total packet length: 4 bytes + + # Unknown string field (4-byte length prefix, 16 unknown bytes) + # fmt: off + unknown_string_field = [ + 0x00, 0x00, 0x00, 0x00, # Length prefix (4 bytes) + ] + ([0x00] * 16) # Unknown bytes (16 bytes) + # fmt: on + auth_header.extend(unknown_string_field) + # Total packet length: 24 bytes + + # Client ID field + client_id = urllib.parse.parse_qs(self.target.query).get("client_id", [0])[0] + _LOGGER.debug("Client ID: %s", client_id) + client_id_field = int(client_id).to_bytes(4, byteorder="big") + _LOGGER.debug("Client ID field: %s (%d)", client_id_field, len(client_id_field)) + auth_header.extend(client_id_field) + # Total packet length: 28 bytes + + # Unknown prefix field (2-byte prefix, 4-byte length prefix, 64 unknown bytes) + # fmt: off + unknown_prefix_field = [ + 0x01, 0x08, # Static prefix (2 bytes) + 0x00, 0x00, 0x00, 0x00, # Length prefix (4 bytes) + ] + ([0x00] * 64) # Unknown bytes (64 bytes) + # fmt: on + auth_header.extend(unknown_prefix_field) + # Total packet length: 98 bytes + + # Connection ID length field (4-byte length prefix) + # fmt: off + conn_id_length_prefix = [ + 0x00, 0x00, 0x00, 0x10, + ] + # fmt: on + auth_header.extend(conn_id_length_prefix) + # Total packet length: 102 bytes + + # Connection ID field (UTF-8-encoded, 16 bytes) + conn_id = self.target.path.split("/")[-1].split("__")[0] + _LOGGER.debug("Connection ID: %s", conn_id) + conn_id_field = conn_id.encode("utf-8")[:16] + # Ensure it is exactly 16 bytes long + if len(conn_id_field) < 16: + conn_id_field += b"\x00" * (16 - len(conn_id_field)) + _LOGGER.debug("Connection ID field: %s (%d)", conn_id_field, len(conn_id_field)) + auth_header.extend(conn_id_field) + # Total packet length: 118 bytes + + # Trailer (static 4-byte trailer) + # fmt: off + trailer_static = [ + 0x00, 0x00, 0x00, 0x01, + ] + # fmt: on + auth_header.extend(trailer_static) + # Total packet length: 122 bytes + + _LOGGER.debug("Auth header length: %d", len(auth_header)) + return auth_header + + async def start(self, host="127.0.0.1", port=None): + """Start the stream.""" + self.server = await asyncio.start_server(self.join, host, port) + return self.server + + @property + def socket(self): + """Return the socket.""" + return self.server.sockets[0] + + @property + def url(self): + """Return the URL of the stream.""" + sockname = self.socket.getsockname() + return f"tcp://{sockname[0]}:{sockname[1]}" + + @property + def is_serving(self): + """Check if the stream is active.""" + return self.server and self.server.is_serving() + + async def feed(self): + """Connect to and stream from the target server.""" + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + self.target_reader, self.target_writer = await asyncio.open_connection( + self.target.hostname, self.target.port, ssl=ssl_context + ) + + auth_header = self.get_auth_header() + self.target_writer.write(auth_header) + await self.target_writer.drain() + + try: + await asyncio.gather(self.recv(), self.send(), self.poll()) + except Exception: + _LOGGER.exception("Error while handling stream") + finally: + # Close all connections + _LOGGER.debug("Streaming was aborted, stopping server") + self.stop() + + async def join(self, client_reader, client_writer): + """Join client to the stream.""" + # Client connected + self.clients.append(client_writer) + + try: + while not client_writer.is_closing(): + # Read data from the client + data = await client_reader.read(1024) + if not data: + _LOGGER.debug("Client disconnected") + break + + # Yield control to the event loop + await asyncio.sleep(0) + except ConnectionResetError: + _LOGGER.debug("Client connection reset") + except Exception: + _LOGGER.exception("Error while handling client") + finally: + # Client disconnecting + self.clients.remove(client_writer) + if not client_writer.is_closing(): + client_writer.close() + + # If no clients are connected, stop everything + if not self.clients: + _LOGGER.debug("Last client disconnected, stopping server") + self.stop() + + async def recv(self): + """Copy data from one reader to multiple writers.""" + try: + _LOGGER.debug("Starting copy from target to clients") + while not self.target_reader.at_eof(): + # Read header from the target server + data = await self.target_reader.read(9) + + # Check if we have enough data for the header + if len(data) < 9: + _LOGGER.warning( + "Insufficient data for header: %d bytes, expected 9", + len(data), + ) + break + + # Handle the 9-byte IMMI protocol header + msgtype = data[0] + sequence = int.from_bytes(data[1:5], byteorder="big") + payload_length = int.from_bytes(data[5:9], byteorder="big") + _LOGGER.debug( + "Received packet: msgtype=%d, sequence=%d, payload_length=%d", + msgtype, + sequence, + payload_length, + ) + + # Skip packets with invalid payload length + if payload_length <= 0: + _LOGGER.debug("Invalid payload length: %d", payload_length) + continue + + # Read payload from the target server + data = await self.target_reader.read(payload_length) + + # Check if we have enough data for the payload + if len(data) < payload_length: + _LOGGER.warning( + "Insufficient data for payload: %d bytes, expected %d", + len(data), + payload_length, + ) + break + + # Skip packets other than msgtype 0x00 (regular video stream) + if msgtype != 0x00: + _LOGGER.debug("Skipping unsupported msgtype %d", msgtype) + continue + + # Skip video payloads missing 0x47 (transport stream packet start) + if data[0] != 0x47: + _LOGGER.debug("Skipping video payload missing 0x47 at start") + continue + + # Send data to all connected clients + _LOGGER.debug("Sending %d bytes to clients", len(data)) + for writer in self.clients: + if not writer.is_closing(): + writer.write(data) + await writer.drain() + + # Yield control to the event loop + await asyncio.sleep(0) + except ssl.SSLError as e: + if e.reason != "APPLICATION_DATA_AFTER_CLOSE_NOTIFY": + _LOGGER.exception("SSL error while receiving data") + except Exception: + _LOGGER.exception("Error while receiving data") + finally: + # Abort sending by closing the target writer + self.target_writer.close() + _LOGGER.debug("Receiving was aborted, aborting sending") + + async def send(self): + """Send keep-alive and latency-stats messages to the server.""" + # fmt: off + latency_stats_packet = [ + # [1-byte msgtype, 4-byte sequence (static 1000), 4-byte payload length] + 0x12, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x18, # 9-byte header + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # 1/3 of 24-byte payload + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # 2/3 of 24-byte payload + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, # 3/3 of 24-byte payload + ] + # fmt: on + every10s = 0 + sequence = 0 + try: + while not self.target_writer.is_closing(): + if (every10s % 10) == 0: + every10s = 0 + sequence += 1 + sequence_bytes = sequence.to_bytes(4, byteorder="big") + + # fmt: off + keepalive_packet = [ + # [1-byte msgtype, 4-byte sequence, 4-byte payload length] + 0x0A, *sequence_bytes, 0x00, 0x00, 0x00, 0x00, # 9-byte header + # no payload, just the header + ] + # fmt: on + + # Send keep-alive packet to the target server + _LOGGER.debug("Sending keep-alive packet") + self.target_writer.write(bytearray(keepalive_packet)) + await self.target_writer.drain() + + # Send latency-stats packet to the target server + _LOGGER.debug("Sending latency-stats packet") + self.target_writer.write(bytearray(latency_stats_packet)) + await self.target_writer.drain() + + # Yield and sleep for the latency-stats interval + every10s += 1 + await asyncio.sleep(1) + except Exception: + _LOGGER.exception("Error while sending keep-alive or latency-stats") + finally: + # Abort receiving by closing the target reader + self.target_reader.feed_eof() + _LOGGER.debug("Sending was aborted, aborting receiving") + + async def poll(self): + """Poll the command API for the stream.""" + try: + while not self.target_reader.at_eof(): + _LOGGER.debug("Polling command API") + response = await api.request_command_status( + self.camera.sync.blink, self.camera.network_id, self.command_id + ) + _LOGGER.debug("Polling response: %s", response) + + # Check if the response is successful + if response.get("status_code", 0) != 908: + _LOGGER.error("Polling command API failed: %s", response) + break + + # Check if the command is still running + for commands in response.get("commands", []): + if commands.get("id") == self.command_id: + _LOGGER.debug("Command %d state found", self.command_id) + state_condition = commands.get("state_condition") + _LOGGER.debug("Command state condition: %s", state_condition) + state_stage = commands.get("state_stage") + _LOGGER.debug("Command state stage: %s", state_stage) + if state_condition in ("new", "running"): + break + else: + return + + # Sleep and yield for the polling interval + await asyncio.sleep(self.polling_interval) + except Exception: + _LOGGER.exception("Error while polling command API") + finally: + _LOGGER.debug("Done polling command API") + response = await api.request_command_done( + self.camera.sync.blink, self.camera.network_id, self.command_id + ) + _LOGGER.debug("Done polling response: %s", response) + + def stop(self): + """Stop the stream.""" + # Close all connections + _LOGGER.debug("Stopping server, closing remaining connections") + if self.server and self.server.is_serving(): + _LOGGER.debug("Closing listen server") + self.server.close() + if self.target_writer and not self.target_writer.is_closing(): + _LOGGER.debug("Closing target writer") + self.target_writer.close() + for writer in self.clients: + if not writer.is_closing(): + _LOGGER.debug("Closing client writer") + writer.close() + _LOGGER.debug("All remaining connections closed") diff --git a/tests/test_api.py b/tests/test_api.py index adea434f..a5d00928 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -11,6 +11,7 @@ COMMAND_COMPLETE = {"complete": True, "status_code": 908} COMMAND_COMPLETE_BAD = {"complete": True, "status_code": 999} COMMAND_NOT_COMPLETE = {"complete": False, "status_code": 908} +COMMAND_DONE = {"message": "Command 1234567890 set to :done", "code": 902} @mock.patch("blinkpy.auth.Auth.query") @@ -62,6 +63,13 @@ async def test_request_command_status(self, mock_resp): {"command": "done"}, ) + async def test_request_command_done(self, mock_resp): + """Test command_done.""" + mock_resp.return_value = mresp.MockResponse(COMMAND_DONE, 200) + response = await api.request_command_done(self.blink, "network", "1234567890") + self.assertEqual(response.status, 200) + self.assertEqual(await response.json(), COMMAND_DONE) + async def test_request_new_image(self, mock_resp): """Test api request new image.""" mock_resp.side_effect = ( diff --git a/tests/test_cameras.py b/tests/test_cameras.py index 3b1836a1..f8f90044 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -8,10 +8,12 @@ from unittest import mock from unittest import IsolatedAsyncioTestCase +import pytest from blinkpy.blinkpy import Blink from blinkpy.helpers.util import BlinkURLHandler from blinkpy.sync_module import BlinkSyncModule from blinkpy.camera import BlinkCamera, BlinkCameraMini, BlinkDoorbell +from blinkpy.livestream import BlinkLiveStream import tests.mock_responses as mresp CONFIG = { @@ -129,6 +131,36 @@ async def test_camera_stream(self, mock_resp): self.assertEqual(await self.camera.get_liveview(), "rtsps://foo.bar") self.assertEqual(await mini_camera.get_liveview(), "rtsps://foo.bar") self.assertEqual(await doorbell_camera.get_liveview(), "rtsps://foo.bar") + with pytest.raises(NotImplementedError): + await self.camera.init_livestream() + with pytest.raises(NotImplementedError): + await mini_camera.init_livestream() + with pytest.raises(NotImplementedError): + await doorbell_camera.init_livestream() + + async def test_camera_livestream(self, mock_resp): + """Test that camera livestream returns correct object.""" + mock_resp.return_value = { + "command_id": 1234567890, + "join_available": True, + "join_state": "available", + "server": "immis://1.2.3.4:443/ABCDEFGHIJKMLNOP__IMDS_1234567812345678?client_id=123", + "duration": 300, + "extended_duration": 5400, + "continue_interval": 300, + "continue_warning": 0, + "polling_interval": 15, + "submit_logs": True, + "new_command": True, + "media_id": None, + "options": {"poor_connection": False}, + "liveview_token": "abcdefghijklmnopqrstuv", + } + mini_camera = BlinkCameraMini(self.blink.sync["test"]) + doorbell_camera = BlinkDoorbell(self.blink.sync["test"]) + self.assertIsInstance(await self.camera.init_livestream(), BlinkLiveStream) + self.assertIsInstance(await mini_camera.init_livestream(), BlinkLiveStream) + self.assertIsInstance(await doorbell_camera.init_livestream(), BlinkLiveStream) async def test_different_thumb_api(self, mock_resp): """Test that the correct url is created with new api.""" diff --git a/tests/test_livestream.py b/tests/test_livestream.py new file mode 100644 index 00000000..f503ceed --- /dev/null +++ b/tests/test_livestream.py @@ -0,0 +1,807 @@ +"""Tests for BlinkLiveStream class.""" + +import ssl +import urllib.parse +from unittest import mock +from unittest import IsolatedAsyncioTestCase + +from blinkpy.blinkpy import Blink +from blinkpy.helpers.util import BlinkURLHandler +from blinkpy.sync_module import BlinkSyncModule +from blinkpy.camera import BlinkCameraMini +from blinkpy.livestream import BlinkLiveStream + +from .test_api import COMMAND_DONE + + +@mock.patch("blinkpy.auth.Auth.query", return_value={}) +class TestBlinkLiveStream(IsolatedAsyncioTestCase): + """Test BlinkLiveStream functions in blinkpy.""" + + def setUp(self): + """Set up Blink module.""" + self.blink = Blink(session=mock.Mock()) + self.blink.urls = BlinkURLHandler("test") + self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", 1234, []) + self.camera = BlinkCameraMini(self.blink.sync["test"]) + self.camera.name = "test_camera" + self.camera.camera_id = "5678" + self.camera.network_id = "1234" + self.blink.sync["test"].cameras["test_camera"] = self.camera + + # Mock response for livestream initialization + self.livestream_response = { + "command_id": 987654321, + "join_available": True, + "join_state": "available", + "server": "immis://1.2.3.4:443/ABCDEFGHIJKMLNOP__IMDS_1234567812345678?client_id=123456", + "duration": 300, + "extended_duration": 5400, + "continue_interval": 300, + "continue_warning": 0, + "polling_interval": 15, + "submit_logs": True, + "new_command": True, + "media_id": None, + "options": {"poor_connection": False}, + "liveview_token": "abcdefghijklmnopqrstuv", + } + + self.livestream = BlinkLiveStream(self.camera, self.livestream_response) + + self.command_status_response = { + "status_code": 908, + "commands": [ + { + "id": self.livestream.command_id, + "state_condition": "running", + "state_stage": "vs", + } + ], + } + + def tearDown(self): + """Clean up after test.""" + self.blink = None + self.camera = None + self.livestream = None + + def test_livestream_init(self, mock_resp): + """Test BlinkLiveStream initialization.""" + self.assertEqual(self.livestream.camera, self.camera) + self.assertEqual(self.livestream.command_id, 987654321) + self.assertEqual(self.livestream.polling_interval, 15) + self.assertIsInstance(self.livestream.target, urllib.parse.ParseResult) + self.assertEqual(self.livestream.target.hostname, "1.2.3.4") + self.assertEqual(self.livestream.target.port, 443) + self.assertIsNone(self.livestream.server) + self.assertEqual(self.livestream.clients, []) + self.assertIsNone(self.livestream.target_reader) + self.assertIsNone(self.livestream.target_writer) + + def test_get_auth_header(self, mock_resp): + """Test authentication header generation.""" + auth_header = self.livestream.get_auth_header() + + # Check that auth header is a bytearray + self.assertIsInstance(auth_header, bytearray) + + # Check expected length (122 bytes according to comments in code) + self.assertEqual(len(auth_header), 122) + + # Check magic number at start (4 bytes: 0x00, 0x00, 0x00, 0x28) + self.assertEqual(auth_header[0:4], bytearray([0x00, 0x00, 0x00, 0x28])) + + # Check client ID field at position 24-28 + expected_client_id = (123456).to_bytes(4, byteorder="big") + self.assertEqual(auth_header[24:28], expected_client_id) + + # Check connection ID length field at position 98-102 + expected_connection_id_length = (16).to_bytes(4, byteorder="big") + self.assertEqual(auth_header[98:102], expected_connection_id_length) + + # Check connection ID at position 102-118 + expected_connection_id = b"ABCDEFGHIJKMLNOP" + self.assertEqual(auth_header[102:118], expected_connection_id) + + # Check static trailer at end (4 bytes: 0x00, 0x00, 0x00, 0x01) + self.assertEqual(auth_header[-4:], bytearray([0x00, 0x00, 0x00, 0x01])) + + async def test_start(self, mock_resp): + """Test starting the server.""" + with mock.patch("asyncio.start_server") as mock_start_server: + mock_server = mock.Mock() + mock_start_server.return_value = mock_server + + result = await self.livestream.start() + + mock_start_server.assert_called_once_with( + self.livestream.join, "127.0.0.1", None + ) + self.assertEqual(result, mock_server) + self.assertEqual(self.livestream.server, mock_server) + + def test_socket_property(self, mock_resp): + """Test socket property.""" + mock_socket = mock.Mock() + mock_server = mock.Mock() + mock_server.sockets = [mock_socket] + self.livestream.server = mock_server + + self.assertEqual(self.livestream.socket, mock_socket) + + def test_url_property(self, mock_resp): + """Test URL property.""" + mock_socket = mock.Mock() + mock_socket.getsockname.return_value = ("127.0.0.1", 8080) + mock_server = mock.Mock() + mock_server.sockets = [mock_socket] + self.livestream.server = mock_server + + expected_url = "tcp://127.0.0.1:8080" + self.assertEqual(self.livestream.url, expected_url) + + def test_is_serving_property(self, mock_resp): + """Test is_serving property.""" + # Test when server is None + self.livestream.server = None + self.assertFalse(self.livestream.is_serving) + + # Test when server exists and is serving + mock_server = mock.Mock() + mock_server.is_serving.return_value = True + self.livestream.server = mock_server + self.assertTrue(self.livestream.is_serving) + + # Test when server exists but is not serving + mock_server.is_serving.return_value = False + self.assertFalse(self.livestream.is_serving) + + @mock.patch("asyncio.open_connection") + @mock.patch("ssl.SSLContext") + @mock.patch("blinkpy.api.request_command_status") + async def test_feed_success( + self, mock_command_status, mock_ssl_context, mock_open_connection, mock_resp + ): + """Test successful feed method.""" + # Mock SSL context + mock_ssl = mock.Mock() + mock_ssl_context.return_value = mock_ssl + + # Mock connection + mock_reader = mock.Mock() + mock_writer = mock.Mock() + mock_writer.drain = mock.AsyncMock() + mock_open_connection.return_value = (mock_reader, mock_writer) + + # Mock successful command status + mock_command_status.return_value = self.command_status_response + + # Mock coroutines to avoid actual execution + with ( + mock.patch.object(self.livestream, "recv", new_callable=mock.AsyncMock), + mock.patch.object(self.livestream, "send", new_callable=mock.AsyncMock), + mock.patch.object(self.livestream, "poll", new_callable=mock.AsyncMock), + mock.patch.object(self.livestream, "stop") as mock_stop, + ): + await self.livestream.feed() + + # Verify SSL context setup + mock_ssl_context.assert_called_once_with(ssl.PROTOCOL_TLS_CLIENT) + self.assertFalse(mock_ssl.check_hostname) + self.assertEqual(mock_ssl.verify_mode, ssl.CERT_NONE) + + # Verify connection was opened + mock_open_connection.assert_called_once_with( + self.livestream.target.hostname, + self.livestream.target.port, + ssl=mock_ssl, + ) + + # Verify auth header was sent + mock_writer.write.assert_called_once() + mock_writer.drain.assert_called_once() + + # Verify stop was called + mock_stop.assert_called_once() + + @mock.patch("asyncio.open_connection") + @mock.patch("ssl.SSLContext") + @mock.patch("blinkpy.api.request_command_status") + async def test_feed_failure( + self, mock_command_status, mock_ssl_context, mock_open_connection, mock_resp + ): + """Test successful feed method.""" + # Mock SSL context + mock_ssl = mock.Mock() + mock_ssl_context.return_value = mock_ssl + + # Mock connection + mock_reader = mock.Mock() + mock_writer = mock.Mock() + mock_writer.drain = mock.AsyncMock() + mock_open_connection.return_value = (mock_reader, mock_writer) + + # Mock successful command status + mock_command_status.return_value = self.command_status_response + + # Mock coroutines to avoid actual execution + with ( + mock.patch("logging.Logger.exception") as mock_logger, + mock.patch("asyncio.gather", new_callable=mock.AsyncMock) as mock_gather, + mock.patch.object(self.livestream, "recv", new_callable=mock.Mock), + mock.patch.object(self.livestream, "send", new_callable=mock.Mock), + mock.patch.object(self.livestream, "poll", new_callable=mock.Mock), + mock.patch.object(self.livestream, "stop") as mock_stop, + ): + # Simulate an exception in the gather call + mock_gather.side_effect = Exception("Test exception") + await self.livestream.feed() + + # Verify exception was logged + mock_logger.assert_called_once() + + # Verify stop was called + mock_stop.assert_called_once() + + async def test_join_client_connect_disconnect(self, mock_resp): + """Test client joining and disconnecting.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock client reading data then disconnecting + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [b"test_data", b""] + mock_writer.is_closing.return_value = False + + # Start the join coroutine + await self.livestream.join(mock_reader, mock_writer) + + # Verify client was added and removed + self.assertEqual(len(self.livestream.clients), 0) + mock_writer.close.assert_called_once() + + async def test_join_connection_reset(self, mock_resp): + """Test client connection reset during join.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock connection reset error + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = ConnectionResetError() + mock_writer.is_closing.return_value = False + + with mock.patch.object(self.livestream, "stop") as mock_stop: + await self.livestream.join(mock_reader, mock_writer) + + # Verify client was removed and stop was called + self.assertEqual(len(self.livestream.clients), 0) + mock_writer.close.assert_called_once() + mock_stop.assert_called_once() + + async def test_join_general_exception_logging(self, mock_resp): + """Test general exception logging in join method.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock general exception (not ConnectionResetError) + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = ValueError("Test join exception") + mock_writer.is_closing.return_value = False + + with ( + mock.patch("logging.Logger.exception") as mock_logger, + mock.patch.object(self.livestream, "stop") as mock_stop, + ): + await self.livestream.join(mock_reader, mock_writer) + + # Verify exception was logged + mock_logger.assert_called_once_with("Error while handling client") + + # Verify client was removed and stop was called + self.assertEqual(len(self.livestream.clients), 0) + mock_writer.close.assert_called_once() + mock_stop.assert_called_once() + + async def test_recv_valid_packet(self, mock_resp): + """Test receiving valid video packets.""" + mock_reader = mock.Mock() + mock_client = mock.Mock() + + # Mock valid IMMI protocol header and payload + header_data = bytearray( + [ + 0x00, # msgtype (video stream) + 0x00, + 0x00, + 0x00, + 0x01, # sequence + 0x00, + 0x00, + 0x00, + 0xBC, # payload_length (188 bytes) + ] + ) + + # Mock payload starting with 0x47 (transport stream packet start) + payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total + + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [header_data, payload_data, b""] + mock_reader.at_eof.side_effect = [False, False, True] + mock_client.is_closing.return_value = False + mock_client.write = mock.Mock() + mock_client.drain = mock.AsyncMock() + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock.Mock() + self.livestream.clients = [mock_client] + + await self.livestream.recv() + + # Verify data was written to client + mock_client.write.assert_called_with(payload_data) + mock_client.drain.assert_called() + + async def test_recv_invalid_msgtype(self, mock_resp): + """Test receiving packet with invalid message type.""" + mock_reader = mock.Mock() + mock_client = mock.Mock() + + # Mock header with invalid msgtype (not 0x00) + header_data_invalid = bytearray( + [ + 0x01, # invalid msgtype + 0x00, + 0x00, + 0x00, + 0x01, # sequence + 0x00, + 0x00, + 0x00, + 0xBC, # payload_length (188 bytes) + ] + ) + + payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total + + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [header_data_invalid, payload_data, b""] + mock_reader.at_eof.side_effect = [False, False, True] + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock.Mock() + self.livestream.clients = [mock_client] + + await self.livestream.recv() + + # Verify data was not written to client (invalid msgtype) + mock_client.write.assert_not_called() + + async def test_recv_incomplete_header(self, mock_resp): + """Test receiving packet with incomplete header.""" + mock_reader = mock.Mock() + mock_client = mock.Mock() + + # Simulate reading incomplete header + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [b"short", b""] + mock_reader.at_eof.side_effect = [False, True] + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock.Mock() + self.livestream.clients = [mock_client] + + # Should not raise exception, just log a warning message + with mock.patch("logging.Logger.warning") as mock_logger: + await self.livestream.recv() + + # Verify that a warning message was logged + mock_logger.assert_called_once() + + # Verify no data was written to client (incomplete header) + mock_client.write.assert_not_called() + + async def test_recv_empty_payload_skipped(self, mock_resp): + """Test skipping packet with empty payload.""" + mock_reader = mock.Mock() + mock_client = mock.Mock() + + # Mock valid IMMI protocol header and empty payload + header_data_empty = bytearray( + [ + 0x00, # msgtype (video stream) + 0x00, + 0x00, + 0x00, + 0x01, # sequence + 0x00, + 0x00, + 0x00, + 0x00, # payload_length (0 bytes) + ] + ) + + # Mock valid IMMI protocol header and payload + header_data = bytearray( + [ + 0x00, # msgtype (video stream) + 0x00, + 0x00, + 0x00, + 0x01, # sequence + 0x00, + 0x00, + 0x00, + 0xBC, # payload_length (188 bytes) + ] + ) + + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [header_data_empty, header_data, b"short", b""] + mock_reader.at_eof.side_effect = [False, False, True] + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock.Mock() + self.livestream.clients = [mock_client] + + # Should not raise exception, just log a warning message + with mock.patch("logging.Logger.warning") as mock_logger: + await self.livestream.recv() + + # Verify that a warning message was logged + mock_logger.assert_called_once() + + # Verify that the first payload read was skipped (empty payload) + self.assertEqual(mock_reader.read.call_count, 3) # odd number of reads + + # Verify no data was written to client (incomplete header) + mock_client.write.assert_not_called() + + async def test_recv_invalid_stream_marker(self, mock_resp): + """Test receiving invalid video packets.""" + mock_reader = mock.Mock() + mock_client = mock.Mock() + + # Mock valid IMMI protocol header and payload + header_data = bytearray( + [ + 0x00, # msgtype (video stream) + 0x00, + 0x00, + 0x00, + 0x01, # sequence + 0x00, + 0x00, + 0x00, + 0xBC, # payload_length (188 bytes) + ] + ) + + # Mock payload starting with 0x42 (invalid transport stream packet start) + payload_data = bytearray([0x42] + [0x00] * 187) # 188 bytes total + + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = [header_data, payload_data, b""] + mock_reader.at_eof.side_effect = [False, False, True] + mock_client.is_closing.return_value = False + mock_client.write = mock.Mock() + mock_client.drain = mock.AsyncMock() + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock.Mock() + self.livestream.clients = [mock_client] + + await self.livestream.recv() + + # Verify no data was written to client (incomplete header) + mock_client.write.assert_not_called() + + async def test_send_keepalive_and_latency(self, mock_resp): + """Test sending keep-alive and latency-stats packets.""" + mock_writer = mock.Mock() + + # Stop after 2 iterations + mock_writer.is_closing.side_effect = [False, False, True] + mock_writer.write = mock.Mock() + mock_writer.drain = mock.AsyncMock() + + self.livestream.target_reader = mock.Mock() + self.livestream.target_writer = mock_writer + + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + await self.livestream.send() + + # Verify multiple writes occurred (keep-alive and latency-stats) + self.assertGreater(mock_writer.write.call_count, 1) + self.assertGreater(mock_writer.drain.call_count, 1) + + @mock.patch("blinkpy.api.request_command_status") + @mock.patch("blinkpy.api.request_command_done") + async def test_poll(self, mock_command_done, mock_command_status, mock_resp): + """Test polling command API.""" + mock_reader = mock.Mock() + mock_reader.at_eof.side_effect = [False, True] # Stop after 1 iteration + + mock_command_status.return_value = self.command_status_response + mock_command_done.return_value = COMMAND_DONE + + self.livestream.target_reader = mock_reader + + with mock.patch("asyncio.sleep", new_callable=mock.AsyncMock): + await self.livestream.poll() + + # Verify reader was read until EOF + self.assertGreater(mock_reader.at_eof.call_count, 1) + + # Verify command status was polled + mock_command_status.assert_called_with( + self.camera.sync.blink, self.camera.network_id, self.livestream.command_id + ) + + # Verify command done was called + mock_command_done.assert_called_with( + self.camera.sync.blink, self.camera.network_id, self.livestream.command_id + ) + + def test_stop(self, mock_resp): + """Test stopping the livestream.""" + # Mock server and connections + mock_server = mock.Mock() + mock_server.is_serving.return_value = True + mock_target_writer = mock.Mock() + mock_target_writer.is_closing.return_value = False + mock_client1 = mock.Mock() + mock_client1.is_closing.return_value = False + mock_client2 = mock.Mock() + mock_client2.is_closing.return_value = False + + self.livestream.server = mock_server + self.livestream.target_writer = mock_target_writer + self.livestream.clients = [mock_client1, mock_client2] + + self.livestream.stop() + + # Verify server was closed + mock_server.close.assert_called_once() + + # Verify target writer was closed + mock_target_writer.close.assert_called_once() + + # Verify all clients were closed + mock_client1.close.assert_called_once() + mock_client2.close.assert_called_once() + + def test_server_url_parsing(self, mock_resp): + """Test client ID parsing from URL query parameters.""" + # Test with different client ID + test_response = { + **self.livestream_response, + "server": "immis://1.2.3.4:443/ABCDEFGH__IMDS_1234?client_id=999888", + } + + test_livestream = BlinkLiveStream(self.camera, test_response) + auth_header = test_livestream.get_auth_header() + + # Check value of client ID field at position 24-28 + expected_client_id = (999888).to_bytes(4, byteorder="big") + self.assertEqual(auth_header[24:28], expected_client_id) + + # Check value of connection ID length field at position 98-102 + expected_connection_id_length = (16).to_bytes(4, byteorder="big") + self.assertEqual(auth_header[98:102], expected_connection_id_length) + + # Check value of connection ID at position 102-118 + expected_connection_id = b"ABCDEFGH" + b"\x00" * 8 # Ensure it is 16 bytes + self.assertEqual(auth_header[102:118], expected_connection_id) + + async def test_recv_ssl_error(self, mock_resp): + """Test handling SSL errors during receive.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock SSL error + ssl_error = ssl.SSLError() + ssl_error.reason = "APPLICATION_DATA_AFTER_CLOSE_NOTIFY" + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = ssl_error + mock_reader.at_eof.return_value = False + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock_writer + self.livestream.clients = [] + + # Should not raise exception for this specific SSL error + await self.livestream.recv() + + # Verify target writer was closed + mock_writer.close.assert_called_once() + + async def test_recv_ssl_error_other_reason(self, mock_resp): + """Test SSL error handling with other reasons in recv.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock SSL error with different reason + ssl_error = ssl.SSLError() + ssl_error.reason = "SOME_OTHER_SSL_ERROR" + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = ssl_error + mock_reader.at_eof.return_value = False + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock_writer + self.livestream.clients = [] + + with mock.patch("logging.Logger.exception") as mock_logger: + # Should not raise exception for this specific SSL error + await self.livestream.recv() + + # Verify exception was logged for non-ignored SSL errors + mock_logger.assert_called_once_with("SSL error while receiving data") + + # Verify target writer was closed + mock_writer.close.assert_called_once() + + async def test_recv_exception_logging(self, mock_resp): + """Test exception logging in recv method.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock general exception + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = Exception("Test exception") + mock_reader.at_eof.return_value = False + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock_writer + self.livestream.clients = [] + + with mock.patch("logging.Logger.exception") as mock_logger: + await self.livestream.recv() + + # Verify exception was logged + mock_logger.assert_called_once_with("Error while receiving data") + + # Verify target writer was closed + mock_writer.close.assert_called_once() + + async def test_recv_timeout_exception(self, mock_resp): + """Test timeout exception handling in recv method.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock asyncio timeout exception + mock_reader.read = mock.AsyncMock() + mock_reader.read.side_effect = TimeoutError() + mock_reader.at_eof.return_value = False + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock_writer + self.livestream.clients = [] + + with mock.patch("logging.Logger.exception") as mock_logger: + await self.livestream.recv() + + # Verify exception was logged + mock_logger.assert_called_once_with("Error while receiving data") + + # Verify target writer was closed + mock_writer.close.assert_called_once() + + async def test_send_exception_logging(self, mock_resp): + """Test exception logging in send method.""" + mock_writer = mock.Mock() + + # Mock exception during send + mock_writer.is_closing.return_value = False + mock_writer.write = mock.Mock() + mock_writer.drain = mock.AsyncMock() + mock_writer.drain.side_effect = Exception("Test send exception") + + self.livestream.target_reader = mock.Mock() + self.livestream.target_writer = mock_writer + + with mock.patch("logging.Logger.exception") as mock_logger: + await self.livestream.send() + + # Verify exception was logged + mock_logger.assert_called_once_with( + "Error while sending keep-alive or latency-stats" + ) + + async def test_send_writer_closing_exception(self, mock_resp): + """Test exception when writer is closing during send.""" + mock_reader = mock.Mock() + mock_writer = mock.Mock() + + # Mock exception when checking if writer is closing + mock_writer.is_closing.side_effect = Exception("Writer check exception") + + self.livestream.target_reader = mock_reader + self.livestream.target_writer = mock_writer + + with mock.patch("logging.Logger.exception") as mock_logger: + await self.livestream.send() + + # Verify exception was logged + mock_logger.assert_called_once_with( + "Error while sending keep-alive or latency-stats" + ) + + @mock.patch("blinkpy.api.request_command_status") + async def test_poll_exception_logging(self, mock_command_status, mock_resp): + """Test exception logging in poll method.""" + mock_reader = mock.Mock() + mock_reader.at_eof.return_value = False + + # Mock exception in command status request + mock_command_status.side_effect = Exception("Test poll exception") + + self.livestream.target_reader = mock_reader + + with ( + mock.patch("logging.Logger.exception") as mock_logger, + mock.patch( + "blinkpy.api.request_command_done", new_callable=mock.AsyncMock + ) as mock_command_done, + ): + await self.livestream.poll() + + # Verify exception was logged + mock_logger.assert_called_once_with("Error while polling command API") + + # Verify command done was called since status failed + mock_command_done.assert_called_once() + + @mock.patch("blinkpy.api.request_command_status") + @mock.patch("blinkpy.api.request_command_done") + async def test_poll_command_done( + self, mock_command_done, mock_command_status, mock_resp + ): + """Test exception handling in poll when command_done fails.""" + mock_reader = mock.Mock() + mock_reader.at_eof.side_effect = [False, True] # Exit after one iteration + + # Mock successful command status + response = self.command_status_response.copy() + response["commands"][0] = response["commands"][0].copy() + response["commands"][0]["state_condition"] = "error" + mock_command_status.return_value = response + + self.livestream.target_reader = mock_reader + + await self.livestream.poll() + + # Verify command status was polled + mock_command_status.assert_called_with( + self.camera.sync.blink, self.camera.network_id, self.livestream.command_id + ) + + # Verify command done was called + mock_command_done.assert_called_with( + self.camera.sync.blink, self.camera.network_id, self.livestream.command_id + ) + + @mock.patch("blinkpy.api.request_command_status") + @mock.patch("blinkpy.api.request_command_done") + async def test_poll_command_done_exception( + self, mock_command_done, mock_command_status, mock_resp + ): + """Test exception handling in poll when command_done fails.""" + mock_reader = mock.Mock() + mock_reader.at_eof.side_effect = [False, True] # Exit after one iteration + + # Mock successful command status + mock_command_status.return_value = self.command_status_response.copy() + mock_command_status.return_value["status_code"] = 1337 + # Mock exception in command done + mock_command_done.side_effect = Exception("Command done exception") + + self.livestream.target_reader = mock_reader + + with ( + mock.patch("asyncio.sleep", new_callable=mock.AsyncMock), + self.assertRaises(Exception), + ): + await self.livestream.poll()