Skip to content

Task specifications

TaskSpec is the typed declaration of one subsystem configured for one data-flow mode; TaskBuilder translates it into the ordered SDK setter sequence. See the Task specs guide for the narrative.

Spec and timing models

dtollib.tasks.spec

TaskSpec + supporting dataclasses — the configuration entry-point.

A single spec describes single-value, continuous, and finite tasks; Timing / :class:BufferPlan / :class:RawLogging carry the continuous-mode configuration consumed by :func:dtollib.streaming.record.

Design reference: docs/design.md §8.1 (TaskSpec), §8.7 (Timing), §8.7a (BufferPlan).

BufferPlan dataclass

BufferPlan(
    *,
    buffers=4,
    samples_per_buffer=1000,
    sample_width_bytes=None,
    wrap_mode=WrapMode.MULTIPLE,
    queue_strategy=QueueStrategy.REQUEUE,
)

SDK Ready / Inprocess / Done buffer plan for continuous tasks.

Consumed by the buffer pool behind :func:dtollib.streaming.record. Required when TaskSpec.data_flow in {CONTINUOUS, FINITE, *_PRETRIGGER, *_ABOUT_TRIGGER}; forbidden for SINGLE_VALUE.

Attributes:

Name Type Description
buffers int

Number of HBUFs in the Ready/Inprocess/Done cycle. Minimum 3; default 4 (matches QuickDAQ default).

samples_per_buffer int

Samples per channel per HBUF.

sample_width_bytes int | None

None → backend auto-detects from OLSSC_RETURNS_FLOATS + resolution.

wrap_mode WrapMode

MULTIPLE (continuous reuse) / SINGLE (DAC waveform) / NONE (finite).

queue_strategy QueueStrategy

How completed HBUFs return to the Ready queue.

__post_init__

__post_init__()

Enforce the hard minimum-3 floor.

Source code in src/dtollib/tasks/spec.py
def __post_init__(self) -> None:
    """Enforce the hard minimum-3 floor."""
    if self.buffers < _MIN_BUFFERS:
        raise DtolValidationError(
            f"BufferPlan.buffers must be >= {_MIN_BUFFERS} (got {self.buffers}); "
            "see docs/design.md §8.7a for the minimum-3 rationale",
            context=ErrorContext(operation="BufferPlan.__post_init__"),
        )
    if self.samples_per_buffer <= 0:
        raise DtolValidationError(
            f"BufferPlan.samples_per_buffer must be positive (got {self.samples_per_buffer})",
            context=ErrorContext(operation="BufferPlan.__post_init__"),
        )

RawLogging dataclass

RawLogging(
    *, path, include_metadata=True, compression=None
)

Driver-side raw-counts logging configuration.

Wired into :class:~dtollib.sinks.RawCountsSink.

Attributes:

Name Type Description
path Path

Output .dt-raw file path.

include_metadata bool

Embed task metadata in the file header.

compression None

Compression algorithm (currently always None).

RetriggerSpec dataclass

RetriggerSpec(
    *,
    mode=RetriggerMode.EXTRA,
    multiscan_count=1,
    frequency_hz=None,
    source=None,
)

Triggered-scan retrigger specification.

Wired by the :class:~dtollib.tasks.TaskBuilder into olDaSetTriggeredScanUsage + olDaSetMultiscanCount + olDaSetRetriggerMode (+ olDaSetRetriggerFrequency for INTERNAL, olDaSetRetrigger for EXTRA).

Attributes:

Name Type Description
mode RetriggerMode

Retrigger mode. Defaults to EXTRA per the SDK doc recommendation when both INTERNAL and EXTRA are supported.

multiscan_count int

Channel-list scans collected per trigger (>= 1).

frequency_hz float | None

Internal retrigger rate; required for RetriggerMode.INTERNAL, forbidden otherwise.

source TriggerSpec | None

Retrigger source trigger; required for RetriggerMode.EXTRA, forbidden otherwise.

__post_init__

__post_init__()

Validate mode-specific field requirements.

