Skip to content

servomexlib.manager

ServomexManager — RS485 multidrop orchestration.

servomexlib.manager

Multi-analyser orchestrator — :class:ServomexManager.

Coordinates many :class:~servomexlib.devices.analyzer.Analyzer instances across one or more serial ports. Operations on different ports run concurrently (:func:anyio.create_task_group); operations on the same port serialise through that port's shared lock — the single-reader discipline an RS485 multidrop segment requires. Port identity is canonicalised so COM3 / com3 / \\.\COM3 collapse to one entry; pre-built transports key on :func:id.

Modbus only. Continuous-ASCII is a single unsolicited broadcaster, not an addressable peer, so the manager refuses to register a continuous device — there is nothing to multidrop. Per-port clients are ref-counted; the last :meth:remove (or :meth:close) tears the shared transport down. A pre-built :class:Analyzer source has no port entry — the caller keeps lifecycle ownership.

The manager satisfies :class:~servomexlib.streaming.poll_source.PollSource, so it drives :func:~servomexlib.streaming.record directly.

DeviceResult dataclass

DeviceResult(value, error)

Per-device result — value or error, never both.

ok property

ok

True when the device produced a value (error is None).

failure classmethod

failure(error)

Build a failure result wrapping error.

Source code in src/servomexlib/manager.py
@classmethod
def failure(cls, error: ServomexError) -> Self:
    """Build a failure result wrapping ``error``."""
    return cls(value=None, error=error)

success classmethod

success(value)

Build a success result wrapping value.

Source code in src/servomexlib/manager.py
@classmethod
def success(cls, value: T) -> Self:
    """Build a success result wrapping ``value``."""
    return cls(value=value, error=None)

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every result and — if any failed — raises an :class:ExceptionGroup after the task group joins. Under :attr:RETURN, each device yields a :class:DeviceResult to inspect.

ServomexManager

ServomexManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many analysers across one or more serial ports.

Usage::

async with ServomexManager() as mgr:
    await mgr.add("a1", "COM11", address=1)
    await mgr.add("a2", "COM11", address=2)
    samples = await mgr.poll_samples()
Source code in src/servomexlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed analyser names.

add async

add(
    name,
    source,
    *,
    protocol=ProtocolKind.MODBUS_RTU,
    address=1,
    serial_settings=None,
    timeout=1.0,
)

Register an analyser under name and return it.

source discriminates lifecycle ownership: a pre-built :class:Analyzer (caller-owned, tracked only), a str port path (the manager opens and shares a transport across the bus), or a :class:Transport (bound, not owned). Continuous-ASCII / AUTO are refused — the manager is a Modbus multidrop coordinator.

Raises:

Type Description
ServomexValidationError

duplicate name, a non-Modbus protocol, or a protocol clash with an existing device on the same port.

ServomexConnectionError

the manager is closed.

Source code in src/servomexlib/manager.py
async def add(
    self,
    name: str,
    source: Analyzer | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.MODBUS_RTU,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
) -> Analyzer:
    """Register an analyser under ``name`` and return it.

    ``source`` discriminates lifecycle ownership: a pre-built :class:`Analyzer`
    (caller-owned, tracked only), a ``str`` port path (the manager opens and
    shares a transport across the bus), or a :class:`Transport` (bound, not
    owned). Continuous-ASCII / ``AUTO`` are refused — the manager is a Modbus
    multidrop coordinator.

    Raises:
        ServomexValidationError: duplicate ``name``, a non-Modbus protocol, or
            a protocol clash with an existing device on the same port.
        ServomexConnectionError: the manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise ServomexValidationError(f"manager: name {name!r} already in use")

        if isinstance(source, Analyzer):
            self._devices[name] = _DeviceEntry(name=name, analyzer=source, port_key=None)
            _logger.info("manager.add device=%s port=prebuilt", name)
            return source

        if protocol not in _MODBUS_KINDS:
            raise ServomexValidationError(
                f"manager: only Modbus protocols can be grouped (got {protocol.value}); "
                "a continuous-ASCII analyser is a single broadcaster, not a multidrop peer",
            )

        port_key, port_entry = await self._resolve_port(
            source, protocol=protocol, serial_settings=serial_settings
        )
        analyzer = self._build_analyzer(
            port_entry, name=name, protocol=protocol, address=address, timeout=timeout
        )
        self._devices[name] = _DeviceEntry(name=name, analyzer=analyzer, port_key=port_key)
        port_entry.refs.add(name)
        _logger.info(
            "manager.add device=%s port=%s protocol=%s address=%s",
            name,
            port_key,
            protocol.value,
            address,
        )
        return analyzer

close async

close()

Tear down every managed analyser and port (LIFO).

Source code in src/servomexlib/manager.py
async def close(self) -> None:
    """Tear down every managed analyser and port (LIFO)."""
    await self._close(suppress_errors=False)

execute_each async

execute_each(op, names=None)

Run op(analyzer) on every (or named) analyser concurrently across ports.

Source code in src/servomexlib/manager.py
async def execute_each[T](
    self,
    op: Callable[[Analyzer], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Run ``op(analyzer)`` on every (or named) analyser concurrently across ports."""
    groups = self._group_by_port(self._resolve_names(names))
    results: dict[str, DeviceResult[T]] = {}
    errors: list[ServomexError] = []
    result_lock = anyio.Lock()

    async def _run_group(port_key: str, members: list[str]) -> None:
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    value = await op(self._devices[member].analyzer)
                except ServomexError as err:
                    async with result_lock:
                        results[member] = DeviceResult.failure(err)
                        errors.append(err)
                else:
                    async with result_lock:
                        results[member] = DeviceResult[T].success(value)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            tg.start_soon(_run_group, port_key, members)

    if self._error_policy is ErrorPolicy.RAISE and errors:
        raise ExceptionGroup("manager.execute_each: one or more analysers failed", errors)
    return results

