Skip to content

watlowlib.manager

WatlowManager — multi-port, multi-device orchestration with ref-counted shared ProtocolClients, per-port locking, and one protocol per port. See Streaming and Design §6.

Public surface

watlowlib.manager

Multi-controller orchestrator — :class:WatlowManager.

The manager coordinates many :class:~watlowlib.devices.controller.Controller instances across one or more serial ports. Operations on different physical ports run concurrently through :func:anyio.create_task_group; operations on the same port serialise through that port's shared :class:~watlowlib.protocol.base.ProtocolClient lock. The shared client is address-agnostic — each managed controller's :class:Session passes its own bus address into every execute call, so multi-drop RS-485 segments with two or more devices work correctly.

Port identity is canonicalised before comparison so a controller referenced via both /dev/ttyUSB0 and /dev/serial/by-id/... (or COM3 and com3 on Windows) collapses to one client — critical for the single-in-flight invariant. Pre-built :class:Transport sources use the object's :func:id as the key so caller-owned transports aren't accidentally shared.

Per-port protocol lock. The same RS-485 segment can only carry one wire protocol at a time. The manager locks the port to the protocol of the first device added; subsequent add(...) calls on that port must use the same protocol or raise :class:WatlowConfigurationError.

Resource lifecycle goes through an internal tracking structure that unwinds LIFO on :meth:close or __aexit__. Per-port clients are ref-counted so the last :meth:remove on a shared port triggers the transport close. Pre-built :class:Controller sources have no port entry — the caller retains lifecycle ownership.

Design reference: docs/design.md §6.

DeviceResult dataclass

DeviceResult(value, error, protocol=None)

Per-device result container — value or error, never both.

:attr:protocol is populated from the controller's session so error rows from the streaming layer can still record which protocol produced the failure.

ok property

ok

True when the controller produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every controller's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each controller produces a :class:DeviceResult and the caller inspects .error per entry.

WatlowManager

WatlowManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many controllers across one or more serial ports.

Operations run concurrently across different physical ports (via :func:anyio.create_task_group) and serialise on the same-port client lock. Per-controller failures are surfaced per :attr:error_policy:

  • :attr:ErrorPolicy.RAISE: the manager still collects results from every controller, then raises an :class:ExceptionGroup if any failed.
  • :attr:ErrorPolicy.RETURN: per-name :class:DeviceResult containers carry .value or .error.

Usage::

async with WatlowManager() as mgr:
    await mgr.add("ctl1", "/dev/ttyUSB0", address=1)
    await mgr.add("ctl2", "/dev/ttyUSB1", address=1)
    samples = await mgr.poll(["process_value", "setpoint"])
Source code in src/watlowlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

add async

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    family=ControllerFamily.UNKNOWN,
)

Register and open a controller under name.

The source discriminates lifecycle ownership:

  • :class:Controller — pre-built (via :func:watlowlib.open_device outside the manager). The manager only tracks the name mapping; it does not take lifecycle ownership.
  • str — serial port path ("/dev/ttyUSB0", "COM3"). The manager creates a transport, canonicalises the port key, and shares the transport + client across controllers on the same bus. Mixing Std Bus and Modbus on a shared physical port is refused; one serial link has one active protocol.
  • :class:Transport — duck-typed transport. The manager builds a session against it but does not take transport ownership.

Parameters:

Name Type Description Default
name str

Unique manager-level identifier.

required
source Controller | str | Transport

One of the three lifecycle shapes above.

required
protocol ProtocolKind

Wire protocol (STDBUS or MODBUS_RTU). Ignored when source is a pre-built :class:Controller. AUTO is rejected — open the controller via :func:open_device first and register the resulting :class:Controller.

STDBUS
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247.

1
serial_settings SerialSettings | None

Override default serial framing. Only honoured when source is a port-string.

None
family ControllerFamily

Best-known :class:ControllerFamily. Defaults to :attr:ControllerFamily.UNKNOWN; the session uses this for capability priors and is updated by identify() on the returned controller.

UNKNOWN

Returns:

Type Description
Controller

The opened :class:Controller.

Raises:

Type Description
WatlowValidationError

name already exists or an invalid combination of kwargs was supplied.