Source code in src/dtollib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate mode-specific field requirements."""
    if self.multiscan_count < 1:
        raise DtolValidationError(
            f"RetriggerSpec.multiscan_count must be >= 1 (got {self.multiscan_count})",
            context=ErrorContext(operation="RetriggerSpec.__post_init__"),
        )
    if self.mode == RetriggerMode.INTERNAL and self.frequency_hz is None:
        raise DtolValidationError(
            "RetriggerSpec.frequency_hz is required when mode is INTERNAL",
            context=ErrorContext(operation="RetriggerSpec.__post_init__"),
        )
    if self.mode != RetriggerMode.INTERNAL and self.frequency_hz is not None:
        raise DtolValidationError(
            "RetriggerSpec.frequency_hz is only valid when mode is INTERNAL",
            context=ErrorContext(operation="RetriggerSpec.__post_init__"),
        )
    if self.mode == RetriggerMode.EXTRA and self.source is None:
        raise DtolValidationError(
            "RetriggerSpec.source is required when mode is EXTRA",
            context=ErrorContext(operation="RetriggerSpec.__post_init__"),
        )

TaskSpec dataclass

TaskSpec(
    *,
    name,
    channels,
    board=None,
    subsystem_type=None,
    element=0,
    data_flow=DataFlow.SINGLE_VALUE,
    timing=None,
    trigger=SoftwareStart(),
    buffers=None,
    logging=None,
    stop_on_error=True,
    metadata=_empty_metadata(),
)

One DT-Open Layers subsystem configured for one data-flow mode.

A TaskSpec is a typed declaration; the :class:~dtollib.tasks.TaskBuilder translates it into the actual SDK call sequence.

Attributes:

Name Type Description
name str

Human-readable task identifier. Propagated to error contexts, log lines, and sink rows.

board str | None

DT-Open Layers board name (e.g. "DT9805(00)"). None = first discovered board.

subsystem_type SubsystemType | None

Explicit override for the subsystem to bind. Usually inferred from the channel kinds — see :meth:infer_subsystem_type.

element int

Subsystem element index on the board. 0 for the first AI subsystem; non-zero on boards with multiple subsystems of the same type.

channels Sequence[ChannelSpec]

Ordered channel specs. Empty is rejected. All channels must share a subsystem type — mixing voltage and DO in one task is a validation error.

data_flow DataFlow

One of :class:DataFlow.

timing Timing | None

Required for non-SINGLE_VALUE; forbidden for it.

trigger TriggerSpec

Defaults to :class:SoftwareStart; the full trigger hierarchy is supported.

buffers BufferPlan | None

Required for non-SINGLE_VALUE; forbidden for it.

logging RawLogging | None

Optional driver-side raw-counts logging.

stop_on_error bool

SDK-level olDaSetStopOnError. Orthogonal to recorder-level :class:ErrorPolicy; see docs/design.md §14.3.

metadata Mapping[str, str | int | float | bool]

Free-form task-level metadata.

__post_init__

__post_init__()

Validate the spec against the §8.1 / §4.2 matrix.

Source code in src/dtollib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate the spec against the §8.1 / §4.2 matrix."""
    if not self.name:
        raise DtolValidationError(
            "TaskSpec.name must be a non-empty string",
            context=ErrorContext(operation="TaskSpec.__post_init__"),
        )
    if not self.channels:
        raise DtolValidationError(
            f"TaskSpec(name={self.name!r}).channels is empty; at least one channel is required",
            context=ErrorContext(
                operation="TaskSpec.__post_init__",
                task_name=self.name,
            ),
        )

    # Channel-kind homogeneity: one HDASS = one subsystem type.
    inferred = self.infer_subsystem_type()
    if self.subsystem_type is not None and self.subsystem_type != inferred:
        raise DtolValidationError(
            f"TaskSpec.subsystem_type={self.subsystem_type.value} conflicts "
            f"with inferred type {inferred.value} from the channel kinds",
            context=ErrorContext(
                operation="TaskSpec.__post_init__",
                task_name=self.name,
            ),
        )

    # data_flow / timing / buffers matrix — see implementation-plan.md §4.2.
    self._validate_data_flow_matrix()

    # Wrap metadata to enforce immutability.
    if not isinstance(self.metadata, MappingProxyType):
        object.__setattr__(self, "metadata", MappingProxyType(dict(self.metadata)))

infer_subsystem_type

infer_subsystem_type()

Derive the subsystem kind from the channel kinds.

Returns:

Type Description
SubsystemType

The implied :class:SubsystemType.

Raises:

Type Description
DtolValidationError

Channels span multiple subsystem kinds or include an unrecognised kind. One HDASS is one subsystem of one type — mixing is illegal.

Source code in src/dtollib/tasks/spec.py
def infer_subsystem_type(self) -> SubsystemType:
    """Derive the subsystem kind from the channel kinds.

    Returns:
        The implied :class:`SubsystemType`.

    Raises:
        DtolValidationError: Channels span multiple subsystem kinds
            or include an unrecognised kind.  One HDASS is one
            subsystem of one type — mixing is illegal.
    """
    inferred: SubsystemType | None = None
    for channel in self.channels:
        kind_type = _channel_subsystem_type(channel)
        if inferred is None:
            inferred = kind_type
        elif inferred != kind_type:
            raise DtolValidationError(
                f"TaskSpec(name={self.name!r}).channels mixes subsystem "
                f"kinds ({inferred.value} vs {kind_type.value}); "
                "split into separate TaskSpecs (one HDASS per type).",
                context=ErrorContext(
                    operation="TaskSpec.infer_subsystem_type",
                    task_name=self.name,
                ),
            )
    if inferred is None:
        raise DtolValidationError(
            f"TaskSpec(name={self.name!r}).channels is empty; at least one channel is required",
            context=ErrorContext(
                operation="TaskSpec.infer_subsystem_type",
                task_name=self.name,
            ),
        )
    return inferred

Timing dataclass

Timing(
    *,
    rate_hz,
    clock_source=ClockSource.INTERNAL,
    external_divider=None,
    retrigger=None,
    samples_per_channel=None,
)

Sample-clock + retrigger configuration for a continuous task.

Required when TaskSpec.data_flow != SINGLE_VALUE; forbidden otherwise (single-value reads ignore the timing engine).

Attributes:

Name Type Description
rate_hz float

Configured sample rate, in hertz. Driven via olDaSetClockFrequency for ClockSource.INTERNAL; interpreted as the external-clock divider's input rate otherwise.

clock_source ClockSource

Internal vs external clock selection.

external_divider int | None

Required when clock_source == EXTERNAL; forbidden otherwise.

retrigger RetriggerSpec | None

Optional triggered-scan retrigger specification. None = no retrigger.

samples_per_channel class-attribute instance-attribute

samples_per_channel = None

Optional sample ceiling. Required for DataFlow.FINITE; the recorder stops when the cumulative emitted sample count reaches this value. Forbidden (None) for CONTINUOUS — that mode runs until stop().

__post_init__

__post_init__()

Validate the external-divider / clock-source / samples-ceiling combos.