get

get(name)

Return the analyser registered under name.

Source code in src/servomexlib/manager.py
def get(self, name: str) -> Analyzer:
    """Return the analyser registered under ``name``."""
    try:
        return self._devices[name].analyzer
    except KeyError:
        raise ServomexValidationError(f"manager: no analyser named {name!r}") from None

poll async

poll(names=None, *, timeout=None)

Read one :class:Frame per (or named) analyser, keyed by name.

Cross-port concurrent, same-port serialised. Always returns a complete mapping; per-device failures land in :attr:DeviceResult.error.

Source code in src/servomexlib/manager.py
async def poll(
    self, names: Sequence[str] | None = None, *, timeout: float | None = None
) -> Mapping[str, DeviceResult[Frame]]:
    """Read one :class:`Frame` per (or named) analyser, keyed by name.

    Cross-port concurrent, same-port serialised. Always returns a complete
    mapping; per-device failures land in :attr:`DeviceResult.error`.
    """
    groups = self._group_by_port(self._resolve_names(names))
    results: dict[str, DeviceResult[Frame]] = {}
    result_lock = anyio.Lock()

    async def _run_group(port_key: str, members: list[str]) -> None:
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    frame = await self._devices[member].analyzer.poll(timeout=timeout)
                except ServomexError as err:
                    async with result_lock:
                        results[member] = DeviceResult.failure(err)
                else:
                    async with result_lock:
                        results[member] = DeviceResult[Frame].success(frame)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            tg.start_soon(_run_group, port_key, members)
    return results

poll_samples async

poll_samples(*, names=None, timeout=None)

Poll every (or named) analyser concurrently across ports → flat samples.

One :class:Sample per channel read. Failed devices are dropped and logged at WARN (the recorder never sees them). Same-port devices serialise on the shared port lock, acquired once per port-group so a coherent snapshot is not interleaved. Satisfies :class:PollSource.

Source code in src/servomexlib/manager.py
async def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
    """Poll every (or named) analyser concurrently across ports → flat samples.

    One :class:`Sample` per channel read. Failed devices are dropped and logged
    at WARN (the recorder never sees them). Same-port devices serialise on the
    shared port lock, acquired once per port-group so a coherent snapshot is
    not interleaved. Satisfies :class:`PollSource`.
    """
    groups = self._group_by_port(self._resolve_names(names))
    result_lock = anyio.Lock()
    all_samples: list[Sample] = []

    async def _run_group(port_key: str, members: list[str]) -> None:
        local: list[Sample] = []
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    local.extend(
                        await self._devices[member].analyzer.poll_samples(timeout=timeout)
                    )
                except ServomexError as err:
                    _logger.warning("manager.poll_failed device=%s error=%r", member, err)
        async with result_lock:
            all_samples.extend(local)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            tg.start_soon(_run_group, port_key, members)
    return all_samples

remove async

remove(name)

Unregister name, closing the shared transport on the last ref.

Source code in src/servomexlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister ``name``, closing the shared transport on the last ref."""
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise ServomexValidationError(f"manager: no analyser named {name!r}")
        await self._teardown_device(self._devices.pop(name))
        _logger.info("manager.remove device=%s", name)