WatlowConfigurationError

protocol mismatches an existing lock on the same port, or protocol=AUTO.

WatlowConnectionError

Manager is closed.

Source code in src/watlowlib/manager.py
async def add(
    self,
    name: str,
    source: Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    family: ControllerFamily = ControllerFamily.UNKNOWN,
) -> Controller:
    """Register and open a controller under ``name``.

    The ``source`` discriminates lifecycle ownership:

    - :class:`Controller` — pre-built (via
      :func:`watlowlib.open_device` outside the manager). The
      manager only tracks the name mapping; it does *not* take
      lifecycle ownership.
    - ``str`` — serial port path (``"/dev/ttyUSB0"``, ``"COM3"``).
      The manager creates a transport, canonicalises the port key,
      and shares the transport + client across controllers on the
      same bus. Mixing Std Bus and Modbus on a shared physical
      port is refused; one serial link has one active protocol.
    - :class:`Transport` — duck-typed transport. The manager builds
      a session against it but does *not* take transport ownership.

    Args:
        name: Unique manager-level identifier.
        source: One of the three lifecycle shapes above.
        protocol: Wire protocol (``STDBUS`` or ``MODBUS_RTU``).
            Ignored when ``source`` is a pre-built :class:`Controller`.
            ``AUTO`` is rejected — open the controller via
            :func:`open_device` first and register the resulting
            :class:`Controller`.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``.
        serial_settings: Override default serial framing. Only
            honoured when ``source`` is a port-string.
        family: Best-known :class:`ControllerFamily`. Defaults to
            :attr:`ControllerFamily.UNKNOWN`; the session uses this
            for capability priors and is updated by ``identify()``
            on the returned controller.

    Returns:
        The opened :class:`Controller`.

    Raises:
        WatlowValidationError: ``name`` already exists or an
            invalid combination of kwargs was supplied.
        WatlowConfigurationError: protocol mismatches an existing
            lock on the same port, or ``protocol=AUTO``.
        WatlowConnectionError: Manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise WatlowValidationError(
                f"manager: name {name!r} already in use",
                context=ErrorContext(address=address),
            )
        if serial_settings is not None and not isinstance(source, str):
            raise WatlowValidationError(
                "manager.add(serial_settings=...) only applies to string port "
                "sources; pre-built Transport / Controller carry their own settings",
            )

        port_key, port_entry, controller = await self._resolve_source(
            source,
            protocol=protocol,
            address=address,
            serial_settings=serial_settings,
            family=family,
        )

        self._devices[name] = _DeviceEntry(
            name=name,
            controller=controller,
            port_key=port_key,
        )
        if port_entry is not None:
            port_entry.refs.add(name)

        _logger.info(
            "manager.add device_name=%s port_key=%s protocol=%s address=%s",
            name,
            port_key,
            controller.session.protocol_kind.value,
            controller.session.address,
        )
        return controller

close async

close()

Tear down every managed controller and port (LIFO).

Per-device teardown errors are collected; if any occurred, they are raised after the close completes as an :class:ExceptionGroup. This makes explicit await mgr.close() calls fail loud on resource leaks. The async-CM exit path swallows the errors instead so an in-flight exception still wins (see :meth:__aexit__).

Source code in src/watlowlib/manager.py
async def close(self) -> None:
    """Tear down every managed controller and port (LIFO).

    Per-device teardown errors are collected; if any occurred,
    they are raised after the close completes as an
    :class:`ExceptionGroup`. This makes explicit ``await mgr.close()``
    calls fail loud on resource leaks. The async-CM exit path
    swallows the errors instead so an in-flight exception still
    wins (see :meth:`__aexit__`).
    """
    await self._close(suppress_errors=False)

execute_each async

execute_each(op, names=None)

Run op(controller) on every (or named) controller concurrently.

General-purpose dispatcher used for cross-device snapshots (identify, read_pid, etc.) where each controller runs the same coroutine and the result is keyed by name. Cross-port runs concurrently; same-port serialises on the shared client lock.

Under :attr:ErrorPolicy.RAISE the method still returns a complete result mapping but re-raises an :class:ExceptionGroup of every per-device error after the task group joins.

Source code in src/watlowlib/manager.py
async def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Run ``op(controller)`` on every (or named) controller concurrently.

    General-purpose dispatcher used for cross-device snapshots
    (``identify``, ``read_pid``, etc.) where each controller runs
    the same coroutine and the result is keyed by name. Cross-port
    runs concurrently; same-port serialises on the shared client
    lock.

    Under :attr:`ErrorPolicy.RAISE` the method still returns a
    complete result mapping but re-raises an :class:`ExceptionGroup`
    of every per-device error after the task group joins.
    """
    targets = self._resolve_names(names)
    groups = self._group_by_port(targets)
    results: dict[str, DeviceResult[T]] = {}
    errors: list[WatlowError] = []
    result_lock = anyio.Lock()

    async def _run_group(member_names: list[str]) -> None:
        for member in member_names:
            entry = self._devices[member]
            controller = entry.controller
            protocol = controller.session.protocol_kind
            try:
                value = await op(controller)
            except WatlowError as err:
                async with result_lock:
                    results[member] = DeviceResult(
                        value=None,
                        error=err,
                        protocol=protocol,
                    )
                    errors.append(err)
            else:
                async with result_lock:
                    results[member] = DeviceResult(
                        value=value,
                        error=None,
                        protocol=protocol,
                    )

    async with anyio.create_task_group() as tg:
        for member_names in groups.values():
            tg.start_soon(_run_group, member_names)

    if self._error_policy is ErrorPolicy.RAISE and errors:
        raise ExceptionGroup("manager.execute_each: one or more controllers failed", errors)
    return results