Source code in src/dtollib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate the external-divider / clock-source / samples-ceiling combos."""
    if self.rate_hz <= 0.0:
        raise DtolValidationError(
            f"Timing.rate_hz must be positive, got {self.rate_hz}",
            context=ErrorContext(operation="Timing.__post_init__"),
        )
    if self.clock_source == ClockSource.EXTERNAL and self.external_divider is None:
        raise DtolValidationError(
            "Timing.external_divider is required when clock_source is EXTERNAL",
            context=ErrorContext(operation="Timing.__post_init__"),
        )
    if self.clock_source == ClockSource.INTERNAL and self.external_divider is not None:
        raise DtolValidationError(
            "Timing.external_divider is forbidden when clock_source is INTERNAL",
            context=ErrorContext(operation="Timing.__post_init__"),
        )
    if self.samples_per_channel is not None and self.samples_per_channel <= 0:
        raise DtolValidationError(
            f"Timing.samples_per_channel must be positive (got {self.samples_per_channel})",
            context=ErrorContext(operation="Timing.__post_init__"),
        )

dtollib.tasks.models

Public enums + payload dataclasses for task / channel / lifecycle modeling.

:class:DaqBlock is a full frozen dataclass, and :class:DaqSample + :class:SdkEventKind back the continuous-bridge path.

Design reference: docs/design.md §8 (enums), §8.9 (DaqReading), §8.10 (DaqBlock), §8.11 (DaqSample), §12.3.2 (SdkEventKind).

BufferState

Bases: StrEnum

Per-HBUF lifecycle — tracked on the internal RawBuffer.

Borrowed from OIBuffer.State. Promotes use-after-free into an explicit error and lets the pool refuse free_all() while any buffer is INPROCESS (the §12.3.2 shutdown invariant).

FILLED is the output-pool addition: an HBUF that has been written with a waveform chunk (olDmCopyToBuffer) but not yet queued. It encodes the Fill-before-Queue invariant — an output buffer must be filled before put_buffer (see :class:~dtollib.backend._buffer_pool.BufferPool).

ClockSource

Bases: StrEnum

Timing.clock_source discriminator.

CounterMode

Bases: StrEnum

Counter/timer operation mode — maps to the SDK OL_CTMODE_* family.

Carried as a ClassVar on each counter channel spec so the :class:~dtollib.tasks.TaskBuilder dispatches olDaSetCTMode without branching on the concrete spec class.

DaqBlock dataclass

DaqBlock(
    *,
    device,
    channels,
    data,
    task=None,
    raw_codes=None,
    cjc_data=None,
    block_index,
    first_sample_index,
    samples_per_channel,
    sample_rate_hz=None,
    block_period_ns=None,
    task_started_at,
    t0,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns=None,
    read_started_at,
    read_finished_at,
    elapsed_s,
    units=_empty_block_units(),
    is_linearised=False,
    sensor_status=_empty_block_sensor_status(),
    error=None,
)

One hardware-clocked acquisition buffer copied off the SDK Done queue.

Constructed by the §12.3.2 callback-bridge drainer thread after olDaGetBuffer + olDmGetBufferPtr. The data array is a drainer-owned copy (the SDK HBUF is recycled before the block leaves the drainer); the array is marked read-only so downstream sinks cannot mutate it in place.

Sample-time reconstruction: derive each sample's t_mono_ns from block.t_mono_ns + k * block.block_period_ns and t_utc analogously. Use first_sample_index + k as the absolute sample index across the whole run. Do not interpolate off t0 — it carries scheduler jitter.

Attributes:

Name Type Description
device str

DtolManager.add(name=...) value; join key with siblings.

task str | None

Underlying TaskSpec.name.

channels tuple[str, ...]

Channel display names in array-row order; data[i] is channels[i].

data NDArray[float64]

Converted samples, shape (len(channels), samples_per_channel), dtype float64. NaN at positions where sensor_status is non-OK.

raw_codes NDArray[signedinteger[Any]] | None

Original SDK codes, shape matches data, dtype int16 or int32. Populated when RawLogging is configured or the backend retains them for replay.

cjc_data NDArray[float64] | None

CJC stream, shape matches data, dtype float64. Populated when olDaSetReturnCjcTemperatureInStream is enabled on a subsystem with OLSSC_SUP_INTERLEAVED_CJC_IN_STREAM.

block_index int

0-based monotonic per task.

first_sample_index int

Cumulative offset since task_started_at.

samples_per_channel int

data.shape[1] — duplicated for ergonomic access without indexing into the ndarray shape.

sample_rate_hz float | None

Actual clock rate read back via olDaGetClockFrequency after configure (the SDK may quantise the requested rate).

block_period_ns int | None

round(1e9 / sample_rate_hz) — ns per sample.

task_started_at datetime

Wall-clock anchor for sample-time reconstruction.

t0 datetime

Wall clock at the first sample of THIS block.

t_mono_ns int

Monotonic ns at callback receipt (drainer thread).

t_utc datetime

Wall clock at the block midpoint — the timestamp consumers should plot against.

t_midpoint_mono_ns int | None

Block-midpoint in monotonic ns.

read_started_at datetime

Drainer-thread wall clock at olDaGetBuffer start.

read_finished_at datetime

Drainer-thread wall clock after copy + requeue.

elapsed_s float

(read_finished_at - read_started_at).total_seconds().

units Mapping[str, str | None]

Channel name → engineering unit ("V" / "degC" / ...).

is_linearised bool

True when data holds engineering units (volts / °C) produced by the drainer's code→units conversion; False when data holds raw ADC codes cast to float (the unconverted fallback, e.g. RawCountsSink / replay). Sinks and the replay tool key off this rather than guessing from values.

sensor_status Mapping[str, NDArray[int8]]

Channel name → int8 mask, same length as samples_per_channel, encoded with :class:SensorStatus ordinals. Absent channels imply all-OK; the matching positions in data are NaN.

error DtolError | None

None under :attr:ErrorPolicy.RAISE; otherwise the wrapped :class:~dtollib.errors.DtolError for the failed read. When set, data is zero-filled to the expected shape.

n_channels property

n_channels

Number of channels — len(self.channels).

__getstate__

__getstate__()

Unwrap MappingProxyType views so the block can be pickled.

Source code in src/dtollib/tasks/models.py
def __getstate__(self) -> dict[str, Any]:
    """Unwrap MappingProxyType views so the block can be pickled."""
    return _slots_to_picklable_state(self, _DAQ_BLOCK_MAPPING_FIELDS)

__post_init__

__post_init__()

Validate array shapes and freeze mapping + ndarray mutability.

Source code in src/dtollib/tasks/models.py
def __post_init__(self) -> None:
    """Validate array shapes and freeze mapping + ndarray mutability."""
    n_channels = len(self.channels)
    expected_shape = (n_channels, self.samples_per_channel)
    if self.data.shape != expected_shape:
        raise DtolValidationError(
            f"DaqBlock.data shape {self.data.shape} does not match "
            f"(len(channels)={n_channels}, samples_per_channel="
            f"{self.samples_per_channel})",
            context=ErrorContext(operation="DaqBlock.__post_init__"),
        )
    if self.raw_codes is not None and self.raw_codes.shape != expected_shape:
        raise DtolValidationError(
            f"DaqBlock.raw_codes shape {self.raw_codes.shape} does not "
            f"match data shape {expected_shape}",
            context=ErrorContext(operation="DaqBlock.__post_init__"),
        )
    if self.cjc_data is not None and self.cjc_data.shape != expected_shape:
        raise DtolValidationError(
            f"DaqBlock.cjc_data shape {self.cjc_data.shape} does not "
            f"match data shape {expected_shape}",
            context=ErrorContext(operation="DaqBlock.__post_init__"),
        )
    for ch_name, mask in self.sensor_status.items():
        if mask.shape != (self.samples_per_channel,):
            raise DtolValidationError(
                f"DaqBlock.sensor_status[{ch_name!r}] shape {mask.shape} "
                f"does not match (samples_per_channel="
                f"{self.samples_per_channel},)",
                context=ErrorContext(operation="DaqBlock.__post_init__"),
            )
    if self.block_index < 0:
        raise DtolValidationError(
            f"DaqBlock.block_index must be >= 0 (got {self.block_index})",
            context=ErrorContext(operation="DaqBlock.__post_init__"),
        )
    if self.first_sample_index < 0:
        raise DtolValidationError(
            f"DaqBlock.first_sample_index must be >= 0 (got {self.first_sample_index})",
            context=ErrorContext(operation="DaqBlock.__post_init__"),
        )

    # Lock arrays read-only so sinks cannot mutate the drainer's copy.
    self.data.setflags(write=False)
    if self.raw_codes is not None:
        self.raw_codes.setflags(write=False)
    if self.cjc_data is not None:
        self.cjc_data.setflags(write=False)
    for mask in self.sensor_status.values():
        mask.setflags(write=False)

    # Freeze mappings.
    if not isinstance(self.units, MappingProxyType):
        object.__setattr__(self, "units", MappingProxyType(dict(self.units)))
    if not isinstance(self.sensor_status, MappingProxyType):
        object.__setattr__(
            self,
            "sensor_status",
            MappingProxyType(dict(self.sensor_status)),
        )

__setstate__

__setstate__(state)

Restore slotted fields and re-run shape/freeze validation.

Source code in src/dtollib/tasks/models.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Restore slotted fields and re-run shape/freeze validation."""
    _restore_slots_state(self, state)
    self.__post_init__()

