Skip to content

nidaqlib.manager

nidaqlib.manager

DaqManager — multi-task lifecycle and dispatch (design doc §15).

Direct-port of sartoriuslib's manager.py, shape-translated for DAQ:

  • Port-keyed locks → per-task locks plus a per-device lock for tasks that share a card (best-effort; NI is the final authority).
  • Per-operation outcomes are reported as :class:DeviceResult[T].
  • The recorder consumed :class:ErrorPolicy in v0.1; the manager becomes the second consumer here.

Lifecycle invariants (sibling parity):

  • Sessions start lazily. :meth:add constructs a :class:DaqSession and records the spec; :meth:start performs the actual NI calls.
  • :meth:close unwinds in LIFO order (last added, first closed). On failure during a group operation, all errors are collected into one :class:ExceptionGroup rather than aborting on the first.
  • :meth:add is idempotent on the same name + spec — a duplicate add bumps a refcount; the matching :meth:remove decrements. Only when the refcount reaches zero is the session torn down.

DaqManager

DaqManager(*, error_policy=ErrorPolicy.RAISE)

Lifecycle, dispatch, and group operations across multiple NI tasks.

Construction does not touch the driver. Add tasks via :meth:add (lazy — no NI calls), then call :meth:start to bring them up. :meth:close always tears down in reverse-add order.

The manager is async-context-manager-aware: async with DaqManager() closes every session on exit, even on raised errors.

Create a manager.

Parameters:

Name Type Description Default
error_policy ErrorPolicy

Default policy for group operations (:meth:start, :meth:stop, :meth:poll, :meth:read_block). :attr:ErrorPolicy.RAISE collects errors into an :class:ExceptionGroup; :attr:ErrorPolicy.RETURN surfaces them as DeviceResult.error rows and continues.

RAISE
Source code in src/nidaqlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    """Create a manager.

    Args:
        error_policy: Default policy for group operations
            (:meth:`start`, :meth:`stop`, :meth:`poll`,
            :meth:`read_block`). :attr:`ErrorPolicy.RAISE` collects
            errors into an :class:`ExceptionGroup`;
            :attr:`ErrorPolicy.RETURN` surfaces them as
            ``DeviceResult.error`` rows and continues.
    """
    self._error_policy = error_policy
    self._sessions: dict[str, DaqSession] = {}
    self._specs: dict[str, TaskSpec] = {}
    self._refcounts: dict[str, int] = {}
    self._order: list[str] = []
    self._task_locks: dict[str, anyio.Lock] = {}
    self._device_locks: dict[str, anyio.Lock] = {}
    self._task_devices: dict[str, tuple[str, ...]] = {}
    self._global_lock = anyio.Lock()
    self._closed = False
    # Cache of (device → product_type) so the module-level preflight
    # only queries the backend once per device. ``_NOT_QUERIED`` is the
    # sentinel for "haven't tried yet"; ``None`` means "queried, NI
    # returned no info / the device alias is unknown".
    self._device_product_cache: dict[str, str | None] = {}

error_policy property

error_policy

The default error policy for group operations.

is_closed property

is_closed

True once :meth:close has run.

names property

names

Names of currently managed tasks, in add-order.

add async

add(name, source, *, backend=None)

Register a task with this manager. Idempotent on duplicate name.

Performs a best-effort preflight conflict check against tasks already managed (design doc §15.3). NI is the final authority — the preflight only catches obvious overlaps.

add does not allocate NI resources — it constructs a :class:DaqSession and records it. The session's :meth:DaqSession.configure (which creates the NI task and applies channels / timing / logging / triggers) runs lazily on the first :meth:start for the task. Any NI rejection of the spec (bad physical channel, unsupported channel kind, sample rate above device max, …) therefore surfaces at :meth:start time, not :meth:add time. The preflight catches operator-side overlap only; everything NI validates lives downstream.

Parameters:

Name Type Description Default
name str

Manager-side label for this task. Must be unique.

required
source TaskSpec | DaqSession

Either a :class:TaskSpec (manager constructs a fresh :class:DaqSession) or a pre-built :class:DaqSession (manager registers it as-is). The pre-built form aligns with the ecosystem add(name, source) convention shared by :class:watlowlib.WatlowManager, :class:alicatlib.AlicatManager, and :class:sartoriuslib.SartoriusManager.

required
backend DaqBackend | None

Optional :class:DaqBackend. Defaults to :class:NidaqmxBackend (lazy import). Ignored when source is a :class:DaqSession (which already carries its own backend).

None

Returns:

Name Type Description
The DaqSession

class:DaqSession registered under name. Re-adding

DaqSession

the same (name, source) returns the existing session and

DaqSession

bumps a refcount.

Raises:

Type Description
NIDaqTaskStateError

name already maps to a different spec, or the manager is closed.

