Skip to content

dtollib.backend

Session orchestration above the capi layer — HDRVR refcount, capability cache, and the notification bridge. The real backend never touches ctypes directly; that is the capi layer's job.

dataacq

dtollib.backend.dataacq

Real DataAcq SDK backend — :class:DataAcqBackend.

This class is Layer 3 of the C-boundary stack (docs/design.md §10.3):

  • Layer 1: :mod:dtollib.capi.prototypes — raw ctypes signatures.
  • Layer 2: :class:~dtollib.capi.OpenLayersApi — output-pointer extraction + ECODE classification.
  • Layer 3 (this module) — session-level orchestration: HDRVR ref-counting, capability cache, notification-wrapper pinning, buffer pool.

The class is constructed once per process and shared across every :class:~dtollib.tasks.DtolSession. See :class:~dtollib.manager.DtolManager for the per-session sharing discipline.

DataAcqBackend

DataAcqBackend(dlls=None)

Real DataAcq SDK backend.

Construction either accepts a pre-loaded :class:~dtollib.capi.OpenLayersDlls (dependency injection — used by tests) or calls :func:~dtollib.capi.load_openlayers itself.

Thread safety: a single :class:threading.RLock guards every SDK call. The lock is conservative — see docs/design.md §16.3.

Bind the SDK and prepare the session caches.

Parameters:

Name Type Description Default
dlls OpenLayersDlls | None

Pre-loaded SDK handle pair. If None, :func:~dtollib.capi.load_openlayers is called to resolve the default install paths.

None
Source code in src/dtollib/backend/dataacq.py
def __init__(self, dlls: OpenLayersDlls | None = None) -> None:
    """Bind the SDK and prepare the session caches.

    Args:
        dlls: Pre-loaded SDK handle pair.  If ``None``,
            :func:`~dtollib.capi.load_openlayers` is called to
            resolve the default install paths.
    """
    self._dlls = dlls if dlls is not None else load_openlayers()
    self._api = OpenLayersApi(self._dlls)
    self._lock = threading.RLock()

    # HDRVR ref-counting: first ``initialize(name)`` opens the
    # device; subsequent calls reuse the existing handle.  Final
    # ``terminate(hdrvr)`` closes it.  Maps board name → (HDRVR, refcount).
    self._open_devices: dict[str, tuple[int, int]] = {}
    # Reverse lookup: HDRVR → board name (for terminate()).
    self._hdrvr_to_name: dict[int, str] = {}

    # Capability cache: one CapabilitySet per HDASS for the
    # lifetime of the held subsystem.
    self._capability_cache: dict[int, CapabilitySet] = {}

    # Input-scaling cache (vmin, vmax, resolution_bits, twos_complement)
    # per HDASS — populated lazily on the first ``code_to_volts`` after
    # commit, because ``olDaCodeToVolts`` is unusable on the DT9805/06
    # (ECODE=9) so we convert from the configured encoding ourselves.
    self._scaling_cache: dict[int, tuple[float, float, int, bool]] = {}

    # Last data-flow mode set per HDASS.  Single-value subsystems have no
    # ``olDaStart`` step (the SDK rejects it with ECODE=27 "Dataflow
    # mismatch"); ``start`` skips the call for them.
    self._dataflow_mode: dict[int, int] = {}

    # Notification-wrapper pinning to prevent GC collecting the
    # callback wrapper while the SDK still holds a pointer to it.
    self._notification_wrappers: dict[int, Any] = {}

api property

api

Underlying :class:OpenLayersApi (escape hatch).

dlls property

dlls

Underlying :class:OpenLayersDlls (escape hatch).

abort

abort(hdass)

olDaAbort (immediate).

Source code in src/dtollib/backend/dataacq.py
def abort(self, hdass: int) -> None:
    """``olDaAbort`` (immediate)."""
    with self._lock:
        self._api.abort(hdass)

add_channel

add_channel(hdass, list_index, spec)

Add a channel to the channel/gain list with all per-type setters.

Caller MUST have already called :meth:set_multi_sensor_type on this channel if it reports IOType.MULTI_SENSOR (docs/design.md §8.5a).

Source code in src/dtollib/backend/dataacq.py
def add_channel(
    self,
    hdass: int,
    list_index: int,
    spec: ChannelSpec,
) -> None:
    """Add a channel to the channel/gain list with all per-type setters.

    Caller MUST have already called :meth:`set_multi_sensor_type`
    on this channel if it reports ``IOType.MULTI_SENSOR``
    (docs/design.md §8.5a).
    """
    # Lazy import — avoids circular import via channels → tasks.
    from dtollib.channels.analog_input import (  # noqa: PLC0415
        AnalogInputBase,
        AnalogInputVoltage,
        CurrentInput,
        ThermocoupleInput,
    )
    from dtollib.channels.analog_output import AnalogOutputVoltage  # noqa: PLC0415

    with self._lock:
        if isinstance(spec, AnalogInputBase):
            self._api.set_channel_type(hdass, _CHANNEL_TYPE_TO_OL(spec.channel_type))
        if isinstance(spec, AnalogInputVoltage | AnalogOutputVoltage | CurrentInput):
            self._set_voltage_range(hdass, spec.physical_channel, spec.min_val, spec.max_val)
        # Per-channel coupling (AC for IEPE, optional elsewhere). Tolerated
        # because the AD subsystem on the DT9805/06 rejects it (ec=36).
        if isinstance(spec, AnalogInputBase) and spec.coupling is not None:
            self._tolerate_unsupported(
                lambda: self._api.set_coupling_type(
                    hdass,
                    spec.physical_channel,
                    _COUPLING_TO_OL[spec.coupling.value],  # type: ignore[union-attr]
                )
            )
        self._configure_multi_sensor(hdass, spec)
        if isinstance(spec, ThermocoupleInput):
            caps = self._capability_cache.get(hdass)
            if caps is not None and caps.returns_floats:
                # Firmware-linearising subsystem: tell the SDK the TC type
                # so olDaGetSingleFloat returns temperature directly.
                self._api.set_thermocouple_type(
                    hdass,
                    spec.physical_channel,
                    _TC_TYPE_TO_OL[spec.thermocouple_type.value],
                )
            # Application-linearised path (DT9805/06): the SDK has no usable
            # TC setter — olDaSetThermocoupleType returns OLNOTSUPPORTED
            # (ec=36) — so we skip it and linearise in the read path from
            # the differential emf + CJC channel. See docs/decisions.md.
        # Gain-list entry applies to analog channels.  AI uses it to
        # select the per-channel range (DT9805/06 gain-select their fixed
        # ±10 V range).  A voltage DAC may have no programmable gain — the
        # DT9806 DA subsystem rejects olDaSetGainListEntry with
        # OLNOTSUPPORTED (ec=36, bench-confirmed 2026-05-28); tolerate that
        # for output channels, the single-value put writes the code
        # directly.  Digital lines have no gain list at all.
        if isinstance(spec, AnalogInputBase):
            self._api.set_gain_list_entry(hdass, list_index, spec.physical_channel, spec.gain)
        elif isinstance(spec, AnalogOutputVoltage):
            try:
                self._api.set_gain_list_entry(
                    hdass, list_index, spec.physical_channel, spec.gain
                )
            except DtolError as exc:
                if exc.context.ecode != OL_NOT_SUPPORTED:
                    raise

