Skip to content

Continuous acquisition & playback

record() / record_polled() stream a continuous input task as DaqBlocks; play() drives a continuous analog-output task from a waveform source. See Continuous acquisition and Waveform output for the narrative.

Recorders, playback, and policies

dtollib.streaming

Streaming surface — the recorders, the player, and their shared types.

  • :func:record — hardware-clocked continuous block acquisition.
  • :func:record_polled — software-timed scalar polling.
  • :func:play — hardware-clocked continuous analog-output (waveform playback).

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    payloads_emitted=0,
    payloads_dropped=0,
    errors_observed=0,
    overruns_observed=0,
    underruns_observed=0,
    extra=_empty_extra(),
)

Mutable summary of one recording session.

Updated in place by the recorder; finished_at is set on context exit. The :class:Recording[T] handle exposes this so consumers can read progress without poking at the recorder's internals.

overruns_observed and underruns_observed are SDK-level, distinguishable from payloads_dropped (which counts consumer-side losses under DROP_* overflow policies). See docs/design.md §14.1 for the rationale.

ErrorPolicy

Bases: StrEnum

How a recorder reacts to a backend error mid-stream.

Attributes:

Name Type Description
RAISE

Cancel the recorder; the exception propagates out of the async with record(...) block.

RETURN

Emit a payload with error=... set and continue the stream. Suitable for long unattended runs where a single bad poll shouldn't kill the recording.

SKIP

Drop the failed payload silently. Increments AcquisitionSummary.errors_observed so the run report still reflects the failure count.

OverflowPolicy

Bases: StrEnum

How a recorder reacts when its outgoing stream buffer fills.

Attributes:

Name Type Description
BLOCK

Backpressure — the producer awaits buffer space. Default; preserves every payload at the cost of stalling the upstream buffer pool. Slow consumers risk SDK-level OVERRUN.

DROP_OLDEST

Discard the head of the buffer to make room. Trades payload completeness for producer liveness. Increments AcquisitionSummary.payloads_dropped.

DROP_NEWEST

Discard the incoming payload. Same trade-off as DROP_OLDEST but the consumer sees only payloads from before the overflow started.

Recording dataclass

Recording(stream, summary, rate_hz)

Active-recording handle returned by :func:record / :func:record_polled.

Attributes:

Name Type Description
stream MemoryObjectReceiveStream[T]

AnyIO receive stream of payloads. Closes when the recorder context manager exits.

summary AcquisitionSummary

Mutable :class:AcquisitionSummary updated in place during the run. summary.finished_at is set on context exit.

rate_hz float | None

Configured cadence of the active recording. None for on-demand mode.

play async

play(
    session,
    source,
    *,
    confirm=False,
    error_policy=ErrorPolicy.RAISE,
)

Drive continuous analog output from source.

Parameters:

Name Type Description Default
session DtolSession

A session for an analog-output continuous task — every channel an :class:~dtollib.channels.analog_output.AnalogOutputVoltage, spec.data_flow == CONTINUOUS, spec.buffers.wrap_mode one of SINGLE / MULTIPLE. Open it with autostart=False (this facade drives the lifecycle).

required
source PlaybackSource

For WrapMode.SINGLE, a single-period np.ndarray shaped (n_channels, ring_capacity) (or (ring_capacity,) for one channel), where ring_capacity == buffers * samples_per_buffer. For WrapMode.MULTIPLE, an async iterator or a () -> ndarray | None callable yielding (n_channels, samples_per_buffer) chunks (None = end of finite playback).

required
confirm bool

Operator confirmation for the §18 safety gate, exactly as :meth:DtolSession.write. Required if any sample leaves the safe band or any channel is requires_confirm.

False
error_policy ErrorPolicy

How SDK errors reaching the producer loop are surfaced (RAISE cancels playback; RETURN / SKIP log + count).

RAISE

Yields:

Type Description
AsyncGenerator[AcquisitionSummary]

The mutable :class:AcquisitionSummary for the run.

Raises:

Type Description
DtolCapabilityError

The subsystem's D/A is single-value only (OLSSC_SUP_CONTINUOUS=0) — e.g. the DT9806. Use :meth:DtolSession.write for single-value output.

DtolTaskStateError

Wrong data-flow / wrap mode, or a non-AO task.

DtolValidationError

Source shape mismatch, or a sample outside the device range (raised before any waveform reaches the DAC).

DtolConfirmationRequiredError

Safety gate tripped without confirm.