DaqReading dataclass

DaqReading(
    *,
    device,
    task=None,
    values=_empty_values(),
    units=_empty_units(),
    requested_at,
    received_at,
    t_utc,
    t_mono_ns,
    t_midpoint_mono_ns=None,
    latency_s,
    sensor_status=_empty_sensor_status(),
    metadata=_empty_metadata(),
    error=None,
)

One scalar reading across the channels of a single-value task.

Field shape matches :class:nidaqlib.DaqReading / :class:alicatlib.Sample / :class:sartoriuslib.Sample for cross-instrument joinability: the canonical key is (device, t_mono_ns).

Attributes:

Name Type Description
device str

DtolManager.add(name=...) value, or the :class:~dtollib.tasks.TaskSpec.name for ad-hoc sessions. Join key with sibling-library samples.

task str | None

Underlying TaskSpec.name. Distinct from device when one manager entry hosts a renamed task.

values Mapping[str, float | int | bool]

Channel name → scalar value mapping. TC sentinels appear as float("nan") here; the explanation lives in sensor_status (see docs/design.md §13.1).

units Mapping[str, str | None]

Channel name → engineering unit string ("V", "degC", ...). None for unit-less channels.

requested_at datetime

Wall clock at the start of the poll call.

received_at datetime

Wall clock at the end of the poll call.

t_utc datetime

Wall-clock midpoint of the integration window. This is the timestamp downstream consumers should plot against.

t_mono_ns int

Monotonic nanoseconds at the start of the poll call. Canonical join key with sibling-library samples.

t_midpoint_mono_ns int | None

Optional monotonic-ns midpoint when the backend can report it. This is None for single-value reads — the simultaneous-sample-and-hold devices we target return one value per channel from one acquisition window.

