Skip to content

nidaqlib.sync

nidaqlib.sync

Sync facade — :class:Daq, :class:SyncPortal, sync recording wrappers.

Async is canonical; the sync facade wraps it through :class:SyncPortal so scripts, notebooks, and REPL sessions can drive DAQ tasks without await. Direct port of sartoriuslib's sync/ package.

AcquisitionSummary dataclass

AcquisitionSummary(
    blocks_emitted=0,
    blocks_dropped=0,
    errors_observed=0,
    started_at=(lambda: datetime.now(UTC))(),
    finished_at=None,
)

Per-run counters, yielded alongside the block stream.

Mirrors sartoriuslib.AcquisitionSummary shape but is intentionally mutable: counters are updated in place during the run so consumers can poll progress (e.g. for a TUI bar) and read final counts after exit. The recorder is the only writer; consumers MUST treat the object as read-only.

Attributes:

Name Type Description
blocks_emitted int

Total :class:DaqBlock records sent into the outbound stream.

blocks_dropped int

Records dropped because of an :class:OverflowPolicy.DROP_* decision.

errors_observed int

Wrapped NI errors seen during the run, regardless of :class:ErrorPolicy.

started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at recorder exit. None while the recorder is still running.

Daq

Sync entry-points (no instances; classmethod-only).

open_device classmethod

open_device(
    spec, *, backend=None, timeout=10.0, confirm_start=False
)

Open a :class:SyncDaqSession and tear it down on exit.

Mirrors :func:nidaqlib.tasks.open_device but yields a sync session. Every operation on the returned session dispatches through a per-context :class:SyncPortal.

Example::

from nidaqlib import TaskSpec, Timing, AnalogInputVoltage
from nidaqlib.sync import Daq

spec = TaskSpec(
    name="ai0",
    channels=[AnalogInputVoltage(physical_channel="Dev1/ai0")],
    timing=Timing(rate_hz=1000),
)
with Daq.open_device(spec) as session:
    block = session.read_block(samples_per_channel=1000)
Source code in src/nidaqlib/sync/daq.py
@classmethod
@contextlib.contextmanager  # pyright: ignore[reportDeprecated]
def open_device(
    cls,
    spec: TaskSpec,
    *,
    backend: DaqBackend | None = None,
    timeout: float = 10.0,
    confirm_start: bool = False,
) -> Iterator[SyncDaqSession]:
    """Open a :class:`SyncDaqSession` and tear it down on exit.

    Mirrors :func:`nidaqlib.tasks.open_device` but yields a sync session.
    Every operation on the returned session dispatches through a
    per-context :class:`SyncPortal`.

    Example::

        from nidaqlib import TaskSpec, Timing, AnalogInputVoltage
        from nidaqlib.sync import Daq

        spec = TaskSpec(
            name="ai0",
            channels=[AnalogInputVoltage(physical_channel="Dev1/ai0")],
            timing=Timing(rate_hz=1000),
        )
        with Daq.open_device(spec) as session:
            block = session.read_block(samples_per_channel=1000)
    """
    with SyncPortal() as portal:
        async_session = portal.call(
            open_device,
            spec,
            backend=backend,
            timeout=timeout,
            confirm_start=confirm_start,
        )
        try:
            yield SyncDaqSession(portal, async_session)
        finally:
            portal.call(async_session.close)

ErrorPolicy

Bases: StrEnum

How recorders react to wrapped NI errors during a read.

RAISE class-attribute instance-attribute

RAISE = 'raise'

Cancel the recorder's task group and re-raise the error.

RETURN class-attribute instance-attribute

RETURN = 'return'

Emit a :class:DaqBlock (or :class:DaqReading) with .error set, then continue.

The recorder MUST advance timing counters (block_index / first_sample_index / monotonic_ns) on error records so consumers can detect dropped intervals. Consumers MUST gate on error is None before reading data.

OverflowPolicy

Bases: StrEnum

Behaviour when the recorder's outbound stream is full.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Producer awaits consumer. Risks NI buffer overrun on hardware-clocked tasks.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the about-to-be-enqueued block. Bounds consumer latency; loses freshest data.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Drop the oldest queued block. Keeps newest data; loses older queued blocks.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/nidaqlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/nidaqlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncDaqSession

SyncDaqSession(portal, session)

Sync facade over an open :class:DaqSession.

Source code in src/nidaqlib/sync/session.py
def __init__(self, portal: SyncPortal, session: DaqSession) -> None:
    self._portal = portal
    self._session = session

is_closed property

is_closed

True once :meth:close has run.

is_started property

is_started

True between :meth:start and :meth:stop.

raw_task property

raw_task

The underlying backend task handle (escape hatch — design §7.4).

spec property

spec

The :class:TaskSpec this session was constructed from.

acquire

acquire(samples_per_channel, *, timeout=None)

Run one finite acquisition and return its :class:DaqBlock.

Source code in src/nidaqlib/sync/session.py
def acquire(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,
) -> DaqBlock:
    """Run one finite acquisition and return its :class:`DaqBlock`."""
    return self._portal.call(self._session.acquire, samples_per_channel, timeout=timeout)

close

close()

Stop and close the underlying task. Idempotent.

