ソースを参照

Initial code commit

DebenOldert 7 ヶ月 前
コミット
0c1d74d27a

+ 128 - 0
custom_components/odido_zyxel_5g/__init__.py

@@ -0,0 +1,128 @@
+"""
+Custom integration to integrate ZYXEL router statistics
+for Odido Klik & Klaar internet subscriptions.
+
+For more details about this integration, please refer to
+https://github.com/golles/ha-knmi/
+"""
+
+import logging
+from datetime import timedelta
+
+from homeassistant.config_entries import ConfigEntry
+from homeassistant.const import (
+    CONF_IP_ADDRESS,
+    CONF_USERNAME,
+    CONF_PASSWORD,
+    CONF_SCAN_INTERVAL,
+    Platform,
+)
+from homeassistant.core import HomeAssistant
+from homeassistant.helpers import entity_registry as er
+from homeassistant.helpers.aiohttp_client import async_get_clientsession
+from homeassistant.helpers.entity import DeviceInfo
+
+
+from .api import RouterApiClient
+from .const import (DEFAULT_SCAN_INTERVAL,
+                    DOMAIN,
+                    API_DEVICESTATUS,
+                    API_SCHEMA)
+
+from .coordinator import RouterDataUpdateCoordinator
+
+PLATFORMS: list[Platform] = [
+    Platform.BINARY_SENSOR,
+    Platform.DEVICE_TRACKER,
+    Platform.SENSOR
+]
+
+_LOGGER: logging.Logger = logging.getLogger(__package__)
+
+async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
+    """Set up this integration using UI."""
+    hass.data.setdefault(DOMAIN, {})
+
+    endpoint = entry.data.get(CONF_IP_ADDRESS)
+    user = entry.data.get(CONF_USERNAME)
+    password = entry.data.get(CONF_PASSWORD)
+    scan_interval_seconds = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
+    
+    scan_interval = timedelta(seconds=scan_interval_seconds)
+
+    session = async_get_clientsession(hass)
+    client = RouterApiClient(endpoint=endpoint,
+                             user=user,
+                             password=password,
+                             session=session)
+
+    _LOGGER.debug(
+        "Set up entry, with scan_interval of %s seconds",
+        scan_interval_seconds,
+    )
+
+    client.async_login()
+
+    data = await client.async_query_api(oid=API_DEVICESTATUS)
+
+    info = data['DeviceInfo']
+
+    device_info = DeviceInfo(
+        configuration_url=f'{API_SCHEMA}://{endpoint}',
+        identifiers={(DOMAIN, entry.entry_id)},
+        model=info['ModelName'],
+        manufacturer=info['Manufacturer'],
+        name=info['Description'],
+        sw_version=info['SoftwareVersion'],
+        hw_version=info['HardwareVersion'],
+        model_id=info['ProductClass'],
+        serial_number=['SerialNumber'],
+    )
+
+    hass.data[DOMAIN][entry.entry_id] = coordinator = RouterDataUpdateCoordinator(
+        hass=hass,
+        client=client,
+        device_info=device_info,
+        scan_interval=scan_interval,
+    )
+
+    await coordinator.async_config_entry_first_refresh()
+
+    await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
+    entry.async_on_unload(entry.add_update_listener(async_reload_entry))
+
+    return True
+
+
+async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
+    """Handle removal of an entry."""
+    if unloaded := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
+        hass.data[DOMAIN].pop(entry.entry_id)
+    return unloaded
+
+
+async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
+    """Reload config entry."""
+    await async_unload_entry(hass, entry)
+    await async_setup_entry(hass, entry)
+
+
+async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
+    """Migrate old entry."""
+    _LOGGER.debug("Migrating from version %s", config_entry.version)
+
+    if config_entry.version == 1:
+        hass.config_entries.async_update_entry(config_entry, version=2)
+
+        entity_registry = er.async_get(hass)
+        existing_entries = er.async_entries_for_config_entry(
+            entity_registry, config_entry.entry_id
+        )
+
+        for entry in list(existing_entries):
+            _LOGGER.debug("Deleting version 1 entity: %s", entry.entity_id)
+            entity_registry.async_remove(entry.entity_id)
+
+    _LOGGER.debug("Migration to version %s successful", config_entry.version)
+
+    return True

