Skip to content

nidaqlib.tasks

nidaqlib.tasks

Tasks — specs, acquisition records, sessions, and :func:open_device.

AcquisitionMode

Bases: StrEnum

Sample-clock acquisition mode.

Mirrors a subset of nidaqmx.constants.AcquisitionType. Kept as a library-side enum so :class:TaskSpec round-trips through JSON without pulling NI's enum machinery into the serialisation layer.

ON_DEMAND class-attribute instance-attribute

ON_DEMAND = 'on_demand'

Software-timed; no hardware sample clock is configured.

DaqBlock dataclass

DaqBlock(
    *,
    device,
    task=None,
    channels,
    data,
    block_index,
    first_sample_index,
    samples_per_channel,
    sample_rate_hz,
    dt_s,
    task_started_at,
    t0,
    monotonic_ns,
    read_started_at,
    read_finished_at,
    elapsed_s,
    units,
    error=None,
)

One rectangular block of hardware-clocked samples.

The data field is the natural shape for Parquet row groups, NumPy slicing, and TDMS — do not scalarize unless the user opts in via block_to_long_rows().

To recover the wall-clock timestamp of sample k (where 0 <= k < samples_per_channel)::

absolute = block.first_sample_index + k
elapsed = absolute / block.sample_rate_hz
sample_at = block.task_started_at + timedelta(seconds=elapsed)

Do not interpolate sample times off t0 or read_started_at — those drift block-to-block.

Attributes:

Name Type Description
device str

Manager-add name, or TaskSpec.name when emitted directly.

task str | None

Underlying TaskSpec.name.

channels tuple[str, ...]

Channel display names in the row order of data.

data ndarray

NumPy array. Invariant — shape is (len(channels), samples_per_channel) and is asserted in :meth:__post_init__. dtype is float64 for AI voltage.

block_index int

0-based, monotonic per task. Resets on a new task.

first_sample_index int

Cumulative sample offset since task_started_at.

samples_per_channel int

data.shape[1]. Held redundantly so consumers need not import NumPy to inspect block size.

sample_rate_hz float | None

From Timing.rate_hz. None for on-demand reads.

dt_s float | None

1 / sample_rate_hz when sample_rate_hz is set.

task_started_at datetime

Wall-clock anchor for sample-time reconstruction.

t0 datetime

Wall-clock at the first sample of this block; provenance only.

monotonic_ns int

time.monotonic_ns() at read_started_at.

read_started_at datetime

Wall-clock just before the read; provenance only.

read_finished_at datetime

Wall-clock just after the read; provenance only.

elapsed_s float

read_finished_at - read_started_at in seconds.

units Mapping[str, str | None]

Engineering units keyed by channel display name.

error NIDaqError | None

Populated only under ErrorPolicy.RETURN. Always None under the default RAISE policy.

__post_init__

__post_init__()

Validate the rectangular-shape invariant.

Raises:

Type Description
NIDaqValidationError

data.shape does not equal (len(channels), samples_per_channel).

Source code in src/nidaqlib/tasks/models.py
def __post_init__(self) -> None:
    """Validate the rectangular-shape invariant.

    Raises:
        NIDaqValidationError: ``data.shape`` does not equal
            ``(len(channels), samples_per_channel)``.
    """
    # Local import — keeps the model module from importing the errors
    # module at parse time to avoid circular-import surprises.
    from nidaqlib.errors import NIDaqValidationError  # noqa: PLC0415

    n_channels = len(self.channels)
    expected = (n_channels, self.samples_per_channel)
    actual = tuple(self.data.shape)
    if actual != expected:
        raise NIDaqValidationError(
            f"DaqBlock data shape {actual} does not match (channels, "
            f"samples_per_channel) = {expected}"
        )

DaqReading dataclass

DaqReading(
    *,
    device,
    task=None,
    values,
    units,
    requested_at,
    received_at,
    midpoint_at,
    monotonic_ns,
    elapsed_s,
    metadata=_empty_metadata(),
    error=None,
)

One scalar (or low-rate) reading across the channels of a task.

Field shape mirrors alicatlib.Sample and sartoriuslib.Sample so that DAQ rows join cleanly against flow-controller and balance rows on (device, monotonic_ns). See design doc §8.6 / §8.8.

Attributes:

Name Type Description
device str

Manager-add name, or TaskSpec.name when emitted directly from a session. This is the cross-instrument join key.

task str | None

Underlying TaskSpec.name (optional second key).

values Mapping[str, float | int | bool]

One entry per channel, keyed by channel display name.

units Mapping[str, str | None]

Engineering units, keyed by channel display name. None entries indicate "no unit declared on the channel spec."

requested_at datetime

Wall-clock immediately before the read.

received_at datetime

Wall-clock immediately after the read returns.

midpoint_at datetime

Midpoint of the request/receive window.

monotonic_ns int

time.monotonic_ns() at the midpoint. Use this — not wall-clock — for join arithmetic; wall-clock is non-monotonic across clock adjustments.

elapsed_s float

