nidaqlib.sync¶
nidaqlib.sync ¶
Sync facade — :class:Daq, :class:SyncPortal, sync recording wrappers.
Async is canonical; the sync facade wraps it through :class:SyncPortal
so scripts, notebooks, and REPL sessions can drive DAQ tasks without
await. Direct port of sartoriuslib's sync/ package.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
blocks_emitted=0,
blocks_dropped=0,
errors_observed=0,
started_at=(lambda: datetime.now(UTC))(),
finished_at=None,
)
Per-run counters, yielded alongside the block stream.
Mirrors sartoriuslib.AcquisitionSummary shape but is intentionally
mutable: counters are updated in place during the run so consumers
can poll progress (e.g. for a TUI bar) and read final counts after
exit. The recorder is the only writer; consumers MUST treat the
object as read-only.
Attributes:
| Name | Type | Description |
|---|---|---|
blocks_emitted |
int
|
Total :class: |
blocks_dropped |
int
|
Records dropped because of an
:class: |
errors_observed |
int
|
Wrapped NI errors seen during the run, regardless
of :class: |
started_at |
datetime
|
Wall-clock at recorder entry. |
finished_at |
datetime | None
|
Wall-clock at recorder exit. |
Daq ¶
Sync entry-points (no instances; classmethod-only).
open_device
classmethod
¶
Open a :class:SyncDaqSession and tear it down on exit.
Mirrors :func:nidaqlib.tasks.open_device but yields a sync session.
Every operation on the returned session dispatches through a
per-context :class:SyncPortal.
Example::
from nidaqlib import TaskSpec, Timing, AnalogInputVoltage
from nidaqlib.sync import Daq
spec = TaskSpec(
name="ai0",
channels=[AnalogInputVoltage(physical_channel="Dev1/ai0")],
timing=Timing(rate_hz=1000),
)
with Daq.open_device(spec) as session:
block = session.read_block(samples_per_channel=1000)
Source code in src/nidaqlib/sync/daq.py
ErrorPolicy ¶
Bases: StrEnum
How recorders react to wrapped NI errors during a read.
RAISE
class-attribute
instance-attribute
¶
Cancel the recorder's task group and re-raise the error.
RETURN
class-attribute
instance-attribute
¶
Emit a :class:DaqBlock (or :class:DaqReading) with .error set,
then continue.
The recorder MUST advance timing counters (block_index /
first_sample_index / monotonic_ns) on error records so consumers
can detect dropped intervals. Consumers MUST gate on error is None
before reading data.
OverflowPolicy ¶
Bases: StrEnum
Behaviour when the recorder's outbound stream is full.
BLOCK
class-attribute
instance-attribute
¶
Producer awaits consumer. Risks NI buffer overrun on hardware-clocked tasks.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the about-to-be-enqueued block. Bounds consumer latency; loses freshest data.
DROP_OLDEST
class-attribute
instance-attribute
¶
Drop the oldest queued block. Keeps newest data; loses older queued blocks.
SyncAsyncIterator ¶
Blocking view over an async iterator, bound to a :class:SyncPortal.
Source code in src/nidaqlib/sync/portal.py
close ¶
Cancel the underlying async iterator if it exposes aclose.
Source code in src/nidaqlib/sync/portal.py
SyncDaqSession ¶
Sync facade over an open :class:DaqSession.
Source code in src/nidaqlib/sync/session.py
acquire ¶
Run one finite acquisition and return its :class:DaqBlock.
Source code in src/nidaqlib/sync/session.py
close ¶
poll ¶
read_block ¶
Read one rectangular :class:DaqBlock.
start ¶
stop ¶
write ¶
Write one sample-per-channel to the task's output channels.
Sync wrapper around :meth:DaqSession.write. The safety gate
(confirm + safe_min / safe_max) runs in the same
process, before any I/O.
Source code in src/nidaqlib/sync/session.py
SyncPortal ¶
Per-context wrapper around :class:anyio.from_thread.BlockingPortal.
Source code in src/nidaqlib/sync/portal.py
call ¶
Run func(*args, **kwargs) on the portal's event loop.
Source code in src/nidaqlib/sync/portal.py
wrap_async_context_manager ¶
Present an async context manager as a sync context manager.
wrap_async_iter ¶
Present an async iterator as a blocking, closeable iterator.
record ¶
record(
source,
*,
chunk_size,
timeout=10.0,
buffer_size=16,
error_policy=ErrorPolicy.RAISE,
overflow=OverflowPolicy.DROP_OLDEST,
use_callback_bridge=False,
)
Sync wrapper around :func:nidaqlib.streaming.record.
Yields (stream, summary). The stream is a sync iterator producing
:class:DaqBlock records; iterate it with a normal for loop.
Example::
with (
Daq.open_device(spec) as session,
record(session, chunk_size=1000) as (stream, summary),
):
for block in stream:
process(block)
Source code in src/nidaqlib/sync/recording.py
record_polled ¶
record_polled(
source,
*,
rate_hz,
error_policy=ErrorPolicy.RAISE,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Sync wrapper around :func:nidaqlib.streaming.record_polled.
The sync facade only accepts a session source — the manager-mode
fan-out belongs to async-only call sites — so the per-tick payload is
always :class:DaqReading.
Source code in src/nidaqlib/sync/recording.py
run_sync ¶
Run one coroutine in a throwaway :class:SyncPortal.