Skip to content

Sessions

DtolSession is the central acquisition object: it owns one configured subsystem and exposes poll() / write() / read_events() / measure_frequency() / start() / stop() / capabilities(). Open one with open_device; use it as an async context manager so the subsystem is committed on entry and released on exit.

See the Async quickstart for the happy path and Safety for the write() gate model.

open_device

dtollib.factory

open_device factory — the canonical entry point for ad-hoc sessions.

For multi-task / multi-device coordination, use :class:DtolManager instead.

Design reference: docs/design.md §9.3.

open_device async

open_device(
    spec,
    *,
    backend=None,
    timeout=10.0,
    autostart=True,
    confirm_start=False,
)

Open a :class:DtolSession for spec.

The session is returned already :meth:prepared <DtolSession.prepare> and :meth:committed <DtolSession.commit>. When autostart is true (the default), it is also :meth:started <DtolSession.start>.

Parameters:

Name Type Description Default
spec TaskSpec

Task specification.

required
backend DtolBackend | None

Backend to use. None instantiates a fresh :class:~dtollib.backend.dataacq.DataAcqBackend (the real SDK path). Tests inject a :class:~dtollib.backend.fake.FakeDtolBackend.

None
timeout float

Default per-call timeout.

10.0
autostart bool

Whether to start the subsystem before returning. Single-value tasks honour this as a literal pre-start; continuous callers that need to register a notification callback before olDaConfig must pass autostart=False.

True
confirm_start bool

Safety gate (docs/design.md §18.1). Autostarting a task that drives requires_confirm output channels needs confirm_start=True; otherwise :class:~dtollib.errors.DtolConfirmationRequiredError is raised before the subsystem is started.

False

Returns:

Type Description
DtolSession

A configured :class:DtolSession. Use it as an async context

DtolSession

manager so :meth:DtolSession.close runs in the cleanup path.

Raises:

Type Description
DtolConfirmationRequiredError

autostart would start an output task containing a requires_confirm channel without confirm_start=True.

Source code in src/dtollib/factory.py
async def open_device(
    spec: TaskSpec,
    *,
    backend: DtolBackend | None = None,
    timeout: float = 10.0,  # noqa: ASYNC109 - public API name, not an asyncio timeout.
    autostart: bool = True,
    confirm_start: bool = False,
) -> DtolSession:
    """Open a :class:`DtolSession` for ``spec``.

    The session is returned already :meth:`prepared <DtolSession.prepare>`
    and :meth:`committed <DtolSession.commit>`.  When ``autostart`` is
    true (the default), it is also :meth:`started <DtolSession.start>`.

    Args:
        spec: Task specification.
        backend: Backend to use.  ``None`` instantiates a fresh
            :class:`~dtollib.backend.dataacq.DataAcqBackend` (the real
            SDK path).  Tests inject a
            :class:`~dtollib.backend.fake.FakeDtolBackend`.
        timeout: Default per-call timeout.
        autostart: Whether to start the subsystem before returning.
            Single-value tasks honour this as a literal pre-start;
            continuous callers that need to register a notification
            callback before ``olDaConfig`` must pass ``autostart=False``.
        confirm_start: Safety gate (docs/design.md §18.1). Autostarting a
            task that drives ``requires_confirm`` output channels needs
            ``confirm_start=True``; otherwise
            :class:`~dtollib.errors.DtolConfirmationRequiredError` is raised
            before the subsystem is started.

    Returns:
        A configured :class:`DtolSession`.  Use it as an async context
        manager so :meth:`DtolSession.close` runs in the cleanup path.

    Raises:
        DtolConfirmationRequiredError: ``autostart`` would start an output
            task containing a ``requires_confirm`` channel without
            ``confirm_start=True``.
    """
    if autostart and not confirm_start:
        _reject_unconfirmed_output_autostart(spec)

    if backend is None:
        from dtollib.backend.dataacq import DataAcqBackend  # noqa: PLC0415

        backend = DataAcqBackend()

    session = DtolSession(spec, backend, timeout=timeout)
    try:
        await session.configure()
        if autostart:
            await session.start()
    except BaseException:
        # Release any HDASS/HDRVR acquired during prepare() before
        # propagating — leaving them open leaks the subsystem.
        with contextlib.suppress(Exception):
            await session.close()
        raise
    return session

DtolSession

dtollib.tasks.session

DtolSession — async lifecycle wrapper around one HDASS.

