Multi-analyser orchestrator — :class:ServomexManager.
Coordinates many :class:~servomexlib.devices.analyzer.Analyzer instances across
one or more serial ports. Operations on different ports run concurrently
(:func:anyio.create_task_group); operations on the same port serialise
through that port's shared lock — the single-reader discipline an RS485 multidrop
segment requires. Port identity is canonicalised so COM3 /
com3 / \\.\COM3 collapse to one entry; pre-built transports key on
:func:id.
Modbus only. Continuous-ASCII is a single unsolicited broadcaster, not an
addressable peer, so the manager refuses to register a continuous device — there
is nothing to multidrop. Per-port clients are ref-counted; the last
:meth:remove (or :meth:close) tears the shared transport down. A pre-built
:class:Analyzer source has no port entry — the caller keeps lifecycle ownership.
The manager satisfies :class:~servomexlib.streaming.poll_source.PollSource, so it
drives :func:~servomexlib.streaming.record directly.
DeviceResult
dataclass
DeviceResult(value, error)
Per-device result — value or error, never both.
ok
property
True when the device produced a value (error is None).
failure
classmethod
Build a failure result wrapping error.
Source code in src/servomexlib/manager.py
| @classmethod
def failure(cls, error: ServomexError) -> Self:
"""Build a failure result wrapping ``error``."""
return cls(value=None, error=error)
|
success
classmethod
Build a success result wrapping value.
Source code in src/servomexlib/manager.py
| @classmethod
def success(cls, value: T) -> Self:
"""Build a success result wrapping ``value``."""
return cls(value=value, error=None)
|
ErrorPolicy
Bases: Enum
How the manager surfaces per-device failures.
Under :attr:RAISE, the manager collects every result and — if any failed —
raises an :class:ExceptionGroup after the task group joins. Under
:attr:RETURN, each device yields a :class:DeviceResult to inspect.
ServomexManager
ServomexManager(*, error_policy=ErrorPolicy.RAISE)
Coordinator for many analysers across one or more serial ports.
Usage::
async with ServomexManager() as mgr:
await mgr.add("a1", "COM11", address=1)
await mgr.add("a2", "COM11", address=2)
samples = await mgr.poll_samples()
Source code in src/servomexlib/manager.py
| def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
self._error_policy = error_policy
self._devices: dict[str, _DeviceEntry] = {}
self._ports: dict[str, _PortEntry] = {}
self._state_lock = anyio.Lock()
self._closed = False
|
closed
property
True once :meth:close has been called.
error_policy
property
The :class:ErrorPolicy this manager was constructed with.
names
property
Insertion-ordered tuple of managed analyser names.
add
async
add(
name,
source,
*,
protocol=ProtocolKind.MODBUS_RTU,
address=1,
serial_settings=None,
timeout=1.0,
)
Register an analyser under name and return it.
source discriminates lifecycle ownership: a pre-built :class:Analyzer
(caller-owned, tracked only), a str port path (the manager opens and
shares a transport across the bus), or a :class:Transport (bound, not
owned). Continuous-ASCII / AUTO are refused — the manager is a Modbus
multidrop coordinator.
Raises:
Source code in src/servomexlib/manager.py
| async def add(
self,
name: str,
source: Analyzer | str | Transport,
*,
protocol: ProtocolKind = ProtocolKind.MODBUS_RTU,
address: int = 1,
serial_settings: SerialSettings | None = None,
timeout: float = 1.0,
) -> Analyzer:
"""Register an analyser under ``name`` and return it.
``source`` discriminates lifecycle ownership: a pre-built :class:`Analyzer`
(caller-owned, tracked only), a ``str`` port path (the manager opens and
shares a transport across the bus), or a :class:`Transport` (bound, not
owned). Continuous-ASCII / ``AUTO`` are refused — the manager is a Modbus
multidrop coordinator.
Raises:
ServomexValidationError: duplicate ``name``, a non-Modbus protocol, or
a protocol clash with an existing device on the same port.
ServomexConnectionError: the manager is closed.
"""
async with self._state_lock:
self._check_open()
if name in self._devices:
raise ServomexValidationError(f"manager: name {name!r} already in use")
if isinstance(source, Analyzer):
self._devices[name] = _DeviceEntry(name=name, analyzer=source, port_key=None)
_logger.info("manager.add device=%s port=prebuilt", name)
return source
if protocol not in _MODBUS_KINDS:
raise ServomexValidationError(
f"manager: only Modbus protocols can be grouped (got {protocol.value}); "
"a continuous-ASCII analyser is a single broadcaster, not a multidrop peer",
)
port_key, port_entry = await self._resolve_port(
source, protocol=protocol, serial_settings=serial_settings
)
analyzer = self._build_analyzer(
port_entry, name=name, protocol=protocol, address=address, timeout=timeout
)
self._devices[name] = _DeviceEntry(name=name, analyzer=analyzer, port_key=port_key)
port_entry.refs.add(name)
_logger.info(
"manager.add device=%s port=%s protocol=%s address=%s",
name,
port_key,
protocol.value,
address,
)
return analyzer
|
close
async
Tear down every managed analyser and port (LIFO).
Source code in src/servomexlib/manager.py
| async def close(self) -> None:
"""Tear down every managed analyser and port (LIFO)."""
await self._close(suppress_errors=False)
|
execute_each
async
execute_each(op, names=None)
Run op(analyzer) on every (or named) analyser concurrently across ports.
Source code in src/servomexlib/manager.py
| async def execute_each[T](
self,
op: Callable[[Analyzer], Awaitable[T]],
names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
"""Run ``op(analyzer)`` on every (or named) analyser concurrently across ports."""
groups = self._group_by_port(self._resolve_names(names))
results: dict[str, DeviceResult[T]] = {}
errors: list[ServomexError] = []
result_lock = anyio.Lock()
async def _run_group(port_key: str, members: list[str]) -> None:
lock = self._lock_for(port_key)
async with lock:
for member in members:
try:
value = await op(self._devices[member].analyzer)
except ServomexError as err:
async with result_lock:
results[member] = DeviceResult.failure(err)
errors.append(err)
else:
async with result_lock:
results[member] = DeviceResult[T].success(value)
async with anyio.create_task_group() as tg:
for port_key, members in groups.items():
tg.start_soon(_run_group, port_key, members)
if self._error_policy is ErrorPolicy.RAISE and errors:
raise ExceptionGroup("manager.execute_each: one or more analysers failed", errors)
return results
|
get
Return the analyser registered under name.
Source code in src/servomexlib/manager.py
| def get(self, name: str) -> Analyzer:
"""Return the analyser registered under ``name``."""
try:
return self._devices[name].analyzer
except KeyError:
raise ServomexValidationError(f"manager: no analyser named {name!r}") from None
|
poll
async
poll(names=None, *, timeout=None)
Read one :class:Frame per (or named) analyser, keyed by name.
Cross-port concurrent, same-port serialised. Always returns a complete
mapping; per-device failures land in :attr:DeviceResult.error.
Source code in src/servomexlib/manager.py
| async def poll(
self, names: Sequence[str] | None = None, *, timeout: float | None = None
) -> Mapping[str, DeviceResult[Frame]]:
"""Read one :class:`Frame` per (or named) analyser, keyed by name.
Cross-port concurrent, same-port serialised. Always returns a complete
mapping; per-device failures land in :attr:`DeviceResult.error`.
"""
groups = self._group_by_port(self._resolve_names(names))
results: dict[str, DeviceResult[Frame]] = {}
result_lock = anyio.Lock()
async def _run_group(port_key: str, members: list[str]) -> None:
lock = self._lock_for(port_key)
async with lock:
for member in members:
try:
frame = await self._devices[member].analyzer.poll(timeout=timeout)
except ServomexError as err:
async with result_lock:
results[member] = DeviceResult.failure(err)
else:
async with result_lock:
results[member] = DeviceResult[Frame].success(frame)
async with anyio.create_task_group() as tg:
for port_key, members in groups.items():
tg.start_soon(_run_group, port_key, members)
return results
|
poll_samples
async
poll_samples(*, names=None, timeout=None)
Poll every (or named) analyser concurrently across ports → flat samples.
One :class:Sample per channel read. Failed devices are dropped and logged
at WARN (the recorder never sees them). Same-port devices serialise on the
shared port lock, acquired once per port-group so a coherent snapshot is
not interleaved. Satisfies :class:PollSource.
Source code in src/servomexlib/manager.py
| async def poll_samples(
self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
"""Poll every (or named) analyser concurrently across ports → flat samples.
One :class:`Sample` per channel read. Failed devices are dropped and logged
at WARN (the recorder never sees them). Same-port devices serialise on the
shared port lock, acquired once per port-group so a coherent snapshot is
not interleaved. Satisfies :class:`PollSource`.
"""
groups = self._group_by_port(self._resolve_names(names))
result_lock = anyio.Lock()
all_samples: list[Sample] = []
async def _run_group(port_key: str, members: list[str]) -> None:
local: list[Sample] = []
lock = self._lock_for(port_key)
async with lock:
for member in members:
try:
local.extend(
await self._devices[member].analyzer.poll_samples(timeout=timeout)
)
except ServomexError as err:
_logger.warning("manager.poll_failed device=%s error=%r", member, err)
async with result_lock:
all_samples.extend(local)
async with anyio.create_task_group() as tg:
for port_key, members in groups.items():
tg.start_soon(_run_group, port_key, members)
return all_samples
|
remove
async
Unregister name, closing the shared transport on the last ref.
Source code in src/servomexlib/manager.py
| async def remove(self, name: str) -> None:
"""Unregister ``name``, closing the shared transport on the last ref."""
async with self._state_lock:
self._check_open()
if name not in self._devices:
raise ServomexValidationError(f"manager: no analyser named {name!r}")
await self._teardown_device(self._devices.pop(name))
_logger.info("manager.remove device=%s", name)
|