+ 108 - 0
custom_components/odido_zyxel_5g/api.py

@@ -0,0 +1,108 @@
+"""ZYXEL API Client"""
+
+import asyncio
+import base64
+
+import aiohttp
+
+from .const import (API_TIMEOUT,
+                    API_SCHEMA,
+                    API_BASE_PATH,
+                    API_LOGIN_PATH,
+                    LOGIN_PAYLOAD,
+                    KEY_RESULT,
+                    VAL_SUCCES,
+                    KEY_OBJECT)
+
+
+class RouterApiClientError(Exception):
+    """Exception to indicate a general API error."""
+
+
+class RouterApiClientCommunicationError(RouterApiClientError):
+    """Exception to indicate a communication error."""
+
+
+class RouterApiClientLoginError(RouterApiClientError):
+    """Exception to indicate an api key error."""
+
+
+class RouterApiClientResponseError(RouterApiClientError):
+    """Exception to indicate a response error."""
+
+
+class RouterApiClient:
+    """Router API wrapper for ZYXEL routers"""
+
+    def __init__(
+        self,
+        endpoint: str,
+        user: str,
+        password: str,
+        session: aiohttp.ClientSession,
+    ) -> None:
+        """ZYXEL API Client."""
+        self.endpoint = endpoint
+        self.user = user
+        self.password = password
+        self._session = session
+
+    async def async_login(self) -> bool:
+        """Login and obtain the session cookie"""
+
+        payload = LOGIN_PAYLOAD.copy()
+        payload['Input_Account'] = self.user
+        payload['Input_Passwd'] = base64.b64encode(
+            self.password.encode('utf-8')).decode('utf-8')
+
+        response = await self._session.post(
+            f'{API_SCHEMA}://{self.endpoint}{API_LOGIN_PATH}',
+            json=payload)
+        
+        if response.ok:
+            try:
+                data = response.json()
+
+                if 'result' in data:
+                    if data['result'] == 'ZCFG_SUCCESS':
+                        return True
+                    else:
+                        raise RouterApiClientLoginError('Login failed')
+                else:
+                    raise RouterApiClientResponseError('Key "result" not set in response')
+                    
+            except Exception as json_exception:
+                raise RouterApiClientResponseError(f'Unable to decode login response') \
+                    from json_exception
+            
+        raise RouterApiClientCommunicationError(
+            f'Error connecting to router. Status: {response.status}')
+    
+    async def async_query_api(self,
+                              oid: str) -> dict:
+        """Query an authenticated API endpoint"""
+        try:
+            async with asyncio.timeout(API_TIMEOUT):
+                response = await self._session.get(
+                    f'{API_SCHEMA}://{self.endpoint}{API_BASE_PATH}',
+                    params={'oid': oid})
+
+                if response.ok:
+                    try:
+                        data: dict = await response.json()
+
+                        if data.get(KEY_RESULT, None) == VAL_SUCCES:
+                            return data.get(KEY_OBJECT, [{}])[0]
+                        else:
+                            raise RouterApiClientResponseError(f'Response returned error')
+
+                    except Exception as json_exception:
+                        raise RouterApiClientResponseError(f'Unable to decode JSON') \
+                            from json_exception
+                else:
+                    raise RouterApiClientCommunicationError(
+                        f'Error retrieving API. Status: {response.status}')
+
+        except Exception as exception:
+            raise RouterApiClientCommunicationError('Unable to connect to router API') \
+                from exception

+ 125 - 0
custom_components/odido_zyxel_5g/config_flow.py