Source code in src/dtollib/streaming/playback.py
@asynccontextmanager
async def play(
    session: DtolSession,
    source: PlaybackSource,
    *,
    confirm: bool = False,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
) -> AsyncGenerator[AcquisitionSummary]:
    """Drive continuous analog output from ``source``.

    Args:
        session: A session for an analog-output continuous task — every
            channel an :class:`~dtollib.channels.analog_output.AnalogOutputVoltage`,
            ``spec.data_flow == CONTINUOUS``, ``spec.buffers.wrap_mode`` one of
            ``SINGLE`` / ``MULTIPLE``. Open it with ``autostart=False`` (this
            facade drives the lifecycle).
        source: For ``WrapMode.SINGLE``, a single-period ``np.ndarray`` shaped
            ``(n_channels, ring_capacity)`` (or ``(ring_capacity,)`` for one
            channel), where ``ring_capacity == buffers * samples_per_buffer``.
            For ``WrapMode.MULTIPLE``, an async iterator or a ``() -> ndarray |
            None`` callable yielding ``(n_channels, samples_per_buffer)`` chunks
            (``None`` = end of finite playback).
        confirm: Operator confirmation for the §18 safety gate, exactly as
            :meth:`DtolSession.write`. Required if any sample leaves the safe
            band or any channel is ``requires_confirm``.
        error_policy: How SDK errors reaching the producer loop are surfaced
            (``RAISE`` cancels playback; ``RETURN`` / ``SKIP`` log + count).

    Yields:
        The mutable :class:`AcquisitionSummary` for the run.

    Raises:
        DtolCapabilityError: The subsystem's D/A is single-value only
            (``OLSSC_SUP_CONTINUOUS=0``) — e.g. the DT9806. Use
            :meth:`DtolSession.write` for single-value output.
        DtolTaskStateError: Wrong data-flow / wrap mode, or a non-AO task.
        DtolValidationError: Source shape mismatch, or a sample outside the
            device range (raised *before* any waveform reaches the DAC).
        DtolConfirmationRequiredError: Safety gate tripped without ``confirm``.
    """
    await session.configure()
    spec = session.spec
    channels = list(spec.channels)
    _validate_output_task(spec, channels)

    # Hardware gate: continuous AO needs a streaming DAC (FIFO + wrap modes).
    # The DT9806 D/A is single-value-only — it reports OLSSC_SUP_CONTINUOUS=0
    # and rejects every continuous setter with OLNOTSUPPORTED (ec=36,
    # bench-confirmed 2026-05-28). Fail loud here with a clear, typed error
    # rather than dying mid-startup at olDaConfig. play() stays correct for a
    # board whose D/A *does* report continuous support.
    if not session.capabilities.supports_continuous:
        raise DtolCapabilityError(
            f"play() needs a continuous (streaming) analog-output subsystem, but "
            f"the D/A on {spec.board!r} reports OLSSC_SUP_CONTINUOUS=0 — it is "
            "single-value only. Use DtolSession.write() for single-value output. "
            "(Bench-confirmed on the DT9806; see docs/decisions.md.)",
            context=ErrorContext(operation="play", task_name=spec.name),
        )

    wrap = spec.buffers.wrap_mode  # type: ignore[union-attr]  # validated above
    n_buffers = spec.buffers.buffers  # type: ignore[union-attr]
    samples_per_buffer = spec.buffers.samples_per_buffer  # type: ignore[union-attr]

    # Build + validate the seed chunks (and the refill puller) BEFORE any SDK
    # lifecycle call, so a bad waveform fails loudly pre-seed (§18 gate).
    seed_chunks, pull = await _prepare_source(
        source,
        channels=channels,
        wrap=wrap,
        n_buffers=n_buffers,
        samples_per_buffer=samples_per_buffer,
        confirm=confirm,
        task_name=spec.name,
    )

    backend = session.backend
    hdass = session.raw_hdass
    pool = BufferPool(
        backend,
        hdass,
        spec.buffers,  # type: ignore[arg-type]  # validated non-None above
        n_channels=len(channels),
        sample_dtype_bytes=_SAMPLE_DTYPE_BYTES,
        direction=BufferDirection.OUTPUT,
    )
    pool.allocate()

    config = OutputBridgeConfig(
        device=spec.name,
        task=spec.name,
        wrap_mode=wrap,
        error_policy=error_policy,
        # OLSSC_SUP_MUTE position is header-verified but bench read-back is
        # still pending (WS-B). The capability is wired through here; on a
        # subsystem that reports it false (incl. the fake) the bridge degrades
        # gracefully and skips the pre-stop mute.
        supports_mute=session.capabilities.supports_mute,
    )

    try:
        # Bench-proven continuous startup ordering (docs/decisions.md):
        #   commit (#1) → register (bridge entry) → seed → queue → arm (#2) → start.
        await anyio_to_thread.run_sync(backend.commit, hdass)
        async with output_callback_bridge(backend, hdass, pool, config, pull=pull) as summary:
            pool.seed_all(seed_chunks)
            pool.queue_all()
            await anyio_to_thread.run_sync(backend.arm, hdass)
            await anyio_to_thread.run_sync(backend.start, hdass)
            yield summary
    finally:
        # Bridge shutdown is shielded inside output_callback_bridge; the pool
        # is freed here, after drain-wait completed in the bridge __aexit__.
        with suppress(Exception):
            pool.flush()
        with suppress(Exception):
            pool.free_all()