alloc_buffer

alloc_buffer(
    n_samples, sample_dtype_bytes, *, zero_init=True
)

olDmCallocBuffer / olDmMallocBuffer.

Source code in src/dtollib/backend/dataacq.py
def alloc_buffer(
    self,
    n_samples: int,
    sample_dtype_bytes: int,
    *,
    zero_init: bool = True,
) -> int:
    """``olDmCallocBuffer`` / ``olDmMallocBuffer``."""
    with self._lock:
        return self._api.alloc_buffer(
            n_samples,
            sample_dtype_bytes,
            zero_init=zero_init,
        )

arm

arm(hdass)

olDaConfig — second config, after olDaSetWndHandle + queue.

Continuous mode only. The second olDaConfig wires the message window into the SDK's buffer-rotation state machine; without it the SDK never posts OLDA_WM_BUFFER_DONE on the DT9805/06.

Source code in src/dtollib/backend/dataacq.py
def arm(self, hdass: int) -> None:
    """``olDaConfig`` — second config, after ``olDaSetWndHandle`` + queue.

    Continuous mode only. The second ``olDaConfig`` wires the message
    window into the SDK's buffer-rotation state machine; without it the
    SDK never posts ``OLDA_WM_BUFFER_DONE`` on the DT9805/06.
    """
    with self._lock:
        self._api.config(hdass)

code_to_volts

code_to_volts(hdass, code, gain)

Convert a raw code to input volts from the configured scaling.

olDaCodeToVolts returns ECODE=9 ("Invalid Encoding") on the DT9805/DT9806 A/D (bench-verified 2026-05-28; docs/decisions.md), so we query the subsystem encoding / resolution / range once (cached after commit) and convert ourselves via :func:~dtollib.capi.conversion.code_to_input_volts.

Source code in src/dtollib/backend/dataacq.py
def code_to_volts(self, hdass: int, code: int, gain: float) -> float:
    """Convert a raw code to input volts from the configured scaling.

    ``olDaCodeToVolts`` returns ECODE=9 ("Invalid Encoding") on the
    DT9805/DT9806 A/D (bench-verified 2026-05-28; docs/decisions.md),
    so we query the subsystem encoding / resolution / range once
    (cached after commit) and convert ourselves via
    :func:`~dtollib.capi.conversion.code_to_input_volts`.
    """
    vmin, vmax, resolution, twos_complement = self.get_scaling(hdass)
    # Pure conversion — no SDK call, safe outside the lock.
    return code_to_input_volts(
        code,
        gain,
        vmin=vmin,
        vmax=vmax,
        resolution_bits=resolution,
        twos_complement=twos_complement,
    )

commit

commit(hdass)

olDaConfig — first config (after channel/clock/wrap setup).

For single-value tasks this is the only config. For continuous tasks it is config #1; :meth:arm runs config #2 after the window handle is wired and buffers are queued (docs/decisions.md, "Bench-verified continuous-mode setup").

Source code in src/dtollib/backend/dataacq.py
def commit(self, hdass: int) -> None:
    """``olDaConfig`` — first config (after channel/clock/wrap setup).

    For single-value tasks this is the only config. For continuous
    tasks it is config #1; :meth:`arm` runs config #2 after the
    window handle is wired and buffers are queued (docs/decisions.md,
    "Bench-verified continuous-mode setup").
    """
    with self._lock:
        self._api.config(hdass)

copy_buffer

copy_buffer(hbuf, n_samples, sample_dtype_bytes)

olDmCopyBuffer — copy an HBUF's valid samples to a host buffer.

Source code in src/dtollib/backend/dataacq.py
def copy_buffer(self, hbuf: int, n_samples: int, sample_dtype_bytes: int) -> bytes:
    """``olDmCopyBuffer`` — copy an HBUF's valid samples to a host buffer."""
    with self._lock:
        return self._api.copy_buffer(hbuf, n_samples, sample_dtype_bytes)

copy_inprocess_buffer

copy_inprocess_buffer(hbuf, n_samples, sample_dtype_bytes)

olDmCopyFromBuffer — copy the in-process HBUF without waiting.

Source code in src/dtollib/backend/dataacq.py
def copy_inprocess_buffer(
    self,
    hbuf: int,
    n_samples: int,
    sample_dtype_bytes: int,
) -> bytes:
    """``olDmCopyFromBuffer`` — copy the in-process HBUF without waiting."""
    with self._lock:
        return self._api.copy_from_buffer(hbuf, n_samples, sample_dtype_bytes)

copy_to_buffer

copy_to_buffer(hbuf, data, n_samples)

