nidaqlib.tasks¶
nidaqlib.tasks ¶
Tasks — specs, acquisition records, sessions, and :func:open_device.
AcquisitionMode ¶
Bases: StrEnum
Sample-clock acquisition mode.
Mirrors a subset of nidaqmx.constants.AcquisitionType. Kept as a
library-side enum so :class:TaskSpec round-trips through JSON without
pulling NI's enum machinery into the serialisation layer.
ON_DEMAND
class-attribute
instance-attribute
¶
Software-timed; no hardware sample clock is configured.
DaqBlock
dataclass
¶
DaqBlock(
*,
device,
task=None,
channels,
data,
block_index,
first_sample_index,
samples_per_channel,
sample_rate_hz,
dt_s,
task_started_at,
t0,
monotonic_ns,
read_started_at,
read_finished_at,
elapsed_s,
units,
error=None,
)
One rectangular block of hardware-clocked samples.
The data field is the natural shape for Parquet row groups, NumPy
slicing, and TDMS — do not scalarize unless the user opts in via
block_to_long_rows().
To recover the wall-clock timestamp of sample k (where
0 <= k < samples_per_channel)::
absolute = block.first_sample_index + k
elapsed = absolute / block.sample_rate_hz
sample_at = block.task_started_at + timedelta(seconds=elapsed)
Do not interpolate sample times off t0 or read_started_at —
those drift block-to-block.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-add name, or |
task |
str | None
|
Underlying |
channels |
tuple[str, ...]
|
Channel display names in the row order of |
data |
ndarray
|
NumPy array. Invariant — shape is
|
block_index |
int
|
0-based, monotonic per task. Resets on a new task. |
first_sample_index |
int
|
Cumulative sample offset since |
samples_per_channel |
int
|
|
sample_rate_hz |
float | None
|
From |
dt_s |
float | None
|
|
task_started_at |
datetime
|
Wall-clock anchor for sample-time reconstruction. |
t0 |
datetime
|
Wall-clock at the first sample of this block; provenance only. |
monotonic_ns |
int
|
|
read_started_at |
datetime
|
Wall-clock just before the read; provenance only. |
read_finished_at |
datetime
|
Wall-clock just after the read; provenance only. |
elapsed_s |
float
|
|
units |
Mapping[str, str | None]
|
Engineering units keyed by channel display name. |
error |
NIDaqError | None
|
Populated only under |
__post_init__ ¶
Validate the rectangular-shape invariant.
Raises:
| Type | Description |
|---|---|
NIDaqValidationError
|
|
Source code in src/nidaqlib/tasks/models.py
DaqReading
dataclass
¶
DaqReading(
*,
device,
task=None,
values,
units,
requested_at,
received_at,
midpoint_at,
monotonic_ns,
elapsed_s,
metadata=_empty_metadata(),
error=None,
)
One scalar (or low-rate) reading across the channels of a task.
Field shape mirrors alicatlib.Sample and sartoriuslib.Sample so
that DAQ rows join cleanly against flow-controller and balance rows on
(device, monotonic_ns). See design doc §8.6 / §8.8.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-add name, or |
task |
str | None
|
Underlying |
values |
Mapping[str, float | int | bool]
|
One entry per channel, keyed by channel display name. |
units |
Mapping[str, str | None]
|
Engineering units, keyed by channel display name. |
requested_at |
datetime
|
Wall-clock immediately before the read. |
received_at |
datetime
|
Wall-clock immediately after the read returns. |
midpoint_at |
datetime
|
Midpoint of the request/receive window. |
monotonic_ns |
int
|
|
elapsed_s |
float
|
|
metadata |
Mapping[str, str | int | float | bool]
|
Free-form scalar metadata (often the source |
error |
NIDaqError | None
|
Populated only under |
DaqSession ¶
Owns one underlying NI task plus its lifecycle state.
Construction does not touch the driver. Call :meth:start (or use
:func:open_device) to create the task, add channels, and configure
timing. read_block / poll are valid once started.
Create a session for spec against backend.
The constructor only stores its arguments; it never touches the
driver. That keeps __init__ exception-free and avoids a
partially-initialised task object on configuration errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
TaskSpec
|
Declarative :class: |
required |
backend
|
DaqBackend
|
Backend that proxies operations into NI (or the fake). |
required |
timeout
|
float
|
Default per-operation timeout in seconds. Individual
|
10.0
|
Source code in src/nidaqlib/tasks/session.py
has_active_callback_bridge
property
¶
True while a §11.3.2 callback bridge is registered.
is_configured
property
¶
True after :meth:configure succeeds and before :meth:close.
A configured session has a backing NI task with channels, timing,
logging, and triggers applied — but task.start() has not yet
been called. Buffer-event callback registration (§11.3.2) is only
valid in this window.
raw_task
property
¶
The underlying backend task handle.
For :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend this is
an nidaqmx.Task; for the fake backend it is an opaque
_FakeTask. Use this for advanced NI features that aren't exposed
via the wrapper — the escape hatch from design doc §7.4.
The handle is available once :meth:configure has succeeded — that
is, in either the configured-not-started or started state. The
callback bridge (§11.3.2) needs the handle pre-start to register
the buffer event.
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
The session has not been configured yet. |
task_started_at
property
¶
Wall-clock anchor for sample-time reconstruction.
Returns None until :meth:start has succeeded. Once set, this
value is the truth that :class:DaqBlock.task_started_at carries —
it is captured exactly once per session, immediately before
backend.start_task, so that the first sample's wall-clock can be
reconstructed deterministically from
task_started_at + first_sample_index / rate_hz (design doc §8.7).
__aenter__
async
¶
__aexit__
async
¶
acquire
async
¶
Run one finite acquisition and return its :class:DaqBlock.
Convenience wrapper for the §12.3 finite-mode pattern: configure
finite, start, read, stop. Requires a session whose
:class:Timing.mode is :attr:AcquisitionMode.FINITE. After the
read completes, the underlying NI task is stopped — call
:meth:start again before another acquisition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples_per_channel
|
int
|
Number of samples per channel to read. |
required |
timeout
|
float | None
|
Optional per-call timeout in seconds. Falls back to the session-wide default. |
None
|
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
The session is not started, is closed, or
its timing mode is not :attr: |
NIDaqReadError / NIDaqTimeoutError
|
Surfaced from the backend. |
Source code in src/nidaqlib/tasks/session.py
close
async
¶
Stop (if needed) and close the underlying task. Idempotent.
__aexit__ always calls this; explicit call is rare. Sessions that
have opted into the every-N-samples callback bridge MUST instead use
the recorder context manager — the bridge has its own ordered
shutdown protocol (design doc §11.3.2) that this method does not
implement.
Source code in src/nidaqlib/tasks/session.py
configure
async
¶
Create the underlying task and apply channels / timing / logging / trigger.
After this method, raw_task is available and any pre-start hooks
(notably the §11.3.2 buffer-event callback registration) may run.
task.start() is not called — use :meth:start for that.
On failure, the partial task is torn down so the session does not leak NI resources.
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
Already configured, started, or closed. |
Source code in src/nidaqlib/tasks/session.py
poll
async
¶
One-shot scalar read across all channels.
Valid only for sessions that are not actively buffering a sample
clock (Timing.mode == ON_DEMAND or no Timing at all). For the
live-scalar use case during a high-rate acquisition, use
:func:record and read the most recent block's last column.
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
The session is buffering a sample clock (continuous or finite mode and started). |
Source code in src/nidaqlib/tasks/session.py
read_block
async
¶
Read one rectangular :class:DaqBlock.
Wraps the backend read in an run_sync so the
event loop stays responsive during the blocking NI call. Increments
the per-session first_sample_index cursor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples_per_channel
|
int
|
Samples per channel for this block. |
required |
timeout
|
float | None
|
Optional per-call timeout in seconds; falls back to the session-wide default. |
None
|
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
The session is not started or is closed. |
NIDaqReadError / NIDaqTimeoutError
|
Surfaced from the backend. |
Source code in src/nidaqlib/tasks/session.py
start
async
¶
Start the configured task.
:meth:configure must have run first. This method calls NI's
task.start() and records the wall-clock anchor used for §8.7
sample-time reconstruction. Calling :meth:start again after
:meth:stop reuses the configured task and resets the
block/sample counters for a new run.
confirm=True is required for task kinds whose start call
can actuate hardware immediately (currently counter-output pulse
trains).
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
Not configured, already started, or closed. |
NIDaqValidationError
|
Starting would actuate hardware without explicit confirmation. |
Source code in src/nidaqlib/tasks/session.py
stop
async
¶
Stop the underlying task. Idempotent for not-yet-started sessions.
Does NOT close the task. Use :meth:close to release NI resources.
Source code in src/nidaqlib/tasks/session.py
write
async
¶
Write one sample-per-channel to the task's output channels.
Safety gate (design doc §17):
- Keys of
valuesmust match the display names of the task's output channels (AO and/or DO). Unknown or missing keys raise :class:NIDaqValidationErrorbefore any I/O. - For analog-output channels with
safe_min/safe_maxset, values outside the resolved clamp window raise :class:NIDaqValidationError. Never silently clamped. - If any target channel has
requires_confirm=TrueandconfirmisFalse, the call raises :class:NIDaqValidationError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
Mapping[str, float | bool]
|
One value per output channel keyed by display name. |
required |
confirm
|
bool
|
Operator confirmation. Required (must be |
False
|
timeout
|
float | None
|
Per-call timeout in seconds. Falls back to the session-wide default. |
None
|
Raises:
| Type | Description |
|---|---|
NIDaqTaskStateError
|
The session is not started or is closed. |
NIDaqValidationError
|
Safety-gate or shape rejection (see above). |
NIDaqWriteError / NIDaqTimeoutError
|
Surfaced from the backend. |
Source code in src/nidaqlib/tasks/session.py
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 | |
Edge ¶
Bases: StrEnum
Active edge for the sample clock or a trigger.
Mirrors nidaqmx.constants.Edge.
TaskBuilder ¶
Fluent builder for :class:TaskSpec.
Example
spec = ( ... TaskBuilder("ai_demo") ... .add_channel(AnalogInputVoltage(physical_channel="Dev1/ai0")) ... .with_timing(Timing(rate_hz=1000.0)) ... .build() ... )
Create a builder for a task named name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Task name. Will become :attr: |
required |
Source code in src/nidaqlib/tasks/builder.py
TaskSpec
dataclass
¶
Declarative description of one NI task.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Task name. Must be unique within an :class: |
channels |
Sequence[ChannelSpec]
|
One or more :class: |
timing |
Timing | None
|
Optional :class: |
trigger |
TriggerSpec | None
|
Optional :class: |
logging |
TdmsLogging | None
|
Optional :class: |
metadata |
Mapping[str, str | int | float | bool]
|
Free-form scalar metadata propagated into emitted records. |
__post_init__ ¶
Validate the channel list shape (the cheap, always-true invariants).
Raises:
| Type | Description |
|---|---|
NIDaqValidationError
|
|
Source code in src/nidaqlib/tasks/spec.py
from_dict
classmethod
¶
Deserialise from a dict produced by :meth:to_dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Mapping[str, Any]
|
Mapping carrying the task-spec fields. |
required |
Raises:
| Type | Description |
|---|---|
NIDaqValidationError
|
A channel or trigger entry has an unknown
|
Source code in src/nidaqlib/tasks/spec.py
replace ¶
Return a copy of this spec with updates applied.
Mirrors dataclasses.replace but is exposed as a method for
consistency with the rest of the API.
Source code in src/nidaqlib/tasks/spec.py
to_dict ¶
Serialise to a JSON-friendly dict, dispatching channels by kind.
Source code in src/nidaqlib/tasks/spec.py
Timing
dataclass
¶
Timing(
*,
rate_hz,
mode=AcquisitionMode.CONTINUOUS,
samples_per_channel=None,
source=None,
active_edge=Edge.RISING,
)
Sample-clock timing configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
rate_hz |
float
|
Sample clock rate, in Hz. Required for hardware-timed modes (finite / continuous). |
mode |
AcquisitionMode
|
Acquisition mode. Defaults to continuous. |
samples_per_channel |
int | None
|
For |
source |
str | None
|
Optional sample-clock source terminal (e.g. an external
terminal name); |
active_edge |
Edge
|
Sample-clock active edge. Rising by default. |
__post_init__ ¶
Validate timing parameters before they reach NI.
Source code in src/nidaqlib/tasks/spec.py
from_dict
classmethod
¶
Deserialise from a dict produced by :meth:to_dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Mapping[str, Any]
|
Mapping carrying the timing fields. |
required |
Raises:
| Type | Description |
|---|---|
NIDaqValidationError
|
An enum field carries an unknown value. |
Source code in src/nidaqlib/tasks/spec.py
to_dict ¶
Serialise to a JSON-friendly dict.
Enum members serialise to their string values so the result is JSON-encodable without a custom encoder.
Source code in src/nidaqlib/tasks/spec.py
open_device
async
¶
Open and return a configured :class:DaqSession.
Usage forms::
async with await open_device(spec) as session:
...
session = await open_device(spec)
try:
...
finally:
await session.close()
Mirrors the ecosystem open_device shape used by alicatlib,
watlowlib, and sartoriuslib. The DAQ-specific deviation: the
spec is the declarative task description (channels, timing,
triggers) rather than a serial port string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
TaskSpec
|
Declarative :class: |
required |
backend
|
DaqBackend | None
|
Optional :class: |
None
|
timeout
|
float
|
Default per-operation timeout, in seconds. |
10.0
|
autostart
|
bool
|
When |
True
|
confirm_start
|
bool
|
Required when starting the task can actuate hardware
immediately (for example counter-output pulse trains). Only
consulted when |
False
|
Returns:
| Type | Description |
|---|---|
DaqSession
|
A configured :class: |