Skip to content

nidaqlib.streaming

nidaqlib.streaming

Streaming acquisition surface.

Two recorders, one per acquisition model:

  • :func:record — hardware-clocked block path, emits :class:DaqBlock.
  • :func:record_polled — software-timed scalar path, emits :class:DaqReading.

Don't unify them; they have different correctness models.

Both yield a :class:Recording[T] from their context manager. The Recording wraps the payload stream, the live :class:AcquisitionSummary, and the active rate_hz so consumers can read all three from one object.

AcquisitionSummary dataclass

AcquisitionSummary(
    blocks_emitted=0,
    blocks_dropped=0,
    errors_observed=0,
    started_at=(lambda: datetime.now(UTC))(),
    finished_at=None,
)

Per-run counters, yielded alongside the block stream.

Mirrors sartoriuslib.AcquisitionSummary shape but is intentionally mutable: counters are updated in place during the run so consumers can poll progress (e.g. for a TUI bar) and read final counts after exit. The recorder is the only writer; consumers MUST treat the object as read-only.

Attributes:

Name Type Description
blocks_emitted int

Total :class:DaqBlock records sent into the outbound stream.

blocks_dropped int

Records dropped because of an :class:OverflowPolicy.DROP_* decision.

errors_observed int

Wrapped NI errors seen during the run, regardless of :class:ErrorPolicy.

started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at recorder exit. None while the recorder is still running.

ErrorPolicy

Bases: StrEnum

How recorders react to wrapped NI errors during a read.

RAISE class-attribute instance-attribute

RAISE = 'raise'

Cancel the recorder's task group and re-raise the error.

RETURN class-attribute instance-attribute

RETURN = 'return'

Emit a :class:DaqBlock (or :class:DaqReading) with .error set, then continue.

The recorder MUST advance timing counters (block_index / first_sample_index / t_mono_ns) on error records so consumers can detect dropped intervals. Consumers MUST gate on error is None before reading data.

OverflowPolicy

Bases: StrEnum

Behaviour when the recorder's outbound stream is full.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Producer awaits consumer. Risks NI buffer overrun on hardware-clocked tasks.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the about-to-be-enqueued block. Bounds consumer latency; loses freshest data.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Drop the oldest queued block. Keeps newest data; loses older queued blocks.

PollSource

Bases: Protocol

Anything that yields per-name :class:DeviceResult[DaqReading] per call.

Same name across all four sibling libraries. The recorder layer's :func:record_polled accepts any :class:PollSource instance, decoupling the polled producer from the concrete session/manager types.

poll async

poll(names=None)

Read once across the named resources (or all, when names is None).

Source code in src/nidaqlib/streaming/poll_source.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Read once across the named resources (or all, when ``names`` is ``None``)."""
    ...

PollSourceAdapter

PollSourceAdapter(session)

Wrap a polled :class:DaqSession as a :class:PollSource.

Multi-channel by design: one DAQ task covers many channels, so the returned mapping has exactly one entry — keyed by the task name — carrying a multi-channel :class:DaqReading. Individual channels stay inside reading.values.

Example::

adapter = PollSourceAdapter(session)
async with record_polled(adapter, rate_hz=2.0) as recording:
    async for results in recording.stream:
        reading = results[session.spec.name].value
        ...

Parameters:

Name Type Description Default
session DaqSession

A started :class:DaqSession whose timing is None or :attr:AcquisitionMode.ON_DEMAND. The same constraint as :meth:DaqSession.poll.

required
Source code in src/nidaqlib/streaming/poll_source.py
def __init__(self, session: DaqSession) -> None:
    self._session = session

name property

name

Mapping key the adapter will use for emitted results.

poll async

poll(names=None)

Read one :class:DaqReading and wrap it as {name: DeviceResult.success(reading)}.

names is accepted for Protocol uniformity. When provided, the adapter only emits a row if the session's name appears in names — otherwise the returned mapping is empty.

Source code in src/nidaqlib/streaming/poll_source.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Read one :class:`DaqReading` and wrap it as ``{name: DeviceResult.success(reading)}``.

    ``names`` is accepted for Protocol uniformity. When provided, the
    adapter only emits a row if the session's name appears in
    ``names`` — otherwise the returned mapping is empty.
    """
    key = self._session.spec.name
    if names is not None and key not in set(names):
        return {}
    from nidaqlib.errors import NIDaqError  # noqa: PLC0415 — late to dodge cycles
    from nidaqlib.manager import DeviceResult  # noqa: PLC0415

    try:
        reading = await self._session.poll()
    except NIDaqError as exc:
        return {key: DeviceResult.failure(exc)}
    return {key: DeviceResult.success(reading)}