NIDaqResourceError

source overlaps physical channels with an already-managed task.

Source code in src/nidaqlib/manager.py
async def add(
    self,
    name: str,
    source: TaskSpec | DaqSession,
    *,
    backend: DaqBackend | None = None,
) -> DaqSession:
    """Register a task with this manager. Idempotent on duplicate ``name``.

    Performs a best-effort preflight conflict check against tasks
    already managed (design doc §15.3). NI is the final authority —
    the preflight only catches obvious overlaps.

    ``add`` does **not** allocate NI resources — it constructs a
    :class:`DaqSession` and records it. The session's
    :meth:`DaqSession.configure` (which creates the NI task and applies
    channels / timing / logging / triggers) runs lazily on the first
    :meth:`start` for the task. Any NI rejection of the ``spec`` (bad
    physical channel, unsupported channel kind, sample rate above
    device max, …) therefore surfaces at :meth:`start` time, not
    :meth:`add` time. The preflight catches operator-side overlap
    only; everything NI validates lives downstream.

    Args:
        name: Manager-side label for this task. Must be unique.
        source: Either a :class:`TaskSpec` (manager constructs a
            fresh :class:`DaqSession`) or a pre-built
            :class:`DaqSession` (manager registers it as-is). The
            pre-built form aligns with the ecosystem ``add(name,
            source)`` convention shared by :class:`watlowlib.WatlowManager`,
            :class:`alicatlib.AlicatManager`, and
            :class:`sartoriuslib.SartoriusManager`.
        backend: Optional :class:`DaqBackend`. Defaults to
            :class:`NidaqmxBackend` (lazy import). Ignored when
            ``source`` is a :class:`DaqSession` (which already
            carries its own backend).

    Returns:
        The :class:`DaqSession` registered under ``name``. Re-adding
        the same ``(name, source)`` returns the existing session and
        bumps a refcount.

    Raises:
        NIDaqTaskStateError: ``name`` already maps to a different
            spec, or the manager is closed.
        NIDaqResourceError: ``source`` overlaps physical channels
            with an already-managed task.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            "DaqManager is closed",
            context=ErrorContext(task_name=name, operation="manager.add"),
        )
    if isinstance(source, DaqSession):
        spec = source.spec
        prebuilt: DaqSession | None = source
    else:
        spec = source
        prebuilt = None
    async with self._global_lock:
        existing = self._sessions.get(name)
        if existing is not None:
            if self._specs.get(name) is not spec and self._specs.get(name) != spec:
                raise NIDaqTaskStateError(
                    f"task {name!r} already registered with a different spec",
                    context=ErrorContext(task_name=name, operation="manager.add"),
                )
            self._refcounts[name] = self._refcounts.get(name, 1) + 1
            return existing

        if prebuilt is not None:
            session = prebuilt
        else:
            if backend is None:
                from nidaqlib.backend.nidaqmx_backend import NidaqmxBackend  # noqa: PLC0415

                backend = NidaqmxBackend()
            await self._preflight_conflicts(name, spec, backend)
            session = DaqSession(spec, backend)
        self._sessions[name] = session
        self._specs[name] = spec
        self._refcounts[name] = 1
        self._order.append(name)
        self._task_locks[name] = anyio.Lock()
        devices = tuple(sorted({_device_of(ch.physical_channel) for ch in spec.channels}))
        self._task_devices[name] = devices
        for ch in spec.channels:
            dev = _device_of(ch.physical_channel)
            self._device_locks.setdefault(dev, anyio.Lock())
        return session

close async

close()

Tear down every managed session in LIFO order. Idempotent.

Failures are collected into an :class:ExceptionGroup; one slow / broken close does not prevent others from running.

Source code in src/nidaqlib/manager.py
async def close(self) -> None:
    """Tear down every managed session in LIFO order. Idempotent.

    Failures are collected into an :class:`ExceptionGroup`; one slow /
    broken close does not prevent others from running.
    """
    if self._closed:
        return
    self._closed = True
    # Snapshot under the global lock, then close outside it so unrelated
    # ops (e.g. a recorder still draining) are not blocked on a long NI
    # close call.
    async with self._global_lock:
        order = list(reversed(self._order))
        sessions = {name: self._sessions[name] for name in order if name in self._sessions}
        self._sessions.clear()
        self._specs.clear()
        self._refcounts.clear()
        self._order.clear()
        self._task_locks.clear()
        self._task_devices.clear()
        self._device_locks.clear()
    errors: list[BaseException] = []
    for name in order:
        session = sessions.get(name)
        if session is None:
            continue
        try:
            await session.close()
        except BaseException as exc:
            # Collected and re-grouped below — one slow / broken close
            # must not prevent the rest of the LIFO unwind from running.
            errors.append(exc)
    if errors:
        raise BaseExceptionGroup("DaqManager.close: one or more sessions failed", errors)

get

get(name)

Return the session registered under name.

Raises:

Type Description
KeyError

name is unknown.

Source code in src/nidaqlib/manager.py
def get(self, name: str) -> DaqSession:
    """Return the session registered under ``name``.

    Raises:
        KeyError: ``name`` is unknown.
    """
    return self._sessions[name]

poll async

poll(names=None, *, timeout=None, error_policy=None)

Poll one or more tasks once each. Returns one :class:DaqReading per task.

Source code in src/nidaqlib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Poll one or more tasks once each. Returns one :class:`DaqReading` per task."""

    async def _do(session: DaqSession) -> DaqReading:
        return await session.poll(timeout=timeout)

    return await self._for_each(
        names,
        "poll",
        _do,
        error_policy=error_policy,
    )