olDmCopyToBuffer — fill an HBUF from a host byte payload.

Source code in src/dtollib/backend/dataacq.py
def copy_to_buffer(self, hbuf: int, data: bytes, n_samples: int) -> None:
    """``olDmCopyToBuffer`` — fill an HBUF from a host byte payload."""
    with self._lock:
        self._api.copy_to_buffer(hbuf, data, n_samples)

enum_boards

enum_boards()

Enumerate every installed DT-Open Layers board.

Uses olDaEnumBoardsEx for the registry-aware enumeration so the driver name and instance are populated. Falls back to olDaEnumBoards + olDaGetBoardInfo if the Ex variant fails (older drivers).

Source code in src/dtollib/backend/dataacq.py
def enum_boards(self) -> list[BoardInfo]:
    """Enumerate every installed DT-Open Layers board.

    Uses ``olDaEnumBoardsEx`` for the registry-aware enumeration so
    the driver name and instance are populated.  Falls back to
    ``olDaEnumBoards`` + ``olDaGetBoardInfo`` if the Ex variant
    fails (older drivers).
    """
    with self._lock:
        try:
            rows = self._api.enum_boards_ex()
        except Exception:
            names = self._api.enum_boards()
            boards: list[BoardInfo] = []
            for name in names:
                model, driver = self._api.get_board_info(name)
                boards.append(
                    BoardInfo(
                        name=name,
                        model=model,
                        driver_name=driver,
                        instance=0,
                    )
                )
            return boards

        boards = []
        for row in rows:
            model = ""
            try:
                model, _driver_unused = self._api.get_board_info(row.name)
            except Exception:
                model = ""
            boards.append(
                BoardInfo(
                    name=row.name,
                    model=model or row.name,
                    driver_name=row.driver,
                    instance=row.instance,
                )
            )
        return boards

enum_subsystems

enum_subsystems(board_name)

Enumerate subsystems on board_name.

Implementation note: olDaEnumSubSystems enumerates raw HDASS handles, but the SDK's preferred discovery shape is "try to acquire each (subsys_type, element) pair until one fails". We do the latter — it's portable across SDK revisions and surfaces the element index for each subsystem without an extra round-trip.

Source code in src/dtollib/backend/dataacq.py
def enum_subsystems(self, board_name: str) -> list[SubsystemInfo]:
    """Enumerate subsystems on ``board_name``.

    Implementation note: ``olDaEnumSubSystems`` enumerates raw
    HDASS handles, but the SDK's preferred discovery shape is
    "try to acquire each (subsys_type, element) pair until one
    fails".  We do the latter — it's portable across SDK
    revisions and surfaces the element index for each subsystem
    without an extra round-trip.
    """
    with self._lock:
        hdrvr = self.initialize(board_name)
        try:
            subs: list[SubsystemInfo] = []
            for subsys_type in _PROBED_SUBSYSTEM_TYPES:
                typed = SUBSYS_TYPE_TO_ENUM[subsys_type]
                for element in range(_MAX_ELEMENT_PROBE):
                    hdass: int
                    try:
                        hdass = self._api.get_dass(hdrvr, subsys_type, element)
                    except Exception:
                        break
                    try:
                        caps = query_capabilities(self._api, hdass)
                        subs.append(
                            SubsystemInfo(
                                type=typed,
                                element=element,
                                num_channels=caps.num_channels,
                                supports_singlevalue=caps.supports_singlevalue,
                                supports_continuous=caps.supports_continuous,
                                supports_simultaneous_sh=caps.supports_simultaneous_sh,
                                supports_multisensor=caps.supports_multisensor,
                                supports_dma=caps.supports_dma,
                                returns_floats=caps.returns_floats,
                                max_throughput_hz=caps.max_throughput_hz,
                                cgl_depth=caps.cgl_depth,
                            )
                        )
                    finally:
                        self._api.release_dass(hdass)
            return subs
        finally:
            self.terminate(hdrvr)

flush_buffers

flush_buffers(hdass)

olDaFlushBuffers.

Source code in src/dtollib/backend/dataacq.py
def flush_buffers(self, hdass: int) -> None:
    """``olDaFlushBuffers``."""
    with self._lock:
        self._api.flush_buffers(hdass)

free_buffer

free_buffer(hbuf)

olDmFreeBuffer.

Source code in src/dtollib/backend/dataacq.py
def free_buffer(self, hbuf: int) -> None:
    """``olDmFreeBuffer``."""
    with self._lock:
        self._api.free_buffer(hbuf)

get_buffer

get_buffer(hdass)

olDaGetBuffer.

Source code in src/dtollib/backend/dataacq.py
def get_buffer(self, hdass: int) -> int | None:
    """``olDaGetBuffer``."""
    with self._lock:
        return self._api.get_buffer(hdass)

get_buffer_valid_samples

get_buffer_valid_samples(hbuf)

olDmGetValidSamples.

Source code in src/dtollib/backend/dataacq.py
def get_buffer_valid_samples(self, hbuf: int) -> int:
    """``olDmGetValidSamples``."""
    with self._lock:
        return self._api.get_buffer_valid_samples(hbuf)

get_cjc_temperature

get_cjc_temperature(hdass, channel)

olDaGetCjcTemperature.

Source code in src/dtollib/backend/dataacq.py
def get_cjc_temperature(self, hdass: int, channel: int) -> float:
    """``olDaGetCjcTemperature``."""
    with self._lock:
        return self._api.get_cjc_temperature(hdass, channel)

get_clock_frequency

get_clock_frequency(hdass)

olDaGetClockFrequency.

Source code in src/dtollib/backend/dataacq.py
def get_clock_frequency(self, hdass: int) -> float:
    """``olDaGetClockFrequency``."""
    with self._lock:
        return self._api.get_clock_frequency(hdass)

get_dass

get_dass(hdrvr, subsystem_type, element)

Reserve a subsystem; return its HDASS.

