Skip to content

alicatlib.manager

Multi-device orchestrator with port-aware concurrent dispatch. See Logging and acquisition for recorder integration and Design §5.13 for the architecture.

alicatlib.manager

Multi-device orchestrator — :class:AlicatManager.

The manager coordinates many :class:~alicatlib.devices.base.Device instances across one or more serial ports. Operations across different physical ports run concurrently through :func:anyio.create_task_group; operations against the same port serialise through that port's shared :class:~alicatlib.protocol.client.AlicatProtocolClient lock.

Port identity is canonicalised before comparison so a device referenced via both /dev/ttyUSB0 and /dev/serial/by-id/usb-FTDI-… (or COM3 and com3 on Windows) collapses to one client — critical for the single-in-flight invariant. Pre-built :class:Transport / :class:AlicatProtocolClient sources use the object's :func:id as the key so caller-owned transports aren't accidentally shared.

Error handling is controlled by :class:ErrorPolicy:

  • :attr:ErrorPolicy.RAISE — manager collects all results, and if any device failed, raises an :class:ExceptionGroup at the end (never silently drops results).
  • :attr:ErrorPolicy.RETURN — every device produces a :class:DeviceResult container; callers inspect .error.

Resource lifecycle goes through an internal tracking structure that unwinds LIFO on :meth:close or __aexit__. Per-port clients are ref-counted so the last :meth:remove on a shared port triggers the client's close.

Design reference: docs/design.md §5.13.

AlicatManager

AlicatManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many devices across one or more serial ports.

Operations run concurrently across different physical ports (via :func:anyio.create_task_group) and serialise on the same-port client lock. Per-device failures are surfaced per :attr:error_policy:

  • :attr:ErrorPolicy.RAISE: the manager still collects results from every device, then raises an :class:ExceptionGroup if any failed.
  • :attr:ErrorPolicy.RETURN: the mapping's values carry :class:DeviceResult containers with .value or .error.

Usage::

async with AlicatManager() as mgr:
    await mgr.add("fuel", "/dev/ttyUSB0")
    await mgr.add("air", "/dev/ttyUSB1")
    frames = await mgr.poll()
Source code in src/alicatlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    # Guards state mutation on ``add`` / ``remove`` / ``close``.
    # The per-port client lock serialises I/O, so we only need
    # to serialise the manager's bookkeeping here.
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed device names.

__aenter__ async

__aenter__()

Enter the async context — returns self for chaining.

Source code in src/alicatlib/manager.py
async def __aenter__(self) -> Self:
    """Enter the async context — returns ``self`` for chaining."""
    return self

__aexit__ async

__aexit__(exc_type, exc, tb)

Close every managed device + port on exit.