latency_s float

(received_at - requested_at).total_seconds().

sensor_status Mapping[str, SensorStatus]

Channel name → :class:SensorStatus. Only channels with non-OK status appear; absent entries imply OK.

metadata Mapping[str, str | int | float | bool]

Free-form key/value metadata propagated by the session and the channel specs.

error DtolError | None

None under :attr:ErrorPolicy.RAISE; otherwise the wrapped :class:~dtollib.errors.DtolError for the failed poll.

__getstate__

__getstate__()

Unwrap MappingProxyType views — they are not pickle-friendly.

Source code in src/dtollib/tasks/models.py
def __getstate__(self) -> dict[str, Any]:
    """Unwrap MappingProxyType views — they are not pickle-friendly."""
    return _slots_to_picklable_state(self, _DAQ_READING_MAPPING_FIELDS)

__post_init__

__post_init__()

Freeze the mapping fields so callers cannot mutate them post-hoc.

Source code in src/dtollib/tasks/models.py
def __post_init__(self) -> None:
    """Freeze the mapping fields so callers cannot mutate them post-hoc."""
    for name in ("values", "units", "sensor_status", "metadata"):
        current = getattr(self, name)
        if not isinstance(current, MappingProxyType):
            object.__setattr__(self, name, MappingProxyType(dict(current)))

__setstate__

__setstate__(state)

Restore slotted fields and re-wrap mappings via :meth:__post_init__.

Source code in src/dtollib/tasks/models.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Restore slotted fields and re-wrap mappings via :meth:`__post_init__`."""
    _restore_slots_state(self, state)
    self.__post_init__()

to_dict

to_dict()

JSON-friendly mapping suitable for the row-oriented sinks.

Source code in src/dtollib/tasks/models.py
def to_dict(self) -> dict[str, Any]:
    """JSON-friendly mapping suitable for the row-oriented sinks."""
    return {
        "device": self.device,
        "task": self.task,
        "values": dict(self.values),
        "units": dict(self.units),
        "requested_at": self.requested_at.isoformat(),
        "received_at": self.received_at.isoformat(),
        "t_utc": self.t_utc.isoformat(),
        "t_mono_ns": self.t_mono_ns,
        "t_midpoint_mono_ns": self.t_midpoint_mono_ns,
        "latency_s": self.latency_s,
        "sensor_status": {k: v.value for k, v in self.sensor_status.items()},
        "metadata": dict(self.metadata),
        "error": None if self.error is None else str(self.error),
    }

DaqSample dataclass

DaqSample(
    *,
    device,
    channel,
    value,
    sample_index,
    block_index,
    t_mono_ns,
    t_utc,
    task=None,
    unit=None,
    sensor_status=SensorStatus.OK,
    is_linearised=False,
    metadata=_empty_metadata(),
)

One (channel, sample) pair scalarised from a :class:DaqBlock.

Produced explicitly via :func:block_to_long_rows. Useful for row-oriented sinks (CSV, JSONL, Postgres) that prefer one row per measurement over a blob column. Carries the same join contract as :class:DaqReading so long-form DAQ rows can be merged with sibling-library samples on (device, t_mono_ns).

is_linearised is inherited from the source :class:DaqBlock: it is True when value is in engineering units (volts / °C) and False when it is a raw ADC code cast to float. Row-oriented sinks persist it so the on-disk row is self-describing.

__getstate__

__getstate__()

Unwrap MappingProxyType so the sample can be pickled.

Source code in src/dtollib/tasks/models.py
def __getstate__(self) -> dict[str, Any]:
    """Unwrap MappingProxyType so the sample can be pickled."""
    return _slots_to_picklable_state(self, _DAQ_SAMPLE_MAPPING_FIELDS)

__post_init__

__post_init__()

Freeze the metadata mapping.

Source code in src/dtollib/tasks/models.py
def __post_init__(self) -> None:
    """Freeze the metadata mapping."""
    if not isinstance(self.metadata, MappingProxyType):
        object.__setattr__(
            self,
            "metadata",
            MappingProxyType(dict(self.metadata)),
        )

__setstate__

__setstate__(state)

Restore slotted fields and re-wrap the metadata mapping.

Source code in src/dtollib/tasks/models.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Restore slotted fields and re-wrap the metadata mapping."""
    _restore_slots_state(self, state)
    self.__post_init__()

to_dict

to_dict()

JSON-friendly mapping suitable for row-oriented sinks.

Source code in src/dtollib/tasks/models.py
def to_dict(self) -> dict[str, Any]:
    """JSON-friendly mapping suitable for row-oriented sinks."""
    return {
        "device": self.device,
        "task": self.task,
        "channel": self.channel,
        "value": self.value,
        "unit": self.unit,
        "sample_index": self.sample_index,
        "block_index": self.block_index,
        "t_mono_ns": self.t_mono_ns,
        "t_utc": self.t_utc.isoformat(),
        "sensor_status": self.sensor_status.value,
        "is_linearised": self.is_linearised,
        "metadata": dict(self.metadata),
    }

DataFlow

Bases: StrEnum

SDK data-flow modes for a configured subsystem.

FINITE is implemented on top of CONTINUOUS + WrapMode.NONE plus a sample ceiling — the SDK has no dedicated finite mode, but the recorder stops when the cumulative sample count is reached.

The two *_PRETRIGGER / *_ABOUT_TRIGGER modes are flagged "Legacy Devices" in the SDK manual and deferred past v0.1.

Edge

Bases: StrEnum

Digital / threshold trigger slope.

GateType

Bases: StrEnum

