diff --git a/packages/modules/vehicles/bmw_cardata/__init__.py b/packages/modules/vehicles/bmw_cardata/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/modules/vehicles/bmw_cardata/config.py b/packages/modules/vehicles/bmw_cardata/config.py new file mode 100644 index 0000000000..f8727d2df1 --- /dev/null +++ b/packages/modules/vehicles/bmw_cardata/config.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass + + +@dataclass +class BmwCardataConfiguration: + client_id: str = "" + vin: str = "" + calculate_soc: bool = False + access_token: str = "" + refresh_token: str = "" + expires_at: float = 0 + container_id: str = "" + # Auth-Status für UI (temporär während Device Code Flow) + auth_user_code: str = "" + auth_verification_uri: str = "" + auth_device_code: str = "" + auth_code_verifier: str = "" + auth_expires_at: float = 0 + auth_connected: bool = False + + +@dataclass +class BmwCardataSetup: + name: str = "BMW CarData" + type: str = "bmw_cardata" + official: bool = False + configuration: BmwCardataConfiguration = None + + def __post_init__(self): + if self.configuration is None: + self.configuration = BmwCardataConfiguration() diff --git a/packages/modules/vehicles/bmw_cardata/soc.py b/packages/modules/vehicles/bmw_cardata/soc.py new file mode 100644 index 0000000000..0a04821031 --- /dev/null +++ b/packages/modules/vehicles/bmw_cardata/soc.py @@ -0,0 +1,247 @@ +import logging +import time +from typing import List, Optional + +from requests.exceptions import RequestException + +from modules.common import req +from modules.common.abstract_device import DeviceDescriptor +from modules.common.abstract_vehicle import VehicleUpdateData +from modules.common.component_state import CarState +from modules.common.configurable_vehicle import ConfigurableVehicle +from modules.vehicles.bmw_cardata.config import BmwCardataSetup, BmwCardataConfiguration + +log = logging.getLogger(__name__) + +BMW_AUTH_URL = "https://customer.bmwgroup.com/gcdm/oauth" +BMW_API_URL = "https://api-cardata.bmwgroup.com" + +FIELD_SOC = "vehicle.drivetrain.electricEngine.charging.level" +FIELD_SOC_ALT = "vehicle.drivetrain.batteryManagement.header" +FIELD_RANGE = "vehicle.drivetrain.electricEngine.remainingElectricRange" +FIELD_STATUS = "vehicle.drivetrain.electricEngine.charging.status" +FIELD_ODOMETER_CANDIDATES = [ + "vehicle.vehicle.travelledDistance", + "vehicle.trip.segment.end.travelledDistance", +] + +CONTAINER_NAME = "ChargeStats" +CONTAINER_PURPOSE = "openWB" +CONTAINER_DESCRIPTORS = [ + "vehicle.drivetrain.electricEngine.charging.status", + "vehicle.drivetrain.electricEngine.charging.level", + "vehicle.drivetrain.batteryManagement.header", + "vehicle.drivetrain.electricEngine.remainingElectricRange", + "vehicle.vehicle.travelledDistance", +] + + +def _get_session(token: Optional[str] = None): + session = req.get_http_session() + session.headers.update({ + "Accept": "application/json", + "x-version": "v1", + }) + if token: + session.headers.update({"Authorization": f"Bearer {token}"}) + return session + + +def _response_text(exc: Exception) -> str: + response = getattr(exc, "response", None) + if response is None: + return str(exc) + try: + return response.text or str(exc) + except Exception: + return str(exc) + + +def _extract_http_status(exc: Exception) -> Optional[int]: + response = getattr(exc, "response", None) + if response is not None: + return getattr(response, "status_code", None) + return None + + +def _extract_value(td: dict, key: str): + entry = td.get(key, {}) + return entry.get("value") if isinstance(entry, dict) else None + + +def _extract_first_value(td: dict, keys: List[str]): + for key in keys: + value = _extract_value(td, key) + if value is not None: + return value + return None + + +def _post_form(url: str, data: dict) -> dict: + session = _get_session() + response = session.post(url, data=data) + return response.json() + + +def _get_json(url: str, token: str): + session = _get_session(token) + response = session.get(url) + return response.json() + + +def _post_json(url: str, token: str, payload: dict) -> dict: + session = _get_session(token) + response = session.post(url, json=payload) + return response.json() + + +def _create_container(token: str) -> str: + log.warning("BMW CarData: Keine aktiven Container gefunden. Erstelle neuen Container...") + result = _post_json( + f"{BMW_API_URL}/customers/containers", + token, + { + "name": CONTAINER_NAME, + "purpose": CONTAINER_PURPOSE, + "technicalDescriptors": CONTAINER_DESCRIPTORS, + }, + ) + container_id = result.get("containerId") or result.get("id") + if not container_id: + raise Exception(f"BMW CarData: Container konnte nicht erstellt werden: {result}") + log.info("BMW CarData: Container erstellt: %s", container_id) + return container_id + + +def _fetch_telematic_data(token: str, vin: str, container_id: str): + url = f"{BMW_API_URL}/customers/vehicles/{vin}/telematicData?containerId={container_id}" + log.debug("BMW CarData: GET %s", url) + return _get_json(url, token) + + +def get_valid_token(cfg: BmwCardataConfiguration) -> str: + if not cfg.access_token: + raise Exception("BMW CarData: Keine Tokens gefunden. Bitte BMW-Kopplung in der UI durchführen.") + + if time.time() < cfg.expires_at: + return cfg.access_token + + log.info("BMW CarData: Token abgelaufen, führe Refresh durch...") + try: + new = _post_form( + f"{BMW_AUTH_URL}/token", + { + "grant_type": "refresh_token", + "refresh_token": cfg.refresh_token, + "client_id": cfg.client_id, + }, + ) + except RequestException as e: + raise Exception( + f"BMW CarData: Token-Refresh fehlgeschlagen: {e}. Bitte BMW-Kopplung erneut durchführen." + ) + + cfg.access_token = new["access_token"] + cfg.refresh_token = new.get("refresh_token", cfg.refresh_token) + cfg.expires_at = time.time() + new.get("expires_in", 3600) - 60 + + log.info("BMW CarData: Token-Refresh erfolgreich.") + return cfg.access_token + + +def get_container_id(cfg: BmwCardataConfiguration, token: str) -> str: + if cfg.container_id: + log.debug("BMW CarData: Container-ID aus Konfiguration: %s", cfg.container_id) + return cfg.container_id + + log.info("BMW CarData: Ermittle Container-ID via API...") + raw = _get_json(f"{BMW_API_URL}/customers/containers", token) + containers = raw if isinstance(raw, list) else raw.get("containers", []) + + openwb = [ + c for c in containers + if c.get("state") == "ACTIVE" and c.get("purpose") == CONTAINER_PURPOSE + ] + active = [c for c in containers if c.get("state") == "ACTIVE"] + preferred = openwb if openwb else active + + if preferred: + container_id = preferred[0].get("containerId") or preferred[0].get("id") + log.info("BMW CarData: Container-ID gefunden: %s", container_id) + else: + container_id = _create_container(token) + + cfg.container_id = container_id + return container_id + + +def fetch_soc(config: BmwCardataSetup) -> CarState: + cfg = config.configuration + + if not cfg.client_id: + raise Exception("BMW CarData: client_id nicht konfiguriert!") + if not cfg.vin: + raise Exception("BMW CarData: VIN nicht konfiguriert!") + + token = get_valid_token(cfg) + container_id = get_container_id(cfg, token) + + try: + raw = _fetch_telematic_data(token, cfg.vin, container_id) + except RequestException as e: + status_code = _extract_http_status(e) + + if status_code in (400, 404): + log.warning( + "BMW CarData: Container %s ungültig (HTTP %s), ermittle neu...", + container_id, + status_code, + ) + cfg.container_id = "" + container_id = get_container_id(cfg, token) + raw = _fetch_telematic_data(token, cfg.vin, container_id) + else: + if status_code == 403 and "CU-429" in _response_text(e): + raise Exception("BMW CarData: Tageslimit erreicht (CU-429).") + raise Exception(f"BMW CarData: API-Fehler beim Abruf der Telematikdaten: {e}") + + td = raw.get("telematicData", raw) + + soc_raw = _extract_value(td, FIELD_SOC) + if soc_raw is None: + soc_raw = _extract_value(td, FIELD_SOC_ALT) + + range_raw = _extract_value(td, FIELD_RANGE) + status = _extract_value(td, FIELD_STATUS) + odometer_raw = _extract_first_value(td, FIELD_ODOMETER_CANDIDATES) + + soc = int(float(soc_raw)) if soc_raw is not None else None + vehicle_range = int(float(range_raw)) if range_raw is not None else None + odometer = int(float(odometer_raw)) if odometer_raw is not None else None + + if soc is None: + raise Exception("BMW CarData: Kein SoC-Wert in API-Antwort gefunden!") + + log.info( + "BMW CarData: SoC=%s%%, Reichweite=%s km, Status=%s, Odometer=%s km", + soc, + vehicle_range, + status, + odometer, + ) + return CarState(soc=soc, range=vehicle_range, odometer=odometer) + + +def create_vehicle(vehicle_config: BmwCardataSetup, vehicle: int): + def updater(vehicle_update_data: VehicleUpdateData) -> CarState: + return fetch_soc(vehicle_config) + + return ConfigurableVehicle( + vehicle_config=vehicle_config, + component_updater=updater, + vehicle=vehicle, + calc_while_charging=vehicle_config.configuration.calculate_soc, + ) + + +device_descriptor = DeviceDescriptor(configuration_factory=BmwCardataSetup) diff --git a/packages/modules/vehicles/bmw_cardata/soc_test.py b/packages/modules/vehicles/bmw_cardata/soc_test.py new file mode 100644 index 0000000000..5a9c69bb5b --- /dev/null +++ b/packages/modules/vehicles/bmw_cardata/soc_test.py @@ -0,0 +1,228 @@ +import pytest +from unittest.mock import Mock + +from requests.exceptions import HTTPError as RequestsHTTPError + +from modules.common import store +from modules.common.abstract_vehicle import VehicleUpdateData +from modules.common.component_context import SingleComponentUpdateContext +from modules.vehicles.bmw_cardata.soc import create_vehicle, fetch_soc +from modules.vehicles.bmw_cardata.config import BmwCardataSetup, BmwCardataConfiguration + + +class TestBmwCardata: + @pytest.fixture(autouse=True) + def set_up(self, monkeypatch): + self.mock_context_exit = Mock(return_value=True) + self.mock_value_store = Mock(name="value_store") + monkeypatch.setattr(store, "get_car_value_store", Mock(return_value=self.mock_value_store)) + monkeypatch.setattr(SingleComponentUpdateContext, "__exit__", self.mock_context_exit) + + def _make_config(self, **kwargs): + defaults = dict( + client_id="test-client-id", + vin="WBY00000000000000", + access_token="test-token", + refresh_token="test-refresh", + expires_at=9999999999, + container_id="test-container-id", + ) + defaults.update(kwargs) + return BmwCardataSetup( + configuration=BmwCardataConfiguration(**defaults) + ) + + def test_missing_client_id_raises(self): + config = BmwCardataSetup( + configuration=BmwCardataConfiguration( + client_id="", + vin="WBY00000000000000", + access_token="test-token", + ) + ) + with pytest.raises(Exception, match="client_id"): + fetch_soc(config) + + def test_missing_vin_raises(self): + config = BmwCardataSetup( + configuration=BmwCardataConfiguration( + client_id="test-client-id", + vin="", + access_token="test-token", + ) + ) + with pytest.raises(Exception, match="VIN"): + fetch_soc(config) + + def test_missing_token_raises(self): + config = BmwCardataSetup( + configuration=BmwCardataConfiguration( + client_id="test-client-id", + vin="WBY00000000000000", + access_token="", + ) + ) + with pytest.raises(Exception, match="Tokens"): + fetch_soc(config) + + def test_api_error_passes_to_context(self, monkeypatch): + dummy_error = Exception("BMW CarData: Tageslimit erreicht (CU-429).") + monkeypatch.setattr( + "modules.vehicles.bmw_cardata.soc.req.get_http_session", + Mock(side_effect=dummy_error) + ) + create_vehicle(self._make_config(), 0).update(VehicleUpdateData()) + assert self.mock_context_exit.call_count == 1 + + def test_soc_extraction_primary_field(self, monkeypatch): + mock_response = Mock() + mock_response.json.return_value = { + "telematicData": { + "vehicle.drivetrain.electricEngine.charging.level": {"value": "75", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "350", "unit": "km"}, + "vehicle.drivetrain.electricEngine.charging.status": {"value": "NOCHARGING", "unit": None}, + "vehicle.vehicle.travelledDistance": {"value": "61570", "unit": "km"}, + } + } + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + result = fetch_soc(self._make_config()) + assert result.soc == 75 + assert result.range == 350 + assert result.odometer == 61570 + + def test_soc_extraction_fallback_field(self, monkeypatch): + mock_response = Mock() + mock_response.json.return_value = { + "telematicData": { + "vehicle.drivetrain.batteryManagement.header": {"value": "63", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "280", "unit": "km"}, + "vehicle.drivetrain.electricEngine.charging.status": {"value": "CHARGINGACTIVE", "unit": None}, + } + } + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + result = fetch_soc(self._make_config()) + assert result.soc == 63 + assert result.range == 280 + + def test_no_soc_raises(self, monkeypatch): + mock_response = Mock() + mock_response.json.return_value = {"telematicData": {}} + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + with pytest.raises(Exception, match="Kein SoC"): + fetch_soc(self._make_config()) + + def test_token_refresh_on_expired(self, monkeypatch): + mock_response = Mock() + mock_response.json.return_value = { + "telematicData": { + "vehicle.drivetrain.electricEngine.charging.level": {"value": "55", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "270", "unit": "km"}, + } + } + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.post.return_value = Mock(json=Mock(return_value={ + "access_token": "new-token", + "refresh_token": "new-refresh", + "expires_in": 3600, + })) + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + config = self._make_config(expires_at=0, container_id="test-container-id") + result = fetch_soc(config) + + assert result.soc == 55 + assert config.configuration.access_token == "new-token" + assert config.configuration.refresh_token == "new-refresh" + + def test_container_auto_create_when_empty(self, monkeypatch): + call_count = [0] + + mock_get_response_empty = Mock() + mock_get_response_empty.json.return_value = {"containers": []} + + mock_get_response_data = Mock() + mock_get_response_data.json.return_value = { + "telematicData": { + "vehicle.drivetrain.electricEngine.charging.level": {"value": "60", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "290", "unit": "km"}, + } + } + + def mock_get(url): + call_count[0] += 1 + if "containers" in url and call_count[0] == 1: + return mock_get_response_empty + return mock_get_response_data + + mock_session = Mock() + mock_session.get.side_effect = mock_get + mock_session.post.return_value = Mock(json=Mock(return_value={"containerId": "new-container-id"})) + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + config = self._make_config(container_id="") + result = fetch_soc(config) + + assert result.soc == 60 + assert config.configuration.container_id == "new-container-id" + + def test_container_retry_on_invalid(self, monkeypatch): + call_count = [0] + good_response = { + "telematicData": { + "vehicle.drivetrain.electricEngine.charging.level": {"value": "70", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "340", "unit": "km"}, + } + } + + def mock_fetch(token, vin, container_id): + call_count[0] += 1 + if call_count[0] == 1: + mock_resp = Mock() + mock_resp.status_code = 404 + raise RequestsHTTPError(response=mock_resp) + return good_response + + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc._fetch_telematic_data", mock_fetch) + monkeypatch.setattr( + "modules.vehicles.bmw_cardata.soc.get_container_id", + Mock(return_value="new-valid-container") + ) + + result = fetch_soc(self._make_config()) + assert result.soc == 70 + assert result.range == 340 + + def test_update_updates_value_store(self, monkeypatch): + mock_response = Mock() + mock_response.json.return_value = { + "telematicData": { + "vehicle.drivetrain.electricEngine.charging.level": {"value": "47", "unit": "%"}, + "vehicle.drivetrain.electricEngine.remainingElectricRange": {"value": "234", "unit": "km"}, + "vehicle.vehicle.travelledDistance": {"value": "61762", "unit": "km"}, + } + } + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + monkeypatch.setattr("modules.vehicles.bmw_cardata.soc.req.get_http_session", Mock(return_value=mock_session)) + + create_vehicle(self._make_config(), 0).update(VehicleUpdateData()) + assert self.mock_value_store.set.call_count == 1 + assert self.mock_value_store.set.call_args[0][0].soc == 47 + assert self.mock_value_store.set.call_args[0][0].range == 234 + assert self.mock_value_store.set.call_args[0][0].odometer == 61762