Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import importlib
import importlib.machinery
import importlib.metadata
import pathlib
import sys
import types
Expand Down Expand Up @@ -129,16 +128,6 @@ def _imported_module_path(module_name: str) -> pathlib.Path | None:
return pathlib.Path(module_file).resolve()


def _installed_module_path(spec: TestPackageSpec) -> pathlib.Path | None:
try:
distribution = importlib.metadata.distribution(spec.distribution_name)
except importlib.metadata.PackageNotFoundError:
return None

module_path = pathlib.Path(distribution.locate_file(spec.module_file)).resolve()
return module_path if module_path.exists() else None


def _module_matches_test_package(package_name: str, package_path: pathlib.Path) -> bool:
spec = _TEST_PACKAGES.get(package_name)
if spec is None:
Expand All @@ -147,11 +136,7 @@ def _module_matches_test_package(package_name: str, package_path: pathlib.Path)
module_path = _imported_module_path(spec.import_name)
if module_path is None:
return False
if module_path.is_relative_to(package_path):
return True

installed_module_path = _installed_module_path(spec)
return installed_module_path is not None and module_path == installed_module_path
return module_path.is_relative_to(package_path)


def _purge_modules(package_roots: list[str]) -> None:
Expand Down Expand Up @@ -181,9 +166,10 @@ def _expose_namespace_packages(package_path: pathlib.Path) -> None:

module_path = list(getattr(module, "__path__", []))
child_str = str(child)
if child_str not in module_path:
module_path.append(child_str)
module.__path__ = module_path
if child_str in module_path:
module_path.remove(child_str)
module_path.insert(0, child_str)
module.__path__ = module_path


@contextmanager
Expand Down Expand Up @@ -447,6 +433,25 @@ def clear_factory_registry():
Factory.clear()


@pytest.fixture(autouse=True)
def clear_dummy_tango_attributes():
"""Clear dummy Tango shared attribute values before/after each test.

The dummy Tango package is a test dependency and may not be importable for
all test environments. When available, its registry must be reset to keep
singleton-like attributes from leaking values across tests.
"""
try:
from tango.pyaml.attribute_store import clear_attributes
except ModuleNotFoundError:
yield
return

clear_attributes()
yield
clear_attributes()


# -----------------------
# Linkers fixtures
# -----------------------
Expand Down
65 changes: 43 additions & 22 deletions tests/dummy_cs/tango-pyaml/tango/pyaml/attribute.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Optional, Tuple

from pydantic import BaseModel, ConfigDict

from pyaml.control.deviceaccess import DeviceAccess
from pyaml.control.readback_value import Value

from .attribute_store import get_state

PYAMLCLASS: str = "Attribute"


Expand All @@ -13,7 +13,7 @@ class ConfigModel(BaseModel):

attribute: str
unit: str = ""
range: Optional[Tuple[Optional[float], Optional[float]]] = None
range: tuple[float | None, float | None] | None = None


class Attribute(DeviceAccess):
Expand All @@ -27,11 +27,17 @@ def __init__(self, cfg: ConfigModel, is_array=False):
self._cfg = cfg
self._setpoint = cfg.attribute
self._readback = cfg.attribute
self._unit = cfg.unit
# Register metadata early. The value may already have been initialized
# by a test before the accelerator configuration is loaded.
get_state(cfg.attribute, unit=cfg.unit, range=self._range())

def set_array(self, is_array: bool):
self._is_array = is_array
self._cache = 0.0 if not is_array else [0.0, 1.0]
"""Mark the shared value as array-like without initializing it."""
get_state(
self._cfg.attribute,
unit=self._cfg.unit,
range=self._range(),
).is_array = is_array

def name(self) -> str:
return self._setpoint
Expand All @@ -41,36 +47,51 @@ def measure_name(self) -> str:

def set(self, value):
print(f"{self._cfg.attribute}:{value}")
self._cache = value
state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range())
state.value = value