read_block async

read_block(
    samples_per_channel,
    names=None,
    *,
    timeout=None,
    error_policy=None,
)

Read one block per task in parallel.

Source code in src/nidaqlib/manager.py
async def read_block(
    self,
    samples_per_channel: int,
    names: Sequence[str] | None = None,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[DaqBlock]]:
    """Read one block per task in parallel."""

    async def _do(session: DaqSession) -> DaqBlock:
        return await session.read_block(samples_per_channel, timeout=timeout)

    return await self._for_each(
        names,
        "read_block",
        _do,
        error_policy=error_policy,
    )

remove async

remove(name)

Decrement refcount; tear down on the last :meth:remove.

A no-op for unknown names — matches sibling parity.

Raises:

Type Description
NIDaqError

Surfaced from session close (collected into a group when called from :meth:close).

Source code in src/nidaqlib/manager.py
async def remove(self, name: str) -> None:
    """Decrement refcount; tear down on the last :meth:`remove`.

    A no-op for unknown names — matches sibling parity.

    Raises:
        NIDaqError: Surfaced from session close (collected into a
            group when called from :meth:`close`).
    """
    async with self._global_lock:
        if name not in self._sessions:
            return
        self._refcounts[name] -= 1
        if self._refcounts[name] > 0:
            return
        session = self._sessions.pop(name)
        self._specs.pop(name, None)
        self._refcounts.pop(name, None)
        self._task_locks.pop(name, None)
        self._task_devices.pop(name, None)
        with contextlib.suppress(ValueError):
            self._order.remove(name)
    # Close outside the global lock so a slow NI close doesn't block
    # other manager ops on unrelated tasks.
    await session.close()

start async

start(names=None, *, error_policy=None, confirm=False)

Start one or more managed tasks. Defaults to all in add-order.

Source code in src/nidaqlib/manager.py
async def start(
    self,
    names: Sequence[str] | None = None,
    *,
    error_policy: ErrorPolicy | None = None,
    confirm: bool = False,
) -> Mapping[str, DeviceResult[None]]:
    """Start one or more managed tasks. Defaults to all in add-order."""

    async def _do(session: DaqSession) -> None:
        await _configure_then_start(session, confirm=confirm)

    return await self._for_each(
        names,
        "start",
        _do,
        error_policy=error_policy,
    )

start_synchronized async

start_synchronized(
    master, slaves, *, error_policy=None, confirm=False
)

Arm slaves first, then start master.

Multi-task synchronisation requires strict ordering: each slave is configured against a shared sample clock or trigger and must reach the armed-and-waiting state before the master is started — once the master arms its clock or fires its trigger, the slaves react immediately. If a slave is started after the master, samples before its first edge are lost.

Slaves are armed sequentially (not concurrently): NI's start_task returns once the task is armed, so issuing the starts in order guarantees every slave has reached the armed state before the master starts. This is intentionally simpler than the parallel fan-out used by :meth:start; the difference matters when one slave fails to arm — the master must not start at all.

On failure during slave arming, every slave that had already armed is stopped (in reverse order) before the error is raised; the master is not started.

Parameters:

Name Type Description Default
master str

Manager-add name of the master task.

required
slaves Sequence[str]

Manager-add names of the slave tasks. Order is respected — slaves are armed left-to-right.

required
error_policy ErrorPolicy | None

Optional override; defaults to the manager's policy.

None
confirm bool

Required when any task being started can actuate hardware immediately.

False

Returns:

Name Type Description
One Mapping[str, DeviceResult[None]]

class:DeviceResult[None] per task (master plus every

Mapping[str, DeviceResult[None]]

entry of slaves), keyed by name.

Raises:

Type Description
KeyError

master or any entry of slaves is unknown.

BaseExceptionGroup

One or more tasks failed under :attr:ErrorPolicy.RAISE.

