Skip to content

sartoriuslib.manager

SartoriusManager (multi-balance orchestrator), BalanceManager alias, DeviceResult, ErrorPolicy. See Async quickstart §Multi-device and Design §11.

sartoriuslib.manager

Multi-balance orchestrator — :class:SartoriusManager.

The manager coordinates many :class:~sartoriuslib.devices.balance.Balance instances across one or more serial ports. Operations across different physical ports run concurrently through :func:anyio.create_task_group; operations against the same port serialise through that port's shared :class:~sartoriuslib.protocol.xbpi.client.XbpiProtocolClient lock.

Port identity is canonicalised before comparison so a balance referenced via both /dev/ttyUSB0 and /dev/serial/by-id/... (or COM3 and com3 on Windows) collapses to one client — critical for the single-in-flight invariant. Pre-built :class:Transport sources use the object's :func:id as the key so caller-owned transports aren't accidentally shared.

Error handling is controlled by :class:ErrorPolicy:

  • :attr:ErrorPolicy.RAISE — manager collects all results, and if any balance failed, raises an :class:ExceptionGroup after the task group joins.
  • :attr:ErrorPolicy.RETURN — every balance produces a :class:DeviceResult container; callers inspect .error.

Resource lifecycle goes through an internal tracking structure that unwinds LIFO on :meth:close or __aexit__. Per-port clients are ref-counted so the last :meth:remove on a shared port triggers the transport close.

Design reference: docs/design.md §11.

DeviceResult dataclass

DeviceResult(value, error, protocol=None)

Per-device result container — value or error, never both.

:attr:protocol is populated by :class:SartoriusManager from the balance's session so error samples from the :mod:~sartoriuslib.streaming layer can still record which protocol produced the failure. Non-manager :class:~sartoriuslib.streaming.PollSource stubs may leave it None.

ok property

ok

True when the balance produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every balance's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each balance produces a :class:DeviceResult and the caller inspects .error per entry.

SartoriusManager

SartoriusManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many balances across one or more serial ports.

Operations run concurrently across different physical ports (via :func:anyio.create_task_group) and serialise on the same-port client lock. Per-balance failures are surfaced per :attr:error_policy:

  • :attr:ErrorPolicy.RAISE: the manager still collects results from every balance, then raises an :class:ExceptionGroup if any failed.
  • :attr:ErrorPolicy.RETURN: the mapping's values carry :class:DeviceResult containers with .value or .error.

Usage::

async with SartoriusManager() as mgr:
    await mgr.add("bal1", "/dev/ttyUSB0")
    await mgr.add("bal2", "/dev/ttyUSB1")
    readings = await mgr.poll()
Source code in src/sartoriuslib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed balance names.

add async

add(
    name,
    source,
    *,
    protocol=ProtocolKind.XBPI,
    serial_settings=None,
    timeout=1.0,
    src_sbn=1,
    dst_sbn=9,
    strict=False,
    identify=True,
)

Register and open a balance under name.

The source discriminates lifecycle ownership:

  • :class:Balance — pre-built (via :func:open_device outside the manager). The manager only tracks the name mapping; it does not take lifecycle ownership.
  • str — serial port path ("/dev/ttyUSB0", "COM3"). The manager creates a :class:SerialTransport, canonicalises the port key, and shares the transport across balances on the same bus. Mixing xBPI and SBI sessions on a shared physical port is refused; one serial link has one active protocol.
  • :class:Transport — duck-typed transport. The manager invokes :func:open_device against it but does not take transport ownership.

Parameters:

Name Type Description Default
name str

Unique manager-level identifier.

required
source Balance | str | Transport

One of the three lifecycle shapes above.

required
protocol ProtocolKind

Which wire protocol to speak (per :func:sartoriuslib.open_device). Ignored when source is a pre-built :class:Balance.

XBPI
serial_settings SerialSettings | None

Override default serial framing. Only honoured when source is a port-string.

None
timeout float

Per-call default timeout.

1.0
src_sbn int

Host xBPI bus address.

1
dst_sbn int

Balance xBPI bus address.

9
strict bool

Strict prior gating (see design §6.1).

False
identify bool

Run identify on open and cache :class:DeviceInfo.

True

Returns:

Type Description
Balance

The opened :class:Balance.

Raises:

Type Description
SartoriusValidationError

name already exists or an invalid combination of kwargs was supplied.

SartoriusConnectionError

Manager is closed.