def set_and_wait(self, value):
self.set(value)

def get(self):
return self._cache
state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range())
return state.value

def readback(self):
if self._is_array:
return [Value(v) for v in self._cache]
else:
return Value(self._cache)
"""Return readback from shared state, preserving scalar/array shape."""
state = get_state(
self._cfg.attribute,
unit=self._cfg.unit,
range=self._range(),
)
value = state.value if state.readback is None else state.readback
if state.is_array:
return [Value(v) for v in value]
return Value(value)

def unit(self) -> str:
return self._unit
state = get_state(self._cfg.attribute, unit=self._cfg.unit, range=self._range())
return state.unit

def __repr__(self):
return repr(self._cfg).replace("ConfigModel", self.__class__.__name__)

def get_range(self) -> list[float]:
attr_range: list[float] = [None, None]
if self._cfg.range is not None:
attr_range[0] = (
self._cfg.range[0] if self._cfg.range[0] is not None else None
)
attr_range[1] = (
self._cfg.range[1] if self._cfg.range[1] is not None else None
)
return attr_range
state = get_state(
self._cfg.attribute,
unit=self._cfg.unit,
range=self._range(),
)
if state.range is None:
return [None, None]
return [
state.range[0] if state.range[0] is not None else None,
state.range[1] if state.range[1] is not None else None,
]

def check_device_availability(self) -> bool:
return True

def _range(self):
"""Return optional range metadata for models that define it."""
return getattr(self._cfg, "range", None)
4 changes: 2 additions & 2 deletions tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_read_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def set_and_wait(self, value):
raise Exception(f"{self._cfg.attribute} is read only attribute")

def unit(self) -> str:
return self._unit
return super().unit()

def get_range(self) -> list[float]:
return [None, None]
return super().get_range()

def check_device_availability(self) -> bool:
return True
116 changes: 116 additions & 0 deletions tests/dummy_cs/tango-pyaml/tango/pyaml/attribute_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from dataclasses import dataclass
from typing import Any


@dataclass
class AttributeState:
"""Shared mutable state for one dummy Tango attribute."""

value: Any = 0.0
unit: str = ""
range: tuple[float | None, float | None] | None = None
readback: Any = None
is_array: bool = False


_ATTRIBUTES: dict[str, AttributeState] = {}
"""Registry keyed by Tango attribute name.

Tests generally initialize attributes with their configuration name
(`srdiag/bpm/...`). The dummy control system later prefixes the attribute with
the Tango host (`//host/srdiag/bpm/...`). Both names must resolve to the same
state so initialization done before `Accelerator.load()` remains visible after
control-system attachment.
"""


def _unqualified_name(name: str) -> str:
"""Strip the optional Tango host prefix from `//host/device/attribute`."""
if name.startswith("//"):
parts = name.split("/", 3)
if len(parts) == 4:
return parts[3]
return name


def _lookup_existing_state(name: str) -> AttributeState | None:
"""Find an existing state using either full or host-less attribute name."""
state = _ATTRIBUTES.get(name)
if state is not None:
return state

unqualified = _unqualified_name(name)
if unqualified != name:
state = _ATTRIBUTES.get(unqualified)
if state is not None:
# Alias the full Tango name to the pre-loaded host-less state.
_ATTRIBUTES[name] = state
return state

return None


def get_state(
name: str,
*,
unit: str = "",
range: tuple[float | None, float | None] | None = None,
) -> AttributeState:
"""Return the shared state for an attribute, creating it if necessary."""
state = _lookup_existing_state(name)
if state is None:
state = AttributeState(unit=unit, range=range)
_ATTRIBUTES[name] = state
else:
if unit and not state.unit:
state.unit = unit
if range is not None and state.range is None:
state.range = range

return state