@@ -0,0 +1,125 @@
+"""Adds config flow for ZYXEL."""
+
+import logging
+from typing import Any
+
+from homeassistant.config_entries import (
+    ConfigEntry,
+    ConfigFlow,
+    FlowResult,
+    OptionsFlow,
+)
+from homeassistant.const import (
+    CONF_NAME,
+    CONF_SCAN_INTERVAL,
+    CONF_USERNAME,
+    CONF_PASSWORD,
+    CONF_IP_ADDRESS
+)
+from homeassistant.core import callback
+from homeassistant.helpers.aiohttp_client import async_create_clientsession
+import homeassistant.helpers.config_validation as cv
+import voluptuous as vol
+
+from .api import (
+    RouterApiClient,
+    RouterApiClientError,
+    RouterApiClientLoginError,
+    RouterApiClientCommunicationError,
+    RouterApiClientResponseError,
+)
+from .const import (DEFAULT_SCAN_INTERVAL,
+                    DOMAIN,
+                    DEFAULT_IP,
+                    DEFAULT_USER)
+
+_LOGGER: logging.Logger = logging.getLogger(__package__)
+
+
+class RouterFlowHandler(ConfigFlow, domain=DOMAIN):
+    """Config flow for Odido Router."""
+
+    VERSION = 1
+
+    async def async_step_user(
+        self, user_input: dict[str, Any] | None = None
+    ) -> FlowResult:
+        """Handle a flow initialized by the user."""
+        _errors = {}
+        if user_input is not None:
+            try:
+                await self._validate_user_input(
+                    user_input[CONF_IP_ADDRESS],
+                    user_input[CONF_USERNAME],
+                    user_input[CONF_PASSWORD],
+                )
+            except RouterApiClientCommunicationError as exception:
+                _LOGGER.error(exception)
+                _errors["base"] = "general"
+            except RouterApiClientLoginError as exception:
+                _LOGGER.error(exception)
+                _errors["base"] = "api_key"
+            except RouterApiClientResponseError as exception:
+                _LOGGER.error(exception)
+                _errors["base"] = "daily_limit"
+            else:
+                return self.async_create_entry(
+                    title=user_input[CONF_NAME], data=user_input
+                )
+
+        return self.async_show_form(
+            step_id="user",
+            data_schema=vol.Schema(
+                {
+                    vol.Required(
+                        CONF_IP_ADDRESS, default=DEFAULT_IP
+                    ): str,
+                    vol.Required(
+                        CONF_USERNAME, default=DEFAULT_USER
+                    ): str,
+                    vol.Required(CONF_PASSWORD): str
+                }
+            ),
+            errors=_errors,
+        )
+
+    async def _validate_user_input(self, endpoint: str, user: str, password: str):
+        """Validate user input."""
+        session = async_create_clientsession(self.hass)
+        client = RouterApiClient(endpoint=endpoint,
+                                 user=user,
+                                 password=password,
+                                 session=session)
+        await client.async_login()
+
+    @staticmethod
+    @callback
+    def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
+        return RouterOptionsFlowHandler()
+
+
+class RouterOptionsFlowHandler(OptionsFlow):
+    """Router config flow options handler."""
+
+    async def async_step_init(
+        self, user_input: dict[str, Any] | None = None
+    ) -> FlowResult:
+        """Manage the options."""
+        if user_input is not None:
+            return self.async_create_entry(
+                title=self.config_entry.data.get(CONF_NAME), data=user_input
+            )
+
+        return self.async_show_form(
+            step_id="init",
+            data_schema=vol.Schema(
+                {
+                    vol.Required(
+                        CONF_SCAN_INTERVAL,
+                        default=self.config_entry.options.get(
+                            CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
+                        ),
+                    ): vol.All(vol.Coerce(int), vol.Range(min=30, max=3600))
+                }
+            ),
+        )

+ 41 - 0
custom_components/odido_zyxel_5g/const.py