Counter gate-enable logic — maps to the SDK OL_GATE_* family.

IOType

Bases: StrEnum

Channel measurement-kind discriminator from SupportedChannelInfo.IOType.

Carried on CapabilitySet.channel_caps[ch]["IOType"] so the wrapper can reject "configure channel 3 as RTD" when that channel reports VOLTAGE_IN only.

MULTI_SENSOR is the DT9805 case: one physical channel that the SDK re-types at configure time based on what's wired to it.

PulseType

Bases: StrEnum

Pulse-output polarity — maps to the SDK OL_PULSETYPE_* family.

QuadratureDecodeMode

Bases: StrEnum

Quadrature decoder count multiplier (counts per encoder line).

QueueStrategy

Bases: StrEnum

How completed HBUFs are returned to the Ready queue.

RetriggerMode

Bases: StrEnum

RetriggerSpec.mode — triggered-scan acquisition mode.

SdkEventKind

Bases: StrEnum

SDK notification-procedure message kinds delivered to the callback bridge.

Eleven distinct event types arrive on the same olDaSetNotificationProcedure callback; the drainer dispatches on the kind. BUFFER_DONE is the happy path; OVERRUN_ERROR / UNDERRUN_ERROR / TRIGGER_ERROR are wrapped into typed exceptions (or routed per ErrorPolicy); BUFFER_REUSED means data was overwritten in WrapMode.MULTIPLE and is logged at WARNING; QUEUE_DONE / QUEUE_STOPPED / IO_COMPLETE signal end-of-run; PRETRIGGER_BUFFER_DONE / EVENT_DONE / MEASURE_DONE are subsystem-specific and routed to dedicated handlers.

SensorStatus

Bases: StrEnum

Per-channel sentinel status preserved through scalarisation.

TC channels can produce sentinel float values that must NOT be coerced into plausible measurements. The recorder writes the sentinel to a sensor_status overlay on the reading / block and replaces the data cell with NaN, so downstream sinks never silently log "23.4 °C" for an open thermocouple.

SubsystemState

Bases: StrEnum

Canonical subsystem state — exposed via DtolSession.state.

Borrowed from the .NET API's SubsystemBase.State. The SDK already tracks this; surfacing it instead of synthesising it from is_running() plus implicit flags lets tests assert exact transitions and lets error messages name the precise lifecycle phase.

SubsystemType

Bases: StrEnum

SDK subsystem types — one HDASS is one subsystem of one type.

WrapMode

Bases: StrEnum

BufferPlan wrap mode.

NONE = finite acquisition (stop after one fill of the queued buffers). SINGLE = DAC waveform (loop a single buffer). MULTIPLE = standard continuous (re-queue completed buffers).

block_to_long_rows

block_to_long_rows(block)

Yield one :class:DaqSample per (channel, sample) pair in block.

Reconstructs each sample's monotonic timestamp from block.t_mono_ns + k * block.block_period_ns (constant if block_period_ns is None — only the block-level timestamp is used). Sensor-status masks are decoded back into :class:SensorStatus values per sample.

Yields n_channels * samples_per_channel samples in (channel-major, sample-minor) order.

Source code in src/dtollib/tasks/models.py
def block_to_long_rows(block: DaqBlock) -> Iterator[DaqSample]:
    """Yield one :class:`DaqSample` per (channel, sample) pair in ``block``.

    Reconstructs each sample's monotonic timestamp from
    ``block.t_mono_ns + k * block.block_period_ns`` (constant if
    ``block_period_ns`` is ``None`` — only the block-level timestamp is
    used). Sensor-status masks are decoded back into :class:`SensorStatus`
    values per sample.

    Yields ``n_channels * samples_per_channel`` samples in (channel-major,
    sample-minor) order.
    """
    period_ns = block.block_period_ns or 0
    # Mask values are SensorStatus ordinals — index into the declaration order.
    status_order = list(SensorStatus)
    for ch_index, ch_name in enumerate(block.channels):
        row = block.data[ch_index]
        mask = block.sensor_status.get(ch_name)
        unit = block.units.get(ch_name)
        for k in range(block.samples_per_channel):
            if mask is not None:
                status_ord = int(mask[k])
                status = (
                    status_order[status_ord]
                    if 0 <= status_ord < len(status_order)
                    else SensorStatus.OK
                )
            else:
                status = SensorStatus.OK
            yield DaqSample(
                device=block.device,
                task=block.task,
                channel=ch_name,
                value=float(row[k]),
                unit=unit,
                sample_index=block.first_sample_index + k,
                block_index=block.block_index,
                t_mono_ns=block.t_mono_ns + k * period_ns,
                t_utc=block.t_utc,
                sensor_status=status,
                is_linearised=block.is_linearised,
            )

Builder

dtollib.tasks.builder

TaskBuilder — translates a :class:TaskSpec into ordered backend calls.

The builder is the single place that knows the legal ordering of SDK configuration calls. It provides the single-value sequence and the continuous-mode sequence with channel-list + timing + trigger + buffer-pool setup.

Critical invariant (docs/design.md §8.5a): on IOType.MULTI_SENSOR channels, set_multi_sensor_type MUST be called BEFORE any per-type setter on that channel. The builder enforces this unconditionally; the fake backend rejects out-of-order calls.

Design reference: docs/implementation-plan.md §4.3.

TaskBuilder

TaskBuilder(backend)

Translate a :class:TaskSpec into ordered backend calls.

The builder is stateless other than the references it captures. It does not own the HDASS; callers (typically :class:~dtollib.tasks.DtolSession) keep the handle and pass it in.