Source code in src/dtollib/backend/dataacq.py
def get_dass(self, hdrvr: int, subsystem_type: int, element: int) -> int:
    """Reserve a subsystem; return its HDASS."""
    with self._lock:
        return self._api.get_dass(hdrvr, subsystem_type, element)

get_queue_size

get_queue_size(hdass, queue)

olDaGetQueueSize.

Source code in src/dtollib/backend/dataacq.py
def get_queue_size(self, hdass: int, queue: int) -> int:
    """``olDaGetQueueSize``."""
    with self._lock:
        return self._api.get_queue_size(hdass, queue)

get_scaling

get_scaling(hdass)

Return (vmin, vmax, resolution_bits, twos_complement) for hdass.

Queries the subsystem range / resolution / encoding once (after olDaConfig) and caches it. Exposed so the continuous block path (:func:dtollib.streaming.record) can build a :class:~dtollib.capi.conversion.BlockConversion plan with the same scaling the single-value code_to_volts path uses.

Source code in src/dtollib/backend/dataacq.py
def get_scaling(self, hdass: int) -> tuple[float, float, int, bool]:
    """Return ``(vmin, vmax, resolution_bits, twos_complement)`` for ``hdass``.

    Queries the subsystem range / resolution / encoding once (after
    ``olDaConfig``) and caches it. Exposed so the continuous block path
    (:func:`dtollib.streaming.record`) can build a
    :class:`~dtollib.capi.conversion.BlockConversion` plan with the same
    scaling the single-value ``code_to_volts`` path uses.
    """
    with self._lock:
        scaling = self._scaling_cache.get(hdass)
        if scaling is None:
            vmax, vmin = self._api.get_range(hdass)
            resolution = self._api.get_resolution(hdass)
            twos_complement = self._api.get_encoding(hdass) == OL_ENC_2SCOMP
            scaling = (vmin, vmax, resolution, twos_complement)
            self._scaling_cache[hdass] = scaling
    return scaling

get_single_float

get_single_float(hdass, channel, gain)

olDaGetSingleFloat — engineering units.

Source code in src/dtollib/backend/dataacq.py
def get_single_float(self, hdass: int, channel: int, gain: float) -> float:
    """``olDaGetSingleFloat`` — engineering units."""
    with self._lock:
        return self._api.get_single_float(hdass, channel, gain)

get_single_floats

get_single_floats(hdass, gain)

olDaGetSingleFloats — simultaneous SH + engineering units.

Source code in src/dtollib/backend/dataacq.py
def get_single_floats(self, hdass: int, gain: float) -> list[float]:
    """``olDaGetSingleFloats`` — simultaneous SH + engineering units."""
    caps = self._capability_cache.get(hdass)
    n = caps.num_channels if caps is not None else 0
    with self._lock:
        return self._api.get_single_floats(hdass, n, gain)

get_single_value

get_single_value(hdass, channel, gain)

olDaGetSingleValue — raw code.

Source code in src/dtollib/backend/dataacq.py
def get_single_value(self, hdass: int, channel: int, gain: float) -> int:
    """``olDaGetSingleValue`` — raw code."""
    with self._lock:
        return self._api.get_single_value(hdass, channel, gain)

get_single_values

get_single_values(hdass, gain)

olDaGetSingleValues — simultaneous SH; requires cached n_channels.

Source code in src/dtollib/backend/dataacq.py
def get_single_values(self, hdass: int, gain: float) -> list[int]:
    """``olDaGetSingleValues`` — simultaneous SH; requires cached n_channels."""
    # Derive channel count from the capability cache — populated at session
    # construction.  Without it we'd have to query the SDK on every poll.
    caps = self._capability_cache.get(hdass)
    n = caps.num_channels if caps is not None else 0
    with self._lock:
        return self._api.get_single_values(hdass, n, gain)

get_ss_list

get_ss_list(hdrvr)

olDaGetSSList.

Source code in src/dtollib/backend/dataacq.py
def get_ss_list(self, hdrvr: int) -> int:
    """``olDaGetSSList``."""
    with self._lock:
        return self._api.get_ss_list(hdrvr)

get_state

get_state(hdass)

olDaGetSSState → :class:SubsystemState.

Source code in src/dtollib/backend/dataacq.py
def get_state(self, hdass: int) -> SubsystemState:
    """``olDaGetSSState`` → :class:`SubsystemState`."""
    with self._lock:
        raw = self._api.get_ss_state(hdass)
    return _OLSSC_STATE_TO_ENUM.get(raw, SubsystemState.INITIALIZED)

get_version

get_version()

Return (oldaapi_version, olmem_version).

Source code in src/dtollib/backend/dataacq.py
def get_version(self) -> tuple[str, str]:
    """Return ``(oldaapi_version, olmem_version)``."""
    with self._lock:
        return (self._api.get_oldaapi_version(), self._api.get_olmem_version())

initialize

initialize(board_name)

Open board_name; return its HDRVR. Ref-counted across sessions.

Source code in src/dtollib/backend/dataacq.py
def initialize(self, board_name: str) -> int:
    """Open ``board_name``; return its HDRVR.  Ref-counted across sessions."""
    with self._lock:
        existing = self._open_devices.get(board_name)
        if existing is not None:
            hdrvr, refcount = existing
            self._open_devices[board_name] = (hdrvr, refcount + 1)
            return hdrvr

        hdrvr = self._api.initialize(board_name)
        self._open_devices[board_name] = (hdrvr, 1)
        self._hdrvr_to_name[hdrvr] = board_name
        return hdrvr

is_running

is_running(hdass)

olDaIsRunning.

Source code in src/dtollib/backend/dataacq.py
def is_running(self, hdass: int) -> bool:
    """``olDaIsRunning``."""
    with self._lock:
        return self._api.is_running(hdass)

measure_frequency

measure_frequency(hdass, channel)

olDaMeasureFrequency — measured input frequency (Hz).

Source code in src/dtollib/backend/dataacq.py
def measure_frequency(self, hdass: int, channel: int) -> float:
    """``olDaMeasureFrequency`` — measured input frequency (Hz)."""
    with self._lock:
        return self._api.measure_frequency(hdass, channel)