@@ -0,0 +1,41 @@
+"""Constants for ZYXEL"""
+
+from typing import Final
+
+# API
+API_SCHEMA: Final[str] = 'https'
+API_BASE_PATH: Final[str] = '/cgi-bin/DAL'
+API_LOGIN_PATH: Final[str] = '/UserLogin'
+API_TIMEOUT: Final = 10
+API_TIMEZONE: Final = "Europe/Amsterdam"
+
+# Endpoints
+EP_CELLINFO: Final[str] = 'status'
+EP_LANINFO: Final[str] = 'lanhosts'
+EP_DEVICESTATUS: Final[str] = 'cardpage_status'
+
+# Keys & values
+KEY_RESULT: Final[str] = 'result'
+KEY_OBJECT: Final[str] = 'Object'
+VAL_SUCCES: Final[str] = 'ZCFG_SUCCESS'
+
+# Base component constants.
+DOMAIN: Final = "Odido"
+NAME: Final = "ZYXEL"
+SUPPLIER: Final = "Odido"
+VERSION: Final = "0.0.1"
+
+# Defaults
+DEFAULT_IP: Final[str] = '192.168.1.1'
+DEFAULT_USER: Final[str] = 'admin'
+DEFAULT_NAME: Final[str] = NAME
+DEFAULT_SCAN_INTERVAL: Final = 60
+
+# Payloads
+LOGIN_PAYLOAD: dict = {
+    'Input_Account': None,
+    'Input_Passwd': None,
+    'currLang': 'en',
+    'RememberPassword': 0,
+    'SHA512_password': False
+}

+ 146 - 0
custom_components/odido_zyxel_5g/coordinator.py

@@ -0,0 +1,146 @@
+"""Update coordinator for """
+
+from datetime import datetime, timedelta
+import logging
+import re
+import asyncio
+from typing import Any
+
+from homeassistant.config_entries import ConfigEntry
+from homeassistant.core import HomeAssistant
+from homeassistant.helpers.entity import DeviceInfo
+from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
+from homeassistant.helpers.typing import StateType
+from homeassistant.util import dt
+
+from .api import RouterApiClient
+from .const import (API_TIMEZONE, 
+                    DOMAIN,
+                    EP_CELLINFO,
+                    EP_DEVICESTATUS,
+                    EP_LANINFO)
+
+_LOGGER: logging.Logger = logging.getLogger(__package__)
+
+
+class RouterDataUpdateCoordinator(DataUpdateCoordinator):
+    """Class to manage fetching data from the API."""
+
+    config_entry: ConfigEntry
+
+    def __init__(
+        self,
+        hass: HomeAssistant,
+        client: RouterApiClient,
+        device_info: DeviceInfo,
+        scan_interval: timedelta,
+    ) -> None:
+        """Initialize"""
+        self.api = client
+        self.device_info = device_info
+
+        super().__init__(
+            hass=hass,
+            logger=_LOGGER,
+            name=DOMAIN,
+            update_interval=scan_interval,
+        )
+
+    async def _async_update_data(self) -> dict:
+        """Update data via library"""
+        try:
+            await self.api.async_login()
+
+            # Get API endpoints
+            endpoints = [EP_CELLINFO,
+                         EP_DEVICESTATUS,
+                         EP_LANINFO]
+            
+            results = await asyncio.gather(
+                *[self.api.async_query_api(oid=endpoint) for endpoint in endpoints],
+                return_exceptions=True)
+            
+            _LOGGER.debug(results)
+
+        except Exception as exception:
+            _LOGGER.error("Update failed! - %s", exception)
+            raise UpdateFailed() from exception
+
+
+    def get_value(self, endpoint: str, path: list[int | str], default=None) -> StateType:
+        """
+        Get a value from the data by a given path.
+        When the value is absent, the default (None) will be returned and an error will be logged.
+        """
+        value = self.data
+
+        try:
+            for key in path:
+                value = value[key]
+
+            value_type = type(value).__name__
+
+            if value_type in ["int", "float", "str"]:
+                _LOGGER.debug(
+                    "Path %s returns a %s (value = %s)", path, value_type, value
+                )
+            else:
+                _LOGGER.debug("Path %s returns a %s", path, value_type)
+
+            return value
+        except (IndexError, KeyError):
+            _LOGGER.warning("Can't find a value for %s in the API response", path)
+            return default
+
+    def get_value_datetime(
+        self, path: list[int | str], default=None
+    ) -> datetime | None:
+        """
+        Get a datetime value from the data by a given path.
+        When the value is absent, the default (None) will be returned and an error will be logged.
+        """
+        timezone = dt.get_time_zone(API_TIMEZONE)
+        value = self.get_value(path, default)
+
+        # Timestamp.
+        if isinstance(value, int):
+            if value > 0:
+                _LOGGER.debug("convert %s to datetime (from timestamp)", value)
+                return datetime.fromtimestamp(value, tz=timezone)
+
+            return default
+
+        # Time.
+        if re.match(r"^\d{2}:\d{2}$", value):
+            _LOGGER.debug("convert %s to datetime (from time HH:MM)", value)
+            time_array = value.split(":")
+            today = datetime.now(tz=timezone)
+            return today.replace(
+                hour=int(time_array[0]),
+                minute=int(time_array[1]),
+                second=0,
+                microsecond=0,
+            )
+
+        # Date.
+        if re.match(r"^\d{2}-\d{2}-\d{4}$", value):
+            _LOGGER.debug("convert %s to datetime (from date DD-MM-YYYY)", value)
+            return datetime.strptime(value, "%d-%m-%Y").replace(tzinfo=timezone)
+
+        # Date and time.
+        if re.match(r"^\d{2}-\d{2}-\d{4} \d{2}:\d{2}:\d{2}$", value):
+            _LOGGER.debug(
+                "convert %s to datetime (from date and time DD-MM-YYYY HH:MM:SS)", value
+            )
+            return datetime.strptime(value, "%d-%m-%Y %H:%M:%S").replace(
+                tzinfo=timezone
+            )
+
+        # Date and time without seconds.
+        if re.match(r"^\d{2}-\d{2}-\d{4} \d{2}:\d{2}$", value):
+            _LOGGER.debug(
+                "convert %s to datetime (from date and time DD-MM-YYYY HH:MM)", value
+            )
+            return datetime.strptime(value, "%d-%m-%Y %H:%M").replace(tzinfo=timezone)
+
+        return default