record async

record(
    session,
    *,
    timeout=10.0,
    stream_buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
)

Hardware-clocked continuous block acquisition.

Drives the §12.3.2 callback bridge. The session MUST be opened with autostart=False — the bridge needs to register notification and queue buffers BEFORE olDaConfig (the recorder calls session.commit() internally after wiring the bridge).

Parameters:

Name Type Description Default
session DtolSession

Session opened via open_device(spec, autostart=False). spec.data_flow must be CONTINUOUS or FINITE; spec.buffers must be non-None.

required
timeout float

Reserved for shutdown timeout in a future revision. Currently informational.

10.0
stream_buffer_size int

AnyIO memory-object-stream size — the consumer-side back-pressure window. Distinct from spec.buffers.buffers.

16
error_policy ErrorPolicy

How to surface SDK errors that reach the producer loop. See docs/design.md §14.3.

RAISE
overflow OverflowPolicy

How to react when the consumer stream is full. Default DROP_OLDEST — keeps the SDK queue moving even with a slow consumer (see docs/design.md §14.4).

DROP_OLDEST

Yields:

Type Description
AsyncGenerator[Recording[DaqBlock]]

class:~dtollib.streaming.Recording whose stream is an

AsyncGenerator[Recording[DaqBlock]]

AsyncIterator[DaqBlock] and summary is the mutable

AsyncGenerator[Recording[DaqBlock]]

class:AcquisitionSummary.

Raises:

Type Description
DtolTaskStateError

If spec.data_flow is not continuous or spec.buffers is None.

Source code in src/dtollib/streaming/block.py
@asynccontextmanager
async def record(
    session: DtolSession,
    *,
    timeout: float = 10.0,  # noqa: ASYNC109 - public API placeholder name.
    stream_buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
) -> AsyncGenerator[Recording[DaqBlock]]:
    """Hardware-clocked continuous block acquisition.

    Drives the §12.3.2 callback bridge. The session MUST be opened with
    ``autostart=False`` — the bridge needs to register notification and
    queue buffers BEFORE ``olDaConfig`` (the recorder calls
    ``session.commit()`` internally after wiring the bridge).

    Args:
        session: Session opened via ``open_device(spec, autostart=False)``.
            ``spec.data_flow`` must be ``CONTINUOUS`` or ``FINITE``;
            ``spec.buffers`` must be non-None.
        timeout: Reserved for shutdown timeout in a future revision.
            Currently informational.
        stream_buffer_size: AnyIO memory-object-stream size — the
            consumer-side back-pressure window. Distinct from
            ``spec.buffers.buffers``.
        error_policy: How to surface SDK errors that reach the producer
            loop. See docs/design.md §14.3.
        overflow: How to react when the consumer stream is full. Default
            ``DROP_OLDEST`` — keeps the SDK queue moving even with a
            slow consumer (see docs/design.md §14.4).

    Yields:
        :class:`~dtollib.streaming.Recording` whose ``stream`` is an
        ``AsyncIterator[DaqBlock]`` and ``summary`` is the mutable
        :class:`AcquisitionSummary`.

    Raises:
        DtolTaskStateError: If ``spec.data_flow`` is not continuous or
            ``spec.buffers`` is None.
    """
    del timeout  # not yet wired — placeholder for §12.3.2 shutdown timeout
    spec = session.spec
    if spec.data_flow not in {DataFlow.CONTINUOUS, DataFlow.FINITE}:
        raise DtolTaskStateError(
            f"record() requires data_flow in {{CONTINUOUS, FINITE}}; got {spec.data_flow.value}",
            context=ErrorContext(
                operation="record",
                task_name=spec.name,
            ),
        )
    if spec.buffers is None:
        raise DtolTaskStateError(
            "record() requires TaskSpec.buffers to be configured",
            context=ErrorContext(operation="record", task_name=spec.name),
        )

    backend = session.backend
    hdass = session.raw_hdass

    # Build the pool — n_channels comes from the configured channel list.
    n_channels = len(spec.channels)
    pool = BufferPool(
        backend,
        hdass,
        spec.buffers,
        n_channels=n_channels,
    )
    pool.allocate()

    task_started_at = datetime.now(UTC)
    task_started_mono_ns = time.monotonic_ns()
    config = BridgeConfig(
        device=spec.name,
        task=spec.name,
        channels=tuple((c.name or f"ch{c.physical_channel}") for c in spec.channels),
        sample_rate_hz=spec.timing.rate_hz if spec.timing else None,
        task_started_at=task_started_at,
        task_started_mono_ns=task_started_mono_ns,
        units={
            (c.name or f"ch{c.physical_channel}"): getattr(c, "unit", None) for c in spec.channels
        },
        error_policy=error_policy,
        overflow_policy=overflow,
        stream_buffer_size=stream_buffer_size,
    )

    try:
        # Bench-proven continuous startup ordering (docs/decisions.md):
        #   commit (olDaConfig #1, after the builder's channel/clock/wrap
        #   setup) → register (olDaSetWndHandle) → queue → arm (olDaConfig
        #   #2, wires the window into buffer rotation) → start.
        await anyio_to_thread.run_sync(backend.commit, hdass)
        # Build the code→engineering-units plan AFTER commit (scaling is only
        # queryable once the subsystem is configured). On AI subsystems this
        # turns the drainer's raw-code blocks into volts / °C; on other
        # subsystem kinds the plan is None and raw codes pass through.
        config.conversion = build_conversion_plan(session, hdass)
        rate_hz = spec.timing.rate_hz if spec.timing else None
        async with callback_bridge(backend, hdass, pool, config) as (rx, summary):
            # callback_bridge has registered the notification window on entry.
            pool.queue_all()
            await anyio_to_thread.run_sync(backend.arm, hdass)
            await anyio_to_thread.run_sync(backend.start, hdass)
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
    finally:
        # Bridge shutdown is shielded inside callback_bridge.  Pool free
        # runs here, AFTER drain-wait completed in the bridge's __aexit__.
        with suppress(Exception):
            pool.flush()
        with suppress(Exception):
            pool.free_all()