mute

mute(hdass)

olDaMute — hold the D/A output at its current value.

Source code in src/dtollib/backend/dataacq.py
def mute(self, hdass: int) -> None:
    """``olDaMute`` — hold the D/A output at its current value."""
    with self._lock:
        self._api.mute(hdass)

put_buffer

put_buffer(hdass, hbuf)

olDaPutBuffer.

Source code in src/dtollib/backend/dataacq.py
def put_buffer(self, hdass: int, hbuf: int) -> None:
    """``olDaPutBuffer``."""
    with self._lock:
        self._api.put_buffer(hdass, hbuf)

put_dass_to_ss_list

put_dass_to_ss_list(hsslist, hdass)

olDaPutDassToSSList.

Source code in src/dtollib/backend/dataacq.py
def put_dass_to_ss_list(self, hsslist: int, hdass: int) -> None:
    """``olDaPutDassToSSList``."""
    with self._lock:
        self._api.put_dass_to_ss_list(hsslist, hdass)

put_single_value

put_single_value(hdass, channel, value, gain)

One-shot raw-code write via olDaPutSingleValue.

Source code in src/dtollib/backend/dataacq.py
def put_single_value(self, hdass: int, channel: int, value: int, gain: float) -> None:
    """One-shot raw-code write via ``olDaPutSingleValue``."""
    with self._lock:
        self._api.put_single_value(hdass, channel, value, gain)

put_single_values

put_single_values(hdass, values, gain)

Simultaneous raw-code write via olDaPutSingleValues.

Source code in src/dtollib/backend/dataacq.py
def put_single_values(self, hdass: int, values: list[int], gain: float) -> None:
    """Simultaneous raw-code write via ``olDaPutSingleValues``."""
    with self._lock:
        self._api.put_single_values(hdass, values, gain)

query_capabilities

query_capabilities(hdass)

Build a :class:CapabilitySet for hdass (cached).

Source code in src/dtollib/backend/dataacq.py
def query_capabilities(self, hdass: int) -> CapabilitySet:
    """Build a :class:`CapabilitySet` for ``hdass`` (cached)."""
    with self._lock:
        cached = self._capability_cache.get(hdass)
        if cached is not None:
            return cached
        caps = query_capabilities(self._api, hdass)
        self._capability_cache[hdass] = caps
        return caps

read_bridge_sensor_hardware_teds

read_bridge_sensor_hardware_teds(hdass, channel)

olDaReadBridgeSensorHardwareTeds passthrough.

Source code in src/dtollib/backend/dataacq.py
def read_bridge_sensor_hardware_teds(self, hdass: int, channel: int) -> dict[str, object]:
    """``olDaReadBridgeSensorHardwareTeds`` passthrough."""
    with self._lock:
        return self._api.read_bridge_sensor_hardware_teds(hdass, channel)

read_bridge_sensor_virtual_teds

read_bridge_sensor_virtual_teds(path)

olDaReadBridgeSensorVirtualTeds passthrough.

Source code in src/dtollib/backend/dataacq.py
def read_bridge_sensor_virtual_teds(self, path: str) -> dict[str, object]:
    """``olDaReadBridgeSensorVirtualTeds`` passthrough."""
    with self._lock:
        return self._api.read_bridge_sensor_virtual_teds(path)

read_buffer_payload

read_buffer_payload(hbuf)

Build an ndarray view over the HBUF payload — drainer-thread call.

Returns a NumPy view; the caller copies before requeueing the HBUF. Dtype is inferred from olDmGetDataWidth (2 bytes → int16, 4 bytes → int32).

Source code in src/dtollib/backend/dataacq.py
def read_buffer_payload(self, hbuf: int) -> Any:
    """Build an ndarray view over the HBUF payload — drainer-thread call.

    Returns a NumPy view; the caller copies before requeueing the HBUF.
    Dtype is inferred from ``olDmGetDataWidth`` (2 bytes → int16, 4
    bytes → int32).
    """
    import numpy as np  # noqa: PLC0415

    with self._lock:
        valid = self._api.get_buffer_valid_samples(hbuf)
        width_bytes = self._api.get_buffer_data_width(hbuf)
        ptr = self._api.get_buffer_ptr(hbuf)
    if not ptr or valid == 0:
        return np.zeros(0, dtype=np.int16)
    dtype = np.int16 if width_bytes == _INT16_SAMPLE_BYTES else np.int32
    nbytes = valid * width_bytes
    buf = (ctypes.c_char * nbytes).from_address(ptr)
    return np.frombuffer(buf, dtype=dtype, count=valid)

read_events

read_events(hdass, channel)

olDaReadEvents — current counter value.

Source code in src/dtollib/backend/dataacq.py
def read_events(self, hdass: int, channel: int) -> int:
    """``olDaReadEvents`` — current counter value."""
    with self._lock:
        return self._api.read_events(hdass, channel)

read_strain_gage_hardware_teds

read_strain_gage_hardware_teds(hdass, channel)

olDaReadStrainGageHardwareTeds passthrough.

Source code in src/dtollib/backend/dataacq.py
def read_strain_gage_hardware_teds(self, hdass: int, channel: int) -> dict[str, object]:
    """``olDaReadStrainGageHardwareTeds`` passthrough."""
    with self._lock:
        return self._api.read_strain_gage_hardware_teds(hdass, channel)

read_strain_gage_virtual_teds

read_strain_gage_virtual_teds(path)

olDaReadStrainGageVirtualTeds passthrough.

Source code in src/dtollib/backend/dataacq.py
def read_strain_gage_virtual_teds(self, path: str) -> dict[str, object]:
    """``olDaReadStrainGageVirtualTeds`` passthrough."""
    with self._lock:
        return self._api.read_strain_gage_virtual_teds(path)

register_notification

register_notification(hdass, callback)

Route SDK buffer-done events for hdass to callback.