Source code in src/nidaqlib/manager.py
async def start_synchronized(
    self,
    master: str,
    slaves: Sequence[str],
    *,
    error_policy: ErrorPolicy | None = None,
    confirm: bool = False,
) -> Mapping[str, DeviceResult[None]]:
    """Arm ``slaves`` first, then start ``master``.

    Multi-task synchronisation requires strict ordering: each slave is
    configured against a shared sample clock or trigger and must reach
    the *armed-and-waiting* state before the master is started — once
    the master arms its clock or fires its trigger, the slaves react
    immediately. If a slave is started after the master, samples
    before its first edge are lost.

    Slaves are armed sequentially (not concurrently): NI's
    ``start_task`` returns once the task is armed, so issuing the
    starts in order guarantees every slave has reached the armed state
    before the master starts. This is intentionally simpler than the
    parallel fan-out used by :meth:`start`; the difference matters
    when one slave fails to arm — the master must not start at all.

    On failure during slave arming, every slave that had already
    armed is stopped (in reverse order) before the error is raised;
    the master is not started.

    Args:
        master: Manager-add name of the master task.
        slaves: Manager-add names of the slave tasks. Order is
            respected — slaves are armed left-to-right.
        error_policy: Optional override; defaults to the manager's
            policy.
        confirm: Required when any task being started can actuate
            hardware immediately.

    Returns:
        One :class:`DeviceResult[None]` per task (``master`` plus every
        entry of ``slaves``), keyed by name.

    Raises:
        KeyError: ``master`` or any entry of ``slaves`` is unknown.
        BaseExceptionGroup: One or more tasks failed under
            :attr:`ErrorPolicy.RAISE`.
    """
    unknown = [n for n in (master, *slaves) if n not in self._sessions]
    if unknown:
        raise KeyError(f"unknown task name(s): {unknown!r}")
    if master in slaves:
        raise NIDaqTaskStateError(
            f"task {master!r} cannot be both master and slave",
            context=ErrorContext(task_name=master, operation="start_synchronized"),
        )

    policy = error_policy if error_policy is not None else self._error_policy
    results: dict[str, DeviceResult[None]] = {}
    errors: list[BaseException] = []
    armed: list[str] = []

    for name in slaves:
        session = self._sessions[name]
        try:
            async with self._operation_locks(name):
                await _configure_then_start(session, confirm=confirm)
            results[name] = DeviceResult(name=name, value=None, error=None)
            armed.append(name)
        except NIDaqError as exc:
            results[name] = DeviceResult(name=name, value=None, error=exc)
            errors.append(exc)
            # Roll back: stop every slave that armed before this one,
            # in reverse order. Best-effort — collect rollback errors
            # but never raise from the rollback path.
            for prior in reversed(armed):
                prior_session = self._sessions[prior]
                try:
                    async with self._operation_locks(prior):
                        await prior_session.stop()
                except NIDaqError as rollback_exc:
                    errors.append(rollback_exc)
            # Do not start the master.
            results[master] = DeviceResult(
                name=master,
                value=None,
                error=NIDaqTaskStateError(
                    f"master {master!r} not started: slave {name!r} failed to arm",
                    context=ErrorContext(task_name=master, operation="start_synchronized"),
                ),
            )
            if policy is ErrorPolicy.RAISE:
                raise BaseExceptionGroup(
                    "DaqManager.start_synchronized: slave arming failed",
                    errors,
                ) from exc
            return results

    # All slaves armed — start the master.
    master_session = self._sessions[master]
    try:
        async with self._operation_locks(master):
            await _configure_then_start(master_session, confirm=confirm)
        results[master] = DeviceResult(name=master, value=None, error=None)
    except NIDaqError as exc:
        results[master] = DeviceResult(name=master, value=None, error=exc)
        errors.append(exc)
        if policy is ErrorPolicy.RAISE:
            raise BaseExceptionGroup(
                "DaqManager.start_synchronized: master start failed",
                errors,
            ) from exc

    return results

stop async

stop(names=None, *, error_policy=None)

Stop one or more managed tasks. Defaults to all in reverse-add.

Source code in src/nidaqlib/manager.py
async def stop(
    self,
    names: Sequence[str] | None = None,
    *,
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[None]]:
    """Stop one or more managed tasks. Defaults to all in reverse-add."""
    targets = self._resolve_names(names)
    if names is None:
        targets = list(reversed(targets))
    return await self._for_each_targets(
        targets,
        "stop",
        self._call_stop,
        error_policy=error_policy,
    )

DeviceResult dataclass

DeviceResult(name, value, error)

One per-task outcome from a manager group operation.

The name matches the ecosystem manager surface even though DAQ granularity is one NI task per slot.

Attributes:

Name Type Description
name str

Manager-add name of the task.

value T | None

The operation's success value, or None on error.

error NIDaqError | None

The wrapped :class:NIDaqError, or None on success.

ok property

ok

True when the operation succeeded for this task.