Skip to content

nidaqlib.backend

nidaqlib.backend

Backend layer — protocol + real and fake implementations.

See design doc §10.

CallbackHandle module-attribute

CallbackHandle = Any

Backend-defined receipt for a registered every-N-samples callback.

Real :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend returns the underlying nidaqmx task object (NI exposes registration as a method on the task with no separate handle). The fake backend returns its own opaque handle. Callers treat this as opaque — pass it back to :meth:DaqBackend.unregister_every_n_samples.

DaqBackend

Bases: Protocol

Operations the rest of :mod:nidaqlib needs from the NI driver layer.

Implementations must wrap nidaqmx.errors.DaqError (and equivalents) into :class:~nidaqlib.errors.NIDaqError subclasses, preserving the original via __cause__ and populating :class:~nidaqlib.errors.ErrorContext.

add_channel

add_channel(task, spec)

Add a channel described by spec to task.

Source code in src/nidaqlib/backend/base.py
def add_channel(self, task: Any, spec: ChannelSpec) -> None:
    """Add a channel described by ``spec`` to ``task``."""
    ...

close_task

close_task(task)

Release the underlying task handle. Idempotent.

Source code in src/nidaqlib/backend/base.py
def close_task(self, task: Any) -> None:
    """Release the underlying task handle. Idempotent."""
    ...

configure_logging

configure_logging(task, logging)

Configure driver-side TDMS logging on task (design doc §14.6).

Maps to task.in_stream.configure_logging(...) on the real backend. Called once, after channels are added and before :meth:configure_timing.

Source code in src/nidaqlib/backend/base.py
def configure_logging(self, task: Any, logging: TdmsLogging) -> None:
    """Configure driver-side TDMS logging on ``task`` (design doc §14.6).

    Maps to ``task.in_stream.configure_logging(...)`` on the real backend.
    Called once, after channels are added and before
    :meth:`configure_timing`.
    """
    ...

configure_timing

configure_timing(task, timing)

Apply :class:Timing to task.

Source code in src/nidaqlib/backend/base.py
def configure_timing(self, task: Any, timing: Timing) -> None:
    """Apply :class:`Timing` to ``task``."""
    ...

configure_trigger

configure_trigger(task, trigger)

Configure a start- or reference-trigger on task.

Implementations dispatch on the concrete :class:TriggerSpec subclass:

  • :class:~nidaqlib.tasks.triggers.DigitalEdgeStartTriggertask.triggers.start_trigger.cfg_dig_edge_start_trig.
  • :class:~nidaqlib.tasks.triggers.AnalogEdgeStartTriggertask.triggers.start_trigger.cfg_anlg_edge_start_trig.
  • :class:~nidaqlib.tasks.triggers.DigitalEdgeReferenceTriggertask.triggers.reference_trigger.cfg_dig_edge_ref_trig.

Called once, after :meth:configure_timing (NI requires the sample clock to be configured before a reference trigger is set).

Source code in src/nidaqlib/backend/base.py
def configure_trigger(self, task: Any, trigger: TriggerSpec) -> None:
    """Configure a start- or reference-trigger on ``task``.

    Implementations dispatch on the concrete :class:`TriggerSpec`
    subclass:

    - :class:`~nidaqlib.tasks.triggers.DigitalEdgeStartTrigger` →
      ``task.triggers.start_trigger.cfg_dig_edge_start_trig``.
    - :class:`~nidaqlib.tasks.triggers.AnalogEdgeStartTrigger` →
      ``task.triggers.start_trigger.cfg_anlg_edge_start_trig``.
    - :class:`~nidaqlib.tasks.triggers.DigitalEdgeReferenceTrigger` →
      ``task.triggers.reference_trigger.cfg_dig_edge_ref_trig``.

    Called once, **after** :meth:`configure_timing` (NI requires the
    sample clock to be configured before a reference trigger is set).
    """
    ...

create_task

create_task(name)

Create and return an underlying task handle.

Source code in src/nidaqlib/backend/base.py
def create_task(self, name: str) -> Any:
    """Create and return an underlying task handle."""
    ...

device_info

device_info(device)

Return product / channel info for device, or None if unknown.

Used by :class:~nidaqlib.manager.DaqManager preflight to detect module-level reservation classes (e.g. NI 9211/9212/9213/9214 TC modules reserve the whole module per task). Implementations MAY return None when the device is unknown to the backend or when no such information is available (e.g. the fake backend).

Source code in src/nidaqlib/backend/base.py
def device_info(self, device: str) -> DeviceInfo | None:
    """Return product / channel info for ``device``, or ``None`` if unknown.

    Used by :class:`~nidaqlib.manager.DaqManager` preflight to detect
    module-level reservation classes (e.g. NI 9211/9212/9213/9214 TC
    modules reserve the whole module per task). Implementations MAY
    return ``None`` when the device is unknown to the backend or when
    no such information is available (e.g. the fake backend).
    """
    ...

read_block

read_block(task, samples_per_channel, timeout)

Block until samples_per_channel samples are available, return them.

Returns:

Type Description
ndarray

np.ndarray of shape (n_channels, samples_per_channel),

ndarray

dtype=float64 for analog-input tasks.

Source code in src/nidaqlib/backend/base.py
def read_block(
    self,
    task: Any,
    samples_per_channel: int,
    timeout: float,
) -> np.ndarray:
    """Block until ``samples_per_channel`` samples are available, return them.

    Returns:
        ``np.ndarray`` of shape ``(n_channels, samples_per_channel)``,
        ``dtype=float64`` for analog-input tasks.
    """
    ...

register_every_n_samples

register_every_n_samples(task, n, callback)

Register a buffer-event callback that fires every n samples.

The callback runs on a driver thread — implementations must not forward asyncio / anyio primitives to it. See design doc §11.3.2.

Parameters:

Name Type Description Default
task Any

Backend task handle from :meth:create_task.

required
n int

Sample-count cadence. Must be > 0.

required
callback Callable[[int], None]

Receives n (the number of samples now available). Implementations are responsible for ensuring the callback receipt outlives the registration — NI stores raw C function pointers and Python GC will silently break the seam.

required

Returns:

Type Description
CallbackHandle

An opaque :data:CallbackHandle to pass to

CallbackHandle

meth:unregister_every_n_samples.