Creates a hidden message-only window on a dedicated pump thread, calls olDaSetWndHandle to point the SDK at it, and pins the :class:~dtollib.backend._message_window.MessageWindow on self._notification_wrappers[id(hdass)] so neither the window, the pump thread, nor the WNDPROC is collected while the SDK holds the handle.

callback(msg_id, wparam, lparam) runs on the pump thread for each OLDA_WM_* message — the window-handle mechanism is the only one that delivers events on the DT9805/06 (docs/decisions.md).

Returns the :class:MessageWindow as the opaque handle for :meth:unregister_notification.

Source code in src/dtollib/backend/dataacq.py
def register_notification(
    self,
    hdass: int,
    callback: Callable[[int, int, int], int],
) -> object:
    """Route SDK buffer-done events for ``hdass`` to ``callback``.

    Creates a hidden message-only window on a dedicated pump thread,
    calls ``olDaSetWndHandle`` to point the SDK at it, and pins the
    :class:`~dtollib.backend._message_window.MessageWindow` on
    ``self._notification_wrappers[id(hdass)]`` so neither the window,
    the pump thread, nor the WNDPROC is collected while the SDK holds
    the handle.

    ``callback(msg_id, wparam, lparam)`` runs on the pump thread for
    each ``OLDA_WM_*`` message — the window-handle mechanism is the
    only one that delivers events on the DT9805/06 (docs/decisions.md).

    Returns the :class:`MessageWindow` as the opaque handle for
    :meth:`unregister_notification`.
    """
    window = MessageWindow(callback)
    with self._lock:
        try:
            self._api.set_wnd_handle(hdass, window.hwnd, 0)
        except BaseException:
            window.close()
            raise
        self._notification_wrappers[id(hdass)] = window
        return window

release_dass

release_dass(hdass)

Release hdass and drop its capability + scaling cache entries.

Source code in src/dtollib/backend/dataacq.py
def release_dass(self, hdass: int) -> None:
    """Release ``hdass`` and drop its capability + scaling cache entries."""
    with self._lock:
        self._capability_cache.pop(hdass, None)
        self._scaling_cache.pop(hdass, None)
        self._dataflow_mode.pop(hdass, None)
        self._api.release_dass(hdass)

release_ss_list

release_ss_list(hsslist)

olDaReleaseSSList.

Source code in src/dtollib/backend/dataacq.py
def release_ss_list(self, hsslist: int) -> None:
    """``olDaReleaseSSList``."""
    with self._lock:
        self._api.release_ss_list(hsslist)

set_cascade_mode

set_cascade_mode(hdass, cascade)

olDaSetCascadeMode.

Source code in src/dtollib/backend/dataacq.py
def set_cascade_mode(self, hdass: int, cascade: bool) -> None:
    """``olDaSetCascadeMode``."""
    with self._lock:
        self._api.set_cascade_mode(hdass, cascade)

set_channel_list

set_channel_list(hdass, channels)

Set list size + each entry via two SDK calls (drainer-thread safe).

Source code in src/dtollib/backend/dataacq.py
def set_channel_list(self, hdass: int, channels: list[int]) -> None:
    """Set list size + each entry via two SDK calls (drainer-thread safe)."""
    with self._lock:
        self._api.set_channel_list_size(hdass, len(channels))
        for list_index, channel in enumerate(channels):
            self._api.set_channel_list_entry(hdass, list_index, channel)

set_clock

set_clock(
    hdass, *, rate_hz, clock_source, external_divider=None
)

Configure clock source + frequency (+ divider when external).

Source code in src/dtollib/backend/dataacq.py
def set_clock(
    self,
    hdass: int,
    *,
    rate_hz: float,
    clock_source: int,
    external_divider: int | None = None,
) -> None:
    """Configure clock source + frequency (+ divider when external)."""
    with self._lock:
        self._api.set_clock_source(hdass, clock_source)
        self._api.set_clock_frequency(hdass, rate_hz)
        if external_divider is not None:
            self._api.set_external_clock_divider(hdass, external_divider)

set_ct_clock

set_ct_clock(hdass, *, rate_hz, clock_source)

olDaSetCTClockSource + olDaSetCTClockFrequency.

Source code in src/dtollib/backend/dataacq.py
def set_ct_clock(self, hdass: int, *, rate_hz: float, clock_source: int) -> None:
    """``olDaSetCTClockSource`` + ``olDaSetCTClockFrequency``."""
    with self._lock:
        self._api.set_ct_clock_source(hdass, clock_source)
        self._api.set_ct_clock_frequency(hdass, rate_hz)

set_ct_mode

set_ct_mode(hdass, mode)

olDaSetCTMode.

Source code in src/dtollib/backend/dataacq.py
def set_ct_mode(self, hdass: int, mode: int) -> None:
    """``olDaSetCTMode``."""
    with self._lock:
        self._api.set_ct_mode(hdass, mode)

set_data_flow

set_data_flow(hdass, mode)

Set data-flow mode via olDaSetDataFlow.

Source code in src/dtollib/backend/dataacq.py
def set_data_flow(self, hdass: int, mode: int) -> None:
    """Set data-flow mode via ``olDaSetDataFlow``."""
    with self._lock:
        self._api.set_data_flow(hdass, mode)
        self._dataflow_mode[hdass] = mode

set_digital_io_list_entry

set_digital_io_list_entry(hdass, entry, value)

olDaSetDigitalIOListEntry.

Source code in src/dtollib/backend/dataacq.py
def set_digital_io_list_entry(self, hdass: int, entry: int, value: int) -> None:
    """``olDaSetDigitalIOListEntry``."""
    with self._lock:
        self._api.set_digital_io_list_entry(hdass, entry, value)

set_dma_usage

set_dma_usage(hdass, n_channels)

olDaSetDmaUsage.

Source code in src/dtollib/backend/dataacq.py
def set_dma_usage(self, hdass: int, n_channels: int) -> None:
    """``olDaSetDmaUsage``."""
    with self._lock:
        self._api.set_dma_usage(hdass, n_channels)