Source code in src/sartoriuslib/manager.py
async def add(
    self,
    name: str,
    source: Balance | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.XBPI,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    src_sbn: int = 0x01,
    dst_sbn: int = 0x09,
    strict: bool = False,
    identify: bool = True,
) -> Balance:
    """Register and open a balance under ``name``.

    The ``source`` discriminates lifecycle ownership:

    - :class:`Balance` — pre-built (via :func:`open_device` outside
      the manager). The manager only tracks the name mapping; it
      does *not* take lifecycle ownership.
    - ``str`` — serial port path (``"/dev/ttyUSB0"``, ``"COM3"``).
      The manager creates a :class:`SerialTransport`,
      canonicalises the port key, and shares the transport across
      balances on the same bus. Mixing xBPI and SBI sessions on a shared
      physical port is refused; one serial link has one active protocol.
    - :class:`Transport` — duck-typed transport. The manager
      invokes :func:`open_device` against it but does *not* take
      transport ownership.

    Args:
        name: Unique manager-level identifier.
        source: One of the three lifecycle shapes above.
        protocol: Which wire protocol to speak (per
            :func:`sartoriuslib.open_device`). Ignored when
            ``source`` is a pre-built :class:`Balance`.
        serial_settings: Override default serial framing. Only
            honoured when ``source`` is a port-string.
        timeout: Per-call default timeout.
        src_sbn: Host xBPI bus address.
        dst_sbn: Balance xBPI bus address.
        strict: Strict prior gating (see design §6.1).
        identify: Run identify on open and cache :class:`DeviceInfo`.

    Returns:
        The opened :class:`Balance`.

    Raises:
        SartoriusValidationError: ``name`` already exists or an
            invalid combination of kwargs was supplied.
        SartoriusConnectionError: Manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise SartoriusValidationError(
                f"manager: name {name!r} already in use",
                context=ErrorContext(extra={"name": name}),
            )
        if serial_settings is not None and not isinstance(source, str):
            raise SartoriusValidationError(
                "manager.add(serial_settings=...) only applies to string port "
                "sources; pre-built Transport / Balance carry their own settings",
                context=ErrorContext(extra={"name": name}),
            )

        port_key, port_entry, balance = await self._resolve_source(
            source,
            protocol=protocol,
            serial_settings=serial_settings,
            timeout=timeout,
            src_sbn=src_sbn,
            dst_sbn=dst_sbn,
            strict=strict,
            identify=identify,
        )

        self._devices[name] = _DeviceEntry(
            name=name,
            balance=balance,
            port_key=port_key,
        )
        if port_entry is not None:
            port_entry.refs.add(name)

        info = balance.info
        _logger.info(
            "manager.add",
            extra={
                "device_name": name,
                "port_key": port_key,
                "model": info.model if info is not None else None,
                "protocol": balance.session.active_protocol.value,
            },
        )
        return balance

close async

close()

Tear down every managed balance and port (LIFO).

Source code in src/sartoriuslib/manager.py
async def close(self) -> None:
    """Tear down every managed balance and port (LIFO)."""
    async with self._state_lock:
        if self._closed:
            return
        for name in reversed(list(self._devices.keys())):
            entry = self._devices.pop(name)
            try:
                await self._teardown_device(entry)
            except Exception as err:
                _logger.warning(
                    "manager.close_device_failed",
                    extra={"device_name": name, "error": repr(err)},
                )
        self._closed = True

execute async

execute(command, requests_by_name)

Dispatch a per-device Command across the requested names.

requests_by_name chooses both which balances participate and what arguments each gets — supporting the common case of "same command, different argument per balance".

Source code in src/sartoriuslib/manager.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Dispatch a per-device ``Command`` across the requested names.

    ``requests_by_name`` chooses both which balances participate and
    what arguments each gets — supporting the common case of
    "same command, different argument per balance".
    """
    for name in requests_by_name:
        if name not in self._devices:
            raise SartoriusValidationError(
                f"manager.execute: no balance named {name!r}",
                context=ErrorContext(command_name=command.name, extra={"name": name}),
            )
    targets = tuple(requests_by_name.keys())
    name_by_balance_id = {id(entry.balance): entry.name for entry in self._devices.values()}

    async def _execute(balance: Balance) -> Resp:
        return await balance.session.execute(
            command,
            requests_by_name[name_by_balance_id[id(balance)]],
        )

    return await self._dispatch(command.name, targets, _execute)

get

get(name)

Return the balance registered under name.

Source code in src/sartoriuslib/manager.py
def get(self, name: str) -> Balance:
    """Return the balance registered under ``name``."""
    try:
        return self._devices[name].balance
    except KeyError:
        raise SartoriusValidationError(
            f"manager: no balance named {name!r}",
            context=ErrorContext(extra={"name": name}),
        ) from None

poll async

poll(names=None)

Poll every (or named) balance concurrently across ports.

Returns a mapping from balance name to :class:DeviceResult even under :attr:ErrorPolicy.RAISE — but under that policy, any failed balance's error is re-raised as an :class:ExceptionGroup after all balances have completed.

Source code in src/sartoriuslib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every (or named) balance concurrently across ports.

    Returns a mapping from balance name to :class:`DeviceResult`
    even under :attr:`ErrorPolicy.RAISE` — but under that policy,
    any failed balance's error is re-raised as an
    :class:`ExceptionGroup` after all balances have completed.
    """
    targets = self._resolve_names(names)

    async def _poll(balance: Balance) -> Reading:
        return await balance.poll()

    return await self._dispatch("poll", targets, _poll)

remove async

remove(name)

Unregister and close the balance named name.

If name was the last balance on a shared port, the transport for that port is closed too. A pre-built :class:Balance source is only dropped from the manager's registry — the caller retains lifecycle ownership.

Source code in src/sartoriuslib/manager.py
async def remove(self, name: str) -> None:
    """Unregister and close the balance named ``name``.

    If ``name`` was the last balance on a shared port, the
    transport for that port is closed too. A pre-built
    :class:`Balance` source is only dropped from the manager's
    registry — the caller retains lifecycle ownership.
    """
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise SartoriusValidationError(
                f"manager: no balance named {name!r}",
                context=ErrorContext(extra={"name": name}),
            )
        entry = self._devices.pop(name)
        await self._teardown_device(entry)
        _logger.info("manager.remove", extra={"device_name": name})