def set_attribute(
name: str,
value: Any,
*,
unit: str = "",
range: tuple[float | None, float | None] | None = None,
readback: Any = None,
) -> AttributeState:
"""Set or initialize a dummy Tango attribute value from a test."""
state = _lookup_existing_state(name)
if state is None:
state = AttributeState()

state.value = value
state.readback = readback
state.is_array = _is_array_value(value)
if unit:
state.unit = unit
if range is not None:
state.range = range

_ATTRIBUTES[name] = state
return state


def get_attribute(name: str) -> Any:
"""Return the current value stored for a dummy Tango attribute."""
return get_state(name).value


def clear_attributes():
"""Clear all dummy Tango values to avoid state leakage between tests."""
_ATTRIBUTES.clear()


def _is_array_value(value: Any) -> bool:
"""Return true for vector-like values, but keep strings scalar."""
if isinstance(value, (str, bytes)):
return False
try:
len(value)
except TypeError:
return False
return True
4 changes: 4 additions & 0 deletions tests/test_bpm_controlsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def test_controlsystem_bpm_position(install_test_package):
indirect=True,
)
def test_controlsystem_bpm_position_indexed(install_test_package):
from tango.pyaml.attribute_store import set_attribute

set_attribute("srdiag/bpm/c01-04/Position", [0.0, 1.0], unit="mm")

sr: Accelerator = Accelerator.load("tests/config/bpms.yaml")
bpm = sr.live.get_bpm("BPM_C01-04")

Expand Down
70 changes: 70 additions & 0 deletions tests/test_dummy_tango_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest


@pytest.mark.parametrize(
"install_test_package",
[{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}],
indirect=True,
)
def test_dummy_tango_attributes_share_state(install_test_package):
from tango.pyaml.attribute import Attribute
from tango.pyaml.attribute import ConfigModel as AttributeConfigModel
from tango.pyaml.attribute_store import get_attribute, set_attribute

set_attribute("sr/test/device/value", 2.0, unit="mm", range=(0.0, 3.0))

first = Attribute(AttributeConfigModel(attribute="sr/test/device/value"))
second = Attribute(AttributeConfigModel(attribute="sr/test/device/value"))

assert first.get() == 2.0
assert second.get() == 2.0
assert first.unit() == "mm"
assert first.get_range() == [0.0, 3.0]

second.set(2.5)

assert first.get() == 2.5
assert get_attribute("sr/test/device/value") == 2.5


@pytest.mark.parametrize(
"install_test_package",
[{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}],
indirect=True,
)
def test_dummy_tango_attribute_lookup_accepts_control_system_prefix(install_test_package):
from tango.pyaml.attribute import Attribute
from tango.pyaml.attribute import ConfigModel as AttributeConfigModel
from tango.pyaml.attribute_store import set_attribute

set_attribute("srdiag/bpm/c01-04/Position", [0.0, 1.0], unit="mm")

attribute = Attribute(
AttributeConfigModel(
attribute="//ebs-simu-3:10000/srdiag/bpm/c01-04/Position",
unit="mm",
)
)
attribute.set_array(True)

assert attribute.get() == [0.0, 1.0]
assert attribute.unit() == "mm"


@pytest.mark.parametrize(
"install_test_package",
[{"name": "tango-pyaml", "path": "tests/dummy_cs/tango-pyaml"}],
indirect=True,
)
def test_dummy_tango_read_only_attribute_can_be_initialized(install_test_package):
from tango.pyaml.attribute_read_only import AttributeReadOnly
from tango.pyaml.attribute_read_only import ConfigModel as ReadOnlyConfigModel
from tango.pyaml.attribute_store import set_attribute

set_attribute("srdiag/tune/tune_h", 0.37, unit="1")
attribute = AttributeReadOnly(ReadOnlyConfigModel(attribute="srdiag/tune/tune_h"))

assert attribute.get() == 0.37
assert attribute.unit() == "1"
with pytest.raises(Exception, match="read only attribute"):
attribute.set(0.38)
Loading