Lifecycle: prepare → commit → start → poll → stop/abort → close. Single-value reads go through :meth:poll; single-value writes through :meth:write. Continuous acquisition is owned by :func:dtollib.streaming.record, which manages the buffer pool and the notification bridge — the session does not expose a bare block-read.

Design reference: docs/design.md §9.1, §9.2; docs/implementation-plan.md §4.4.

DtolSession

DtolSession(spec, backend, *, timeout=10.0)

Async lifecycle wrapper around one configured DT-Open Layers subsystem.

Drives single-value reads (:meth:poll) and writes (:meth:write) end-to-end. Continuous acquisition uses the same session but is driven by :func:dtollib.streaming.record, which owns the buffer pool and notification bridge.

Attributes:

Name Type Description
spec

Bound :class:~dtollib.tasks.TaskSpec.

backend

Bound :class:~dtollib.backend.DtolBackend.

timeout

Default per-call timeout in seconds.

Source code in src/dtollib/tasks/session.py
def __init__(
    self,
    spec: TaskSpec,
    backend: DtolBackend,
    *,
    timeout: float = 10.0,
) -> None:
    self.spec = spec
    self.backend = backend
    self.timeout = timeout

    self._lock = anyio.Lock()
    self._hdrvr: int | None = None
    self._hdass: int | None = None
    self._capabilities: CapabilitySet | None = None
    self._prepared = False
    self._committed = False
    self._closed = False

    # DOUT shadow register — port_channel → last-written byte. A digital
    # port write is whole-byte at the SDK level; partial per-line writes
    # merge into this shadow so untouched lines are preserved. Seeded from
    # each DigitalOutputPort.safe_value at commit (docs/design.md §18.1).
    self._dout_shadow: dict[int, int] = {}

    # Lazily-primed buffer pool for the synchronous block-read path
    # (read_block / read_inprocess). Distinct from record(), which owns
    # its own pool + notification bridge; the two paths are mutually
    # exclusive on one session.
    self._read_pool: BufferPool | None = None
    self._read_samples_per_buffer: int | None = None
    self._read_conversion: BlockConversion | None = None
    self._read_block_index: int = 0
    self._read_first_sample: int = 0
    self._read_started_at: datetime | None = None
    self._read_started_mono_ns: int = 0
    self._read_notify_handle: object | None = None

board_name property

board_name

Resolved board name for coordination layers.

capabilities property

capabilities

Live capability snapshot. Available after :meth:prepare.

closed property

closed

Whether :meth:close has completed for this session.

hdass property

hdass

Reserved HDASS — set after :meth:prepare.

queued_buffer_dones property

queued_buffer_dones

olDaGetQueueSize(OL_QUE_DONE) — done-queue depth (synchronous).

raw_hdass property

raw_hdass

Escape hatch — raw HDASS for direct olDa* calls.

raw_hdrv property

raw_hdrv

Escape hatch — raw HDRVR for direct olDa* calls.

state property

state

Reported SDK :class:SubsystemState.

Cheap read; called inside the lock-free hot path of :meth:is_running and from error messages.

__aenter__ async

__aenter__()

Configure on entry. Caller still calls :meth:poll explicitly.

Source code in src/dtollib/tasks/session.py
async def __aenter__(self) -> Self:
    """Configure on entry.  Caller still calls :meth:`poll` explicitly."""
    await self.configure()
    return self

__aexit__ async

__aexit__(exc_type, exc, tb)

Best-effort close. Uses abort to avoid deadlocking on a hung trigger.

