core/homeassistant/components/shelly/cover.py

236 lines
7.9 KiB
Python

"""Cover for Shelly."""
from __future__ import annotations
from typing import Any, cast
from aioshelly.block_device import Block
from aioshelly.const import RPC_GENERATIONS
from homeassistant.components.cover import (
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
CoverEntityFeature,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .coordinator import ShellyBlockCoordinator, ShellyConfigEntry, ShellyRpcCoordinator
from .entity import ShellyBlockEntity, ShellyRpcEntity
from .utils import get_device_entry_gen, get_rpc_key_ids
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ShellyConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up covers for device."""
if get_device_entry_gen(config_entry) in RPC_GENERATIONS:
return async_setup_rpc_entry(hass, config_entry, async_add_entities)
return async_setup_block_entry(hass, config_entry, async_add_entities)
@callback
def async_setup_block_entry(
hass: HomeAssistant,
config_entry: ShellyConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up cover for device."""
coordinator = config_entry.runtime_data.block
assert coordinator and coordinator.device.blocks
blocks = [block for block in coordinator.device.blocks if block.type == "roller"]
if not blocks:
return
async_add_entities(BlockShellyCover(coordinator, block) for block in blocks)
@callback
def async_setup_rpc_entry(
hass: HomeAssistant,
config_entry: ShellyConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up entities for RPC device."""
coordinator = config_entry.runtime_data.rpc
assert coordinator
cover_key_ids = get_rpc_key_ids(coordinator.device.status, "cover")
if not cover_key_ids:
return
async_add_entities(RpcShellyCover(coordinator, id_) for id_ in cover_key_ids)
class BlockShellyCover(ShellyBlockEntity, CoverEntity):
"""Entity that controls a cover on block based Shelly devices."""
_attr_device_class = CoverDeviceClass.SHUTTER
_attr_supported_features: CoverEntityFeature = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
)
def __init__(self, coordinator: ShellyBlockCoordinator, block: Block) -> None:
"""Initialize block cover."""
super().__init__(coordinator, block)
self.control_result: dict[str, Any] | None = None
if self.coordinator.device.settings["rollers"][0]["positioning"]:
self._attr_supported_features |= CoverEntityFeature.SET_POSITION
@property
def is_closed(self) -> bool:
"""If cover is closed."""
if self.control_result:
return cast(bool, self.control_result["current_pos"] == 0)
return cast(int, self.block.rollerPos) == 0
@property
def current_cover_position(self) -> int:
"""Position of the cover."""
if self.control_result:
return cast(int, self.control_result["current_pos"])
return cast(int, self.block.rollerPos)
@property
def is_closing(self) -> bool:
"""Return if the cover is closing."""
if self.control_result:
return cast(bool, self.control_result["state"] == "close")
return self.block.roller == "close"
@property
def is_opening(self) -> bool:
"""Return if the cover is opening."""
if self.control_result:
return cast(bool, self.control_result["state"] == "open")
return self.block.roller == "open"
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close cover."""
self.control_result = await self.set_state(go="close")
self.async_write_ha_state()
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open cover."""
self.control_result = await self.set_state(go="open")
self.async_write_ha_state()
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
self.control_result = await self.set_state(
go="to_pos", roller_pos=kwargs[ATTR_POSITION]
)
self.async_write_ha_state()
async def async_stop_cover(self, **_kwargs: Any) -> None:
"""Stop the cover."""
self.control_result = await self.set_state(go="stop")
self.async_write_ha_state()
@callback
def _update_callback(self) -> None:
"""When device updates, clear control result that overrides state."""
self.control_result = None
super()._update_callback()
class RpcShellyCover(ShellyRpcEntity, CoverEntity):
"""Entity that controls a cover on RPC based Shelly devices."""
_attr_device_class = CoverDeviceClass.SHUTTER
_attr_supported_features: CoverEntityFeature = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
)
def __init__(self, coordinator: ShellyRpcCoordinator, id_: int) -> None:
"""Initialize rpc cover."""
super().__init__(coordinator, f"cover:{id_}")
self._id = id_
if self.status["pos_control"]:
self._attr_supported_features |= CoverEntityFeature.SET_POSITION
if coordinator.device.config[f"cover:{id_}"].get("slat", {}).get("enable"):
self._attr_supported_features |= (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
@property
def is_closed(self) -> bool | None:
"""If cover is closed."""
return cast(bool, self.status["state"] == "closed")
@property
def current_cover_position(self) -> int | None:
"""Position of the cover."""
if not self.status["pos_control"]:
return None
return cast(int, self.status["current_pos"])
@property
def current_cover_tilt_position(self) -> int | None:
"""Return current position of cover tilt."""
if "slat_pos" not in self.status:
return None
return cast(int, self.status["slat_pos"])
@property
def is_closing(self) -> bool:
"""Return if the cover is closing."""
return cast(bool, self.status["state"] == "closing")
@property
def is_opening(self) -> bool:
"""Return if the cover is opening."""
return cast(bool, self.status["state"] == "opening")
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close cover."""
await self.call_rpc("Cover.Close", {"id": self._id})
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open cover."""
await self.call_rpc("Cover.Open", {"id": self._id})
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
await self.call_rpc(
"Cover.GoToPosition", {"id": self._id, "pos": kwargs[ATTR_POSITION]}
)
async def async_stop_cover(self, **_kwargs: Any) -> None:
"""Stop the cover."""
await self.call_rpc("Cover.Stop", {"id": self._id})
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
await self.call_rpc("Cover.GoToPosition", {"id": self._id, "slat_pos": 100})
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
await self.call_rpc("Cover.GoToPosition", {"id": self._id, "slat_pos": 0})
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
await self.call_rpc(
"Cover.GoToPosition",
{"id": self._id, "slat_pos": kwargs[ATTR_TILT_POSITION]},
)
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.call_rpc("Cover.Stop", {"id": self._id})