Source code in src/nidaqlib/sync/session.py
def close(self) -> None:
    """Stop and close the underlying task. Idempotent."""
    self._portal.call(self._session.close)

poll

poll(*, timeout=None)

One-shot scalar read across all channels.

Source code in src/nidaqlib/sync/session.py
def poll(self, *, timeout: float | None = None) -> DaqReading:
    """One-shot scalar read across all channels."""
    return self._portal.call(self._session.poll, timeout=timeout)

read_block

read_block(samples_per_channel, *, timeout=None)

Read one rectangular :class:DaqBlock.

Source code in src/nidaqlib/sync/session.py
def read_block(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,
) -> DaqBlock:
    """Read one rectangular :class:`DaqBlock`."""
    return self._portal.call(self._session.read_block, samples_per_channel, timeout=timeout)

start

start(*, confirm=False)

Start or restart the underlying task.

Source code in src/nidaqlib/sync/session.py
def start(self, *, confirm: bool = False) -> None:
    """Start or restart the underlying task."""
    self._portal.call(self._session.start, confirm=confirm)

stop

stop()

Stop the underlying task. Idempotent.

Source code in src/nidaqlib/sync/session.py
def stop(self) -> None:
    """Stop the underlying task. Idempotent."""
    self._portal.call(self._session.stop)

write

write(values, *, confirm=False, timeout=None)

Write one sample-per-channel to the task's output channels.

Sync wrapper around :meth:DaqSession.write. The safety gate (confirm + safe_min / safe_max) runs in the same process, before any I/O.

Source code in src/nidaqlib/sync/session.py
def write(
    self,
    values: Mapping[str, float | bool],
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    """Write one sample-per-channel to the task's output channels.

    Sync wrapper around :meth:`DaqSession.write`. The safety gate
    (``confirm`` + ``safe_min`` / ``safe_max``) runs in the same
    process, before any I/O.
    """
    self._portal.call(
        self._session.write,
        values,
        confirm=confirm,
        timeout=timeout,
    )

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Source code in src/nidaqlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Source code in src/nidaqlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop."""
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/nidaqlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/nidaqlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

record

record(
    source,
    *,
    chunk_size,
    timeout=10.0,
    buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
    use_callback_bridge=False,
)

Sync wrapper around :func:nidaqlib.streaming.record.

Yields (stream, summary). The stream is a sync iterator producing :class:DaqBlock records; iterate it with a normal for loop.

Example::

with (
    Daq.open_device(spec) as session,
    record(session, chunk_size=1000) as (stream, summary),
):
    for block in stream:
        process(block)
Source code in src/nidaqlib/sync/recording.py
@contextlib.contextmanager  # pyright: ignore[reportDeprecated]
def record(
    source: SyncDaqSession,
    *,
    chunk_size: int,
    timeout: float = 10.0,
    buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
    use_callback_bridge: bool = False,
) -> Iterator[tuple[SyncAsyncIterator[DaqBlock], AcquisitionSummary]]:
    """Sync wrapper around :func:`nidaqlib.streaming.record`.

    Yields ``(stream, summary)``. The stream is a sync iterator producing
    :class:`DaqBlock` records; iterate it with a normal ``for`` loop.

    Example::

        with (
            Daq.open_device(spec) as session,
            record(session, chunk_size=1000) as (stream, summary),
        ):
            for block in stream:
                process(block)
    """
    with SyncPortal() as portal:
        acm = _async_record(
            source._session,  # pyright: ignore[reportPrivateUsage]
            chunk_size=chunk_size,
            timeout=timeout,
            buffer_size=buffer_size,
            error_policy=error_policy,
            overflow=overflow,
            use_callback_bridge=use_callback_bridge,
        )
        with portal.wrap_async_context_manager(acm) as (rx, summary):
            sync_iter = portal.wrap_async_iter(rx)
            try:
                yield sync_iter, summary
            finally:
                sync_iter.close()

record_polled

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Sync wrapper around :func:nidaqlib.streaming.record_polled.

The sync facade only accepts a session source — the manager-mode fan-out belongs to async-only call sites — so the per-tick payload is always :class:DaqReading.

Source code in src/nidaqlib/sync/recording.py
@contextlib.contextmanager  # pyright: ignore[reportDeprecated]
def record_polled(
    source: SyncDaqSession,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> Iterator[tuple[SyncAsyncIterator[DaqReading], AcquisitionSummary]]:
    """Sync wrapper around :func:`nidaqlib.streaming.record_polled`.

    The sync facade only accepts a session source — the manager-mode
    fan-out belongs to async-only call sites — so the per-tick payload is
    always :class:`DaqReading`.
    """
    with SyncPortal() as portal:
        acm = _async_record_polled(
            source._session,  # pyright: ignore[reportPrivateUsage]
            rate_hz=rate_hz,
            error_policy=error_policy,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        with portal.wrap_async_context_manager(acm) as (rx, summary):
            # The session-source overload always emits DaqReading; the
            # async-side Union is widened only for manager-mode.
            reading_rx = cast("SyncAsyncIterator[DaqReading]", portal.wrap_async_iter(rx))
            try:
                yield reading_rx, summary
            finally:
                reading_rx.close()

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/nidaqlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)