Source code in src/dtollib/tasks/session.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Best-effort close.  Uses ``abort`` to avoid deadlocking on a hung trigger."""
    del exc_type, exc, tb
    await self.close(graceful=False)

abort async

abort()

olDaAbort — immediate halt.

Source code in src/dtollib/tasks/session.py
async def abort(self) -> None:
    """``olDaAbort`` — immediate halt."""
    async with self._lock:
        if self._hdass is None:
            return
        await anyio.to_thread.run_sync(self.backend.abort, self._hdass)

close async

close(*, graceful=False)

Tear the session down.

Default graceful=False uses :meth:abort; graceful=True uses :meth:stop. Releases HDASS, decrements HDRVR ref-count. Idempotent.

Parameters:

Name Type Description Default
graceful bool

When True, prefer :meth:stop (waits for the current buffer). False (default) uses :meth:abort — the safe choice from __aexit__ when an outer exception is propagating and stop could deadlock waiting for an SDK trigger that will never fire.

False
Source code in src/dtollib/tasks/session.py
async def close(self, *, graceful: bool = False) -> None:
    """Tear the session down.

    Default ``graceful=False`` uses :meth:`abort`; ``graceful=True``
    uses :meth:`stop`.  Releases HDASS, decrements HDRVR ref-count.
    Idempotent.

    Args:
        graceful: When ``True``, prefer :meth:`stop` (waits for the
            current buffer).  ``False`` (default) uses :meth:`abort`
            — the safe choice from ``__aexit__`` when an outer
            exception is propagating and ``stop`` could deadlock
            waiting for an SDK trigger that will never fire.
    """
    if self._closed:
        return
    # Shield teardown from cancellation. A cancelled or timed-out
    # session (e.g. ``move_on_after`` wrapping ``record()``) must still
    # release the HDASS and terminate the HDRVR; otherwise the awaited
    # release_dass/terminate are cancelled mid-flight and the subsystem
    # stays reserved ("Subsystem in use", ECODE 20) until the OS reclaims
    # the process handle. Bench-confirmed on DT9806, 2026-05-28.
    with anyio.CancelScope(shield=True):
        # Tear down the synchronous read pool (read_block / read_inprocess)
        # before releasing the subsystem: stop the subsystem, unregister the
        # no-op notification, then flush + free its HBUFs.
        if self._read_pool is not None:
            if self._hdass is not None:
                with contextlib.suppress(Exception):
                    await anyio.to_thread.run_sync(self.backend.abort, self._hdass)
                if self._read_notify_handle is not None:
                    with contextlib.suppress(Exception):
                        await anyio.to_thread.run_sync(
                            self.backend.unregister_notification,
                            self._hdass,
                            self._read_notify_handle,
                        )
            self._read_notify_handle = None
            with contextlib.suppress(Exception):
                await anyio.to_thread.run_sync(self._read_pool.flush)
            with contextlib.suppress(Exception):
                await anyio.to_thread.run_sync(self._read_pool.free_all)
            self._read_pool = None
        try:
            if self._hdass is not None:
                if self.backend.is_running(self._hdass):
                    if graceful:
                        await self.stop()
                    else:
                        await self.abort()
                await anyio.to_thread.run_sync(self.backend.release_dass, self._hdass)
                self._hdass = None
            if self._hdrvr is not None:
                await anyio.to_thread.run_sync(self.backend.terminate, self._hdrvr)
                self._hdrvr = None
        finally:
            self._closed = True
            self._prepared = False
            self._committed = False

commit async

commit()

Run the configured-state commit.

Dispatches on spec.data_flow:

  • SINGLE_VALUE → builder runs the per-channel configure + commit.
  • CONTINUOUS / FINITE → builder runs the pre-commit continuous configuration; the recorder (record()) drives the register → queue → commit() ordering after wiring its bridge.

Idempotent.

Source code in src/dtollib/tasks/session.py
async def commit(self) -> None:
    """Run the configured-state commit.

    Dispatches on ``spec.data_flow``:

    - ``SINGLE_VALUE`` → builder runs the per-channel configure + commit.
    - ``CONTINUOUS`` / ``FINITE`` → builder runs the pre-commit
      continuous configuration; the recorder (``record()``) drives the
      register → queue → ``commit()`` ordering after wiring its bridge.

    Idempotent.
    """
    if not self._prepared:
        raise DtolTaskStateError(
            "DtolSession.commit called before prepare()",
            context=ErrorContext(
                operation="DtolSession.commit",
                task_name=self.spec.name,
            ),
        )
    async with self._lock:
        if self._committed:
            return
        builder = TaskBuilder(self.backend)
        hdass = self._require_hdass("DtolSession.commit")
        capabilities = self.capabilities
        subsys_type = self.spec.subsystem_type or self.spec.infer_subsystem_type()
        if subsys_type in _COUNTER_SUBSYSTEMS:
            # Counter/timer / quadrature / tachometer — read on demand
            # after start; no channel/gain list or sample clock.
            await anyio.to_thread.run_sync(
                builder.configure_counter,
                hdass,
                self.spec,
                capabilities,
            )
        elif self.spec.data_flow == DataFlow.SINGLE_VALUE:
            await anyio.to_thread.run_sync(
                builder.configure_single_value,
                hdass,
                self.spec,
                capabilities,
            )
            self._seed_dout_shadow()
        else:
            # Pre-commit configure for continuous/finite — the recorder
            # finishes the §12.3.2 register → queue → commit ordering.
            await anyio.to_thread.run_sync(
                builder.configure_continuous,
                hdass,
                self.spec,
                capabilities,
            )
        self._committed = True

configure async

configure()

Convenience: :meth:prepare followed by :meth:commit.

Source code in src/dtollib/tasks/session.py
async def configure(self) -> None:
    """Convenience: :meth:`prepare` followed by :meth:`commit`."""
    await self.prepare()
    await self.commit()

is_running

is_running()

Cheap running-state probe.

Source code in src/dtollib/tasks/session.py
def is_running(self) -> bool:
    """Cheap running-state probe."""
    if self._hdass is None:
        return False
    return self.backend.is_running(self._hdass)

measure_frequency async

measure_frequency(*, timeout=None)

Measure input frequency (Hz) across the task's channels.

Valid for counter-frequency + tachometer tasks. Drives olDaMeasureFrequency per channel.

Returns:

Name Type Description
One DaqReading

class:DaqReading; values maps each channel display

DaqReading

name to its measured frequency in hertz.

Source code in src/dtollib/tasks/session.py
async def measure_frequency(self, *, timeout: float | None = None) -> DaqReading:  # noqa: ASYNC109
    """Measure input frequency (Hz) across the task's channels.

    Valid for counter-frequency + tachometer tasks.  Drives
    ``olDaMeasureFrequency`` per channel.

    Returns:
        One :class:`DaqReading`; ``values`` maps each channel display
        name to its measured frequency in hertz.
    """
    return await self._counter_reading(
        self.backend.measure_frequency,
        unit="Hz",
        operation="DtolSession.measure_frequency",
    )

poll async

poll(*, timeout=None)

One-shot scalar read across every channel of the task.

Behaviour (docs/implementation-plan.md §3.7):

  1. Captures requested_at + monotonic ns.
  2. Branches on OLSSC_SUP_SIMULTANEOUS_SH: one olDaGetSingleValues/Floats call across all channels, or per-channel loop.
  3. Branches on OLSSC_RETURNS_FLOATS: skip code-to-volts if true.
  4. For TC channels, detects sentinel floats and populates sensor_status + NaN-fills values.
  5. Computes t_utc as the midpoint of requested_at / received_at.

Parameters:

Name Type Description Default
timeout float | None

Per-call timeout in seconds. None = use :attr:timeout.

None

Returns:

Name Type Description
One DaqReading

class:DaqReading.

Raises:

Type Description
DtolTaskStateError

Task is not in a state that admits poll (continuous mid-run, mid-stop, mid-abort, ...).

Source code in src/dtollib/tasks/session.py
async def poll(self, *, timeout: float | None = None) -> DaqReading:  # noqa: ASYNC109
    """One-shot scalar read across every channel of the task.

    Behaviour (docs/implementation-plan.md §3.7):

    1. Captures ``requested_at`` + monotonic ns.
    2. Branches on ``OLSSC_SUP_SIMULTANEOUS_SH``: one
       ``olDaGetSingleValues``/``Floats`` call across all
       channels, or per-channel loop.
    3. Branches on ``OLSSC_RETURNS_FLOATS``: skip
       code-to-volts if true.
    4. For TC channels, detects sentinel floats and populates
       ``sensor_status`` + NaN-fills ``values``.
    5. Computes ``t_utc`` as the midpoint of
       ``requested_at`` / ``received_at``.

    Args:
        timeout: Per-call timeout in seconds.  ``None`` =
            use :attr:`timeout`.

    Returns:
        One :class:`DaqReading`.

    Raises:
        DtolTaskStateError: Task is not in a state that admits
            ``poll`` (continuous mid-run, mid-stop, mid-abort, ...).
    """
    await self.configure()
    # ``start()`` is conventional even though single-value SDK reads
    # may not require it on every device; calling ``olDaStart``
    # turns the subsystem into RUNNING and standardises the
    # state-machine assertions in tests.
    if self.state in {SubsystemState.STOPPING, SubsystemState.ABORTING}:
        raise DtolTaskStateError(
            f"DtolSession.poll: invalid state {self.state.value}; task is mid-shutdown",
            context=ErrorContext(
                operation="DtolSession.poll",
                task_name=self.spec.name,
            ),
        )

    # poll() is the single-value read path.  Continuous/finite tasks
    # stream through record() — see docs/design.md §9.2.
    if self.spec.data_flow != DataFlow.SINGLE_VALUE:
        raise DtolTaskStateError(
            f"DtolSession.poll: data_flow={self.spec.data_flow.value} is not "
            "valid for single-value reads; use `record(session)` instead.",
            context=ErrorContext(
                operation="DtolSession.poll",
                task_name=self.spec.name,
            ),
        )

    del timeout  # Single-value reads honour the session-level default only.

    async with self._lock:
        hdass = self._require_hdass("DtolSession.poll")
        requested_at = datetime.now(UTC)
        t_mono_request = time.monotonic_ns()

        await anyio.to_thread.run_sync(self.backend.start, hdass)
        values, sentinels = await self._read_all_channels()
        received_at = datetime.now(UTC)
        t_mono_received = time.monotonic_ns()

    # Derive midpoint between request and receipt (wall-clock).
    midpoint_us = (requested_at.timestamp() + received_at.timestamp()) / 2.0
    t_utc = datetime.fromtimestamp(midpoint_us, tz=UTC)
    midpoint_mono_ns = (t_mono_request + t_mono_received) // 2
    latency_s = (received_at - requested_at).total_seconds()

    units: dict[str, str | None] = {
        ch.display_name: _channel_unit(ch) for ch in self.spec.channels
    }

    return DaqReading(
        device=self.spec.name,
        task=self.spec.name,
        values=values,
        units=units,
        requested_at=requested_at,
        received_at=received_at,
        t_utc=t_utc,
        t_mono_ns=t_mono_request,
        t_midpoint_mono_ns=midpoint_mono_ns,
        latency_s=latency_s,
        sensor_status=sentinels,
        metadata=dict(self.spec.metadata),
    )

prepare async

prepare()

Allocate HDASS, validate against capabilities, configure channels.

Idempotent. Stops short of olDaConfig so the continuous path can insert notification + Ready-queue setup between :meth:prepare and :meth:commit. For single-value mode there is nothing to interleave; :meth:configure is the convenience that runs prepare then commit.

Source code in src/dtollib/tasks/session.py
async def prepare(self) -> None:
    """Allocate HDASS, validate against capabilities, configure channels.

    Idempotent.  Stops short of ``olDaConfig`` so the continuous
    path can insert notification + Ready-queue setup between
    :meth:`prepare` and :meth:`commit`.  For single-value mode there
    is nothing to interleave; :meth:`configure` is the convenience
    that runs ``prepare`` then ``commit``.
    """
    async with self._lock:
        if self._prepared:  # double-check under lock.
            return

        board_name = self._resolve_board_name()
        subsys_type = self.spec.subsystem_type or self.spec.infer_subsystem_type()
        olss = _to_olss(subsys_type)

        hdrvr = await anyio.to_thread.run_sync(self.backend.initialize, board_name)
        self._hdrvr = hdrvr
        try:
            hdass = await anyio.to_thread.run_sync(
                self.backend.get_dass, hdrvr, olss, self.spec.element
            )
            self._hdass = hdass
        except BaseException:
            # Cleanup the HDRVR ref-count if subsystem reservation fails.
            await anyio.to_thread.run_sync(self.backend.terminate, hdrvr)
            self._hdrvr = None
            raise

        capabilities = await anyio.to_thread.run_sync(self.backend.query_capabilities, hdass)
        self._capabilities = capabilities
        self._validate_against_capabilities(capabilities)

        self._prepared = True

read_block async

read_block(samples_per_channel, *, timeout=None)

Read one buffer's worth of hardware-clocked data, synchronously.

The polled alternative to :func:dtollib.streaming.record for CONTINUOUS / FINITE tasks. On first call it primes a buffer pool (allocate → queue → olDaStart); each call then waits for the next completed buffer on the SDK Done queue and returns it as a :class:DaqBlock. No notification bridge is involved — this is a direct olDaGetBuffer poll.

:func:record / :func:~dtollib.streaming.record_polled remain the recommended path; read_block suits simple scripts and tests that want one buffer at a time without an async for consumer.

Parameters:

Name Type Description Default
samples_per_channel int

Buffer depth in samples per channel. Fixed for the life of the pool — later calls must pass the same value.

required
timeout float | None

Seconds to wait for a completed buffer. None uses the session default (:attr:timeout).

None

Returns:

Name Type Description
One DaqBlock

class:DaqBlock. block.samples_per_channel is the actual

DaqBlock

count (<= samples_per_channel for a partial final buffer).

Raises:

Type Description
DtolValidationError

samples_per_channel < 1.

DtolTaskStateError

data_flow is not continuous/finite, or buffers is unconfigured, or the pool was primed at a different depth.

DtolTimeoutError

No buffer completed within timeout.

Source code in src/dtollib/tasks/session.py
async def read_block(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,  # noqa: ASYNC109
) -> DaqBlock:
    """Read one buffer's worth of hardware-clocked data, synchronously.

    The polled alternative to :func:`dtollib.streaming.record` for
    ``CONTINUOUS`` / ``FINITE`` tasks. On first call it primes a buffer
    pool (allocate → queue → ``olDaStart``); each call then waits for the
    next completed buffer on the SDK Done queue and returns it as a
    :class:`DaqBlock`. No notification bridge is involved — this is a
    direct ``olDaGetBuffer`` poll.

    :func:`record` / :func:`~dtollib.streaming.record_polled` remain the
    recommended path; ``read_block`` suits simple scripts and tests that
    want one buffer at a time without an ``async for`` consumer.

    Args:
        samples_per_channel: Buffer depth in samples per channel. Fixed
            for the life of the pool — later calls must pass the same value.
        timeout: Seconds to wait for a completed buffer. ``None`` uses the
            session default (:attr:`timeout`).

    Returns:
        One :class:`DaqBlock`. ``block.samples_per_channel`` is the actual
        count (``<= samples_per_channel`` for a partial final buffer).

    Raises:
        DtolValidationError: ``samples_per_channel < 1``.
        DtolTaskStateError: ``data_flow`` is not continuous/finite, or
            ``buffers`` is unconfigured, or the pool was primed at a
            different depth.
        DtolTimeoutError: No buffer completed within ``timeout``.
    """
    if samples_per_channel < 1:
        raise DtolValidationError(
            f"read_block: samples_per_channel must be >= 1, got {samples_per_channel}",
            context=ErrorContext(operation="DtolSession.read_block", task_name=self.spec.name),
        )
    await self._ensure_read_primed(samples_per_channel)
    pool = self._read_pool
    if pool is None:  # pragma: no cover - _ensure_read_primed sets it
        raise DtolTaskStateError(
            "read_block: buffer pool not primed",
            context=ErrorContext(operation="DtolSession.read_block", task_name=self.spec.name),
        )
    n_channels = max(pool.n_channels, 1)
    wait_s = self.timeout if timeout is None else timeout
    deadline = anyio.current_time() + wait_s

    sample_count = 0
    while True:
        raw = await anyio.to_thread.run_sync(pool.get_done)
        if raw is not None:
            sample_count = min(int(raw.valid_samples) // n_channels, samples_per_channel)
            if sample_count > 0:
                break
            # Empty buffer — recycle it and keep waiting.
            await anyio.to_thread.run_sync(pool.requeue, raw)
        if anyio.current_time() >= deadline:
            raise DtolTimeoutError(
                f"read_block: no buffer completed within {wait_s:.3f}s",
                context=ErrorContext(
                    operation="DtolSession.read_block", task_name=self.spec.name
                ),
            )
        await anyio.sleep(_READ_POLL_INTERVAL_S)

    import numpy as np  # noqa: PLC0415

    read_started = datetime.now(UTC)
    mono_ns = time.monotonic_ns()
    view = await anyio.to_thread.run_sync(pool.payload_view, raw)
    flat = np.asarray(view)[: sample_count * n_channels].copy()
    data_int = flat.reshape(sample_count, n_channels).T
    block = self._make_read_block(
        data_int, sample_count, mono_ns=mono_ns, read_started=read_started
    )
    await anyio.to_thread.run_sync(pool.requeue, raw)
    return block

read_events async

read_events(*, timeout=None)

Read the current counter value(s) across the task's channels.

Valid for counter/timer + quadrature tasks (event counting, edge-to-edge interval, accumulated quadrature position). Drives olDaReadEvents per channel and packages the counts into a :class:DaqReading so counter rows join sibling-library samples on (device, t_mono_ns).

Returns:

Name Type Description
One DaqReading

class:DaqReading; values maps each channel display

DaqReading

name to its integer count.

Source code in src/dtollib/tasks/session.py
async def read_events(self, *, timeout: float | None = None) -> DaqReading:  # noqa: ASYNC109
    """Read the current counter value(s) across the task's channels.

    Valid for counter/timer + quadrature tasks (event counting,
    edge-to-edge interval, accumulated quadrature position).  Drives
    ``olDaReadEvents`` per channel and packages the counts into a
    :class:`DaqReading` so counter rows join sibling-library samples on
    ``(device, t_mono_ns)``.

    Returns:
        One :class:`DaqReading`; ``values`` maps each channel display
        name to its integer count.
    """
    return await self._counter_reading(
        self.backend.read_events,
        unit="counts",
        operation="DtolSession.read_events",
    )

read_inprocess async

read_inprocess()

Drain the currently-filling buffer without waiting for completion.

The low-latency partial-buffer read for CONTINUOUS / FINITE tasks on subsystems that advertise OLSSC_SUP_INPROCESSFLUSH — useful on slow tasks (e.g. 200 Hz TC, 1 kHz strain) where waiting for a full buffer is unacceptable. Backed by olDmCopyFromBuffer on the in-process HBUF; primes the pool on first use like :meth:read_block.

The SDK transfers data in device-specific segment sizes, so the returned block.samples_per_channel is whatever was available — not necessarily a full buffer.

Returns:

Name Type Description
A DaqBlock | None

class:DaqBlock for the samples available, or None when the

DaqBlock | None

in-process buffer holds zero valid samples.

Raises:

Type Description
DtolCapabilityError

The subsystem does not support in-process flush.

DtolTaskStateError

data_flow is not continuous/finite, or buffers is unconfigured.

Source code in src/dtollib/tasks/session.py
async def read_inprocess(self) -> DaqBlock | None:
    """Drain the currently-filling buffer without waiting for completion.

    The low-latency partial-buffer read for ``CONTINUOUS`` / ``FINITE``
    tasks on subsystems that advertise ``OLSSC_SUP_INPROCESSFLUSH`` —
    useful on slow tasks (e.g. 200 Hz TC, 1 kHz strain) where waiting for
    a full buffer is unacceptable. Backed by ``olDmCopyFromBuffer`` on the
    in-process HBUF; primes the pool on first use like :meth:`read_block`.

    The SDK transfers data in device-specific segment sizes, so the
    returned ``block.samples_per_channel`` is whatever was available — not
    necessarily a full buffer.

    Returns:
        A :class:`DaqBlock` for the samples available, or ``None`` when the
        in-process buffer holds zero valid samples.

    Raises:
        DtolCapabilityError: The subsystem does not support in-process flush.
        DtolTaskStateError: ``data_flow`` is not continuous/finite, or
            ``buffers`` is unconfigured.
    """
    if not self.capabilities.supports_inprocess_flush:
        raise DtolCapabilityError(
            "read_inprocess: subsystem does not advertise OLSSC_SUP_INPROCESSFLUSH; "
            "use read_block() or record() instead.",
            context=ErrorContext(
                operation="DtolSession.read_inprocess", task_name=self.spec.name
            ),
        )
    if self.spec.buffers is None:
        raise DtolTaskStateError(
            "read_inprocess: requires TaskSpec.buffers to be configured",
            context=ErrorContext(
                operation="DtolSession.read_inprocess", task_name=self.spec.name
            ),
        )
    await self._ensure_read_primed(self.spec.buffers.samples_per_buffer)
    pool = self._read_pool
    if pool is None:  # pragma: no cover - _ensure_read_primed sets it
        return None
    n_channels = max(pool.n_channels, 1)
    # The buffer the SDK is currently filling is the FIFO head of the pool.
    candidates = [b for b in pool.buffers if b.state == BufferState.QUEUED]
    if not candidates:
        return None
    raw = candidates[0]
    request_samples = pool.plan.samples_per_buffer * n_channels

    import numpy as np  # noqa: PLC0415

    read_started = datetime.now(UTC)
    mono_ns = time.monotonic_ns()
    data_bytes = await anyio.to_thread.run_sync(
        self.backend.copy_inprocess_buffer,
        raw.hbuf,
        request_samples,
        pool.sample_dtype_bytes,
    )
    if not data_bytes:
        return None
    dtype = np.int16 if pool.sample_dtype_bytes == 2 else np.int32  # noqa: PLR2004
    codes = np.frombuffer(data_bytes, dtype=dtype)
    sample_count = len(codes) // n_channels
    if sample_count <= 0:
        return None
    data_int = codes[: sample_count * n_channels].reshape(sample_count, n_channels).T
    return self._make_read_block(
        data_int, sample_count, mono_ns=mono_ns, read_started=read_started
    )

start async

start()

olDaStart — transitions subsystem to RUNNING.

Source code in src/dtollib/tasks/session.py
async def start(self) -> None:
    """``olDaStart`` — transitions subsystem to RUNNING."""
    if not self._committed:
        await self.commit()
    async with self._lock:
        hdass = self._require_hdass("DtolSession.start")
        await anyio.to_thread.run_sync(self.backend.start, hdass)

stop async

stop()

olDaStop — orderly stop. Blocks until current buffer fills.

Source code in src/dtollib/tasks/session.py
async def stop(self) -> None:
    """``olDaStop`` — orderly stop.  Blocks until current buffer fills."""
    async with self._lock:
        if self._hdass is None:
            return
        await anyio.to_thread.run_sync(self.backend.stop, self._hdass)

write async

write(values, *, confirm=False)

Single-value write to AO / DO channels with the §18 safety gate.

Validation is atomic and pre-SDK (docs/design.md §18): every value is checked against its channel before any write reaches the backend, so a single bad value leaves the device untouched. The wrapper never silently clamps.

Gate model (decided 2026-05-28; confirm-gate per design §18.1):

  • Unknown channel name → :class:DtolValidationError.
  • Value outside the device [min_val, max_val] → always :class:DtolValidationError (electrically impossible; confirm does not override).
  • Value outside [safe_min, safe_max] (when set), or a channel with requires_confirm=True, without confirm=True → :class:DtolConfirmationRequiredError.

Parameters:

Name Type Description Default
values Mapping[str, float | bool]

Channel display_name → value. Floats for AO, bools for DO.

required
confirm bool

Operator confirmation for safety-gated writes.

False

Raises:

Type Description
DtolTaskStateError

data_flow is not SINGLE_VALUE (use :func:~dtollib.streaming.play for continuous AO), or the task is mid-shutdown.

DtolValidationError

Unknown channel or out-of-device-range value.

DtolConfirmationRequiredError

Safety gate tripped without confirm.

Source code in src/dtollib/tasks/session.py
async def write(self, values: Mapping[str, float | bool], *, confirm: bool = False) -> None:
    """Single-value write to AO / DO channels with the §18 safety gate.

    Validation is **atomic and pre-SDK** (docs/design.md §18): every
    value is checked against its channel before *any* write reaches the
    backend, so a single bad value leaves the device untouched. The
    wrapper never silently clamps.

    Gate model (decided 2026-05-28; confirm-gate per design §18.1):

    - Unknown channel name → :class:`DtolValidationError`.
    - Value outside the device ``[min_val, max_val]`` → always
      :class:`DtolValidationError` (electrically impossible; ``confirm``
      does not override).
    - Value outside ``[safe_min, safe_max]`` (when set), **or** a channel
      with ``requires_confirm=True``, without ``confirm=True`` →
      :class:`DtolConfirmationRequiredError`.

    Args:
        values: Channel ``display_name`` → value. Floats for AO, bools
            for DO.
        confirm: Operator confirmation for safety-gated writes.

    Raises:
        DtolTaskStateError: ``data_flow`` is not ``SINGLE_VALUE`` (use
            :func:`~dtollib.streaming.play` for continuous AO), or the
            task is mid-shutdown.
        DtolValidationError: Unknown channel or out-of-device-range value.
        DtolConfirmationRequiredError: Safety gate tripped without confirm.
    """
    if self.spec.data_flow != DataFlow.SINGLE_VALUE:
        raise DtolTaskStateError(
            f"DtolSession.write: data_flow={self.spec.data_flow.value} is not "
            "valid for single-value writes; use play() for continuous AO.",
            context=ErrorContext(operation="DtolSession.write", task_name=self.spec.name),
        )

    await self.configure()

    # Resolve + validate EVERYTHING before any SDK call (atomic; never
    # clamp). Analog writes are one code per channel; digital writes are
    # accumulated per port into a single byte (whole-port writes plus
    # per-line bit merges over the shadow register).
    ao_writes, port_writes = self._plan_write(values, confirm=confirm)

    async with self._lock:
        hdass = self._require_hdass("DtolSession.write")
        caps = self.capabilities
        if (
            ao_writes
            and not port_writes
            and len(ao_writes) == len(self.spec.channels)
            and caps.supports_simultaneous_da
        ):
            codes = [code for _ch, code in ao_writes]
            await anyio.to_thread.run_sync(self.backend.put_single_values, hdass, codes, 1.0)
        else:
            for physical_channel, code in ao_writes:
                await anyio.to_thread.run_sync(
                    self.backend.put_single_value, hdass, physical_channel, code, 1.0
                )
            for port_channel, byte in port_writes.items():
                await anyio.to_thread.run_sync(
                    self.backend.put_single_value, hdass, port_channel, byte, 1.0
                )
                self._dout_shadow[port_channel] = byte