Source code in src/nidaqlib/backend/base.py
def register_every_n_samples(
    self,
    task: Any,
    n: int,
    callback: Callable[[int], None],
) -> CallbackHandle:
    """Register a buffer-event callback that fires every ``n`` samples.

    The callback runs on a *driver thread* — implementations must not
    forward asyncio / anyio primitives to it. See design doc §11.3.2.

    Args:
        task: Backend task handle from :meth:`create_task`.
        n: Sample-count cadence. Must be > 0.
        callback: Receives ``n`` (the number of samples now available).
            Implementations are responsible for ensuring the callback
            receipt outlives the registration — NI stores raw C function
            pointers and Python GC will silently break the seam.

    Returns:
        An opaque :data:`CallbackHandle` to pass to
        :meth:`unregister_every_n_samples`.
    """
    ...

start_task

start_task(task)

Start task (transition to running).

Source code in src/nidaqlib/backend/base.py
def start_task(self, task: Any) -> None:
    """Start ``task`` (transition to running)."""
    ...

stop_task

stop_task(task)

Stop task (transition to committed).

Source code in src/nidaqlib/backend/base.py
def stop_task(self, task: Any) -> None:
    """Stop ``task`` (transition to committed)."""
    ...

unregister_every_n_samples

unregister_every_n_samples(task, handle)

Unregister a previously-registered buffer-event callback.

After this returns, the backend guarantees no further invocations of the callback. MUST be called after :meth:stop_task on the same task — NI rejects unregister on a running task with -200986. See the §11.3.2 ordering invariants for the full stop → unregister → sentinel → drain sequence.

Source code in src/nidaqlib/backend/base.py
def unregister_every_n_samples(self, task: Any, handle: CallbackHandle) -> None:
    """Unregister a previously-registered buffer-event callback.

    After this returns, the backend guarantees no further invocations of
    the callback. MUST be called *after* :meth:`stop_task` on the same
    task — NI rejects unregister on a running task with -200986. See the
    §11.3.2 ordering invariants for the full stop → unregister →
    sentinel → drain sequence.
    """
    ...

write

write(task, values, timeout)

Write one sample-per-channel to task.