Source code in src/alicatlib/manager.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close every managed device + port on exit."""
    del exc_type, exc, tb
    await self.close()

add async

add(name, source, *, unit_id='A', serial=None, timeout=0.5)

Register and open a device under name.

The source discriminates lifecycle ownership:

  • Device — pre-built (via :func:open_device outside the manager). The manager only tracks the name mapping; it does not take lifecycle ownership.
  • str — serial port path ("/dev/ttyUSB0", "COM3"). The manager creates a :class:~alicatlib.transport.serial.SerialTransport and :class:AlicatProtocolClient, canonicalises the port key, and reuses them across multi-device buses (RS-485).
  • :class:Transport — duck-typed transport. The manager wraps it in a new client but does not take transport ownership (the caller keeps open/close responsibility).
  • :class:AlicatProtocolClient — use as-is; the manager does not close it.

Parameters:

Name Type Description Default
name str

Unique manager-level identifier. Must not already exist on this manager.

required
source Device | str | Transport | AlicatProtocolClient

One of the four lifecycle shapes above.

required
unit_id str

Bus-level letter for the device. "A" is the polling default; multiple devices on the same port get distinct unit ids.

'A'
serial SerialSettings | None

:class:SerialSettings override. Only honoured when source is a port-string — ignored otherwise (pre-built transports carry their own settings).

None
timeout float

Default command timeout passed through to :func:open_device.

0.5

Returns:

Type Description
Device

The identified :class:Device (a :class:FlowMeter,

Device

class:FlowController, etc. subclass).

Raises:

Type Description
AlicatValidationError

name already exists or serial was supplied with a non-string source.

AlicatConnectionError

The manager is closed.

Source code in src/alicatlib/manager.py
async def add(
    self,
    name: str,
    source: Device | str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
) -> Device:
    """Register and open a device under ``name``.

    The ``source`` discriminates lifecycle ownership:

    - ``Device`` — pre-built (via :func:`open_device` outside the
      manager). The manager only tracks the name mapping; it does
      *not* take lifecycle ownership.
    - ``str`` — serial port path (``"/dev/ttyUSB0"``, ``"COM3"``).
      The manager creates a
      :class:`~alicatlib.transport.serial.SerialTransport` and
      :class:`AlicatProtocolClient`, canonicalises the port key,
      and reuses them across multi-device buses (RS-485).
    - :class:`Transport` — duck-typed transport. The manager wraps
      it in a new client but does *not* take transport ownership
      (the caller keeps open/close responsibility).
    - :class:`AlicatProtocolClient` — use as-is; the manager does
      not close it.

    Args:
        name: Unique manager-level identifier. Must not already
            exist on this manager.
        source: One of the four lifecycle shapes above.
        unit_id: Bus-level letter for the device. ``"A"`` is the
            polling default; multiple devices on the same port
            get distinct unit ids.
        serial: :class:`SerialSettings` override. Only honoured
            when ``source`` is a port-string — ignored otherwise
            (pre-built transports carry their own settings).
        timeout: Default command timeout passed through to
            :func:`open_device`.

    Returns:
        The identified :class:`Device` (a :class:`FlowMeter`,
        :class:`FlowController`, etc. subclass).

    Raises:
        AlicatValidationError: ``name`` already exists or
            ``serial`` was supplied with a non-string source.
        AlicatConnectionError: The manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise AlicatValidationError(
                f"manager: name {name!r} already in use",
                context=ErrorContext(extra={"name": name}),
            )
        if serial is not None and not isinstance(source, str):
            raise AlicatValidationError(
                "manager.add(serial=...) only applies to string port sources; "
                "pre-built Transport / AlicatProtocolClient carry their own settings",
                context=ErrorContext(extra={"name": name}),
            )

        port_key, port_entry, device_ctx = await self._resolve_source(
            source,
            unit_id=unit_id,
            serial=serial,
            timeout=timeout,
        )

        # ``open_device`` context-enter runs identification + probes.
        # If it raises, we must not leave the port's ref count dangling.
        try:
            if device_ctx is not None:
                device = await device_ctx.__aenter__()
            else:
                # ``source`` was a pre-built Device.
                assert isinstance(source, Device)  # noqa: S101 — narrow for mypy
                device = source
        except BaseException:
            if port_entry is not None and port_key is not None and name not in port_entry.refs:
                # We created a brand-new port just for this add —
                # unwind it rather than leaking the transport.
                await self._maybe_teardown_port(port_key, port_entry)
            raise

        self._devices[name] = _DeviceEntry(
            name=name,
            device=device,
            port_key=port_key,
            device_ctx=device_ctx,
        )
        if port_entry is not None:
            port_entry.refs.add(name)

        _logger.info(
            "manager.add",
            extra={
                "device_name": name,
                "port_key": port_key,
                "unit_id": unit_id,
                "model": device.info.model,
                "firmware": str(device.info.firmware),
            },
        )
        return device

close async

close()

Tear down every managed device and port (LIFO).

Idempotent: safe to call from both :meth:__aexit__ and explicit user code. Individual close failures are caught and logged so one device's shutdown error doesn't strand the others.

Source code in src/alicatlib/manager.py
async def close(self) -> None:
    """Tear down every managed device and port (LIFO).

    Idempotent: safe to call from both :meth:`__aexit__` and
    explicit user code. Individual close failures are caught and
    logged so one device's shutdown error doesn't strand the
    others.
    """
    async with self._state_lock:
        if self._closed:
            return
        # Unwind in reverse insertion order — LIFO per design §5.13.
        for name in reversed(list(self._devices.keys())):
            entry = self._devices.pop(name)
            try:
                await self._teardown_device(entry)
            except Exception as err:  # noqa: BLE001 — best-effort manager teardown: keep going so other devices still close
                _logger.warning(
                    "manager.close_device_failed",
                    extra={"device_name": name, "error": repr(err)},
                )
        # Any port entries that survived (e.g. because a pre-built
        # client source never got refs torn down) are left alone —
        # the caller owns them.
        self._closed = True