set_gate_type

set_gate_type(hdass, gate)

olDaSetGateType.

Source code in src/dtollib/backend/dataacq.py
def set_gate_type(self, hdass: int, gate: int) -> None:
    """``olDaSetGateType``."""
    with self._lock:
        self._api.set_gate_type(hdass, gate)

set_measure_edges

set_measure_edges(hdass, *, start_edge, stop_edge)

olDaSetMeasureStartEdge + olDaSetMeasureStopEdge.

Source code in src/dtollib/backend/dataacq.py
def set_measure_edges(self, hdass: int, *, start_edge: int, stop_edge: int) -> None:
    """``olDaSetMeasureStartEdge`` + ``olDaSetMeasureStopEdge``."""
    with self._lock:
        self._api.set_measure_start_edge(hdass, start_edge)
        self._api.set_measure_stop_edge(hdass, stop_edge)

set_multi_sensor_type

set_multi_sensor_type(hdass, physical_channel, io_type)

Re-type a MULTI_SENSOR channel. Caller orders this BEFORE per-type setters.

Source code in src/dtollib/backend/dataacq.py
def set_multi_sensor_type(
    self,
    hdass: int,
    physical_channel: int,
    io_type: IOType,
) -> None:
    """Re-type a MULTI_SENSOR channel.  Caller orders this BEFORE per-type setters."""
    with self._lock:
        self._api.set_multi_sensor_type(
            hdass, physical_channel, _IO_TYPE_TO_OLSS_MULTI_SENSOR[io_type]
        )

set_pulse

set_pulse(hdass, *, pulse_type, duty_or_width)

olDaSetPulseType + olDaSetPulseWidth.

Source code in src/dtollib/backend/dataacq.py
def set_pulse(self, hdass: int, *, pulse_type: int, duty_or_width: float) -> None:
    """``olDaSetPulseType`` + ``olDaSetPulseWidth``."""
    with self._lock:
        self._api.set_pulse_type(hdass, pulse_type)
        self._api.set_pulse_width(hdass, duty_or_width)

set_return_cjc_in_stream

set_return_cjc_in_stream(hdass, enable)

olDaSetReturnCjcTemperatureInStream — interleaved-CJC streaming.

Source code in src/dtollib/backend/dataacq.py
def set_return_cjc_in_stream(self, hdass: int, enable: bool) -> None:
    """``olDaSetReturnCjcTemperatureInStream`` — interleaved-CJC streaming."""
    with self._lock:
        self._api.set_return_cjc_in_stream(hdass, enable)

set_stop_on_error

set_stop_on_error(hdass, stop)

olDaSetStopOnError — SDK-level error policy.

Source code in src/dtollib/backend/dataacq.py
def set_stop_on_error(self, hdass: int, stop: bool) -> None:
    """``olDaSetStopOnError`` — SDK-level error policy."""
    with self._lock:
        self._api.set_stop_on_error(hdass, stop)

set_synchronous_digital_io_usage

set_synchronous_digital_io_usage(hdass, use)

olDaSetSynchronousDigitalIOUsage.

Source code in src/dtollib/backend/dataacq.py
def set_synchronous_digital_io_usage(self, hdass: int, use: bool) -> None:
    """``olDaSetSynchronousDigitalIOUsage``."""
    with self._lock:
        self._api.set_synchronous_digital_io_usage(hdass, use)

set_thermocouple_type

set_thermocouple_type(hdass, channel, tc_type)

Set TC type via olDaSetThermocoupleType.

Source code in src/dtollib/backend/dataacq.py
def set_thermocouple_type(
    self,
    hdass: int,
    channel: int,
    tc_type: ThermocoupleType,
) -> None:
    """Set TC type via ``olDaSetThermocoupleType``."""
    with self._lock:
        self._api.set_thermocouple_type(hdass, channel, _TC_TYPE_TO_OL[tc_type.value])

set_trigger

set_trigger(
    hdass,
    *,
    kind,
    threshold_channel=None,
    threshold_level=None,
)

Configure start-trigger + analog-threshold parameters when given.

Source code in src/dtollib/backend/dataacq.py
def set_trigger(
    self,
    hdass: int,
    *,
    kind: int,
    threshold_channel: int | None = None,
    threshold_level: float | None = None,
) -> None:
    """Configure start-trigger + analog-threshold parameters when given."""
    with self._lock:
        self._api.set_trigger(hdass, kind)
        if threshold_channel is not None:
            self._api.set_trigger_threshold_channel(hdass, threshold_channel)
        if threshold_level is not None:
            self._api.set_trigger_threshold_level(hdass, threshold_level)

set_triggered_scan

set_triggered_scan(
    hdass,
    *,
    multiscan_count,
    retrigger_mode,
    frequency_hz=None,
    source=None,
)

Enable triggered scan + configure the retrigger mode.

Source code in src/dtollib/backend/dataacq.py
def set_triggered_scan(
    self,
    hdass: int,
    *,
    multiscan_count: int,
    retrigger_mode: int,
    frequency_hz: float | None = None,
    source: int | None = None,
) -> None:
    """Enable triggered scan + configure the retrigger mode."""
    with self._lock:
        self._api.set_triggered_scan_usage(hdass, True)
        self._api.set_multiscan_count(hdass, multiscan_count)
        self._api.set_retrigger_mode(hdass, retrigger_mode)
        if frequency_hz is not None:
            self._api.set_retrigger_frequency(hdass, frequency_hz)
        if source is not None:
            self._api.set_retrigger(hdass, source)

set_wrap_mode

set_wrap_mode(hdass, mode)

olDaSetWrapMode.

Source code in src/dtollib/backend/dataacq.py
def set_wrap_mode(self, hdass: int, mode: int) -> None:
    """``olDaSetWrapMode``."""
    with self._lock:
        self._api.set_wrap_mode(hdass, mode)

simultaneous_pre_start

simultaneous_pre_start(hsslist)