received_at - requested_at in seconds.

metadata Mapping[str, str | int | float | bool]

Free-form scalar metadata (often the source TaskSpec's metadata, optionally merged with manager-level metadata).

error NIDaqError | None

Populated only under ErrorPolicy.RETURN. Always None under the default RAISE policy.

DaqSession

DaqSession(spec, backend, *, timeout=10.0)

Owns one underlying NI task plus its lifecycle state.

Construction does not touch the driver. Call :meth:start (or use :func:open_device) to create the task, add channels, and configure timing. read_block / poll are valid once started.

Create a session for spec against backend.

The constructor only stores its arguments; it never touches the driver. That keeps __init__ exception-free and avoids a partially-initialised task object on configuration errors.

Parameters:

Name Type Description Default
spec TaskSpec

Declarative :class:TaskSpec to materialise.

required
backend DaqBackend

Backend that proxies operations into NI (or the fake).

required
timeout float

Default per-operation timeout in seconds. Individual read_block / poll calls may override.

10.0
Source code in src/nidaqlib/tasks/session.py
def __init__(
    self,
    spec: TaskSpec,
    backend: DaqBackend,
    *,
    timeout: float = 10.0,
) -> None:
    """Create a session for ``spec`` against ``backend``.

    The constructor only stores its arguments; it never touches the
    driver. That keeps ``__init__`` exception-free and avoids a
    partially-initialised task object on configuration errors.

    Args:
        spec: Declarative :class:`TaskSpec` to materialise.
        backend: Backend that proxies operations into NI (or the fake).
        timeout: Default per-operation timeout in seconds. Individual
            ``read_block`` / ``poll`` calls may override.
    """
    self._spec = spec
    self._backend = backend
    self._timeout = timeout
    self._task: Any = None
    self._lock = anyio.Lock()
    self._configured = False
    self._started = False
    self._closed = False
    self._task_started_at: datetime | None = None
    self._first_sample_index: int = 0
    self._block_index: int = 0
    # Bridge bookkeeping — populated only when streaming/block.py opts
    # into the every-N-samples callback path.
    self._callback_handle: CallbackHandle | None = None

has_active_callback_bridge property

has_active_callback_bridge

True while a §11.3.2 callback bridge is registered.

is_closed property

is_closed

True once :meth:close has run (idempotent).

is_configured property

is_configured

True after :meth:configure succeeds and before :meth:close.

A configured session has a backing NI task with channels, timing, logging, and triggers applied — but task.start() has not yet been called. Buffer-event callback registration (§11.3.2) is only valid in this window.

is_started property

is_started

True between :meth:start and :meth:stop.

raw_task property

raw_task

The underlying backend task handle.

For :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend this is an nidaqmx.Task; for the fake backend it is an opaque _FakeTask. Use this for advanced NI features that aren't exposed via the wrapper — the escape hatch from design doc §7.4.

The handle is available once :meth:configure has succeeded — that is, in either the configured-not-started or started state. The callback bridge (§11.3.2) needs the handle pre-start to register the buffer event.

Raises:

Type Description
NIDaqTaskStateError

The session has not been configured yet.

spec property

spec

The :class:TaskSpec this session was constructed from.

task_started_at property

task_started_at

Wall-clock anchor for sample-time reconstruction.

Returns None until :meth:start has succeeded. Once set, this value is the truth that :class:DaqBlock.task_started_at carries — it is captured exactly once per session, immediately before backend.start_task, so that the first sample's wall-clock can be reconstructed deterministically from task_started_at + first_sample_index / rate_hz (design doc §8.7).

__aenter__ async

__aenter__()

Enter the async context — no-op; :func:open_device already configured/started.

Source code in src/nidaqlib/tasks/session.py
async def __aenter__(self) -> DaqSession:
    """Enter the async context — no-op; :func:`open_device` already configured/started."""
    return self

__aexit__ async

__aexit__(*exc_info)

Exit the async context — calls :meth:close.

Source code in src/nidaqlib/tasks/session.py
async def __aexit__(self, *exc_info: object) -> None:
    """Exit the async context — calls :meth:`close`."""
    del exc_info
    await self.close()

acquire async

acquire(samples_per_channel, *, timeout=None)

Run one finite acquisition and return its :class:DaqBlock.

Convenience wrapper for the §12.3 finite-mode pattern: configure finite, start, read, stop. Requires a session whose :class:Timing.mode is :attr:AcquisitionMode.FINITE. After the read completes, the underlying NI task is stopped — call :meth:start again before another acquisition.

Parameters:

Name Type Description Default
samples_per_channel int

Number of samples per channel to read.

required
timeout float | None

Optional per-call timeout in seconds. Falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started, is closed, or its timing mode is not :attr:AcquisitionMode.FINITE.

NIDaqReadError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def acquire(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqBlock:
    """Run one finite acquisition and return its :class:`DaqBlock`.

    Convenience wrapper for the §12.3 finite-mode pattern: configure
    finite, start, read, stop. Requires a session whose
    :class:`Timing.mode` is :attr:`AcquisitionMode.FINITE`. After the
    read completes, the underlying NI task is stopped — call
    :meth:`start` again before another acquisition.

    Args:
        samples_per_channel: Number of samples per channel to read.
        timeout: Optional per-call timeout in seconds. Falls back to
            the session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started, is closed, or
            its timing mode is not :attr:`AcquisitionMode.FINITE`.
        NIDaqReadError / NIDaqTimeoutError: Surfaced from the backend.
    """
    self._require_started("acquire")
    timing = self._spec.timing
    if timing is None or timing.mode is not AcquisitionMode.FINITE:
        raise NIDaqTaskStateError(
            f"acquire() requires Timing.mode=FINITE; got {timing.mode if timing else None}",
            context=ErrorContext(task_name=self._spec.name, operation="acquire"),
        )
    block = await self.read_block(samples_per_channel, timeout=timeout)
    await self.stop()
    return block

close async

close()

Stop (if needed) and close the underlying task. Idempotent.

__aexit__ always calls this; explicit call is rare. Sessions that have opted into the every-N-samples callback bridge MUST instead use the recorder context manager — the bridge has its own ordered shutdown protocol (design doc §11.3.2) that this method does not implement.

Source code in src/nidaqlib/tasks/session.py
async def close(self) -> None:
    """Stop (if needed) and close the underlying task. Idempotent.

    ``__aexit__`` always calls this; explicit call is rare. Sessions that
    have opted into the every-N-samples callback bridge MUST instead use
    the recorder context manager — the bridge has its own ordered
    shutdown protocol (design doc §11.3.2) that this method does not
    implement.
    """
    if self._closed:
        return
    if self._callback_handle is not None:
        raise NIDaqTaskStateError(
            "cannot close a session while an every-N-samples callback bridge is active; "
            "exit the record(..., use_callback_bridge=True) context first",
            context=ErrorContext(task_name=self._spec.name, operation="close"),
        )
    self._closed = True
    if self._task is None:
        return
    async with self._lock:
        if self._started:
            await run_sync(self._backend.stop_task, self._task)
            self._started = False
        await run_sync(self._backend.close_task, self._task)
        self._task = None
        self._configured = False

configure async

configure()

Create the underlying task and apply channels / timing / logging / trigger.

After this method, raw_task is available and any pre-start hooks (notably the §11.3.2 buffer-event callback registration) may run. task.start() is not called — use :meth:start for that.

On failure, the partial task is torn down so the session does not leak NI resources.

Raises:

Type Description
NIDaqTaskStateError

Already configured, started, or closed.

Source code in src/nidaqlib/tasks/session.py
async def configure(self) -> None:
    """Create the underlying task and apply channels / timing / logging / trigger.

    After this method, ``raw_task`` is available and any pre-start hooks
    (notably the §11.3.2 buffer-event callback registration) may run.
    ``task.start()`` is **not** called — use :meth:`start` for that.

    On failure, the partial task is torn down so the session does not
    leak NI resources.

    Raises:
        NIDaqTaskStateError: Already configured, started, or closed.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is closed",
            context=ErrorContext(task_name=self._spec.name, operation="configure"),
        )
    if self._configured:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is already configured",
            context=ErrorContext(task_name=self._spec.name, operation="configure"),
        )
    async with self._lock:
        await run_sync(self._configure_sync)
        self._configured = True

poll async

poll(*, timeout=None)

One-shot scalar read across all channels.

Valid only for sessions that are not actively buffering a sample clock (Timing.mode == ON_DEMAND or no Timing at all). For the live-scalar use case during a high-rate acquisition, use :func:record and read the most recent block's last column.

Raises:

Type Description
NIDaqTaskStateError

The session is buffering a sample clock (continuous or finite mode and started).

Source code in src/nidaqlib/tasks/session.py
async def poll(
    self,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqReading:
    """One-shot scalar read across all channels.

    Valid only for sessions that are not actively buffering a sample
    clock (``Timing.mode == ON_DEMAND`` or no ``Timing`` at all). For the
    live-scalar use case during a high-rate acquisition, use
    :func:`record` and read the most recent block's last column.

    Raises:
        NIDaqTaskStateError: The session is buffering a sample clock
            (continuous or finite mode and started).
    """
    self._require_started("poll")
    self._require_analog_input_task("poll")
    timing = self._spec.timing
    if timing is not None and timing.mode in (
        AcquisitionMode.CONTINUOUS,
        AcquisitionMode.FINITE,
    ):
        raise NIDaqTaskStateError(
            f"poll() is invalid for {timing.mode.value} tasks; use record() and "
            "inspect the most recent DaqBlock instead",
            context=ErrorContext(task_name=self._spec.name, operation="poll"),
        )
    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        requested_at = datetime.now(UTC)
        monotonic_ns_start = time.monotonic_ns()
        data = await run_sync(
            self._backend.read_block,
            self._task,
            1,
            eff_timeout,
        )
        received_at = datetime.now(UTC)
        monotonic_ns_end = time.monotonic_ns()
    midpoint_at = requested_at + (received_at - requested_at) / 2
    midpoint_monotonic = (monotonic_ns_start + monotonic_ns_end) // 2
    # data shape is (n_channels, 1) — squeeze to per-channel scalars.
    names = self._channel_names()
    units = self._channel_units()
    values: dict[str, float | int | bool] = {
        name: float(data[i, 0]) for i, name in enumerate(names)
    }
    return DaqReading(
        device=self._spec.name,
        task=self._spec.name,
        values=values,
        units=units,
        requested_at=requested_at,
        received_at=received_at,
        midpoint_at=midpoint_at,
        monotonic_ns=midpoint_monotonic,
        elapsed_s=(received_at - requested_at).total_seconds(),
        metadata=dict(self._spec.metadata),
        error=None,
    )

read_block async

read_block(samples_per_channel, *, timeout=None)

Read one rectangular :class:DaqBlock.

Wraps the backend read in an run_sync so the event loop stays responsive during the blocking NI call. Increments the per-session first_sample_index cursor.

Parameters:

Name Type Description Default
samples_per_channel int

Samples per channel for this block.

required
timeout float | None

Optional per-call timeout in seconds; falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started or is closed.

NIDaqReadError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def read_block(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqBlock:
    """Read one rectangular :class:`DaqBlock`.

    Wraps the backend read in an ``run_sync`` so the
    event loop stays responsive during the blocking NI call. Increments
    the per-session ``first_sample_index`` cursor.

    Args:
        samples_per_channel: Samples per channel for this block.
        timeout: Optional per-call timeout in seconds; falls back to the
            session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started or is closed.
        NIDaqReadError / NIDaqTimeoutError: Surfaced from the backend.
    """
    self._require_started("read_block")
    self._require_analog_input_task("read_block")
    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        read_started_at = datetime.now(UTC)
        monotonic_ns = time.monotonic_ns()
        data = await run_sync(
            self._backend.read_block,
            self._task,
            samples_per_channel,
            eff_timeout,
        )
        read_finished_at = datetime.now(UTC)
        block = self._build_block(
            data=data,
            samples_per_channel=samples_per_channel,
            read_started_at=read_started_at,
            read_finished_at=read_finished_at,
            monotonic_ns=monotonic_ns,
        )
    return block

start async

start(*, confirm=False)

Start the configured task.

:meth:configure must have run first. This method calls NI's task.start() and records the wall-clock anchor used for §8.7 sample-time reconstruction. Calling :meth:start again after :meth:stop reuses the configured task and resets the block/sample counters for a new run.

confirm=True is required for task kinds whose start call can actuate hardware immediately (currently counter-output pulse trains).

Raises:

Type Description
NIDaqTaskStateError

Not configured, already started, or closed.

NIDaqValidationError

Starting would actuate hardware without explicit confirmation.

Source code in src/nidaqlib/tasks/session.py
async def start(self, *, confirm: bool = False) -> None:
    """Start the configured task.

    :meth:`configure` must have run first. This method calls NI's
    ``task.start()`` and records the wall-clock anchor used for §8.7
    sample-time reconstruction. Calling :meth:`start` again after
    :meth:`stop` reuses the configured task and resets the
    block/sample counters for a new run.

    ``confirm=True`` is required for task kinds whose ``start`` call
    can actuate hardware immediately (currently counter-output pulse
    trains).

    Raises:
        NIDaqTaskStateError: Not configured, already started, or closed.
        NIDaqValidationError: Starting would actuate hardware without
            explicit confirmation.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is closed",
            context=ErrorContext(task_name=self._spec.name, operation="start"),
        )
    if not self._configured:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} must be configured before start",
            context=ErrorContext(task_name=self._spec.name, operation="start"),
        )
    if self._started:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is already started",
            context=ErrorContext(task_name=self._spec.name, operation="start"),
        )
    self._validate_start_safety(confirm=confirm)
    async with self._lock:
        # Capture the wall-clock anchor as close to the start as possible
        # — `start_task` returns once NI has armed the clock, so the
        # first sample's wall-clock is approximately this timestamp + a
        # bounded device latency.
        anchor = datetime.now(UTC)
        try:
            await run_sync(self._backend.start_task, self._task)
        except BaseException:
            await run_sync(self._backend.close_task, self._task)
            self._task = None
            self._configured = False
            raise
        self._task_started_at = anchor
        self._first_sample_index = 0
        self._block_index = 0
        self._started = True