Source code in src/dtollib/tasks/builder.py
def __init__(self, backend: DtolBackend) -> None:
    self._backend = backend

configure_continuous

configure_continuous(hdass, spec, capabilities)

Run the continuous-mode pre-commit configuration sequence.

Stops short of commit() — the §12.3.2 ordering requires notification registration and buffer queueing BEFORE olDaConfig. The callback bridge / record() drives the commit step after wiring its bridge.

Sequence (docs/implementation-plan.md §5.7):

  1. set_data_flow(OL_DF_CONTINUOUS) (or OL_DF_CONTINUOUS_*).
  2. For each channel: MULTI_SENSOR retype if needed, then add_channel(...).
  3. set_channel_list([phys, ...]) — drives the continuous-mode channel list separately from the gain-list entries.
  4. set_clock(...).
  5. set_trigger(...).
  6. set_wrap_mode(...).
  7. set_stop_on_error(...).

Parameters:

Name Type Description Default
hdass int

Subsystem handle.

required
spec TaskSpec

Task spec with data_flow in {CONTINUOUS, FINITE}.

required
capabilities CapabilitySet

Subsystem capability snapshot — drives the MULTI_SENSOR dispatch.

required

Raises:

Type Description
DtolTaskStateError

If spec is not in a continuous mode.

Source code in src/dtollib/tasks/builder.py
def configure_continuous(
    self,
    hdass: int,
    spec: TaskSpec,
    capabilities: CapabilitySet,
) -> None:
    """Run the continuous-mode pre-commit configuration sequence.

    Stops short of ``commit()`` — the §12.3.2 ordering requires
    notification registration and buffer queueing BEFORE
    ``olDaConfig``. The callback bridge / ``record()`` drives the
    commit step after wiring its bridge.

    Sequence (docs/implementation-plan.md §5.7):

    1. ``set_data_flow(OL_DF_CONTINUOUS)`` (or ``OL_DF_CONTINUOUS_*``).
    2. For each channel: MULTI_SENSOR retype if needed, then
       ``add_channel(...)``.
    3. ``set_channel_list([phys, ...])`` — drives the continuous-mode
       channel list separately from the gain-list entries.
    4. ``set_clock(...)``.
    5. ``set_trigger(...)``.
    6. ``set_wrap_mode(...)``.
    7. ``set_stop_on_error(...)``.

    Args:
        hdass: Subsystem handle.
        spec: Task spec with ``data_flow`` in {CONTINUOUS, FINITE}.
        capabilities: Subsystem capability snapshot — drives the
            MULTI_SENSOR dispatch.

    Raises:
        DtolTaskStateError: If ``spec`` is not in a continuous mode.
    """
    if spec.data_flow == DataFlow.SINGLE_VALUE:
        from dtollib.errors import (  # noqa: PLC0415
            DtolTaskStateError,
            ErrorContext,
        )

        raise DtolTaskStateError(
            "configure_continuous: data_flow=single_value; "
            "use configure_single_value() instead",
            context=ErrorContext(
                operation="TaskBuilder.configure_continuous",
                task_name=spec.name,
            ),
        )
    if spec.timing is None or spec.buffers is None:
        from dtollib.errors import (  # noqa: PLC0415
            DtolTaskStateError,
            ErrorContext,
        )

        raise DtolTaskStateError(
            "configure_continuous requires both Timing and BufferPlan",
            context=ErrorContext(
                operation="TaskBuilder.configure_continuous",
                task_name=spec.name,
            ),
        )

    self._backend.set_data_flow(hdass, DATA_FLOW_TO_OL[spec.data_flow])

    # Per-channel configuration (mirrors single-value path).
    for list_index, channel in enumerate(spec.channels):
        _require_io_type_supported(channel, capabilities)
        if capabilities.supports_multisensor:
            io_type = channel.kind_to_multi_sensor_type()
            self._backend.set_multi_sensor_type(hdass, channel.physical_channel, io_type)
        self._backend.add_channel(hdass, list_index, channel)

    # Continuous channel-list: flat list of physical channel indices.
    self._backend.set_channel_list(hdass, [c.physical_channel for c in spec.channels])

    # Clock.
    clock_source = (
        OL_CLK_INTERNAL if spec.timing.clock_source == ClockSource.INTERNAL else OL_CLK_EXTERNAL
    )
    self._backend.set_clock(
        hdass,
        rate_hz=spec.timing.rate_hz,
        clock_source=clock_source,
        external_divider=spec.timing.external_divider,
    )

    # Trigger.
    kind, threshold_channel, threshold_level = _trigger_to_sdk(spec.trigger)
    self._backend.set_trigger(
        hdass,
        kind=kind,
        threshold_channel=threshold_channel,
        threshold_level=threshold_level,
    )

    # Triggered-scan retrigger.  When the timing carries a
    # RetriggerSpec the SDK collects ``multiscan_count`` scans per trigger
    # at the configured retrigger source/rate.
    if spec.timing.retrigger is not None:
        self._configure_retrigger(hdass, spec.timing.retrigger)

    # Wrap mode (NONE for FINITE; MULTIPLE for continuous; SINGLE for DAC).
    wrap = _wrap_mode_to_sdk(spec.buffers.wrap_mode)
    self._backend.set_wrap_mode(hdass, wrap)

    # DMA usage. The SDK requires olDaSetDmaUsage(min(1, NUMDMACHANS))
    # for continuous mode even when the subsystem reports zero DMA
    # channels (the DT9805/06 report NUMDMACHANS==0 yet still need the
    # call — docs/decisions.md). Pass 1 when DMA is available, else 0,
    # and always make the call.
    self._backend.set_dma_usage(hdass, 1 if capabilities.supports_dma else 0)

    self._backend.set_stop_on_error(hdass, spec.stop_on_error)