+ 157 - 0
custom_components/odido_zyxel_5g/sensor.py

@@ -0,0 +1,157 @@
+"""Sensor platform for knmi."""
+
+from collections.abc import Callable
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any
+
+from homeassistant.components.sensor import (
+    SensorDeviceClass,
+    SensorEntity,
+    SensorEntityDescription,
+    SensorStateClass,
+)
+from homeassistant.config_entries import ConfigEntry
+from homeassistant.const import (
+    CONF_NAME,
+    PERCENTAGE,
+    UnitOfSoundPressure,
+    UnitOfDataRate
+)
+from homeassistant.core import HomeAssistant
+from homeassistant.helpers.entity import EntityCategory
+from homeassistant.helpers.entity_platform import AddEntitiesCallback
+from homeassistant.helpers.typing import StateType
+from homeassistant.helpers.update_coordinator import CoordinatorEntity
+
+from .const import (DOMAIN,
+                    EP_CELLINFO,
+                    EP_DEVICESTATUS,
+                    EP_LANINFO)
+from .coordinator import RouterDataUpdateCoordinator
+
+
+@dataclass(kw_only=True, frozen=True)
+class RouterSensorDescription(SensorEntityDescription):
+    """Class describing Router sensor entities."""
+
+    value_fn: Callable[[dict[str, Any]], StateType | datetime | None]
+    attr_fn: Callable[[dict[str, Any]], dict[str, Any]] = lambda _: {}
+
+
+DESCRIPTIONS: list[RouterSensorDescription] = [
+    RouterSensorDescription(
+        key='rssi',
+        icon='mdi:wifi-check',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "RSSI"]),
+        native_unit_of_measurement=UnitOfSoundPressure.WEIGHTED_DECIBEL_A,
+        device_class=SensorDeviceClass.SOUND_PRESSURE,
+        state_class=SensorStateClass.MEASUREMENT,
+        translation_key='rssi',
+        entity_registry_enabled_default=True,
+    ),
+    RouterSensorDescription(
+        key='rsrq',
+        icon='mdi:wifi-arrow-up-down',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "X_ZYXEL_RSRQ"]),
+        native_unit_of_measurement=UnitOfSoundPressure.DECIBEL,
+        device_class=SensorDeviceClass.SOUND_PRESSURE,
+        state_class=SensorStateClass.MEASUREMENT,
+        translation_key='rsrq',
+        entity_registry_enabled_default=False
+    ),
+    RouterSensorDescription(
+        key='rsrp',
+        icon='mdi:wifi-arrow-down',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "X_ZYXEL_RSRP"]),
+        native_unit_of_measurement=UnitOfSoundPressure.WEIGHTED_DECIBEL_A,
+        device_class=SensorDeviceClass.SOUND_PRESSURE,
+        state_class=SensorStateClass.MEASUREMENT,
+        translation_key='rsrp',
+        entity_registry_enabled_default=False
+    ),
+    RouterSensorDescription(
+        key='sinr',
+        icon='mdi:wifi-alert',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "X_ZYXEL_SINR"]),
+        state_class=SensorStateClass.MEASUREMENT,
+        translation_key='sinr',
+        entity_registry_enabled_default=False
+    ),
+    RouterSensorDescription(
+        key='network_technology',
+        icon='mdi:radio-tower',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "CurrentAccessTechnology"]),
+        translation_key='network_technology',
+        entity_registry_enabled_default=True
+    ),
+    RouterSensorDescription(
+        key='network_band',
+        icon='mdi:signal-5g',
+        value_fn=lambda coordinator: coordinator.get_value(EP_CELLINFO, ["CellIntfInfo", "CurrentAccessTechnology"]),
+        translation_key='network_band',
+        entity_registry_enabled_default=False
+    ),
+    # RouterSensorDescription(
+    #     key='network_devices',
+    #     state_class=SensorStateClass.MEASUREMENT,
+    #     translation_key='network_devices',
+    #     entity_registry_enabled_default=True
+    # )
+]
+
+
+async def async_setup_entry(
+    hass: HomeAssistant,
+    entry: ConfigEntry,
+    async_add_entities: AddEntitiesCallback,
+) -> None:
+    """Set up Router sensors based on a config entry."""
+    conf_name = entry.data.get(CONF_NAME)
+    coordinator = hass.data[DOMAIN][entry.entry_id]
+
+    entities: list[RouterSensor] = []
+
+    # Add all sensors described above.
+    for description in DESCRIPTIONS:
+        entities.append(
+            RouterSensor(
+                conf_name=conf_name,
+                coordinator=coordinator,
+                description=description,
+            )
+        )
+
+    async_add_entities(entities)
+
+
+class RouterSensor(CoordinatorEntity[RouterDataUpdateCoordinator], SensorEntity):
+    """Defines a Router sensor."""
+
+    _attr_has_entity_name = True
+    entity_description: RouterSensorDescription
+
+    def __init__(
+        self,
+        conf_name: str,
+        coordinator: RouterDataUpdateCoordinator,
+        description: SensorEntityDescription,
+    ) -> None:
+        """Initialize Router sensor."""
+        super().__init__(coordinator=coordinator)
+
+        self._attr_attribution = self.coordinator.get_value(["api", 0, "bron"])
+        self._attr_device_info = coordinator.device_info
+        self._attr_unique_id = f"{conf_name}_{description.key}".lower()
+
+        self.entity_description = description
+
+    @property
+    def native_value(self) -> StateType:
+        """Return the state."""
+        return self.entity_description.value_fn(self.coordinator)
+
+    @property
+    def extra_state_attributes(self) -> dict[str, Any]:
+        """Return the state attributes."""
+        return self.entity_description.attr_fn(self.coordinator)

+ 5 - 0
hacs.json

@@ -0,0 +1,5 @@
+{
+    "name": "Odido Klik & Klaar",
+    "homeassistant": "2025.5.0",
+    "render_readme": true
+  }

+ 1 - 0
requirements.txt

@@ -0,0 +1 @@
+homeassistant>=2025.5.0