stop async

stop()

Stop the underlying task. Idempotent for not-yet-started sessions.

Does NOT close the task. Use :meth:close to release NI resources.

Source code in src/nidaqlib/tasks/session.py
async def stop(self) -> None:
    """Stop the underlying task. Idempotent for not-yet-started sessions.

    Does NOT close the task. Use :meth:`close` to release NI resources.
    """
    if not self._started or self._closed or self._task is None:
        return
    async with self._lock:
        await run_sync(self._backend.stop_task, self._task)
        self._started = False

write async

write(values, *, confirm=False, timeout=None)

Write one sample-per-channel to the task's output channels.

Safety gate (design doc §17):

  • Keys of values must match the display names of the task's output channels (AO and/or DO). Unknown or missing keys raise :class:NIDaqValidationError before any I/O.
  • For analog-output channels with safe_min / safe_max set, values outside the resolved clamp window raise :class:NIDaqValidationError. Never silently clamped.
  • If any target channel has requires_confirm=True and confirm is False, the call raises :class:NIDaqValidationError.

Parameters:

Name Type Description Default
values Mapping[str, float | bool]

One value per output channel keyed by display name.

required
confirm bool

Operator confirmation. Required (must be True) whenever any target channel sets requires_confirm.

False
timeout float | None

Per-call timeout in seconds. Falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started or is closed.