Recording dataclass

Recording(stream, summary, rate_hz)

Active-recording handle returned by :func:record / :func:record_polled.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of payloads. Closes when the recorder context manager exits.

summary AcquisitionSummary

Mutable :class:AcquisitionSummary updated in place during the run. summary.finished_at is set on context exit.

rate_hz float | None

Configured cadence of the active recording. None for on-demand mode.

record async

record(
    source,
    *,
    chunk_size,
    timeout=10.0,
    buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
    use_callback_bridge=False,
)

Hardware-clocked block acquisition.

Yields a :class:Recording[DaqBlock]. The stream is closed when this context manager exits; summary is mutated in place during the run and is safe to read after exit.

Parameters:

Name Type Description Default
source DaqSession

A configured :class:DaqSession. Required state depends on use_callback_bridge:

  • use_callback_bridge=False (Option A) — source must be started; wrap with :func:~nidaqlib.tasks.open_device (the default autostart=True).
  • use_callback_bridge=True (Option B / §11.3.2) — source must be configured but not yet started; pass autostart=False to open_device and let the recorder own the start. NI rejects buffer-event registration on a running task with -200960 ("Register all your DAQmx software events prior to starting the task").
required
chunk_size int

Samples per channel per emitted :class:DaqBlock.

required
timeout float

Per-read timeout in seconds (Option A only — Option B reads from the NI buffer with timeout 0).

10.0
buffer_size int

AnyIO memory-object stream buffer, in :class:DaqBlock slots. Older blocks may be dropped per overflow when full.

16
error_policy ErrorPolicy

:attr:ErrorPolicy.RAISE (default) cancels the task group on error; :attr:ErrorPolicy.RETURN emits an error-tagged :class:DaqBlock and continues. Option B (callback bridge) currently honours only :attr:RAISE — :attr:RETURN is wired for the Option A producer.

RAISE
overflow OverflowPolicy

Backpressure policy. DROP_OLDEST is the hardware-clocked default — see design doc §13.3.

DROP_OLDEST
use_callback_bridge bool

Opt into the §11.3.2 every-N-samples callback path. Default False selects Option A (blocking read in a worker thread).

False

Raises:

Type Description
NIDaqTaskStateError

source is in the wrong lifecycle state for the selected mode (see source argument above).

ValueError

chunk_size < 1 or buffer_size < 1.