configure_counter

configure_counter(hdass, spec, capabilities)

Configure a counter/timer, quadrature, or tachometer subsystem.

Counter subsystems are read on demand after :meth:start; there is no channel/gain list or sample clock to set up. The critical ordering invariant is C/T mode firstolDaSetCTMode re-types the counter and must precede gate / pulse / edge setters. The fake backend rejects out-of-order calls.

Parameters:

Name Type Description Default
hdass int

Subsystem handle.

required
spec TaskSpec

Task spec whose channels are counter/quadrature/tachometer.

required
capabilities CapabilitySet

Subsystem capability snapshot (unused today; kept for signature parity with the AI/continuous paths).

required
Source code in src/dtollib/tasks/builder.py
def configure_counter(
    self,
    hdass: int,
    spec: TaskSpec,
    capabilities: CapabilitySet,
) -> None:
    """Configure a counter/timer, quadrature, or tachometer subsystem.

    Counter subsystems are read on demand after :meth:`start`; there is
    no channel/gain list or sample clock to set up.  The critical
    ordering invariant is **C/T mode first** — ``olDaSetCTMode`` re-types
    the counter and must precede gate / pulse / edge setters.  The fake
    backend rejects out-of-order calls.

    Args:
        hdass: Subsystem handle.
        spec: Task spec whose channels are counter/quadrature/tachometer.
        capabilities: Subsystem capability snapshot (unused today; kept
            for signature parity with the AI/continuous paths).
    """
    for channel in spec.channels:
        _require_counter_mode_supported(_counter_mode_for(channel), capabilities, channel)
        self._configure_counter_channel(hdass, channel)
    self._backend.set_stop_on_error(hdass, spec.stop_on_error)
    self._backend.commit(hdass)

configure_single_value

configure_single_value(hdass, spec, capabilities)

Run the single-value configuration sequence.

Sequence (docs/implementation-plan.md §4.3):

  1. set_data_flow(OL_DF_SINGLEVALUE).
  2. For each channel: a. If the channel is MULTI_SENSOR per the capability set, set_multi_sensor_type(...) FIRST. b. add_channel(...) — drives olDaSetChannelType + olDaSetChannelRange + olDaSetGainListEntry (+ olDaSetThermocoupleType).
  3. set_stop_on_error(spec.stop_on_error).
  4. commit()olDaConfig.

Parameters:

Name Type Description Default
hdass int

Subsystem handle from :meth:~dtollib.backend.DtolBackend.get_dass.

required
spec TaskSpec

Task specification. Must have data_flow == DataFlow.SINGLE_VALUE — validated by TaskSpec.__post_init__ plus an explicit assert here for the type narrower.

required
capabilities CapabilitySet

Live capability snapshot for the subsystem. Drives the MULTI_SENSOR dispatch.

required
Source code in src/dtollib/tasks/builder.py
def configure_single_value(
    self,
    hdass: int,
    spec: TaskSpec,
    capabilities: CapabilitySet,
) -> None:
    """Run the single-value configuration sequence.

    Sequence (docs/implementation-plan.md §4.3):

    1. ``set_data_flow(OL_DF_SINGLEVALUE)``.
    2. For each channel:
       a. If the channel is MULTI_SENSOR per the capability set,
          ``set_multi_sensor_type(...)`` FIRST.
       b. ``add_channel(...)`` — drives
          ``olDaSetChannelType`` + ``olDaSetChannelRange`` +
          ``olDaSetGainListEntry`` (+ ``olDaSetThermocoupleType``).
    3. ``set_stop_on_error(spec.stop_on_error)``.
    4. ``commit()`` — ``olDaConfig``.

    Args:
        hdass: Subsystem handle from
            :meth:`~dtollib.backend.DtolBackend.get_dass`.
        spec: Task specification.  Must have
            ``data_flow == DataFlow.SINGLE_VALUE`` — validated by
            ``TaskSpec.__post_init__`` plus an explicit assert
            here for the type narrower.
        capabilities: Live capability snapshot for the
            subsystem.  Drives the MULTI_SENSOR dispatch.
    """
    if spec.data_flow != DataFlow.SINGLE_VALUE:
        # TaskSpec validation should have caught this.  Belt + braces.
        from dtollib.errors import (  # noqa: PLC0415
            DtolTaskStateError,
            ErrorContext,
        )

        raise DtolTaskStateError(
            f"configure_single_value: data_flow={spec.data_flow.value}; "
            "use configure_continuous() for non-single-value tasks",
            context=ErrorContext(
                operation="TaskBuilder.configure_single_value",
                task_name=spec.name,
            ),
        )

    self._backend.set_data_flow(hdass, DATA_FLOW_TO_OL[spec.data_flow])

    for list_index, channel in enumerate(spec.channels):
        # MULTI_SENSOR ordering — docs/design.md §8.5a.  On
        # subsystems that report supports_multisensor, every
        # channel is re-typed at configure time; the spec's
        # ``kind_to_multi_sensor_type`` returns the SDK-facing
        # IOType discriminator.
        _require_io_type_supported(channel, capabilities)
        _validate_digital_port(channel, capabilities)
        if capabilities.supports_multisensor:
            io_type = channel.kind_to_multi_sensor_type()
            self._backend.set_multi_sensor_type(hdass, channel.physical_channel, io_type)

        self._backend.add_channel(hdass, list_index, channel)

    self._backend.set_stop_on_error(hdass, spec.stop_on_error)
    self._backend.commit(hdass)