NIDaqValidationError

Safety-gate or shape rejection (see above).

NIDaqWriteError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def write(
    self,
    values: Mapping[str, float | bool],
    *,
    confirm: bool = False,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> None:
    """Write one sample-per-channel to the task's output channels.

    Safety gate (design doc §17):

    - Keys of ``values`` must match the display names of the task's
      output channels (AO and/or DO). Unknown or missing keys raise
      :class:`NIDaqValidationError` before any I/O.
    - For analog-output channels with ``safe_min`` / ``safe_max`` set,
      values outside the resolved clamp window raise
      :class:`NIDaqValidationError`. **Never silently clamped.**
    - If any target channel has ``requires_confirm=True`` and
      ``confirm`` is ``False``, the call raises
      :class:`NIDaqValidationError`.

    Args:
        values: One value per output channel keyed by display name.
        confirm: Operator confirmation. Required (must be ``True``)
            whenever any target channel sets ``requires_confirm``.
        timeout: Per-call timeout in seconds. Falls back to the
            session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started or is closed.
        NIDaqValidationError: Safety-gate or shape rejection (see above).
        NIDaqWriteError / NIDaqTimeoutError: Surfaced from the backend.
    """
    # Late import — keeps the channel modules out of the session-import
    # graph for sessions that never write.
    from nidaqlib.channels.analog_output import AnalogOutputVoltage  # noqa: PLC0415
    from nidaqlib.channels.counter_output import (  # noqa: PLC0415
        CounterPulseFrequency,
        CounterPulseTicks,
        CounterPulseTime,
    )
    from nidaqlib.channels.digital_output import DigitalOutput  # noqa: PLC0415

    self._require_started("write")

    output_channels = [
        ch for ch in self._spec.channels if isinstance(ch, (AnalogOutputVoltage, DigitalOutput))
    ]
    if not output_channels:
        if any(
            isinstance(ch, (CounterPulseFrequency, CounterPulseTime, CounterPulseTicks))
            for ch in self._spec.channels
        ):
            raise NIDaqValidationError(
                "counter-output pulse trains are controlled by start()/stop(), not write(); "
                "start them with confirm=True",
                context=ErrorContext(task_name=self._spec.name, operation="write"),
            )
        raise NIDaqValidationError(
            f"task {self._spec.name!r} has no output channels to write",
            context=ErrorContext(task_name=self._spec.name, operation="write"),
        )
    has_ao = any(isinstance(ch, AnalogOutputVoltage) for ch in output_channels)
    has_do = any(isinstance(ch, DigitalOutput) for ch in output_channels)
    if has_ao and has_do:
        raise NIDaqValidationError(
            "write() does not support mixing analog-output and digital-output "
            "channels in one task",
            context=ErrorContext(task_name=self._spec.name, operation="write"),
        )

    target_names = {ch.display_name for ch in output_channels}
    provided_names = set(values.keys())
    unknown = provided_names - target_names
    missing = target_names - provided_names
    if unknown or missing:
        raise NIDaqValidationError(
            f"write keys do not match task outputs (unknown={sorted(unknown)!r}, "
            f"missing={sorted(missing)!r})",
            context=ErrorContext(task_name=self._spec.name, operation="write"),
        )

    needs_confirm = any(getattr(ch, "requires_confirm", False) for ch in output_channels)
    if needs_confirm and not confirm:
        raise NIDaqConfirmationRequiredError(
            f"task {self._spec.name!r}: write requires confirm=True (one or more "
            "channels are marked requires_confirm)",
            context=ErrorContext(task_name=self._spec.name, operation="write"),
        )

    for ch in output_channels:
        value = values[ch.display_name]
        if isinstance(ch, AnalogOutputVoltage):
            lo = ch.effective_safe_min
            hi = ch.effective_safe_max
            fvalue = float(value)
            if fvalue < lo or fvalue > hi:
                raise NIDaqValidationError(
                    f"value {fvalue!r} for AO channel {ch.display_name!r} is outside "
                    f"safe range [{lo}, {hi}]",
                    context=ErrorContext(
                        task_name=self._spec.name,
                        channel_name=ch.display_name,
                        physical_channel=ch.physical_channel,
                        operation="write",
                    ),
                )

    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        await run_sync(
            self._backend.write,
            self._task,
            dict(values),
            eff_timeout,
        )

Edge

Bases: StrEnum

Active edge for the sample clock or a trigger.

Mirrors nidaqmx.constants.Edge.

TaskBuilder

TaskBuilder(name)

Fluent builder for :class:TaskSpec.

Example

spec = ( ... TaskBuilder("ai_demo") ... .add_channel(AnalogInputVoltage(physical_channel="Dev1/ai0")) ... .with_timing(Timing(rate_hz=1000.0)) ... .build() ... )

Create a builder for a task named name.

Parameters:

Name Type Description Default
name str

Task name. Will become :attr:TaskSpec.name.

required
Source code in src/nidaqlib/tasks/builder.py
def __init__(self, name: str) -> None:
    """Create a builder for a task named ``name``.

    Args:
        name: Task name. Will become :attr:`TaskSpec.name`.
    """
    self._name = name
    self._channels: list[ChannelSpec] = []
    self._timing: Timing | None = None
    self._metadata: dict[str, str | int | float | bool] = {}

add_channel

add_channel(channel)

Append a channel to the task. Returns self for chaining.

Source code in src/nidaqlib/tasks/builder.py
def add_channel(self, channel: ChannelSpec) -> Self:
    """Append a channel to the task. Returns self for chaining."""
    self._channels.append(channel)
    return self

build

build()

Construct the immutable :class:TaskSpec.

Source code in src/nidaqlib/tasks/builder.py
def build(self) -> TaskSpec:
    """Construct the immutable :class:`TaskSpec`."""
    return TaskSpec(
        name=self._name,
        channels=tuple(self._channels),
        timing=self._timing,
        metadata=dict(self._metadata),
    )

with_metadata

with_metadata(metadata)

Merge metadata into the builder's metadata dict.

Returns self for chaining. Later calls overwrite earlier keys.

Source code in src/nidaqlib/tasks/builder.py
def with_metadata(self, metadata: Mapping[str, str | int | float | bool]) -> Self:
    """Merge ``metadata`` into the builder's metadata dict.

    Returns self for chaining. Later calls overwrite earlier keys.
    """
    self._metadata.update(metadata)
    return self

with_timing

with_timing(timing)

Set the task's :class:Timing. Returns self for chaining.

Source code in src/nidaqlib/tasks/builder.py
def with_timing(self, timing: Timing) -> Self:
    """Set the task's :class:`Timing`. Returns self for chaining."""
    self._timing = timing
    return self

TaskSpec dataclass

TaskSpec(
    *,
    name,
    channels,
    timing=None,
    trigger=None,
    logging=None,
    metadata=_empty_metadata(),
)

Declarative description of one NI task.

Attributes:

Name Type Description
name str

Task name. Must be unique within an :class:~nidaqlib.DaqManager and labels :class:DaqReading / :class:DaqBlock rows.

channels Sequence[ChannelSpec]

One or more :class:~nidaqlib.channels.ChannelSpec instances. Order is preserved and is the source of truth for DaqBlock.channels row ordering.

timing Timing | None

Optional :class:Timing. None means on-demand / software-polled.

trigger TriggerSpec | None

Optional :class:~nidaqlib.tasks.triggers.TriggerSpec. None means "start as soon as :meth:DaqSession.start returns" (NI's default).

logging TdmsLogging | None

Optional :class:TdmsLogging for driver-side TDMS. None disables TDMS (the default).

metadata Mapping[str, str | int | float | bool]

Free-form scalar metadata propagated into emitted records.

__post_init__

__post_init__()

Validate the channel list shape (the cheap, always-true invariants).

Raises:

Type Description
NIDaqValidationError

channels is empty or contains a non-:class:ChannelSpec element.

Source code in src/nidaqlib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate the channel list shape (the cheap, always-true invariants).

    Raises:
        NIDaqValidationError: ``channels`` is empty or contains a
            non-:class:`ChannelSpec` element.
    """
    if len(self.channels) == 0:
        raise NIDaqValidationError(f"TaskSpec {self.name!r}: at least one channel is required")
    if not self.name:
        raise NIDaqValidationError("TaskSpec.name must be a non-empty string")
    channels = tuple(self.channels)
    object.__setattr__(self, "channels", channels)
    for ch in self.channels:
        if not isinstance(ch, ChannelSpec):  # pyright: ignore[reportUnnecessaryIsInstance]
            raise NIDaqValidationError(
                f"TaskSpec {self.name!r}: channels must be ChannelSpec instances, "
                f"got {type(ch).__name__}"
            )
    names = [ch.display_name for ch in self.channels]
    duplicates = sorted({name for name in names if names.count(name) > 1})
    if duplicates:
        raise NIDaqValidationError(
            f"TaskSpec {self.name!r}: duplicate channel display names {duplicates!r}"
        )
    object.__setattr__(self, "metadata", MappingProxyType(dict(self.metadata)))

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Mapping carrying the task-spec fields.

required

Raises:

Type Description
NIDaqValidationError

A channel or trigger entry has an unknown kind, or required structural fields are malformed.

Source code in src/nidaqlib/tasks/spec.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    Args:
        data: Mapping carrying the task-spec fields.

    Raises:
        NIDaqValidationError: A channel or trigger entry has an unknown
            ``kind``, or required structural fields are malformed.
    """
    from nidaqlib.tasks.triggers import TriggerSpec  # noqa: PLC0415

    timing_payload = data.get("timing")
    timing = Timing.from_dict(timing_payload) if timing_payload is not None else None
    trigger_payload = data.get("trigger")
    if trigger_payload is None:
        trigger = None
    elif isinstance(trigger_payload, Mapping):
        trigger = TriggerSpec.from_dict(trigger_payload)  # pyright: ignore[reportUnknownArgumentType]
    else:
        raise NIDaqValidationError(
            f"TaskSpec.trigger must be a mapping or null, got {type(trigger_payload).__name__}"
        )
    logging_payload = data.get("logging")
    logging = TdmsLogging.from_dict(logging_payload) if logging_payload is not None else None
    raw_channels: object = data.get("channels", [])
    if not isinstance(raw_channels, list):
        raise NIDaqValidationError(
            f"TaskSpec.channels must be a list, got {type(raw_channels).__name__}"
        )
    channels: list[ChannelSpec] = []
    for ch in raw_channels:  # pyright: ignore[reportUnknownVariableType]
        if not isinstance(ch, Mapping):
            raise NIDaqValidationError(
                f"TaskSpec.channels[*] must be a mapping, got {type(ch).__name__}"  # pyright: ignore[reportUnknownArgumentType]
            )
        channels.append(ChannelSpec.from_dict(ch))  # pyright: ignore[reportUnknownArgumentType]
    metadata_raw: object = data.get("metadata", {})
    if not isinstance(metadata_raw, Mapping):
        raise NIDaqValidationError(
            f"TaskSpec.metadata must be a mapping, got {type(metadata_raw).__name__}"
        )
    return cls(
        name=str(data["name"]),
        channels=channels,
        timing=timing,
        trigger=trigger,
        logging=logging,
        metadata=dict(metadata_raw),  # pyright: ignore[reportUnknownArgumentType]
    )

replace

replace(**updates)

Return a copy of this spec with updates applied.

Mirrors dataclasses.replace but is exposed as a method for consistency with the rest of the API.

Source code in src/nidaqlib/tasks/spec.py
def replace(self, **updates: Any) -> Self:
    """Return a copy of this spec with ``updates`` applied.

    Mirrors ``dataclasses.replace`` but is exposed as a method for
    consistency with the rest of the API.
    """
    return dataclasses.replace(self, **updates)

to_dict

to_dict()

Serialise to a JSON-friendly dict, dispatching channels by kind.

Source code in src/nidaqlib/tasks/spec.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict, dispatching channels by ``kind``."""
    return {
        "name": self.name,
        "channels": [ch.to_dict() for ch in self.channels],
        "timing": self.timing.to_dict() if self.timing is not None else None,
        "trigger": self.trigger.to_dict() if self.trigger is not None else None,
        "logging": self.logging.to_dict() if self.logging is not None else None,
        "metadata": dict(self.metadata),
    }

Timing dataclass

Timing(
    *,
    rate_hz,
    mode=AcquisitionMode.CONTINUOUS,
    samples_per_channel=None,
    source=None,
    active_edge=Edge.RISING,
)

Sample-clock timing configuration.

Attributes:

Name Type Description
rate_hz float

Sample clock rate, in Hz. Required for hardware-timed modes (finite / continuous).

mode AcquisitionMode

Acquisition mode. Defaults to continuous.

samples_per_channel int | None

For FINITE, the total number of samples per channel. For CONTINUOUS, this sizes the on-board buffer. NI chooses a sensible default when None.

source str | None

Optional sample-clock source terminal (e.g. an external terminal name); None selects the on-board clock.

active_edge Edge

Sample-clock active edge. Rising by default.

__post_init__

__post_init__()

Validate timing parameters before they reach NI.

Source code in src/nidaqlib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate timing parameters before they reach NI."""
    if self.rate_hz <= 0.0:
        raise NIDaqValidationError(f"rate_hz must be > 0, got {self.rate_hz!r}")
    if self.samples_per_channel is not None and self.samples_per_channel <= 0:
        raise NIDaqValidationError(
            f"samples_per_channel must be > 0 when set, got {self.samples_per_channel!r}"
        )

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Mapping carrying the timing fields.

required

Raises:

Type Description
NIDaqValidationError

An enum field carries an unknown value.

Source code in src/nidaqlib/tasks/spec.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    Args:
        data: Mapping carrying the timing fields.

    Raises:
        NIDaqValidationError: An enum field carries an unknown value.
    """
    try:
        mode = AcquisitionMode(data.get("mode", AcquisitionMode.CONTINUOUS.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown AcquisitionMode {data.get('mode')!r}") from exc
    try:
        edge = Edge(data.get("active_edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {data.get('active_edge')!r}") from exc
    return cls(
        rate_hz=float(data["rate_hz"]),
        mode=mode,
        samples_per_channel=(
            int(data["samples_per_channel"])
            if data.get("samples_per_channel") is not None
            else None
        ),
        source=data.get("source"),
        active_edge=edge,
    )

to_dict

to_dict()

Serialise to a JSON-friendly dict.

Enum members serialise to their string values so the result is JSON-encodable without a custom encoder.

Source code in src/nidaqlib/tasks/spec.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict.

    Enum members serialise to their string values so the result is
    JSON-encodable without a custom encoder.
    """
    return {
        "rate_hz": self.rate_hz,
        "mode": self.mode.value,
        "samples_per_channel": self.samples_per_channel,
        "source": self.source,
        "active_edge": self.active_edge.value,
    }

open_device async

open_device(
    spec,
    *,
    backend=None,
    timeout=10.0,
    autostart=True,
    confirm_start=False,
)

Open and return a configured :class:DaqSession.

Usage forms::

async with await open_device(spec) as session:
    ...

session = await open_device(spec)
try:
    ...
finally:
    await session.close()

Mirrors the ecosystem open_device shape used by alicatlib, watlowlib, and sartoriuslib. The DAQ-specific deviation: the spec is the declarative task description (channels, timing, triggers) rather than a serial port string.

Parameters:

Name Type Description Default
spec TaskSpec

Declarative :class:TaskSpec to materialise.

required
backend DaqBackend | None

Optional :class:~nidaqlib.backend.base.DaqBackend. Defaults to :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend — tests typically pass a :class:~nidaqlib.backend.fake.FakeDaqBackend here.

None
timeout float

Default per-operation timeout, in seconds.

10.0
autostart bool

When True (default), the session is configured AND started before this function returns. When False, the session is only configured — the caller is responsible for await session.start() before any acquisition. Required for the §11.3.2 callback bridge, which must register the buffer event before NI's task.start(); pass the unstarted session to :func:~nidaqlib.streaming.block.record with use_callback_bridge=True and the recorder owns the start.

True
confirm_start bool

Required when starting the task can actuate hardware immediately (for example counter-output pulse trains). Only consulted when autostart=True.

False

Returns:

Type Description
DaqSession

A configured :class:DaqSession. Started iff autostart=True.

Source code in src/nidaqlib/tasks/__init__.py
async def open_device(
    spec: TaskSpec,
    *,
    backend: DaqBackend | None = None,
    timeout: float = 10.0,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    autostart: bool = True,
    confirm_start: bool = False,
) -> DaqSession:
    """Open and return a configured :class:`DaqSession`.

    Usage forms::

        async with await open_device(spec) as session:
            ...

        session = await open_device(spec)
        try:
            ...
        finally:
            await session.close()

    Mirrors the ecosystem ``open_device`` shape used by ``alicatlib``,
    ``watlowlib``, and ``sartoriuslib``. The DAQ-specific deviation: the
    ``spec`` is the declarative task description (channels, timing,
    triggers) rather than a serial port string.

    Args:
        spec: Declarative :class:`TaskSpec` to materialise.
        backend: Optional :class:`~nidaqlib.backend.base.DaqBackend`. Defaults
            to :class:`~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend` —
            tests typically pass a
            :class:`~nidaqlib.backend.fake.FakeDaqBackend` here.
        timeout: Default per-operation timeout, in seconds.
        autostart: When ``True`` (default), the session is configured AND
            started before this function returns. When ``False``, the
            session is only configured — the caller is responsible for
            ``await session.start()`` before any acquisition. Required
            for the §11.3.2 callback bridge, which must register the
            buffer event before NI's ``task.start()``; pass the
            unstarted session to :func:`~nidaqlib.streaming.block.record`
            with ``use_callback_bridge=True`` and the recorder owns the
            start.
        confirm_start: Required when starting the task can actuate hardware
            immediately (for example counter-output pulse trains). Only
            consulted when ``autostart=True``.

    Returns:
        A configured :class:`DaqSession`. Started iff ``autostart=True``.
    """
    if backend is None:
        # Local import — keeps the production `nidaqmx` import off the
        # critical path of test sessions that supply a fake backend.
        from nidaqlib.backend.nidaqmx_backend import NidaqmxBackend  # noqa: PLC0415

        backend = NidaqmxBackend()
    session = DaqSession(spec, backend, timeout=timeout)
    if autostart:
        # Validate up-front so a missing ``confirm_start`` for an actuating
        # task fails before any backend resources are allocated.
        session._validate_start_safety(confirm=confirm_start)  # pyright: ignore[reportPrivateUsage]
    try:
        await session.configure()
        if autostart:
            await session.start(confirm=confirm_start)
    except BaseException:
        # Open failed mid-pipeline; release any partial state so the
        # caller isn't left with a half-configured session.
        await session.close()
        raise
    return session