Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions roborock/data/b01_q10/b01_q10_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
automatically update objects from raw device responses.
"""

import base64
from dataclasses import dataclass, field

from ..containers import RoborockBase
Expand All @@ -19,6 +20,14 @@
YXWaterLevel,
)

_Q10_CUSTOMER_CLEAN_COUNT_SIZE = 1
_Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE = 26
_Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE = 20
_Q10_CUSTOMER_CLEAN_VERTEX_SIZE = 4
_Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH = _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE - 1
_Q10_SENTINEL_U8 = 0xFF
_Q10_SENTINEL_U16 = 0xFFFF


@dataclass
class dpCleanRecord(RoborockBase):
Expand Down Expand Up @@ -81,6 +90,156 @@ class dpTimeZone(RoborockBase):
timeZoneSec: int


@dataclass
class Q10RoomVertex(RoborockBase):
x_raw: int
y_raw: int
x: float
y: float


@dataclass
class Q10RoomConfig(RoborockBase):
index: int
room_id: int
room_type: int
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these all some kind of enum value? trying to reason about what "-1" will mean here and wondering if int | None would be better or RoomTypeEnum | None if we can figure those out.

clean_order: int
clean_count: int
clean_type: int
fan_level: int
water_level: int
material: int
clean_line: int
room_name: str
vertices_num: int
vertices: list[Q10RoomVertex] = field(default_factory=list)


@dataclass
class Q10RoomsConfig(RoborockBase):
raw_length: int = 0
declared_count: int = 0
rooms: list[Q10RoomConfig] = field(default_factory=list)

@property
def parsed_count(self) -> int:
return len(self.rooms)

@property
def room_map(self) -> dict[int, Q10RoomConfig]:
return {room.room_id: room for room in self.rooms}


def parse_customer_clean_payload(payload_b64: str) -> Q10RoomsConfig:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to me like it fits better next to RoomsTrait.

"""Parse CUSTOMER_CLEAN payload.

Layout:
- 1 byte: room count
- For each room:
- 26-byte room metadata block
- 20-byte room name block
- 1 byte vertex count followed by 4 bytes per vertex (x, y as u16)
"""
raw = base64.b64decode(payload_b64)
if not raw:
return Q10RoomsConfig()

count = raw[0]
offset = _Q10_CUSTOMER_CLEAN_COUNT_SIZE
rooms: list[Q10RoomConfig] = []

for index in range(count):
if offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE > len(raw):
break
Comment thread
lboue marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure about all of theses breaks. if one step fails, we are returning partial data, which I think can be problematic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we could just check them all at once. Seems like the total length be the count * (26+20+1] then read vertex count then check its vertex_count * _Q10_CUSTOMER_CLEAN_VERTEX_SIZE, then we don't ned to check again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if using construct would be helpful like how some of the other packages are parsed in protocol.py...


room_block = raw[offset : offset + _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE]
room_id = int.from_bytes(room_block[0:2], "big", signed=False)
room_type = room_block[2]
clean_order = int.from_bytes(room_block[3:5], "big", signed=False)
clean_count = int.from_bytes(room_block[5:7], "big", signed=False)
clean_type = room_block[7]
fan_level = room_block[8]
water_level = room_block[9]
material = room_block[10]
clean_line = room_block[11]

if clean_order == _Q10_SENTINEL_U16:
clean_order = -1
if clean_type == _Q10_SENTINEL_U8:
clean_type = -1
if fan_level == _Q10_SENTINEL_U8:
fan_level = -1
if water_level == _Q10_SENTINEL_U8:
water_level = -1
if material == _Q10_SENTINEL_U8:
material = -1
if clean_line == _Q10_SENTINEL_U8:
clean_line = -1

offset += _Q10_CUSTOMER_CLEAN_ROOM_BLOCK_SIZE
if offset + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE > len(raw):
break

room_name_bytes = raw[offset : offset + _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE]
offset += _Q10_CUSTOMER_CLEAN_NAME_BLOCK_SIZE
name_len = room_name_bytes[0]
if 0 < name_len <= _Q10_CUSTOMER_CLEAN_NAME_MAX_LENGTH:
room_name = room_name_bytes[1 : 1 + name_len].decode("utf-8", errors="replace")
else:
room_name = room_name_bytes[1:].split(b"\x00", 1)[0].decode("utf-8", errors="replace")
room_name = normalize_q10_room_name(room_name)

if offset + 1 > len(raw):
break

vertices_num = raw[offset]
offset += 1
bytes_needed = vertices_num * _Q10_CUSTOMER_CLEAN_VERTEX_SIZE
if offset + bytes_needed > len(raw):
break

vertices: list[Q10RoomVertex] = []
for _ in range(vertices_num):
x_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False)
offset += 2
y_raw = int.from_bytes(raw[offset : offset + 2], "big", signed=False)
offset += 2
vertices.append(Q10RoomVertex(x_raw=x_raw, y_raw=y_raw, x=x_raw / 10.0, y=y_raw / 10.0))

rooms.append(
Q10RoomConfig(
index=index,
room_id=room_id,
room_type=room_type,
clean_order=clean_order,
clean_count=clean_count,
clean_type=clean_type,
fan_level=fan_level,
water_level=water_level,
material=material,
clean_line=clean_line,
room_name=room_name,
vertices_num=vertices_num,
vertices=vertices,
)
)

return Q10RoomsConfig(raw_length=len(raw), declared_count=count, rooms=rooms)


def normalize_q10_room_name(room_name: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also consider making this a property on Q10RoomConfig which exposes the name (e.g.raw_name and name as a property.)

"""Normalize room names reported by the Q10 firmware."""
Comment thread
lboue marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe would be good to give some examples in the pydoc of what this is doing.

raw_name = room_name.strip()
if not raw_name:
return raw_name
if raw_name.startswith("rr_"):
normalized = raw_name[3:].replace("_", " ").strip()
if not normalized:
return raw_name
return " ".join(part.capitalize() for part in normalized.split())
return raw_name


@dataclass
class Q10Status(RoborockBase):
"""Status for Q10 devices.
Expand Down
29 changes: 29 additions & 0 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import logging

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.b01_q10.b01_q10_containers import Q10RoomConfig
from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel

from .command import CommandTrait
from .remote import RemoteTrait
from .rooms import RoomsTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

Expand All @@ -35,12 +37,26 @@ class Q10PropertiesApi(Trait):
remote: RemoteTrait
"""Trait for sending remote control related commands to Q10 devices."""

rooms: RoomsTrait
"""Trait for reading room configuration from Q10 devices."""

@property
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like these should just be on RoomsTrait and not here

def room_map(self) -> dict[int, Q10RoomConfig]:
"""Return the current room configurations keyed by room id."""
return self.rooms.room_map

@property
def room_names(self) -> dict[int, str]:
"""Return the current room names keyed by room id."""
return self.rooms.room_names

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.remote = RemoteTrait(self.command)
self.rooms = RoomsTrait(self.command)
self.status = StatusTrait()
self._subscribe_task: asyncio.Task[None] | None = None

Expand All @@ -64,6 +80,18 @@ async def refresh(self) -> None:
# to the device. Updates will be received by the subscribe loop below.
await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

async def refresh_room_config(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, these should all be on the rooms trait

"""Refresh only the room configuration."""
await self.rooms.refresh()

def get_room(self, room_id: int) -> Q10RoomConfig | None:
"""Return a room configuration by id, if known."""
return self.rooms.get_room(room_id)

def get_room_name(self, room_id: int, default: str | None = None) -> str | None:
"""Return a room name by id, if known."""
return self.rooms.get_room_name(room_id, default)

async def _subscribe_loop(self) -> None:
"""Persistent loop to listen for status updates."""
async for decoded_dps in stream_decoded_responses(self._channel):
Expand All @@ -73,6 +101,7 @@ async def _subscribe_loop(self) -> None:
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)
self.rooms.update_from_dps(decoded_dps)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
68 changes: 68 additions & 0 deletions roborock/devices/traits/b01/q10/rooms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Rooms trait for Q10 B01 devices."""

import logging
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.b01_q10.b01_q10_containers import (
Q10RoomConfig,
Q10RoomsConfig,
parse_customer_clean_payload,
)
from roborock.devices.traits.common import TraitUpdateListener

from .command import CommandTrait

_LOGGER = logging.getLogger(__name__)


class RoomsTrait(Q10RoomsConfig, TraitUpdateListener):
"""Trait for managing Q10 room configuration."""

def __init__(self, command: CommandTrait) -> None:
super().__init__()
TraitUpdateListener.__init__(self, logger=_LOGGER)
self._command = command

async def refresh(self) -> None:
"""Request the current room configuration from the device."""
await self._command.send(B01_Q10_DP.COMMON, params={B01_Q10_DP.CUSTOMER_CLEAN_REQUEST.code: 0})

@property
def room_names(self) -> dict[int, str]:
"""Return the current room names keyed by room id."""
return {room.room_id: room.room_name for room in self.rooms}

def get_room(self, room_id: int) -> Q10RoomConfig | None:
"""Return a room configuration by room id, if known."""
return self.room_map.get(int(room_id))

def get_room_name(self, room_id: int, default: str | None = None) -> str | None:
"""Return a room name by room id, optionally with a default."""
room = self.get_room(room_id)
if room is None:
return default
return room.room_name

def update_from_dps(self, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
"""Update the trait from raw DPS data."""
payload = decoded_dps.get(B01_Q10_DP.CUSTOMER_CLEAN)
if not isinstance(payload, str):
return

try:
parsed = parse_customer_clean_payload(payload)
except Exception:
_LOGGER.debug("Failed to parse CUSTOMER_CLEAN payload", exc_info=True)
return

changed = (
self.raw_length != parsed.raw_length
or self.declared_count != parsed.declared_count
or self.rooms != parsed.rooms
)
self.raw_length = parsed.raw_length
self.declared_count = parsed.declared_count
self.rooms = parsed.rooms
if changed:
self._notify_update()
Loading
Loading