Source code in src/nidaqlib/streaming/block.py
@asynccontextmanager
async def record(
    source: DaqSession,
    *,
    chunk_size: int,
    timeout: float = 10.0,  # noqa: ASYNC109 — per-NI-read timeout, not coroutine
    buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
    use_callback_bridge: bool = False,
) -> AsyncGenerator[Recording[DaqBlock]]:
    """Hardware-clocked block acquisition.

    Yields a :class:`Recording[DaqBlock]`. The stream is closed when this
    context manager exits; ``summary`` is mutated in place during the run
    and is safe to read after exit.

    Args:
        source: A configured :class:`DaqSession`. Required state depends on
            ``use_callback_bridge``:

            * ``use_callback_bridge=False`` (Option A) — ``source`` must be
              **started**; wrap with :func:`~nidaqlib.tasks.open_device` (the
              default ``autostart=True``).
            * ``use_callback_bridge=True`` (Option B / §11.3.2) — ``source``
              must be **configured but not yet started**; pass
              ``autostart=False`` to ``open_device`` and let the recorder own
              the start. NI rejects buffer-event registration on a running
              task with -200960 ("Register all your DAQmx software events
              prior to starting the task").
        chunk_size: Samples per channel per emitted :class:`DaqBlock`.
        timeout: Per-read timeout in seconds (Option A only — Option B reads
            from the NI buffer with timeout 0).
        buffer_size: AnyIO memory-object stream buffer, in :class:`DaqBlock`
            slots. Older blocks may be dropped per ``overflow`` when full.
        error_policy: :attr:`ErrorPolicy.RAISE` (default) cancels the task
            group on error; :attr:`ErrorPolicy.RETURN` emits an error-tagged
            :class:`DaqBlock` and continues. Option B (callback bridge)
            currently honours only :attr:`RAISE` — :attr:`RETURN` is wired
            for the Option A producer.
        overflow: Backpressure policy. ``DROP_OLDEST`` is the
            hardware-clocked default — see design doc §13.3.
        use_callback_bridge: Opt into the §11.3.2 every-N-samples callback
            path. Default ``False`` selects Option A (blocking read in a
            worker thread).

    Raises:
        NIDaqTaskStateError: ``source`` is in the wrong lifecycle state for
            the selected mode (see ``source`` argument above).
        ValueError: ``chunk_size < 1`` or ``buffer_size < 1``.
    """
    if chunk_size < 1:
        raise ValueError(f"chunk_size must be >= 1, got {chunk_size!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    _validate_record_state(
        source,
        error_policy=error_policy,
        use_callback_bridge=use_callback_bridge,
    )

    summary = AcquisitionSummary()
    timing = source.spec.timing
    rate_hz: float | None = (
        timing.rate_hz
        if timing is not None and timing.mode is not AcquisitionMode.ON_DEMAND
        else None
    )
    tx, rx = anyio.create_memory_object_stream[DaqBlock](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    # TDMS LoggingMode.LOG bypasses the application read path. If we tried
    # to drive the producer, ``read_block`` would block forever waiting on
    # samples that never arrive. Detect at entry and emit an empty stream
    # so consumers see ``blocks_emitted == 0``.
    if _is_log_only(source):
        async with rx, drop_rx:
            await tx.aclose()
            try:
                yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
            finally:
                summary.finished_at = datetime.now(UTC)
        return

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if use_callback_bridge:
            await _start_bridge_producer(tg, source, tx, drop_rx, summary, chunk_size, overflow)
        else:
            _ = tg.start_soon(
                _blocking_producer,
                source,
                tx,
                drop_rx,
                summary,
                chunk_size,
                timeout,
                overflow,
                error_policy,
            )
        try:
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
        finally:
            await tx.aclose()
            tg.cancel()
    summary.finished_at = datetime.now(UTC)

record_polled async

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Software-timed scalar polling at rate_hz.

Yields a :class:Recording[T]. The per-tick payload type T depends on source:

  • :class:DaqSession → one :class:DaqReading per tick.
  • :class:DaqManagerMapping[str, DeviceResult[DaqReading]] per tick (matches :meth:DaqManager.poll).
  • Any :class:PollSource (including :class:PollSourceAdapter) → Mapping[str, DeviceResult[DaqReading]] returned by its poll().

summary is updated in place during the run; a final snapshot is frozen on exit.

Parameters:

Name Type Description Default
source DaqSession | DaqManager | PollSource

A started :class:DaqSession (whose timing is None or :attr:AcquisitionMode.ON_DEMAND), a :class:DaqManager, or any :class:PollSource.

required
rate_hz float

Target poll rate, in Hz. Must be > 0.

required
error_policy ErrorPolicy

:attr:RAISE cancels the recorder on a poll error; :attr:RETURN emits a :class:DaqReading (or per-task :class:DeviceResult row) with the error attached and continues.

RAISE
overflow OverflowPolicy

Backpressure policy. Defaults to :attr:BLOCK — software-timed pollers can pause safely (design §13.3).

BLOCK
buffer_size int

AnyIO send-stream capacity in payload slots.

64

Raises:

Type Description
NIDaqTaskStateError

A session source is not started, or a manager source is closed / has no managed tasks.

ValueError

rate_hz <= 0 or buffer_size < 1.

Source code in src/nidaqlib/streaming/recorder.py
@asynccontextmanager
async def record_polled(
    source: DaqSession | DaqManager | PollSource,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[_PolledItem]]:
    """Software-timed scalar polling at ``rate_hz``.

    Yields a :class:`Recording[T]`. The per-tick payload type ``T`` depends
    on ``source``:

    - :class:`DaqSession` → one :class:`DaqReading` per tick.
    - :class:`DaqManager` → ``Mapping[str, DeviceResult[DaqReading]]`` per
      tick (matches :meth:`DaqManager.poll`).
    - Any :class:`PollSource` (including :class:`PollSourceAdapter`) →
      ``Mapping[str, DeviceResult[DaqReading]]`` returned by its ``poll()``.

    ``summary`` is updated in place during the run; a final snapshot is
    frozen on exit.

    Args:
        source: A started :class:`DaqSession` (whose timing is ``None`` or
            :attr:`AcquisitionMode.ON_DEMAND`), a :class:`DaqManager`, or
            any :class:`PollSource`.
        rate_hz: Target poll rate, in Hz. Must be > 0.
        error_policy: :attr:`RAISE` cancels the recorder on a poll error;
            :attr:`RETURN` emits a :class:`DaqReading` (or per-task
            :class:`DeviceResult` row) with the error attached and continues.
        overflow: Backpressure policy. Defaults to :attr:`BLOCK` —
            software-timed pollers can pause safely (design §13.3).
        buffer_size: AnyIO send-stream capacity in payload slots.

    Raises:
        NIDaqTaskStateError: A session ``source`` is not started, or a
            manager ``source`` is closed / has no managed tasks.
        ValueError: ``rate_hz <= 0`` or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    # Late imports — manager imports streaming.block, session pulls in
    # backend modules; both are heavy enough to keep off the recorder
    # module's import path.
    from nidaqlib.manager import DaqManager  # noqa: PLC0415
    from nidaqlib.tasks.session import DaqSession  # noqa: PLC0415

    if isinstance(source, DaqManager):
        if source.is_closed:
            raise NIDaqTaskStateError(
                "record_polled() requires an open DaqManager; got a closed manager",
                context=ErrorContext(command_name="record_polled"),
            )
        if not source.names:
            raise NIDaqTaskStateError(
                "record_polled() requires a DaqManager with at least one task",
                context=ErrorContext(command_name="record_polled"),
            )
    elif isinstance(source, DaqSession) and not source.is_started:
        raise NIDaqTaskStateError(
            f"record_polled() requires a started session; task {source.spec.name!r} is not running",
            context=ErrorContext(task_name=source.spec.name, command_name="record_polled"),
        )

    summary = AcquisitionSummary()
    period = 1.0 / rate_hz
    tx, rx = anyio.create_memory_object_stream[_PolledItem](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if isinstance(source, DaqManager):
            _ = tg.start_soon(
                _polled_manager_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        elif isinstance(source, DaqSession):
            _ = tg.start_soon(
                _polled_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        else:
            # Any other PollSource — drive its poll() directly. Emits the
            # same Mapping[str, DeviceResult[DaqReading]] shape as the
            # manager path.
            _ = tg.start_soon(
                _polled_source_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        try:
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
        finally:
            await tx.aclose()
            tg.cancel()
    summary.finished_at = datetime.now(UTC)