get

get(name)

Return the controller registered under name.

Source code in src/watlowlib/manager.py
def get(self, name: str) -> Controller:
    """Return the controller registered under ``name``."""
    try:
        return self._devices[name].controller
    except KeyError:
        raise WatlowValidationError(
            f"manager: no controller named {name!r}",
        ) from None

poll async

poll(parameters, *, names=None, instances=(1,))

Poll every (or named) controller concurrently across ports.

Returns a flat list of :class:Sample — one per (device, parameter, instance) read that succeeded. Failed reads are dropped from the list and logged at WARN. Cross-port reads run concurrently; same-port reads serialise on the shared client lock.

This satisfies the :class:watlowlib.streaming.PollSource Protocol so a manager can drive :func:watlowlib.streaming.record directly.

Source code in src/watlowlib/manager.py
async def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Poll every (or named) controller concurrently across ports.

    Returns a flat list of :class:`Sample` — one per (device,
    parameter, instance) read that succeeded. Failed reads are
    dropped from the list and logged at WARN. Cross-port reads run
    concurrently; same-port reads serialise on the shared client
    lock.

    This satisfies the :class:`watlowlib.streaming.PollSource`
    Protocol so a manager can drive :func:`watlowlib.streaming.record`
    directly.
    """
    targets = self._resolve_names(names)
    groups = self._group_by_port(targets)

    result_lock = anyio.Lock()
    all_samples: list[Sample] = []

    async def _run_group(member_names: list[str]) -> None:
        local: list[Sample] = []
        for member in member_names:
            entry = self._devices[member]
            local.extend(
                await poll_controller(
                    entry.controller,
                    name=member,
                    parameters=parameters,
                    instances=instances,
                ),
            )
        async with result_lock:
            all_samples.extend(local)

    async with anyio.create_task_group() as tg:
        for member_names in groups.values():
            tg.start_soon(_run_group, member_names)

    return all_samples

remove async

remove(name)

Unregister and close the controller named name.

If name was the last controller on a shared port, the transport for that port is closed too. A pre-built :class:Controller source is only dropped from the manager's registry — the caller retains lifecycle ownership.

Source code in src/watlowlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister and close the controller named ``name``.

    If ``name`` was the last controller on a shared port, the
    transport for that port is closed too. A pre-built
    :class:`Controller` source is only dropped from the manager's
    registry — the caller retains lifecycle ownership.
    """
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise WatlowValidationError(
                f"manager: no controller named {name!r}",
            )
        entry = self._devices.pop(name)
        await self._teardown_device(entry)
        _logger.info("manager.remove device_name=%s", name)