olDaSimultaneousPreStart.

Source code in src/dtollib/backend/dataacq.py
def simultaneous_pre_start(self, hsslist: int) -> None:
    """``olDaSimultaneousPreStart``."""
    with self._lock:
        self._api.simultaneous_pre_start(hsslist)

simultaneous_start

simultaneous_start(hsslist)

olDaSimultaneousStart.

Source code in src/dtollib/backend/dataacq.py
def simultaneous_start(self, hsslist: int) -> None:
    """``olDaSimultaneousStart``."""
    with self._lock:
        self._api.simultaneous_start(hsslist)

start

start(hdass)

olDaStart — skipped for single-value subsystems.

Single-value mode has no run state: after olDaConfig the subsystem is ready for olDaGetSingleValue and olDaStart returns ECODE=27 ("Dataflow mismatch"). The session calls start unconditionally as a convention, so we make it a no-op there.

Source code in src/dtollib/backend/dataacq.py
def start(self, hdass: int) -> None:
    """``olDaStart`` — skipped for single-value subsystems.

    Single-value mode has no run state: after ``olDaConfig`` the
    subsystem is ready for ``olDaGetSingleValue`` and ``olDaStart``
    returns ECODE=27 ("Dataflow mismatch"). The session calls ``start``
    unconditionally as a convention, so we make it a no-op there.
    """
    with self._lock:
        if self._dataflow_mode.get(hdass) == OL_DF_SINGLEVALUE:
            return
        self._api.start(hdass)

stop

stop(hdass)

olDaStop (orderly).

Source code in src/dtollib/backend/dataacq.py
def stop(self, hdass: int) -> None:
    """``olDaStop`` (orderly)."""
    with self._lock:
        self._api.stop(hdass)

terminate

terminate(hdrvr)

Drop a reference to hdrvr; close on final release.

Source code in src/dtollib/backend/dataacq.py
def terminate(self, hdrvr: int) -> None:
    """Drop a reference to ``hdrvr``; close on final release."""
    with self._lock:
        name = self._hdrvr_to_name.get(hdrvr)
        if name is None:
            # Already terminated or never opened through this backend.
            # Closing an unknown HDRVR through the SDK would be a
            # bug; raise so the caller can investigate.
            self._api.terminate(hdrvr)
            return

        current_hdrvr, refcount = self._open_devices[name]
        if current_hdrvr != hdrvr:
            # Bookkeeping invariant: every entry in
            # ``_hdrvr_to_name`` matches the HDRVR stored in
            # ``_open_devices``.  A mismatch indicates the caller
            # is closing a stale or fabricated HDRVR.
            raise DtolResourceError(
                f"HDRVR {hdrvr} does not match the open handle for {name!r}",
                context=ErrorContext(operation="terminate", board=name),
            )
        if refcount > 1:
            self._open_devices[name] = (hdrvr, refcount - 1)
            return

        self._api.terminate(hdrvr)
        del self._open_devices[name]
        del self._hdrvr_to_name[hdrvr]

unmute

unmute(hdass)

olDaUnMute — release a muted D/A output.

Source code in src/dtollib/backend/dataacq.py
def unmute(self, hdass: int) -> None:
    """``olDaUnMute`` — release a muted D/A output."""
    with self._lock:
        self._api.unmute(hdass)

unregister_notification

unregister_notification(hdass, handle)

Detach the window handle, stop the pump, and drop the window.

Source code in src/dtollib/backend/dataacq.py
def unregister_notification(self, hdass: int, handle: object) -> None:
    """Detach the window handle, stop the pump, and drop the window."""
    del handle  # the strong ref is on us, not the caller
    with self._lock:
        with contextlib.suppress(Exception):
            self._api.set_wnd_handle(hdass, 0, 0)
        window = self._notification_wrappers.pop(id(hdass), None)
    # Join the pump thread outside the backend lock — close() blocks on
    # the thread, which must not contend with other backend calls.
    if isinstance(window, MessageWindow):
        window.close()

volts_to_bridge_based_sensor

volts_to_bridge_based_sensor(
    v_unstrained,
    v_strained,
    v_excitation,
    temperature_coefficient,
    gage_resistance,
    lead_resistance,
    rated_output_mv_per_v,
    shunt_correction,
)

olDaVoltsToBridgeBasedSensor passthrough (pure SDK conversion).

Source code in src/dtollib/backend/dataacq.py
def volts_to_bridge_based_sensor(
    self,
    v_unstrained: float,
    v_strained: float,
    v_excitation: float,
    temperature_coefficient: float,
    gage_resistance: float,
    lead_resistance: float,
    rated_output_mv_per_v: float,
    shunt_correction: float,
) -> float:
    """``olDaVoltsToBridgeBasedSensor`` passthrough (pure SDK conversion)."""
    return self._api.volts_to_bridge_based_sensor(
        v_unstrained,
        v_strained,
        v_excitation,
        temperature_coefficient,
        gage_resistance,
        lead_resistance,
        rated_output_mv_per_v,
        shunt_correction,
    )

volts_to_strain

volts_to_strain(
    config,
    v_unstrained,
    v_strained,
    v_excitation,
    gage_factor,
    gage_resistance,
    lead_resistance,
    poisson_ratio,
    shunt_correction,
)

olDaVoltsToStrain passthrough (pure SDK conversion, no lock needed).

Source code in src/dtollib/backend/dataacq.py
def volts_to_strain(
    self,
    config: int,
    v_unstrained: float,
    v_strained: float,
    v_excitation: float,
    gage_factor: float,
    gage_resistance: float,
    lead_resistance: float,
    poisson_ratio: float,
    shunt_correction: float,
) -> float:
    """``olDaVoltsToStrain`` passthrough (pure SDK conversion, no lock needed)."""
    return self._api.volts_to_strain(
        config,
        v_unstrained,
        v_strained,
        v_excitation,
        gage_factor,
        gage_resistance,
        lead_resistance,
        poisson_ratio,
        shunt_correction,
    )