Keys of values are the channel display names declared on the spec (ChannelSpec.display_name / NI's name_to_assign_to_channel). Implementations dispatch on the channel kinds present on the task — AO writes go through AnalogMultiChannelWriter; DO writes through DigitalMultiChannelWriter. Mixing kinds in one task is a configuration error and SHOULD be rejected.

:meth:DaqSession.write performs all safety-gate validation (confirm=True, safe_min / safe_max) before this call — backends MUST NOT silently clamp or coerce.

Source code in src/nidaqlib/backend/base.py
def write(
    self,
    task: Any,
    values: Mapping[str, float | bool],
    timeout: float,
) -> None:
    """Write one sample-per-channel to ``task``.

    Keys of ``values`` are the channel display names declared on the
    spec (``ChannelSpec.display_name`` / NI's
    ``name_to_assign_to_channel``). Implementations dispatch on the
    channel kinds present on the task — AO writes go through
    ``AnalogMultiChannelWriter``; DO writes through
    ``DigitalMultiChannelWriter``. Mixing kinds in one task is a
    configuration error and SHOULD be rejected.

    :meth:`DaqSession.write` performs all safety-gate validation
    (``confirm=True``, ``safe_min`` / ``safe_max``) before this call —
    backends MUST NOT silently clamp or coerce.
    """
    ...

FakeDaqBackend

FakeDaqBackend(
    *,
    blocks=None,
    read_block_default_shape=None,
    read_errors=None,
    write_errors=None,
)

In-memory test double for :class:~nidaqlib.backend.base.DaqBackend.

Capabilities:

  • Scripted block reads, keyed by task name and consumed FIFO.
  • Optional deterministic ramp generation when no script is provided.
  • Scripted timeouts / errors injected by the test.
  • An operation log (:attr:operations) for asserting the §11.3.2 shutdown ordering.
  • A driver-thread simulator (:meth:simulate_callbacks) that fires the registered every-N-samples callback on a private threading.Thread, matching the threading model of NI's real callback.

Configure the fake backend.

Parameters:

Name Type Description Default
blocks dict[str, Sequence[ndarray]] | None

Per-task-name sequence of pre-built np.ndarray blocks returned in order from :meth:read_block. When exhausted, the backend falls back to read_block_default_shape (or raises if neither is set).

None
read_block_default_shape tuple[int, int] | None

(n_channels, samples_per_channel) used to synthesise a deterministic ramp when no scripted block is queued. None means "raise instead of synthesise".

None
read_errors dict[str, Iterable[Exception]] | None

Per-task-name iterable of exceptions to raise from :meth:read_block. Each entry is consumed by the next read call before the scripted blocks queue is consulted.

None
write_errors dict[str, Iterable[Exception]] | None

Per-task-name iterable of exceptions to raise from :meth:write. Each entry is consumed by the next write call before the values are recorded.

None
Source code in src/nidaqlib/backend/fake.py
def __init__(
    self,
    *,
    blocks: dict[str, Sequence[np.ndarray]] | None = None,
    read_block_default_shape: tuple[int, int] | None = None,
    read_errors: dict[str, Iterable[Exception]] | None = None,
    write_errors: dict[str, Iterable[Exception]] | None = None,
) -> None:
    """Configure the fake backend.

    Args:
        blocks: Per-task-name sequence of pre-built ``np.ndarray`` blocks
            returned in order from :meth:`read_block`. When exhausted,
            the backend falls back to ``read_block_default_shape`` (or
            raises if neither is set).
        read_block_default_shape: ``(n_channels, samples_per_channel)``
            used to synthesise a deterministic ramp when no scripted
            block is queued. ``None`` means "raise instead of synthesise".
        read_errors: Per-task-name iterable of exceptions to raise from
            :meth:`read_block`. Each entry is consumed by the next read
            call before the scripted blocks queue is consulted.
        write_errors: Per-task-name iterable of exceptions to raise from
            :meth:`write`. Each entry is consumed by the next write
            call before the values are recorded.
    """
    self._blocks: dict[str, list[np.ndarray]] = {
        name: list(seq) for name, seq in (blocks or {}).items()
    }
    self._default_shape = read_block_default_shape
    self._read_errors: dict[str, list[Exception]] = {
        name: list(errs) for name, errs in (read_errors or {}).items()
    }
    self._write_errors: dict[str, list[Exception]] = {
        name: list(errs) for name, errs in (write_errors or {}).items()
    }
    self._tasks: dict[str, _FakeTask] = {}
    self._read_counter: dict[str, int] = defaultdict(int)
    self.operations: list[_Operation] = []
    """Append-only log of backend calls. Tests assert ordering against this."""

    self._sim_threads: list[threading.Thread] = []
    self._device_info: dict[str, DeviceInfo] = {}

operations instance-attribute

operations = []

Append-only log of backend calls. Tests assert ordering against this.

add_channel

add_channel(task, spec)

Append spec to task.channels.

Source code in src/nidaqlib/backend/fake.py
def add_channel(self, task: _FakeTask, spec: ChannelSpec) -> None:
    """Append ``spec`` to ``task.channels``."""
    task.channels.append(spec)
    self.operations.append(_Operation("add_channel", task.name, spec.physical_channel))

close_task

close_task(task)

Mark task closed. Idempotent.

Source code in src/nidaqlib/backend/fake.py
def close_task(self, task: _FakeTask) -> None:
    """Mark ``task`` closed. Idempotent."""
    if task.closed:
        return
    task.closed = True
    self.operations.append(_Operation("close_task", task.name))

configure_logging

configure_logging(task, logging)

Record logging on task.

Source code in src/nidaqlib/backend/fake.py
def configure_logging(self, task: _FakeTask, logging: TdmsLogging) -> None:
    """Record ``logging`` on ``task``."""
    task.logging = logging
    self.operations.append(_Operation("configure_logging", task.name, f"path={logging.path!s}"))

configure_timing

configure_timing(task, timing)

Record timing on task.

Source code in src/nidaqlib/backend/fake.py
def configure_timing(self, task: _FakeTask, timing: Timing) -> None:
    """Record ``timing`` on ``task``."""
    task.timing = timing
    self.operations.append(
        _Operation("configure_timing", task.name, f"rate_hz={timing.rate_hz}")
    )

configure_trigger

configure_trigger(task, trigger)

Record trigger on task for test inspection.

Source code in src/nidaqlib/backend/fake.py
def configure_trigger(self, task: _FakeTask, trigger: TriggerSpec) -> None:
    """Record ``trigger`` on ``task`` for test inspection."""
    task.trigger = trigger
    detail = f"kind={trigger.kind};source={trigger.source}"
    self.operations.append(_Operation("configure_trigger", task.name, detail))

create_task

create_task(name)

Create and return a new :class:_FakeTask.

Raises:

Type Description
NIDaqBackendError

A task with name already exists.

Source code in src/nidaqlib/backend/fake.py
def create_task(self, name: str) -> _FakeTask:
    """Create and return a new :class:`_FakeTask`.

    Raises:
        NIDaqBackendError: A task with ``name`` already exists.
    """
    if name in self._tasks:
        raise NIDaqBackendError(
            f"task {name!r} already exists",
            context=ErrorContext(task_name=name, operation="create_task"),
        )
    task = _FakeTask(name=name)
    self._tasks[name] = task
    self.operations.append(_Operation("create_task", name))
    return task

device_info

device_info(device)

Return scripted DeviceInfo for device if registered, else None.

Tests register product types via :meth:register_device_info so the manager's module-level preflight can be exercised against the fake. Default behaviour (no registration) returns None, matching the Protocol's "unknown device" semantics.

Source code in src/nidaqlib/backend/fake.py
def device_info(self, device: str) -> DeviceInfo | None:
    """Return scripted ``DeviceInfo`` for ``device`` if registered, else ``None``.

    Tests register product types via :meth:`register_device_info` so the
    manager's module-level preflight can be exercised against the fake.
    Default behaviour (no registration) returns ``None``, matching the
    Protocol's "unknown device" semantics.
    """
    return self._device_info.get(device)

read_block

read_block(task, samples_per_channel, timeout)

Pop the next scripted block, fall back to a deterministic ramp.

Raises:

Type Description
NIDaqTimeoutError

A scripted timeout exception was queued.

NIDaqReadError

A scripted read exception was queued, or the queue is empty and no read_block_default_shape is set.

Source code in src/nidaqlib/backend/fake.py
def read_block(
    self,
    task: _FakeTask,
    samples_per_channel: int,
    timeout: float,
) -> np.ndarray:
    """Pop the next scripted block, fall back to a deterministic ramp.

    Raises:
        NIDaqTimeoutError: A scripted timeout exception was queued.
        NIDaqReadError: A scripted read exception was queued, or the
            queue is empty and no ``read_block_default_shape`` is set.
    """
    del timeout  # The fake never blocks — timeout is metadata for tests.
    errs = self._read_errors.get(task.name)
    if errs:
        err = errs.pop(0)
        self.operations.append(_Operation("read_block_error", task.name, str(err)))
        if isinstance(err, (NIDaqReadError, NIDaqTimeoutError)):
            raise err
        raise NIDaqReadError(
            f"scripted read error: {err!r}",
            context=ErrorContext(task_name=task.name, operation="read_block"),
        ) from err
    scripted = self._blocks.get(task.name)
    if scripted:
        block = scripted.pop(0)
    elif self._default_shape is not None:
        n_channels, _ = self._default_shape
        i = self._read_counter[task.name]
        block = np.full(
            (n_channels, samples_per_channel),
            fill_value=float(i),
            dtype=np.float64,
        )
        self._read_counter[task.name] = i + 1
    else:
        raise NIDaqReadError(
            f"no scripted blocks remain for task {task.name!r}",
            context=ErrorContext(task_name=task.name, operation="read_block"),
        )
    if block.shape[1] != samples_per_channel:
        # Reshape on the fly for tests that pre-build a long stream and
        # let the recorder choose the chunk size.
        block = block[:, :samples_per_channel]
    self.operations.append(_Operation("read_block", task.name, str(block.shape)))
    return block

register_device_info

register_device_info(device, *, product_type)

Scripted DeviceInfo for tests of the manager's module-level preflight.

Source code in src/nidaqlib/backend/fake.py
def register_device_info(self, device: str, *, product_type: str) -> None:
    """Scripted DeviceInfo for tests of the manager's module-level preflight."""
    from nidaqlib.system.models import DeviceInfo as _DeviceInfo  # noqa: PLC0415

    self._device_info[device] = _DeviceInfo(
        name=device,
        product_type=product_type,
        serial_number=None,
        ai_physical_channels=(),
        ao_physical_channels=(),
        di_lines=(),
        do_lines=(),
        ci_physical_channels=(),
        co_physical_channels=(),
    )

register_every_n_samples

register_every_n_samples(task, n, callback)

Stash callback on the task. Returns task as the handle.

Mirrors NI's ordering invariant: registration must precede task.start(). Real NI rejects post-start registration with -200960 ("Register all your DAQmx software events prior to starting the task"); the fake raises an analogous :class:NIDaqBackendError so the unit suite catches violations that the hardware would otherwise surface only at integration.

Raises:

Type Description
NIDaqBackendError

A callback is already registered, or the task has already been started.

Source code in src/nidaqlib/backend/fake.py
def register_every_n_samples(
    self,
    task: _FakeTask,
    n: int,
    callback: Callable[[int], None],
) -> _FakeTask:
    """Stash ``callback`` on the task. Returns ``task`` as the handle.

    Mirrors NI's ordering invariant: registration must precede
    ``task.start()``. Real NI rejects post-start registration with
    -200960 ("Register all your DAQmx software events prior to starting
    the task"); the fake raises an analogous
    :class:`NIDaqBackendError` so the unit suite catches violations
    that the hardware would otherwise surface only at integration.

    Raises:
        NIDaqBackendError: A callback is already registered, or the task
            has already been started.
    """
    if task.started:
        raise NIDaqBackendError(
            f"task {task.name!r} is already started; "
            "register_every_n_samples must run before start_task "
            "(NI rejects post-start registration with -200960)",
            context=ErrorContext(task_name=task.name, operation="register_every_n_samples"),
        )
    if task.callback is not None:
        raise NIDaqBackendError(
            f"task {task.name!r} already has a buffer-event callback",
            context=ErrorContext(task_name=task.name, operation="register_every_n_samples"),
        )
    task.callback = callback
    task.callback_n = n
    self.operations.append(_Operation("register_every_n_samples", task.name, f"n={n}"))
    return task

simulate_callbacks

simulate_callbacks(task, *, firings, cadence_s=0.0)

Fire the registered callback firings times from a worker thread.

Models the behaviour of NI's DAQmx driver thread so the §11.3.2 bridge can be exercised end-to-end in unit tests. The callback runs on a fresh threading.Thread (NOT the asyncio event loop).

The simulator stops early if the callback is unregistered between firings — this models NI's "any pending events are discarded" note on stop_task.

Parameters:

Name Type Description Default
task _FakeTask

The fake task on which the callback was registered.

required
firings int

Number of times to invoke the callback.

required
cadence_s float

Optional sleep between firings, in seconds. Use 0 for a tight burst, > 0 to mimic a finite sample-clock cadence.

0.0

Returns:

Name Type Description
The Thread

class:threading.Thread running the simulator. Tests

Thread

usually do not need to .join() this — the bridge tests rely

Thread

on the recorder's own shutdown to drain pending chunks. Joinable

Thread

if asserting on thread liveness.

Source code in src/nidaqlib/backend/fake.py
def simulate_callbacks(
    self,
    task: _FakeTask,
    *,
    firings: int,
    cadence_s: float = 0.0,
) -> threading.Thread:
    """Fire the registered callback ``firings`` times from a worker thread.

    Models the behaviour of NI's DAQmx driver thread so the §11.3.2
    bridge can be exercised end-to-end in unit tests. The callback runs
    on a fresh ``threading.Thread`` (NOT the asyncio event loop).

    The simulator stops early if the callback is unregistered between
    firings — this models NI's "any pending events are discarded" note
    on ``stop_task``.

    Args:
        task: The fake task on which the callback was registered.
        firings: Number of times to invoke the callback.
        cadence_s: Optional sleep between firings, in seconds. Use 0 for
            a tight burst, > 0 to mimic a finite sample-clock cadence.

    Returns:
        The :class:`threading.Thread` running the simulator. Tests
        usually do not need to ``.join()`` this — the bridge tests rely
        on the recorder's own shutdown to drain pending chunks. Joinable
        if asserting on thread liveness.
    """

    def _run() -> None:
        for _ in range(firings):
            cb = task.callback
            if cb is None:
                return
            cb(task.callback_n)
            if cadence_s > 0.0:
                threading.Event().wait(cadence_s)

    thread = threading.Thread(
        target=_run,
        name=f"FakeDaqBackend-cb-sim[{task.name}]",
        daemon=True,
    )
    thread.start()
    self._sim_threads.append(thread)
    return thread

start_task

start_task(task)

Mark task started.

Source code in src/nidaqlib/backend/fake.py
def start_task(self, task: _FakeTask) -> None:
    """Mark ``task`` started."""
    task.started = True
    self.operations.append(_Operation("start_task", task.name))

stop_task

stop_task(task)

Mark task stopped.

Source code in src/nidaqlib/backend/fake.py
def stop_task(self, task: _FakeTask) -> None:
    """Mark ``task`` stopped."""
    task.started = False
    self.operations.append(_Operation("stop_task", task.name))

unregister_every_n_samples

unregister_every_n_samples(task, handle)

Clear the buffer-event callback on task.

Mirrors NI's ordering invariant: unregistration requires the task to be stopped. Real NI rejects post-running unregister with -200986 ("DAQmx software event cannot be unregistered because the task is running"); the fake raises an analogous :class:NIDaqBackendError so the unit suite catches violations that real hardware would otherwise surface only at integration.

Raises:

Type Description
NIDaqBackendError

The task is still running.

Source code in src/nidaqlib/backend/fake.py
def unregister_every_n_samples(self, task: _FakeTask, handle: Any) -> None:
    """Clear the buffer-event callback on ``task``.

    Mirrors NI's ordering invariant: unregistration requires the task
    to be stopped. Real NI rejects post-running unregister with -200986
    ("DAQmx software event cannot be unregistered because the task is
    running"); the fake raises an analogous
    :class:`NIDaqBackendError` so the unit suite catches violations
    that real hardware would otherwise surface only at integration.

    Raises:
        NIDaqBackendError: The task is still running.
    """
    del handle
    if task.started:
        raise NIDaqBackendError(
            f"task {task.name!r} is still running; "
            "unregister_every_n_samples must run after stop_task "
            "(NI rejects post-running unregister with -200986)",
            context=ErrorContext(task_name=task.name, operation="unregister_every_n_samples"),
        )
    task.callback = None
    task.callback_n = 0
    self.operations.append(_Operation("unregister_every_n_samples", task.name))

write

write(task, values, timeout)

Record one write — for tests asserting on outputs.

Validation parity with the real backend: missing channel keys raise :class:NIDaqConfigurationError; scripted errors raise from the per-task write_errors queue.

Raises:

Type Description
NIDaqConfigurationError

values is missing entries for one or more output channels.

NIDaqWriteError / NIDaqTimeoutError

A scripted error was queued.

Source code in src/nidaqlib/backend/fake.py
def write(
    self,
    task: _FakeTask,
    values: Mapping[str, float | bool],
    timeout: float,
) -> None:
    """Record one write — for tests asserting on outputs.

    Validation parity with the real backend: missing channel keys raise
    :class:`NIDaqConfigurationError`; scripted errors raise from the
    per-task ``write_errors`` queue.

    Raises:
        NIDaqConfigurationError: ``values`` is missing entries for one
            or more output channels.
        NIDaqWriteError / NIDaqTimeoutError: A scripted error was queued.
    """
    del timeout  # The fake never blocks — timeout is metadata for tests.
    from nidaqlib.channels.analog_output import AnalogOutputVoltage  # noqa: PLC0415
    from nidaqlib.channels.digital_output import DigitalOutput  # noqa: PLC0415

    output_channels = [
        ch for ch in task.channels if isinstance(ch, (AnalogOutputVoltage, DigitalOutput))
    ]
    if not output_channels:
        raise NIDaqConfigurationError(
            "task has no writable channels (AO or DO)",
            context=ErrorContext(task_name=task.name, operation="write"),
        )
    names = [ch.display_name for ch in output_channels]
    missing = [n for n in names if n not in values]
    if missing:
        raise NIDaqConfigurationError(
            f"write missing values for channel(s): {missing!r}",
            context=ErrorContext(task_name=task.name, operation="write"),
        )

    errs = self._write_errors.get(task.name)
    if errs:
        err = errs.pop(0)
        self.operations.append(_Operation("write_error", task.name, str(err)))
        if isinstance(err, (NIDaqWriteError, NIDaqTimeoutError)):
            raise err
        raise NIDaqWriteError(
            f"scripted write error: {err!r}",
            context=ErrorContext(task_name=task.name, operation="write"),
        ) from err

    snapshot: dict[str, float | bool] = {n: values[n] for n in names}
    task.writes.append(snapshot)
    task.last_write = snapshot
    self.operations.append(_Operation("write", task.name, repr(snapshot)))

NidaqmxBackend

NidaqmxBackend()

Production backend wrapping nidaqmx-python.

Supported operations:

  • Task creation / destruction.
  • Analog, digital, and counter channel addition.
  • Sample-clock timing.
  • Start / stop / read / write.
  • Trigger configuration.
  • Every-N-samples buffer-event callbacks (the §11.3.2 bridge driver).

Per-task state held here is limited to the strong reference the callback bridge needs: NI stores the registered callback as a raw C function pointer (see §11.3.2 GC seam), and nidaqmx.Task uses __slots__ so we can't stash the wrapper on the task itself. The backend keeps it in self._callback_wrappers keyed by id(task) until unregister_every_n_samples runs.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def __init__(self) -> None:
    self._callback_wrappers: dict[int, Callable[..., int]] = {}

add_channel

add_channel(task, spec)

Dispatch on spec.kind and add the channel to task.

The dispatch table covers "ai_voltage", "thermocouple", "ao_voltage", "di", "do", "ci_frequency", "ci_period", "ci_edge_count", "co_pulse_frequency", "co_pulse_time", and "co_pulse_ticks".

Raises:

Type Description
NIDaqBackendError

NI rejected the channel creation, or spec.kind is unsupported by this backend.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def add_channel(self, task: Any, spec: ChannelSpec) -> None:
    """Dispatch on ``spec.kind`` and add the channel to ``task``.

    The dispatch table covers ``"ai_voltage"``, ``"thermocouple"``,
    ``"ao_voltage"``, ``"di"``, ``"do"``, ``"ci_frequency"``,
    ``"ci_period"``, ``"ci_edge_count"``, ``"co_pulse_frequency"``,
    ``"co_pulse_time"``, and ``"co_pulse_ticks"``.

    Raises:
        NIDaqBackendError: NI rejected the channel creation, or
            ``spec.kind`` is unsupported by this backend.
    """
    # Late import — design doc §10.3 keeps the channel-spec module free
    # of the production NI dependency.
    from nidaqlib.channels.analog_input import (  # noqa: PLC0415
        AnalogInputVoltage,
        ThermocoupleInput,
    )
    from nidaqlib.channels.analog_output import AnalogOutputVoltage  # noqa: PLC0415
    from nidaqlib.channels.counter_input import (  # noqa: PLC0415
        CounterEdgeCountInput,
        CounterFrequencyInput,
        CounterPeriodInput,
    )
    from nidaqlib.channels.counter_output import (  # noqa: PLC0415
        CounterPulseFrequency,
        CounterPulseTicks,
        CounterPulseTime,
    )
    from nidaqlib.channels.digital_input import DigitalInput  # noqa: PLC0415
    from nidaqlib.channels.digital_output import DigitalOutput  # noqa: PLC0415

    nidaqmx = _import_nidaqmx()
    try:
        if isinstance(spec, AnalogInputVoltage):
            self._add_ai_voltage(task, spec)
        elif isinstance(spec, ThermocoupleInput):
            self._add_thermocouple(task, spec)
        elif isinstance(spec, AnalogOutputVoltage):
            self._add_ao_voltage(task, spec)
        elif isinstance(spec, (DigitalInput, DigitalOutput)):
            self._add_digital(task, spec)
        elif isinstance(spec, CounterFrequencyInput):
            self._add_ci_frequency(task, spec)
        elif isinstance(spec, CounterPeriodInput):
            self._add_ci_period(task, spec)
        elif isinstance(spec, CounterEdgeCountInput):
            self._add_ci_edge_count(task, spec)
        elif isinstance(spec, CounterPulseFrequency):
            self._add_co_pulse_frequency(task, spec)
        elif isinstance(spec, CounterPulseTime):
            self._add_co_pulse_time(task, spec)
        elif isinstance(spec, CounterPulseTicks):
            self._add_co_pulse_ticks(task, spec)
        else:
            raise NIDaqBackendError(
                f"NidaqmxBackend does not support channel kind {spec.kind!r}",
                context=ErrorContext(
                    task_name=getattr(task, "name", None),
                    physical_channel=spec.physical_channel,
                    operation="add_channel",
                ),
            )
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            f"failed to add channel {spec.physical_channel!r}",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                channel_name=spec.name,
                physical_channel=spec.physical_channel,
                operation="add_channel",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

close_task

close_task(task)

Close task. Idempotent — already-closed tasks are silently OK.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def close_task(self, task: Any) -> None:
    """Close ``task``. Idempotent — already-closed tasks are silently OK."""
    nidaqmx = _import_nidaqmx()
    try:
        task.close()
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to close task",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="close_task",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

configure_logging

configure_logging(task, logging)

Configure driver-side TDMS logging via task.in_stream.

Raises:

Type Description
NIDaqBackendError

NI rejected the configure-logging call.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def configure_logging(self, task: Any, logging: TdmsLogging) -> None:
    """Configure driver-side TDMS logging via ``task.in_stream``.

    Raises:
        NIDaqBackendError: NI rejected the configure-logging call.
    """
    nidaqmx = _import_nidaqmx()
    kwargs: dict[str, Any] = {
        "file_path": str(logging.path),
        "logging_mode": logging.mode,
        "operation": logging.operation,
    }
    if logging.group_name is not None:
        kwargs["group_name"] = logging.group_name
    try:
        task.in_stream.configure_logging(**kwargs)
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to configure TDMS logging",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="configure_logging",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

configure_timing

configure_timing(task, timing)

Apply :class:Timing to task via cfg_samp_clk_timing.

Raises:

Type Description
NIDaqBackendError

NI rejected the timing configuration.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def configure_timing(self, task: Any, timing: Timing) -> None:
    """Apply :class:`Timing` to ``task`` via ``cfg_samp_clk_timing``.

    Raises:
        NIDaqBackendError: NI rejected the timing configuration.
    """
    from nidaqlib.tasks.spec import AcquisitionMode, Edge  # noqa: PLC0415

    nidaqmx = _import_nidaqmx()
    from nidaqmx.constants import AcquisitionType  # noqa: PLC0415
    from nidaqmx.constants import Edge as NIEdge  # noqa: PLC0415

    if timing.mode is AcquisitionMode.ON_DEMAND:
        return

    mode_map = {
        AcquisitionMode.FINITE: AcquisitionType.FINITE,
        AcquisitionMode.CONTINUOUS: AcquisitionType.CONTINUOUS,
    }
    edge_map = {Edge.RISING: NIEdge.RISING, Edge.FALLING: NIEdge.FALLING}
    # ``samps_per_chan`` is required by NI for both finite and continuous
    # modes — for continuous it sizes the on-board buffer. Pass a sane
    # default when the user did not supply one.
    samps = (
        timing.samples_per_channel
        if timing.samples_per_channel is not None
        else max(1000, int(timing.rate_hz))
    )
    kwargs: dict[str, Any] = {
        "rate": timing.rate_hz,
        "active_edge": edge_map[timing.active_edge],
        "sample_mode": mode_map[timing.mode],
        "samps_per_chan": samps,
    }
    if timing.source is not None:
        kwargs["source"] = timing.source
    try:
        task.timing.cfg_samp_clk_timing(**kwargs)
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to configure timing",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="configure_timing",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

configure_trigger

configure_trigger(task, trigger)

Dispatch trigger onto the appropriate NI triggers.* API.

Raises:

Type Description
NIDaqBackendError

NI rejected the trigger configuration, or the trigger kind is unsupported by this backend.

NIDaqConfigurationError

The trigger spec is structurally invalid (e.g. zero pretrigger samples — already caught by __post_init__, but defensive).

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def configure_trigger(self, task: Any, trigger: TriggerSpec) -> None:
    """Dispatch ``trigger`` onto the appropriate NI ``triggers.*`` API.

    Raises:
        NIDaqBackendError: NI rejected the trigger configuration, or
            the trigger ``kind`` is unsupported by this backend.
        NIDaqConfigurationError: The trigger spec is structurally
            invalid (e.g. zero pretrigger samples — already caught by
            ``__post_init__``, but defensive).
    """
    from nidaqlib.tasks.spec import Edge  # noqa: PLC0415
    from nidaqlib.tasks.triggers import (  # noqa: PLC0415
        AnalogEdgeStartTrigger,
        AnalogTriggerSlope,
        DigitalEdgeReferenceTrigger,
        DigitalEdgeStartTrigger,
    )

    nidaqmx = _import_nidaqmx()
    from nidaqmx.constants import Edge as NIEdge  # noqa: PLC0415
    from nidaqmx.constants import Slope as NISlope  # noqa: PLC0415

    edge_map = {Edge.RISING: NIEdge.RISING, Edge.FALLING: NIEdge.FALLING}
    slope_map = {
        AnalogTriggerSlope.RISING: NISlope.RISING,
        AnalogTriggerSlope.FALLING: NISlope.FALLING,
    }
    try:
        if isinstance(trigger, DigitalEdgeStartTrigger):
            task.triggers.start_trigger.cfg_dig_edge_start_trig(
                trigger_source=trigger.source,
                trigger_edge=edge_map[trigger.edge],
            )
        elif isinstance(trigger, AnalogEdgeStartTrigger):
            task.triggers.start_trigger.cfg_anlg_edge_start_trig(
                trigger_source=trigger.source,
                trigger_slope=slope_map[trigger.slope],
                trigger_level=trigger.level,
            )
        elif isinstance(trigger, DigitalEdgeReferenceTrigger):
            task.triggers.reference_trigger.cfg_dig_edge_ref_trig(
                trigger_source=trigger.source,
                pretrigger_samples=trigger.pretrigger_samples,
                trigger_edge=edge_map[trigger.edge],
            )
        else:
            raise NIDaqBackendError(
                f"NidaqmxBackend does not support trigger kind {trigger.kind!r}",
                context=ErrorContext(
                    task_name=getattr(task, "name", None),
                    operation="configure_trigger",
                ),
            )
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to configure trigger",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="configure_trigger",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

create_task

create_task(name)

Create an nidaqmx.Task with name.

Raises:

Type Description
NIDaqBackendError

NI rejected the task creation.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def create_task(self, name: str) -> Any:
    """Create an ``nidaqmx.Task`` with ``name``.

    Raises:
        NIDaqBackendError: NI rejected the task creation.
    """
    nidaqmx = _import_nidaqmx()
    try:
        return nidaqmx.Task(new_task_name=name)
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            f"failed to create task {name!r}",
            context=ErrorContext(
                task_name=name,
                operation="create_task",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

device_info

device_info(device)

Return product info for device via nidaqmx.system.Device.

Direct lookup — does not enumerate the whole system. Returns None if NI does not recognise the device alias. Used by the manager's preflight to detect module-level reservation (e.g. TC modules reserve the whole module per task).

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def device_info(self, device: str) -> DeviceInfo | None:
    """Return product info for ``device`` via ``nidaqmx.system.Device``.

    Direct lookup — does not enumerate the whole system. Returns
    ``None`` if NI does not recognise the device alias. Used by the
    manager's preflight to detect module-level reservation (e.g. TC
    modules reserve the whole module per task).
    """
    from nidaqlib.system.models import DeviceInfo as _DeviceInfo  # noqa: PLC0415

    nidaqmx = _import_nidaqmx()
    try:
        dev = nidaqmx.system.Device(device)
        product_type = getattr(dev, "product_type", None)
    except nidaqmx.errors.DaqError:
        return None
    if product_type is None:
        return None
    # Only product_type matters for the reservation lookup; channel
    # inventories are a separate (more expensive) call we don't need
    # here. Leave them empty.
    return _DeviceInfo(
        name=device,
        product_type=product_type,
        serial_number=None,
        ai_physical_channels=(),
        ao_physical_channels=(),
        di_lines=(),
        do_lines=(),
        ci_physical_channels=(),
        co_physical_channels=(),
    )

read_block

read_block(task, samples_per_channel, timeout)

Block-read samples_per_channel samples per channel.

Uses :class:AnalogMultiChannelReader so the result is np.ndarray of shape (n_channels, samples_per_channel) rather than Task.read's list-of-lists. Allocates a fresh buffer per call because clarity is worth a few microseconds of allocator pressure here.

Raises:

Type Description
NIDaqTimeoutError

timeout elapsed before samples were ready.

NIDaqReadError

NI returned any other read failure.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def read_block(
    self,
    task: Any,
    samples_per_channel: int,
    timeout: float,
) -> np.ndarray:
    """Block-read ``samples_per_channel`` samples per channel.

    Uses :class:`AnalogMultiChannelReader` so the result is
    ``np.ndarray`` of shape ``(n_channels, samples_per_channel)`` rather
    than ``Task.read``'s list-of-lists. Allocates a fresh buffer per call
    because clarity is worth a few microseconds of allocator pressure here.

    Raises:
        NIDaqTimeoutError: ``timeout`` elapsed before samples were ready.
        NIDaqReadError: NI returned any other read failure.
    """
    import numpy as np  # noqa: PLC0415
    from nidaqmx.stream_readers import AnalogMultiChannelReader  # noqa: PLC0415

    nidaqmx = _import_nidaqmx()

    n_channels = int(task.number_of_channels)
    buf = np.empty((n_channels, samples_per_channel), dtype=np.float64)
    reader = AnalogMultiChannelReader(task.in_stream)
    read_many_sample = cast(
        "Callable[..., object]",
        reader.read_many_sample,  # pyright: ignore[reportUnknownMemberType]
    )
    try:
        read_many_sample(
            buf,
            number_of_samples_per_channel=samples_per_channel,
            timeout=timeout,
        )
    except nidaqmx.errors.DaqError as exc:
        ctx = ErrorContext(
            task_name=getattr(task, "name", None),
            operation="read_block",
            ni_error_code=getattr(exc, "error_code", None),
        )
        # NI's timeout error code is DAQmxErrorSamplesNotYetAvailable;
        # generic read failures land under NIDaqReadError.
        if getattr(exc, "error_code", None) == _NI_TIMEOUT_ERROR_CODE:
            raise NIDaqTimeoutError(
                f"read_block timed out after {timeout}s",
                context=ctx,
            ) from exc
        raise NIDaqReadError(
            "failed to read DAQ block",
            context=ctx,
        ) from exc
    return buf

register_every_n_samples

register_every_n_samples(task, n, callback)

Register a buffer-event callback for task.

Wraps NI's four-argument C-style callback into the Protocol's single-argument Callable[[int], None]. Returns task itself — NI tracks at most one such callback per task, so the unregister side reuses the same handle.

The caller MUST keep a strong reference to callback for the lifetime of the registration. NI stores the wrapper as a raw C function pointer and Python GC will silently break the seam.

Raises:

Type Description
NIDaqBackendError

NI rejected the registration.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def register_every_n_samples(
    self,
    task: Any,
    n: int,
    callback: Callable[[int], None],
) -> Any:
    """Register a buffer-event callback for ``task``.

    Wraps NI's four-argument C-style callback into the Protocol's
    single-argument ``Callable[[int], None]``. Returns ``task`` itself —
    NI tracks at most one such callback per task, so the unregister side
    reuses the same handle.

    The caller MUST keep a strong reference to ``callback`` for the
    lifetime of the registration. NI stores the wrapper as a raw C
    function pointer and Python GC will silently break the seam.

    Raises:
        NIDaqBackendError: NI rejected the registration.
    """
    nidaqmx = _import_nidaqmx()

    def _ni_cb(task_handle: object, event_type: int, n_samples: int, _data: object) -> int:
        """NI-shaped trampoline.

        Runs on a DAQmx driver thread. Anyio / asyncio APIs are unsafe
        here — the bridge layer (``streaming/block.py``) takes care of
        the thread-safe hand-off via ``queue.SimpleQueue``.
        """
        del task_handle, event_type, _data
        callback(n_samples)
        return 0

    try:
        task.register_every_n_samples_acquired_into_buffer_event(n, _ni_cb)
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to register every-N-samples callback",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="register_every_n_samples",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc
    # Strong-ref the wrapper for the lifetime of the registration. NI
    # stores ``_ni_cb`` as a raw C function pointer; if Python GC reaps
    # it, the next firing crashes the driver (see §11.3.2 GC seam).
    # ``nidaqmx.Task`` uses ``__slots__`` so we can't stash on the task
    # itself — keep it on the backend instance, keyed by id(task).
    self._callback_wrappers[id(task)] = _ni_cb
    return task

start_task

start_task(task)

Start task.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def start_task(self, task: Any) -> None:
    """Start ``task``."""
    nidaqmx = _import_nidaqmx()
    try:
        task.start()
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to start task",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="start_task",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

stop_task

stop_task(task)

Stop task.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def stop_task(self, task: Any) -> None:
    """Stop ``task``."""
    nidaqmx = _import_nidaqmx()
    try:
        task.stop()
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to stop task",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="stop_task",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc

unregister_every_n_samples

unregister_every_n_samples(task, handle)

Unregister the buffer-event callback on task.

Per NI's API, registering with None clears the callback.

Raises:

Type Description
NIDaqBackendError

NI rejected the unregister call.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def unregister_every_n_samples(self, task: Any, handle: Any) -> None:
    """Unregister the buffer-event callback on ``task``.

    Per NI's API, registering with ``None`` clears the callback.

    Raises:
        NIDaqBackendError: NI rejected the unregister call.
    """
    del handle  # NI's API is task-scoped; the handle is the task itself.
    nidaqmx = _import_nidaqmx()
    try:
        task.register_every_n_samples_acquired_into_buffer_event(0, None)
    except nidaqmx.errors.DaqError as exc:
        raise NIDaqBackendError(
            "failed to unregister every-N-samples callback",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="unregister_every_n_samples",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        ) from exc
    # Drop the strong reference now that NI is no longer holding it.
    self._callback_wrappers.pop(id(task), None)

write

write(task, values, timeout)

Dispatch one-sample-per-channel write across AO / DO channels.

Inspects the underlying NI task's channel collections — AO writes go through AnalogMultiChannelWriter; DO writes through DigitalMultiChannelWriter. Mixing AO and DO on a single task is rejected as :class:NIDaqConfigurationError. Per-channel ordering follows task.channel_names so the caller's mapping does not need to match NI's internal order.

Raises:

Type Description
NIDaqConfigurationError

Task mixes AO and DO, or the keys of values don't cover the task's output channels.

NIDaqWriteError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/backend/nidaqmx_backend.py
def write(
    self,
    task: Any,
    values: Mapping[str, float | bool],
    timeout: float,
) -> None:
    """Dispatch one-sample-per-channel write across AO / DO channels.

    Inspects the underlying NI task's channel collections — AO writes go
    through ``AnalogMultiChannelWriter``; DO writes through
    ``DigitalMultiChannelWriter``. Mixing AO and DO on a single task is
    rejected as :class:`NIDaqConfigurationError`. Per-channel ordering
    follows ``task.channel_names`` so the caller's mapping does not need
    to match NI's internal order.

    Raises:
        NIDaqConfigurationError: Task mixes AO and DO, or the keys of
            ``values`` don't cover the task's output channels.
        NIDaqWriteError / NIDaqTimeoutError: Surfaced from the backend.
    """
    import numpy as np  # noqa: PLC0415

    nidaqmx = _import_nidaqmx()

    ao_count = int(getattr(task, "number_of_ao_channels", 0) or 0)
    do_count = 0
    try:
        do_count = len(list(task.do_channels))
    except Exception:  # pragma: no cover - defensive against odd NI shapes
        do_count = 0

    if ao_count > 0 and do_count > 0:
        raise NIDaqConfigurationError(
            "tasks mixing AO and DO are not supported by NidaqmxBackend.write",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="write",
            ),
        )

    channel_names: list[str] = list(task.channel_names)
    missing = [name for name in channel_names if name not in values]
    if missing:
        raise NIDaqConfigurationError(
            f"write missing values for channel(s): {missing!r}",
            context=ErrorContext(
                task_name=getattr(task, "name", None),
                operation="write",
            ),
        )

    ctx = ErrorContext(task_name=getattr(task, "name", None), operation="write")
    try:
        if ao_count > 0:
            from nidaqmx.stream_writers import AnalogMultiChannelWriter  # noqa: PLC0415

            buf = np.asarray([float(values[name]) for name in channel_names], dtype=np.float64)
            writer = AnalogMultiChannelWriter(task.out_stream)
            write_one_sample = cast(
                "Callable[..., object]",
                writer.write_one_sample,  # pyright: ignore[reportUnknownMemberType]
            )
            write_one_sample(buf, timeout=timeout)
            return
        if do_count > 0:
            from nidaqmx.stream_writers import DigitalMultiChannelWriter  # noqa: PLC0415

            buf_b = np.asarray([bool(values[name]) for name in channel_names], dtype=bool)
            writer_d = DigitalMultiChannelWriter(task.out_stream)
            write_one_sample_d = cast(
                "Callable[..., object]",
                writer_d.write_one_sample_one_line,  # pyright: ignore[reportUnknownMemberType]
            )
            write_one_sample_d(buf_b, timeout=timeout)
            return
        raise NIDaqConfigurationError(
            "task has no writable channels (AO or DO)",
            context=ctx,
        )
    except nidaqmx.errors.DaqError as exc:
        ni_ctx = ErrorContext(
            task_name=getattr(task, "name", None),
            operation="write",
            ni_error_code=getattr(exc, "error_code", None),
        )
        if getattr(exc, "error_code", None) == _NI_TIMEOUT_ERROR_CODE:
            raise NIDaqTimeoutError(
                f"write timed out after {timeout}s",
                context=ni_ctx,
            ) from exc
        raise NIDaqWriteError(
            "failed to write DAQ values",
            context=ni_ctx,
        ) from exc