diff --git a/roborock/data/b01_q10/b01_q10_containers.py b/roborock/data/b01_q10/b01_q10_containers.py index 393eb231..9975679f 100644 --- a/roborock/data/b01_q10/b01_q10_containers.py +++ b/roborock/data/b01_q10/b01_q10_containers.py @@ -6,6 +6,7 @@ automatically update objects from raw device responses. """ +import base64 from dataclasses import dataclass, field from ..containers import RoborockBase @@ -19,6 +20,14 @@ YXWaterLevel, ) +_Q10_CUSTOMER_CLEAN_COUNT_SIZE = 1 +_Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE = 26 +_Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE = 20 +_Q10_CUSTOMER_CLEAN_VERTEX_SIZE = 4 +_Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH = _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE - 1 +_Q10_SENTINEL_U8 = 0xFF +_Q10_SENTINEL_U16 = 0xFFFF + @dataclass class dpCleanRecord(RoborockBase): @@ -81,6 +90,156 @@ class dpTimeZone(RoborockBase): timeZoneSec: int +@dataclass +class Q10RoomVertex(RoborockBase): + x_raw: int + y_raw: int + x: float + y: float + + +@dataclass +class Q10RoomConfig(RoborockBase): + index: int + room_id: int + room_type: int + clean_order: int + clean_count: int + clean_type: int + fan_level: int + water_level: int + material: int + clean_line: int + room_name: str + vertices_num: int + vertices: list[Q10RoomVertex] = field(default_factory=list) + + +@dataclass +class Q10RoomsConfig(RoborockBase): + raw_length: int = 0 + declared_count: int = 0 + rooms: list[Q10RoomConfig] = field(default_factory=list) + + @property + def parsed_count(self) -> int: + return len(self.rooms) + + @property + def room_map(self) -> dict[int, Q10RoomConfig]: + return {room.room_id: room for room in self.rooms} + + +def parse_customer_clean_payload(payload_b64: str) -> Q10RoomsConfig: + """Parse CUSTOMER_CLEAN payload. + + Layout: + - 1 byte: room count + - For each room: + - 26-byte room metadata block + - 20-byte room name block + - 1 byte vertex count followed by 4 bytes per vertex (x, y as u16) + """ + raw = base64.b64decode(payload_b64) + if not raw: + return Q10RoomsConfig() + + count = raw[0] + offset = _Q10_CUSTOMER_CLEAN_COUNT_SIZE + rooms: list[Q10RoomConfig] = [] + + for index in range(count): + if offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE > len(raw): + break + + room_block = raw[offset : offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE] + room_id = int.from_bytes(room_block[0:2], "big", signed=False) + room_type = room_block[2] + clean_order = int.from_bytes(room_block[3:5], "big", signed=False) + clean_count = int.from_bytes(room_block[5:7], "big", signed=False) + clean_type = room_block[7] + fan_level = room_block[8] + water_level = room_block[9] + material = room_block[10] + clean_line = room_block[11] + + if clean_order == _Q10_SENTINEL_U16: + clean_order = -1 + if clean_type == _Q10_SENTINEL_U8: + clean_type = -1 + if fan_level == _Q10_SENTINEL_U8: + fan_level = -1 + if water_level == _Q10_SENTINEL_U8: + water_level = -1 + if material == _Q10_SENTINEL_U8: + material = -1 + if clean_line == _Q10_SENTINEL_U8: + clean_line = -1 + + offset += _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE + if offset + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE > len(raw): + break + + room_name_bytes = raw[offset : offset + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE] + offset += _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE + name_len = room_name_bytes[0] + if 0 < name_len <= _Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH: + room_name = room_name_bytes[1 : 1 + name_len].decode("utf-8", errors="replace") + else: + room_name = room_name_bytes[1:].split(b"\x00", 1)[0].decode("utf-8", errors="replace") + room_name = normalize_q10_room_name(room_name) + + if offset + 1 > len(raw): + break + + vertices_num = raw[offset] + offset += 1 + bytes_needed = vertices_num * _Q10_CUSTOMER_CLEAN_VERTEX_SIZE + if offset + bytes_needed > len(raw): + break + + vertices: list[Q10RoomVertex] = [] + for _ in range(vertices_num): + x_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False) + offset += 2 + y_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False) + offset += 2 + vertices.append(Q10RoomVertex(x_raw=x_raw, y_raw=y_raw, x=x_raw / 10.0, y=y_raw / 10.0)) + + rooms.append( + Q10RoomConfig( + index=index, + room_id=room_id, + room_type=room_type, + clean_order=clean_order, + clean_count=clean_count, + clean_type=clean_type, + fan_level=fan_level, + water_level=water_level, + material=material, + clean_line=clean_line, + room_name=room_name, + vertices_num=vertices_num, + vertices=vertices, + ) + ) + + return Q10RoomsConfig(raw_length=len(raw), declared_count=count, rooms=rooms) + + +def normalize_q10_room_name(room_name: str) -> str: + """Normalize room names reported by the Q10 firmware.""" + raw_name = room_name.strip() + if not raw_name: + return raw_name + if raw_name.startswith("rr_"): + normalized = raw_name[3:].replace("_", " ").strip() + if not normalized: + return raw_name + return " ".join(part.capitalize() for part in normalized.split()) + return raw_name + + @dataclass class Q10Status(RoborockBase): """Status for Q10 devices. diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 184de2d2..7c50c652 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,12 +4,14 @@ import logging from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.data.b01_q10.b01_q10_containers import Q10RoomConfig from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait from .remote import RemoteTrait +from .rooms import RoomsTrait from .status import StatusTrait from .vacuum import VacuumTrait @@ -35,12 +37,26 @@ class Q10PropertiesApi(Trait): remote: RemoteTrait """Trait for sending remote control related commands to Q10 devices.""" + rooms: RoomsTrait + """Trait for reading room configuration from Q10 devices.""" + + @property + def room_map(self) -> dict[int, Q10RoomConfig]: + """Return the current room configurations keyed by room id.""" + return self.rooms.room_map + + @property + def room_names(self) -> dict[int, str]: + """Return the current room names keyed by room id.""" + return self.rooms.room_names + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel self.command = CommandTrait(channel) self.vacuum = VacuumTrait(self.command) self.remote = RemoteTrait(self.command) + self.rooms = RoomsTrait(self.command) self.status = StatusTrait() self._subscribe_task: asyncio.Task[None] | None = None @@ -64,6 +80,18 @@ async def refresh(self) -> None: # to the device. Updates will be received by the subscribe loop below. await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) + async def refresh_room_config(self) -> None: + """Refresh only the room configuration.""" + await self.rooms.refresh() + + def get_room(self, room_id: int) -> Q10RoomConfig | None: + """Return a room configuration by id, if known.""" + return self.rooms.get_room(room_id) + + def get_room_name(self, room_id: int, default: str | None = None) -> str | None: + """Return a room name by id, if known.""" + return self.rooms.get_room_name(room_id, default) + async def _subscribe_loop(self) -> None: """Persistent loop to listen for status updates.""" async for decoded_dps in stream_decoded_responses(self._channel): @@ -73,6 +101,7 @@ async def _subscribe_loop(self) -> None: # only update what fields that it is responsible for. # More traits can be added here below. self.status.update_from_dps(decoded_dps) + self.rooms.update_from_dps(decoded_dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/rooms.py b/roborock/devices/traits/b01/q10/rooms.py new file mode 100644 index 00000000..7084ceb7 --- /dev/null +++ b/roborock/devices/traits/b01/q10/rooms.py @@ -0,0 +1,68 @@ +"""Rooms trait for Q10 B01 devices.""" + +import logging +from typing import Any + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.data.b01_q10.b01_q10_containers import ( + Q10RoomConfig, + Q10RoomsConfig, + parse_customer_clean_payload, +) +from roborock.devices.traits.common import TraitUpdateListener + +from .command import CommandTrait + +_LOGGER = logging.getLogger(__name__) + + +class RoomsTrait(Q10RoomsConfig, TraitUpdateListener): + """Trait for managing Q10 room configuration.""" + + def __init__(self, command: CommandTrait) -> None: + super().__init__() + TraitUpdateListener.__init__(self, logger=_LOGGER) + self._command = command + + async def refresh(self) -> None: + """Request the current room configuration from the device.""" + await self._command.send(B01_Q10_DP.COMMON, params={B01_Q10_DP.CUSTOMER_CLEAN_REQUEST.code: 0}) + + @property + def room_names(self) -> dict[int, str]: + """Return the current room names keyed by room id.""" + return {room.room_id: room.room_name for room in self.rooms} + + def get_room(self, room_id: int) -> Q10RoomConfig | None: + """Return a room configuration by room id, if known.""" + return self.room_map.get(int(room_id)) + + def get_room_name(self, room_id: int, default: str | None = None) -> str | None: + """Return a room name by room id, optionally with a default.""" + room = self.get_room(room_id) + if room is None: + return default + return room.room_name + + def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None: + """Update the trait from raw DPS data.""" + payload = decoded_dps.get(B01_Q10_DP.CUSTOMER_CLEAN) + if not isinstance(payload, str): + return + + try: + parsed = parse_customer_clean_payload(payload) + except Exception: + _LOGGER.debug("Failed to parse CUSTOMER_CLEAN payload", exc_info=True) + return + + changed = ( + self.raw_length != parsed.raw_length + or self.declared_count != parsed.declared_count + or self.rooms != parsed.rooms + ) + self.raw_length = parsed.raw_length + self.declared_count = parsed.declared_count + self.rooms = parsed.rooms + if changed: + self._notify_update() diff --git a/tests/data/b01_q10/test_b01_q10_containers.py b/tests/data/b01_q10/test_b01_q10_containers.py new file mode 100644 index 00000000..bedf2bbf --- /dev/null +++ b/tests/data/b01_q10/test_b01_q10_containers.py @@ -0,0 +1,98 @@ +"""Tests for Q10 B01 container parsing helpers.""" + +import base64 + +import pytest + +from roborock.data.b01_q10.b01_q10_containers import ( + normalize_q10_room_name, + parse_customer_clean_payload, +) + + +def _build_payload( + *, + room_name_bytes: bytes, + clean_order: int = 3, + clean_count: int = 2, + clean_type: int = 1, + fan_level: int = 4, + water_level: int = 2, + material: int = 0, + clean_line: int = 1, +) -> str: + room_block = bytearray(26) + room_block[0:2] = (42).to_bytes(2, "big") + room_block[2] = 7 + room_block[3:5] = clean_order.to_bytes(2, "big", signed=False) + room_block[5:7] = clean_count.to_bytes(2, "big", signed=False) + room_block[7] = clean_type + room_block[8] = fan_level + room_block[9] = water_level + room_block[10] = material + room_block[11] = clean_line + + room_name = bytearray(20) + room_name[: len(room_name_bytes)] = room_name_bytes + + # No vertices for these targeted parser tests. + raw = bytes([1]) + bytes(room_block) + bytes(room_name) + bytes([0]) + return base64.b64encode(raw).decode("ascii") + + +def test_parse_customer_clean_payload_converts_sentinel_values() -> None: + payload_b64 = _build_payload( + room_name_bytes=bytes([7]) + b"rr_room", + clean_order=0xFFFF, + clean_type=0xFF, + fan_level=0xFF, + water_level=0xFF, + material=0xFF, + clean_line=0xFF, + ) + + parsed = parse_customer_clean_payload(payload_b64) + + assert parsed.declared_count == 1 + assert parsed.parsed_count == 1 + room = parsed.rooms[0] + assert room.clean_order == -1 + assert room.clean_type == -1 + assert room.fan_level == -1 + assert room.water_level == -1 + assert room.material == -1 + assert room.clean_line == -1 + + +@pytest.mark.parametrize( + ("name_prefix", "name_data", "expected"), + [ + (0, b"rr_living_room\x00", "Living Room"), + (25, b"rr_toilet\x00", "Toilet"), + (7, b"rr_hall", "Hall"), + ], +) +def test_parse_customer_clean_payload_handles_name_length_edge_cases( + name_prefix: int, + name_data: bytes, + expected: str, +) -> None: + payload_b64 = _build_payload(room_name_bytes=bytes([name_prefix]) + name_data) + + parsed = parse_customer_clean_payload(payload_b64) + + assert parsed.parsed_count == 1 + assert parsed.rooms[0].room_name == expected + + +@pytest.mark.parametrize( + ("raw_name", "expected"), + [ + ("rr_living_room", "Living Room"), + (" rr_entrance_hall ", "Entrance Hall"), + ("rr_", "rr_"), + ("Kitchen", "Kitchen"), + ], +) +def test_normalize_q10_room_name(raw_name: str, expected: str) -> None: + assert normalize_q10_room_name(raw_name) == expected diff --git a/tests/devices/traits/b01/q10/test_rooms.py b/tests/devices/traits/b01/q10/test_rooms.py new file mode 100644 index 00000000..9a0f2c35 --- /dev/null +++ b/tests/devices/traits/b01/q10/test_rooms.py @@ -0,0 +1,180 @@ +"""Tests for the Q10 B01 rooms trait.""" + +import asyncio +import base64 +import json +from collections.abc import AsyncGenerator +from unittest.mock import AsyncMock, Mock + +import pytest +import pytest_asyncio + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + + +def _build_customer_clean_payload() -> str: + room_block = bytearray(26) + room_block[0:2] = (42).to_bytes(2, "big") + room_block[2] = 7 + room_block[3:5] = (3).to_bytes(2, "big") + room_block[5:7] = (2).to_bytes(2, "big") + room_block[7] = 1 + room_block[8] = 4 + room_block[9] = 2 + room_block[10] = 0 + room_block[11] = 1 + + room_name = bytearray(20) + encoded_name = b"rr_living_room" + room_name[0] = len(encoded_name) + room_name[1 : 1 + len(encoded_name)] = encoded_name + + vertices = bytearray() + vertices.append(2) + vertices.extend((100).to_bytes(2, "big")) + vertices.extend((250).to_bytes(2, "big")) + vertices.extend((300).to_bytes(2, "big")) + vertices.extend((450).to_bytes(2, "big")) + + raw = bytes([1]) + bytes(room_block) + bytes(room_name) + bytes(vertices) + return base64.b64encode(raw).decode("ascii") + + +def _build_message_with_customer_clean(payload_b64: str) -> RoborockMessage: + payload = {"dps": {str(B01_Q10_DP.COMMON.code): {str(B01_Q10_DP.CUSTOMER_CLEAN.code): payload_b64}}} + return RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=json.dumps(payload).encode("utf-8"), + version=b"B01", + ) + + +@pytest.fixture +def mock_channel() -> AsyncMock: + return AsyncMock() + + +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + return asyncio.Queue() + + +@pytest.fixture +def mock_subscribe_stream(mock_channel: AsyncMock, message_queue: asyncio.Queue[RoborockMessage]) -> Mock: + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + try: + while True: + yield await message_queue.get() + except asyncio.CancelledError: + return + + mock = Mock(return_value=mock_stream()) + mock_channel.subscribe_stream = mock + return mock + + +@pytest_asyncio.fixture +async def q10_api(mock_channel: AsyncMock, mock_subscribe_stream: Mock) -> AsyncGenerator[Q10PropertiesApi, None]: + api = create(mock_channel) + await api.start() + yield api + await api.close() + + +async def wait_for_room_count(api: Q10PropertiesApi, value: int, timeout: float = 2.0) -> None: + for _ in range(int(timeout / 0.1)): + if api.rooms.parsed_count == value: + return + await asyncio.sleep(0.1) + pytest.fail(f"Timeout waiting for parsed_count={value}") + + +async def test_rooms_trait_streaming( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + payload_b64 = _build_customer_clean_payload() + + assert q10_api.rooms.parsed_count == 0 + + message_queue.put_nowait(_build_message_with_customer_clean(payload_b64)) + + await wait_for_room_count(q10_api, 1) + + assert q10_api.rooms.declared_count == 1 + assert q10_api.rooms.raw_length > 0 + room = q10_api.rooms.room_map[42] + assert room.room_name == "Living Room" + assert room.room_type == 7 + assert room.clean_order == 3 + assert room.clean_count == 2 + assert room.clean_type == 1 + assert room.fan_level == 4 + assert room.water_level == 2 + assert room.clean_line == 1 + assert room.vertices_num == 2 + assert room.vertices[0].x == pytest.approx(10.0) + assert room.vertices[0].y == pytest.approx(25.0) + assert room.vertices[1].x == pytest.approx(30.0) + assert room.vertices[1].y == pytest.approx(45.0) + + +def test_rooms_trait_update_listener(q10_api: Q10PropertiesApi) -> None: + event = asyncio.Event() + + unsubscribe = q10_api.rooms.add_update_listener(event.set) + q10_api.rooms.update_from_dps({B01_Q10_DP.CUSTOMER_CLEAN: _build_customer_clean_payload()}) + + assert event.is_set() + event.clear() + + unsubscribe() + q10_api.rooms.update_from_dps({B01_Q10_DP.CUSTOMER_CLEAN: _build_customer_clean_payload()}) + + assert not event.is_set() + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +@pytest.fixture(name="direct_q10_api") +def direct_q10_api_fixture(fake_channel: FakeChannel) -> Q10PropertiesApi: + return Q10PropertiesApi(fake_channel) # type: ignore[arg-type] + + +async def test_rooms_trait_refresh_requests_customer_clean( + direct_q10_api: Q10PropertiesApi, + fake_channel: FakeChannel, +) -> None: + await direct_q10_api.rooms.refresh() + + assert len(fake_channel.published_messages) == 1 + message = fake_channel.published_messages[0] + assert message.payload + assert json.loads(message.payload.decode()) == { + "dps": { + str(B01_Q10_DP.COMMON.code): { + str(B01_Q10_DP.CUSTOMER_CLEAN_REQUEST.code): 0, + } + } + } + + +async def test_rooms_api_helpers_reflect_current_room_state( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + message_queue.put_nowait(_build_message_with_customer_clean(_build_customer_clean_payload())) + + await wait_for_room_count(q10_api, 1) + + assert q10_api.room_names == {42: "Living Room"} + assert q10_api.room_map[42].room_name == "Living Room" + assert q10_api.get_room(42) is not None + assert q10_api.get_room_name(42) == "Living Room" + assert q10_api.get_room_name(999, default="unknown") == "unknown"