Skip to content

nidaqlib.streaming

nidaqlib.streaming

Streaming acquisition surface.

Two recorders, one per acquisition model (design doc §11.3 / §13.1):

  • :func:record — hardware-clocked block path, emits :class:DaqBlock.
  • :func:record_polled — software-timed scalar path, emits :class:DaqReading.

Don't unify them; they have different correctness models.

AcquisitionSummary dataclass

AcquisitionSummary(
    blocks_emitted=0,
    blocks_dropped=0,
    errors_observed=0,
    started_at=(lambda: datetime.now(UTC))(),
    finished_at=None,
)

Per-run counters, yielded alongside the block stream.

Mirrors sartoriuslib.AcquisitionSummary shape but is intentionally mutable: counters are updated in place during the run so consumers can poll progress (e.g. for a TUI bar) and read final counts after exit. The recorder is the only writer; consumers MUST treat the object as read-only.

Attributes:

Name Type Description
blocks_emitted int

Total :class:DaqBlock records sent into the outbound stream.

blocks_dropped int

Records dropped because of an :class:OverflowPolicy.DROP_* decision.

errors_observed int

Wrapped NI errors seen during the run, regardless of :class:ErrorPolicy.

started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at recorder exit. None while the recorder is still running.

ErrorPolicy

Bases: StrEnum

How recorders react to wrapped NI errors during a read.

RAISE class-attribute instance-attribute

RAISE = 'raise'

Cancel the recorder's task group and re-raise the error.

RETURN class-attribute instance-attribute

RETURN = 'return'

Emit a :class:DaqBlock (or :class:DaqReading) with .error set, then continue.

The recorder MUST advance timing counters (block_index / first_sample_index / monotonic_ns) on error records so consumers can detect dropped intervals. Consumers MUST gate on error is None before reading data.

OverflowPolicy

Bases: StrEnum

Behaviour when the recorder's outbound stream is full.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Producer awaits consumer. Risks NI buffer overrun on hardware-clocked tasks.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the about-to-be-enqueued block. Bounds consumer latency; loses freshest data.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Drop the oldest queued block. Keeps newest data; loses older queued blocks.

record async

record(
    source,
    *,
    chunk_size,
    timeout=10.0,
    buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
    use_callback_bridge=False,
)

Hardware-clocked block acquisition.

Yields (stream, summary). The stream is closed when this context manager exits; summary is mutated in place during the run and is safe to read after exit (the values are frozen on the way out).

Parameters:

Name Type Description Default
source DaqSession

A configured :class:DaqSession. Required state depends on use_callback_bridge:

  • use_callback_bridge=False (Option A) — source must be started; wrap with :func:~nidaqlib.tasks.open_device (the default autostart=True).
  • use_callback_bridge=True (Option B / §11.3.2) — source must be configured but not yet started; pass autostart=False to open_device and let the recorder own the start. NI rejects buffer-event registration on a running task with -200960 ("Register all your DAQmx software events prior to starting the task").
required
chunk_size int

Samples per channel per emitted :class:DaqBlock.

required
timeout float

Per-read timeout in seconds (Option A only — Option B reads from the NI buffer with timeout 0).

10.0
buffer_size int

AnyIO memory-object stream buffer, in :class:DaqBlock slots. Older blocks may be dropped per overflow when full.

16
error_policy ErrorPolicy

:attr:ErrorPolicy.RAISE (default) cancels the task group on error; :attr:ErrorPolicy.RETURN emits an error-tagged :class:DaqBlock and continues. Option B (callback bridge) currently honours only :attr:RAISE — :attr:RETURN is wired for the Option A producer.

RAISE
overflow OverflowPolicy

Backpressure policy. DROP_OLDEST is the hardware-clocked default — see design doc §13.3.

DROP_OLDEST
use_callback_bridge bool

Opt into the §11.3.2 every-N-samples callback path. Default False selects Option A (blocking read in a worker thread).

False

Raises:

Type Description
NIDaqTaskStateError

