Continuous acquisition¶
Single-value poll() reads one sample per channel on demand. For sustained,
high-rate acquisition you want the hardware sample clock driving a buffer ring
— that is what record() provides. dtollib offers two recorders:
| Recorder | Timing | Yields | Use when |
|---|---|---|---|
record() |
Hardware sample clock | DaqBlocks ((n_channels, n_samples)) |
You need jitter-free, gap-free sampling at the board's clock rate. |
record_polled() |
Software loop | DaqReading per tick |
A low rate is fine and you want the simplicity of repeated poll()s (or to poll a whole DtolManager). |
Both are async context managers that yield a handle and run a producer task in the background. Design reference: design.md §14.
record() — hardware-clocked¶
record() drives the §12.3.2 callback bridge: it owns the buffer
pool and the bench-proven commit → register → queue → arm → start ordering,
and yields (stream, summary).
The task must be opened with autostart=False — the bridge has to register
the notification window and queue buffers before the second olDaConfig, so
the recorder calls commit() itself. spec.data_flow must be CONTINUOUS or
FINITE, and spec.buffers must be set.
import anyio
from dtollib import (
AnalogInputVoltage, BufferPlan, DataFlow, TaskSpec, Timing, open_device, record,
)
async def main() -> None:
spec = TaskSpec(
name="heat_flux_run",
channels=[AnalogInputVoltage(physical_channel=0, name="heat_flux")],
data_flow=DataFlow.CONTINUOUS,
timing=Timing(rate_hz=10_000.0),
buffers=BufferPlan(buffers=4, samples_per_buffer=1000),
)
async with (
await open_device(spec, autostart=False) as session,
record(session) as recording,
):
async for block in recording.stream:
# block.data is (n_channels, n_samples) in engineering units
print(block.block_index, block.data.mean(axis=1))
if recording.summary.payloads_emitted >= 50:
break
summary = recording.summary
print("dropped:", summary.payloads_dropped, "overruns:", summary.overruns_observed)
anyio.run(main)
Parameters¶
| Parameter | Default | Notes |
|---|---|---|
stream_buffer_size |
16 |
AnyIO stream depth — the consumer-side back-pressure window, distinct from spec.buffers.buffers (the SDK's HBUF ring). |
error_policy |
ErrorPolicy.RAISE |
How SDK errors reaching the producer loop are surfaced (see below). |
overflow |
OverflowPolicy.DROP_OLDEST |
What to do when the consumer stream is full. The default keeps the SDK queue moving even with a slow consumer. |
timeout |
10.0 |
Reserved for a future shutdown-timeout revision; currently informational. |
Loss-proof captures override the default overflow
record() defaults to DROP_OLDEST so a slow consumer can't stall the SDK
queue into an overrun. For an archival capture you want the opposite —
OverflowPolicy.BLOCK plus a RawCountsSink fed at the
head of the loop, so back-pressure protects the durable file.
record_polled() — software-timed¶
record_polled() calls poll() on a fixed cadence using absolute-target
scheduling, so per-cycle drift never accumulates. It accepts either a single
DtolSession (yields one DaqReading per tick) or a DtolManager (yields
Mapping[str, DeviceResult[DaqReading]] per tick — every task polled together).
from dtollib import record_polled
async with record_polled(session, rate_hz=10.0) as rec:
async for reading in rec.stream:
print(reading.values)
It defaults to OverflowPolicy.BLOCK (a software poller can pause without
risking an SDK overrun) and ErrorPolicy.RAISE.
The Recording handle and AcquisitionSummary¶
record_polled() yields a Recording (.stream, .summary, .rate_hz);
record() yields the (stream, summary) tuple directly. The
AcquisitionSummary is mutable and updated in place during the run, so you
can read progress live:
| Field | Meaning |
|---|---|
payloads_emitted |
Payloads handed to the consumer. |
payloads_dropped |
Consumer-side losses under a DROP_* overflow policy (or missed ticks in record_polled). |
errors_observed |
Backend errors seen by the producer loop. |
overruns_observed / underruns_observed |
SDK-level input overruns / output underruns — distinct from payloads_dropped. |
started_at / finished_at |
UTC timestamps; finished_at is set on context exit. |
Error and overflow policies¶
ErrorPolicy governs what happens when a backend error reaches the producer:
RAISE— cancel the recorder; the exception propagates out of theasync with.RETURN— emit a payload with.errorset and keep streaming (good for long unattended runs).SKIP— drop the failed payload silently, but still count it inerrors_observed.
OverflowPolicy governs a full consumer buffer: BLOCK (back-pressure, lossless),
DROP_OLDEST (evict the head), DROP_NEWEST (drop the incoming payload). Both
enums live in dtollib.streaming.
Where blocks go next¶
A DaqBlock carries data, channels, units, block_index, timestamps, and
an optional error. Feed blocks to a sink for durable logging, or
reshape one into tidy per-sample rows with block_to_long_rows. The
dtol-capture CLI wires record() to a file sink for you.