execute async

execute(command, requests_by_name)

Dispatch a per-device Command across the requested names.

requests_by_name chooses both which devices participate and what arguments each gets — supporting the common case of "same command, different setpoint per device".

Source code in src/alicatlib/manager.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Dispatch a per-device ``Command`` across the requested names.

    ``requests_by_name`` chooses both which devices participate and
    what arguments each gets — supporting the common case of
    "same command, different setpoint per device".
    """
    for name in requests_by_name:
        if name not in self._devices:
            raise AlicatValidationError(
                f"manager.execute: no device named {name!r}",
                context=ErrorContext(command_name=command.name, extra={"name": name}),
            )
    targets = tuple(requests_by_name.keys())
    name_by_device_id = {id(entry.device): entry.name for entry in self._devices.values()}

    async def _execute(device: Device) -> Resp:
        return await device.session.execute(
            command,
            requests_by_name[name_by_device_id[id(device)]],
        )

    return await self._dispatch(command.name, targets, _execute)

get

get(name)

Return the device registered under name (raises if unknown).

Source code in src/alicatlib/manager.py
def get(self, name: str) -> Device:
    """Return the device registered under ``name`` (raises if unknown)."""
    try:
        return self._devices[name].device
    except KeyError:
        raise AlicatValidationError(
            f"manager: no device named {name!r}",
            context=ErrorContext(extra={"name": name}),
        ) from None

poll async

poll(names=None)

Poll every (or named) device concurrently across ports.

Returns a mapping from device name to :class:DeviceResult even under :attr:ErrorPolicy.RAISE — but under that policy, any failed device's error is re-raised as an :class:ExceptionGroup after all devices have completed.

Source code in src/alicatlib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[DataFrame]]:
    """Poll every (or named) device concurrently across ports.

    Returns a mapping from device name to :class:`DeviceResult`
    even under :attr:`ErrorPolicy.RAISE` — but under that policy,
    any failed device's error is re-raised as an
    :class:`ExceptionGroup` after all devices have completed.
    """
    targets = self._resolve_names(names)

    async def _poll(device: Device) -> DataFrame:
        return await device.poll()

    return await self._dispatch("poll", targets, _poll)

remove async

remove(name)

Unregister and close the device named name.

If name was the last device on a shared port, the transport and client for that port are closed too. A pre-built :class:Device source is only dropped from the manager's registry — the caller retains lifecycle ownership.

Source code in src/alicatlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister and close the device named ``name``.

    If ``name`` was the last device on a shared port, the
    transport and client for that port are closed too. A
    pre-built :class:`Device` source is only dropped from the
    manager's registry — the caller retains lifecycle ownership.
    """
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise AlicatValidationError(
                f"manager: no device named {name!r}",
                context=ErrorContext(extra={"name": name}),
            )
        entry = self._devices.pop(name)
        await self._teardown_device(entry)
        _logger.info("manager.remove", extra={"device_name": name})

request async

request(statistics, names=None, *, averaging_ms=1)

Run :meth:Device.request across devices concurrently.

Every targeted device receives the same statistic list and averaging window — mirroring the primer's DV semantics.

Source code in src/alicatlib/manager.py
async def request(
    self,
    statistics: Sequence[Statistic | str],
    names: Sequence[str] | None = None,
    *,
    averaging_ms: int = 1,
) -> Mapping[str, DeviceResult[MeasurementSet]]:
    """Run :meth:`Device.request` across devices concurrently.

    Every targeted device receives the same statistic list and
    averaging window — mirroring the primer's ``DV`` semantics.
    """
    targets = self._resolve_names(names)

    async def _request(device: Device) -> MeasurementSet:
        return await device.request(statistics, averaging_ms=averaging_ms)

    return await self._dispatch("request", targets, _request)

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The union is encoded as two optional fields (rather than an Either / Result ADT) so mypy's narrowing on ok reads cleanly at call sites without pattern matching.

Attributes:

Name Type Description
value T | None

The successful result, or None if the call failed.

error AlicatError | None

The captured :class:~alicatlib.errors.AlicatError, or None if the call succeeded.

ok property

ok

True when the device produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every device's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each device produces a :class:DeviceResult and the caller inspects .error per entry.

Design reference: docs/design.md §5.13.