diff --git a/tests/conftest.py b/tests/conftest.py index ee1a5681..3dabdb68 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,5 @@ import importlib import importlib.machinery -import importlib.metadata import pathlib import sys import types @@ -129,16 +128,6 @@ def _imported_module_path(module_name: str) -> pathlib.Path | None: return pathlib.Path(module_file).resolve() -def _installed_module_path(spec: TestPackageSpec) -> pathlib.Path | None: - try: - distribution = importlib.metadata.distribution(spec.distribution_name) - except importlib.metadata.PackageNotFoundError: - return None - - module_path = pathlib.Path(distribution.locate_file(spec.module_file)).resolve() - return module_path if module_path.exists() else None - - def _module_matches_test_package(package_name: str, package_path: pathlib.Path) -> bool: spec = _TEST_PACKAGES.get(package_name) if spec is None: @@ -147,11 +136,7 @@ def _module_matches_test_package(package_name: str, package_path: pathlib.Path) module_path = _imported_module_path(spec.import_name) if module_path is None: return False - if module_path.is_relative_to(package_path): - return True - - installed_module_path = _installed_module_path(spec) - return installed_module_path is not None and module_path == installed_module_path + return module_path.is_relative_to(package_path) def _purge_modules(package_roots: list[str]) -> None: @@ -181,9 +166,10 @@ def _expose_namespace_packages(package_path: pathlib.Path) -> None: module_path = list(getattr(module, "__path__", [])) child_str = str(child) - if child_str not in module_path: - module_path.append(child_str) - module.__path__ = module_path + if child_str in module_path: + module_path.remove(child_str) + module_path.insert(0, child_str) + module.__path__ = module_path @contextmanager @@ -447,6 +433,25 @@ def clear_factory_registry(): Factory.clear() +@pytest.fixture(autouse=True) +def clear_dummy_tango_attributes(): + """Clear dummy Tango shared attribute values before/after each test. + + The dummy Tango package is a test dependency and may not be importable for + all test environments. When available, its registry must be reset to keep + singleton-like attributes from leaking values across tests. + """ + try: + from tango.pyaml.attribute_store import clear_attributes + except ModuleNotFoundError: + yield + return + + clear_attributes() + yield + clear_attributes() + + # ----------------------- # Linkers fixtures # ----------------------- diff --git a/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute.py b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute.py index a7e4482a..cc19566f 100644 --- a/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute.py +++ b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute.py @@ -1,10 +1,10 @@ -from typing import Optional, Tuple - from pydantic import BaseModel, ConfigDict from pyaml.control.deviceaccess import DeviceAccess from pyaml.control.readback_value import Value +from .attribute_store import get_state + PYAMLCLASS: str = "Attribute" @@ -13,7 +13,7 @@ class ConfigModel(BaseModel): attribute: str unit: str = "" - range: Optional[Tuple[Optional[float], Optional[float]]] = None + range: tuple[float | None, float | None] | None = None class Attribute(DeviceAccess): @@ -27,11 +27,17 @@ def __init__(self, cfg: ConfigModel, is_array=False): self._cfg = cfg self._setpoint = cfg.attribute self._readback = cfg.attribute - self._unit = cfg.unit + # Register metadata early. The value may already have been initialized + # by a test before the accelerator configuration is loaded. + get_state(cfg.attribute, unit=cfg.unit, range=self._range()) def set_array(self, is_array: bool): - self._is_array = is_array - self._cache = 0.0 if not is_array else [0.0, 1.0] + """Mark the shared value as array-like without initializing it.""" + get_state( + self._cfg.attribute, + unit=self._cfg.unit, + range=self._range(), + ).is_array = is_array def name(self) -> str: return self._setpoint @@ -41,36 +47,51 @@ def measure_name(self) -> str: def set(self, value): print(f"{self._cfg.attribute}:{value}") - self._cache = value + state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range()) + state.value = value def set_and_wait(self, value): self.set(value) def get(self): - return self._cache + state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range()) + return state.value def readback(self): - if self._is_array: - return [Value(v) for v in self._cache] - else: - return Value(self._cache) + """Return readback from shared state, preserving scalar/array shape.""" + state = get_state( + self._cfg.attribute, + unit=self._cfg.unit, + range=self._range(), + ) + value = state.value if state.readback is None else state.readback + if state.is_array: + return [Value(v) for v in value] + return Value(value) def unit(self) -> str: - return self._unit + state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range()) + return state.unit def __repr__(self): return repr(self._cfg).replace("ConfigModel", self.__class__.__name__) def get_range(self) -> list[float]: - attr_range: list[float] = [None, None] - if self._cfg.range is not None: - attr_range[0] = ( - self._cfg.range[0] if self._cfg.range[0] is not None else None - ) - attr_range[1] = ( - self._cfg.range[1] if self._cfg.range[1] is not None else None - ) - return attr_range + state = get_state( + self._cfg.attribute, + unit=self._cfg.unit, + range=self._range(), + ) + if state.range is None: + return [None, None] + return [ + state.range[0] if state.range[0] is not None else None, + state.range[1] if state.range[1] is not None else None, + ] def check_device_availability(self) -> bool: return True + + def _range(self): + """Return optional range metadata for models that define it.""" + return getattr(self._cfg, "range", None) diff --git a/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_read_only.py b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_read_only.py index bb2ba4d2..b42b5446 100644 --- a/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_read_only.py +++ b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_read_only.py @@ -30,10 +30,10 @@ def set_and_wait(self, value): raise Exception(f"{self._cfg.attribute} is read only attribute") def unit(self) -> str: - return self._unit + return super().unit() def get_range(self) -> list[float]: - return [None, None] + return super().get_range() def check_device_availability(self) -> bool: return True diff --git a/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_store.py b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_store.py new file mode 100644 index 00000000..8bb79e80 --- /dev/null +++ b/tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_store.py @@ -0,0 +1,116 @@ +from dataclasses import dataclass +from typing import Any + + +@dataclass +class AttributeState: + """Shared mutable state for one dummy Tango attribute.""" + + value: Any = 0.0 + unit: str = "" + range: tuple[float | None, float | None] | None = None + readback: Any = None + is_array: bool = False + + +_ATTRIBUTES: dict[str, AttributeState] = {} +"""Registry keyed by Tango attribute name. + +Tests generally initialize attributes with their configuration name +(`srdiag/bpm/...`). The dummy control system later prefixes the attribute with +the Tango host (`//host/srdiag/bpm/...`). Both names must resolve to the same +state so initialization done before `Accelerator.load()` remains visible after +control-system attachment. +""" + + +def _unqualified_name(name: str) -> str: + """Strip the optional Tango host prefix from `//host/device/attribute`.""" + if name.startswith("//"): + parts = name.split("/", 3) + if len(parts) == 4: + return parts[3] + return name + + +def _lookup_existing_state(name: str) -> AttributeState | None: + """Find an existing state using either full or host-less attribute name.""" + state = _ATTRIBUTES.get(name) + if state is not None: + return state + + unqualified = _unqualified_name(name) + if unqualified != name: + state = _ATTRIBUTES.get(unqualified) + if state is not None: + # Alias the full Tango name to the pre-loaded host-less state. + _ATTRIBUTES[name] = state + return state + + return None + + +def get_state( + name: str, + *, + unit: str = "", + range: tuple[float | None, float | None] | None = None, +) -> AttributeState: + """Return the shared state for an attribute, creating it if necessary.""" + state = _lookup_existing_state(name) + if state is None: + state = AttributeState(unit=unit, range=range) + _ATTRIBUTES[name] = state + else: + if unit and not state.unit: + state.unit = unit + if range is not None and state.range is None: + state.range = range + + return state + + +def set_attribute( + name: str, + value: Any, + *, + unit: str = "", + range: tuple[float | None, float | None] | None = None, + readback: Any = None, +) -> AttributeState: + """Set or initialize a dummy Tango attribute value from a test.""" + state = _lookup_existing_state(name) + if state is None: + state = AttributeState() + + state.value = value + state.readback = readback + state.is_array = _is_array_value(value) + if unit: + state.unit = unit + if range is not None: + state.range = range + + _ATTRIBUTES[name] = state + return state + + +def get_attribute(name: str) -> Any: + """Return the current value stored for a dummy Tango attribute.""" + return get_state(name).value + + +def clear_attributes(): + """Clear all dummy Tango values to avoid state leakage between tests.""" + _ATTRIBUTES.clear() + + +def _is_array_value(value: Any) -> bool: + """Return true for vector-like values, but keep strings scalar.""" + if isinstance(value, (str, bytes)): + return False + try: + len(value) + except TypeError: + return False + return True diff --git a/tests/test_bpm_controlsystem.py b/tests/test_bpm_controlsystem.py index 83c64b98..c6aa5187 100644 --- a/tests/test_bpm_controlsystem.py +++ b/tests/test_bpm_controlsystem.py @@ -56,6 +56,10 @@ def test_controlsystem_bpm_position(install_test_package): indirect=True, ) def test_controlsystem_bpm_position_indexed(install_test_package): + from tango.pyaml.attribute_store import set_attribute + + set_attribute("srdiag/bpm/c01-04/Position", [0.0, 1.0], unit="mm") + sr: Accelerator = Accelerator.load("tests/config/bpms.yaml") bpm = sr.live.get_bpm("BPM_C01-04") diff --git a/tests/test_dummy_tango_attributes.py b/tests/test_dummy_tango_attributes.py new file mode 100644 index 00000000..20747e29 --- /dev/null +++ b/tests/test_dummy_tango_attributes.py @@ -0,0 +1,70 @@ +import pytest + + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_dummy_tango_attributes_share_state(install_test_package): + from tango.pyaml.attribute import Attribute + from tango.pyaml.attribute import ConfigModel as AttributeConfigModel + from tango.pyaml.attribute_store import get_attribute, set_attribute + + set_attribute("sr/test/device/value", 2.0, unit="mm", range=(0.0, 3.0)) + + first = Attribute(AttributeConfigModel(attribute="sr/test/device/value")) + second = Attribute(AttributeConfigModel(attribute="sr/test/device/value")) + + assert first.get() == 2.0 + assert second.get() == 2.0 + assert first.unit() == "mm" + assert first.get_range() == [0.0, 3.0] + + second.set(2.5) + + assert first.get() == 2.5 + assert get_attribute("sr/test/device/value") == 2.5 + + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_dummy_tango_attribute_lookup_accepts_control_system_prefix(install_test_package): + from tango.pyaml.attribute import Attribute + from tango.pyaml.attribute import ConfigModel as AttributeConfigModel + from tango.pyaml.attribute_store import set_attribute + + set_attribute("srdiag/bpm/c01-04/Position", [0.0, 1.0], unit="mm") + + attribute = Attribute( + AttributeConfigModel( + attribute="//ebs-simu-3:10000/srdiag/bpm/c01-04/Position", + unit="mm", + ) + ) + attribute.set_array(True) + + assert attribute.get() == [0.0, 1.0] + assert attribute.unit() == "mm" + + +@pytest.mark.parametrize( + "install_test_package", + [{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}], + indirect=True, +) +def test_dummy_tango_read_only_attribute_can_be_initialized(install_test_package): + from tango.pyaml.attribute_read_only import AttributeReadOnly + from tango.pyaml.attribute_read_only import ConfigModel as ReadOnlyConfigModel + from tango.pyaml.attribute_store import set_attribute + + set_attribute("srdiag/tune/tune_h", 0.37, unit="1") + attribute = AttributeReadOnly(ReadOnlyConfigModel(attribute="srdiag/tune/tune_h")) + + assert attribute.get() == 0.37 + assert attribute.unit() == "1" + with pytest.raises(Exception, match="read only attribute"): + attribute.set(0.38)