record_polled async

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Software-timed polling at rate_hz.

Parameters:

Name Type Description Default
source DtolSession | DtolManager

A :class:DtolSession (yields :class:DaqReading per tick) or a :class:DtolManager (yields Mapping[str, DeviceResult[DaqReading]] per tick).

required
rate_hz float

Target poll rate, in Hz. Must be > 0.

required
error_policy ErrorPolicy

:attr:RAISE cancels the recorder on a poll error; :attr:RETURN emits a payload with .error set; :attr:SKIP drops the failed payload silently (counter only).

RAISE
overflow OverflowPolicy

Consumer back-pressure policy. Default BLOCK — software-timed pollers can pause without SDK overrun.

BLOCK
buffer_size int

AnyIO send-stream capacity in payload slots.

64

Yields:

Type Description
AsyncGenerator[Recording[_PolledItem]]

class:Recording[T] whose stream is the async iterator of

AsyncGenerator[Recording[_PolledItem]]

payloads and summary is the mutable :class:AcquisitionSummary.

Raises:

Type Description
DtolTaskStateError

source is not in a pollable state.

ValueError

rate_hz <= 0 or buffer_size < 1.

Source code in src/dtollib/streaming/recorder.py
@asynccontextmanager
async def record_polled(
    source: DtolSession | DtolManager,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[_PolledItem]]:
    """Software-timed polling at ``rate_hz``.

    Args:
        source: A :class:`DtolSession` (yields :class:`DaqReading` per tick)
            or a :class:`DtolManager` (yields
            ``Mapping[str, DeviceResult[DaqReading]]`` per tick).
        rate_hz: Target poll rate, in Hz. Must be > 0.
        error_policy: :attr:`RAISE` cancels the recorder on a poll error;
            :attr:`RETURN` emits a payload with ``.error`` set;
            :attr:`SKIP` drops the failed payload silently (counter only).
        overflow: Consumer back-pressure policy. Default ``BLOCK`` —
            software-timed pollers can pause without SDK overrun.
        buffer_size: AnyIO send-stream capacity in payload slots.

    Yields:
        :class:`Recording[T]` whose ``stream`` is the async iterator of
        payloads and ``summary`` is the mutable :class:`AcquisitionSummary`.

    Raises:
        DtolTaskStateError: ``source`` is not in a pollable state.
        ValueError: ``rate_hz <= 0`` or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    from dtollib.manager import DtolManager  # noqa: PLC0415

    if isinstance(source, DtolManager) and not source.names:
        raise DtolTaskStateError(
            "record_polled() requires a DtolManager with at least one task",
            context=ErrorContext(operation="record_polled"),
        )

    summary = AcquisitionSummary(started_at=datetime.now(UTC))
    period = 1.0 / rate_hz
    tx, rx = anyio.create_memory_object_stream[_PolledItem](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if isinstance(source, DtolManager):
            tg.start_soon(
                _manager_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        else:
            tg.start_soon(
                _session_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        try:
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
        finally:
            await tx.aclose()
            tg.cancel_scope.cancel()
    summary.finished_at = datetime.now(UTC)

Block and reading payloads

DaqBlock, DaqReading, and DaqSample are documented under Task specs → models. block_to_long_rows reshapes a block into tidy per-sample rows for sinks.