source is in the wrong lifecycle state for the selected mode (see source argument above).

ValueError

chunk_size < 1 or buffer_size < 1.

Source code in src/nidaqlib/streaming/block.py
@asynccontextmanager
async def record(
    source: DaqSession,
    *,
    chunk_size: int,
    timeout: float = 10.0,  # noqa: ASYNC109 — per-NI-read timeout, not coroutine
    buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
    use_callback_bridge: bool = False,
) -> AsyncGenerator[tuple[AsyncIterator[DaqBlock], AcquisitionSummary]]:
    """Hardware-clocked block acquisition.

    Yields ``(stream, summary)``. The stream is closed when this context
    manager exits; ``summary`` is mutated in place during the run and is
    safe to read after exit (the values are frozen on the way out).

    Args:
        source: A configured :class:`DaqSession`. Required state depends on
            ``use_callback_bridge``:

            * ``use_callback_bridge=False`` (Option A) — ``source`` must be
              **started**; wrap with :func:`~nidaqlib.tasks.open_device` (the
              default ``autostart=True``).
            * ``use_callback_bridge=True`` (Option B / §11.3.2) — ``source``
              must be **configured but not yet started**; pass
              ``autostart=False`` to ``open_device`` and let the recorder own
              the start. NI rejects buffer-event registration on a running
              task with -200960 ("Register all your DAQmx software events
              prior to starting the task").
        chunk_size: Samples per channel per emitted :class:`DaqBlock`.
        timeout: Per-read timeout in seconds (Option A only — Option B reads
            from the NI buffer with timeout 0).
        buffer_size: AnyIO memory-object stream buffer, in :class:`DaqBlock`
            slots. Older blocks may be dropped per ``overflow`` when full.
        error_policy: :attr:`ErrorPolicy.RAISE` (default) cancels the task
            group on error; :attr:`ErrorPolicy.RETURN` emits an error-tagged
            :class:`DaqBlock` and continues. Option B (callback bridge)
            currently honours only :attr:`RAISE` — :attr:`RETURN` is wired
            for the Option A producer.
        overflow: Backpressure policy. ``DROP_OLDEST`` is the
            hardware-clocked default — see design doc §13.3.
        use_callback_bridge: Opt into the §11.3.2 every-N-samples callback
            path. Default ``False`` selects Option A (blocking read in a
            worker thread).

    Raises:
        NIDaqTaskStateError: ``source`` is in the wrong lifecycle state for
            the selected mode (see ``source`` argument above).
        ValueError: ``chunk_size < 1`` or ``buffer_size < 1``.
    """
    if chunk_size < 1:
        raise ValueError(f"chunk_size must be >= 1, got {chunk_size!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    _validate_record_state(
        source,
        error_policy=error_policy,
        use_callback_bridge=use_callback_bridge,
    )

    summary = AcquisitionSummary()
    tx, rx = anyio.create_memory_object_stream[DaqBlock](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    # Design §13.2 / §14.6 — TDMS LoggingMode.LOG bypasses the application
    # read path. If we tried to drive the producer, ``read_block`` would
    # block forever waiting on samples that never arrive. Detect at entry
    # and emit an empty stream so consumers see ``blocks_emitted == 0``.
    if _is_log_only(source):
        async with rx, drop_rx:
            await tx.aclose()
            try:
                yield rx, summary
            finally:
                summary.finished_at = datetime.now(UTC)
        return

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if use_callback_bridge:
            await _start_bridge_producer(tg, source, tx, drop_rx, summary, chunk_size, overflow)
        else:
            tg.start_soon(
                _blocking_producer,
                source,
                tx,
                drop_rx,
                summary,
                chunk_size,
                timeout,
                overflow,
                error_policy,
            )
        try:
            yield rx, summary
        finally:
            await tx.aclose()
            tg.cancel_scope.cancel()
    summary.finished_at = datetime.now(UTC)

record_polled async

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Software-timed scalar polling at rate_hz.

Yields (stream, summary). The per-tick payload depends on source:

  • :class:DaqSession → one :class:DaqReading per tick.
  • :class:DaqManagerMapping[str, DeviceResult[DaqReading]] per tick (matches :meth:DaqManager.poll).

summary is updated in place during the run; a final snapshot is frozen on exit.

Parameters:

Name Type Description Default
source DaqSession | DaqManager

A started :class:DaqSession (whose timing is None or :attr:AcquisitionMode.ON_DEMAND) or a :class:DaqManager.

required
rate_hz float

Target poll rate, in Hz. Must be > 0.

required
error_policy ErrorPolicy

:attr:RAISE cancels the recorder on a poll error; :attr:RETURN emits a :class:DaqReading (or per-task :class:DeviceResult row) with the error attached and continues.

RAISE
overflow OverflowPolicy

Backpressure policy. Defaults to :attr:BLOCK — software-timed pollers can pause safely (design §13.3).

BLOCK
buffer_size int

AnyIO send-stream capacity in payload slots.

64

Raises:

Type Description
NIDaqTaskStateError

A session source is not started, or a manager source is closed / has no managed tasks.

ValueError

rate_hz <= 0 or buffer_size < 1.

Source code in src/nidaqlib/streaming/recorder.py
@asynccontextmanager
async def record_polled(
    source: DaqSession | DaqManager,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[tuple[AsyncIterator[_PolledItem], AcquisitionSummary]]:
    """Software-timed scalar polling at ``rate_hz``.

    Yields ``(stream, summary)``. The per-tick payload depends on
    ``source``:

    - :class:`DaqSession` → one :class:`DaqReading` per tick.
    - :class:`DaqManager` → ``Mapping[str, DeviceResult[DaqReading]]`` per
      tick (matches :meth:`DaqManager.poll`).

    ``summary`` is updated in place during the run; a final snapshot is
    frozen on exit.

    Args:
        source: A started :class:`DaqSession` (whose timing is ``None`` or
            :attr:`AcquisitionMode.ON_DEMAND`) or a :class:`DaqManager`.
        rate_hz: Target poll rate, in Hz. Must be > 0.
        error_policy: :attr:`RAISE` cancels the recorder on a poll error;
            :attr:`RETURN` emits a :class:`DaqReading` (or per-task
            :class:`DeviceResult` row) with the error attached and continues.
        overflow: Backpressure policy. Defaults to :attr:`BLOCK` —
            software-timed pollers can pause safely (design §13.3).
        buffer_size: AnyIO send-stream capacity in payload slots.

    Raises:
        NIDaqTaskStateError: A session ``source`` is not started, or a
            manager ``source`` is closed / has no managed tasks.
        ValueError: ``rate_hz <= 0`` or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    # Late import — manager imports streaming.block, which lives in this same
    # package. Doing the import here keeps the package import graph acyclic.
    from nidaqlib.manager import DaqManager  # noqa: PLC0415

    if isinstance(source, DaqManager):
        if source.is_closed:
            raise NIDaqTaskStateError(
                "record_polled() requires an open DaqManager; got a closed manager",
                context=ErrorContext(operation="record_polled"),
            )
        if not source.names:
            raise NIDaqTaskStateError(
                "record_polled() requires a DaqManager with at least one task",
                context=ErrorContext(operation="record_polled"),
            )
    elif not source.is_started:
        raise NIDaqTaskStateError(
            f"record_polled() requires a started session; task {source.spec.name!r} is not running",
            context=ErrorContext(task_name=source.spec.name, operation="record_polled"),
        )

    summary = AcquisitionSummary()
    period = 1.0 / rate_hz
    tx, rx = anyio.create_memory_object_stream[_PolledItem](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if isinstance(source, DaqManager):
            tg.start_soon(
                _polled_manager_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        else:
            tg.start_soon(
                _polled_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        try:
            yield rx, summary
        finally:
            await tx.aclose()
            tg.cancel_scope.cancel()
    summary.finished_at = datetime.now(UTC)