Skip to content

Sync facade

A blocking wrapper around the async API for scripts, notebooks, and REPL use. Dtol.open_device(...) returns a SyncDtolSession that dispatches every call through an anyio blocking portal — no parallel implementation. See the Sync quickstart.

dtollib.sync

Sync facade — :class:Dtol, :class:SyncDtolSession, :class:SyncPortal.

Async is canonical; the sync facade wraps it through :class:SyncPortal so scripts, notebooks, and REPL sessions can drive tasks without await.

Provides:

  • :class:Dtol — entry point (Dtol.open_device(spec)).
  • :class:SyncDtolSession — blocking wrapper over :class:~dtollib.tasks.DtolSession.
  • :func:record / :func:record_polled — blocking streaming wrappers yielding a :class:SyncRecording.
  • :class:SyncPortal / :func:run_sync.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    payloads_emitted=0,
    payloads_dropped=0,
    errors_observed=0,
    overruns_observed=0,
    underruns_observed=0,
    extra=_empty_extra(),
)

Mutable summary of one recording session.

Updated in place by the recorder; finished_at is set on context exit. The :class:Recording[T] handle exposes this so consumers can read progress without poking at the recorder's internals.

overruns_observed and underruns_observed are SDK-level, distinguishable from payloads_dropped (which counts consumer-side losses under DROP_* overflow policies). See docs/design.md §14.1 for the rationale.

Dtol

Static namespace for sync entry points.

Idiomatic usage::

with Dtol.open_device(spec) as session:
    reading = session.poll()
    print(reading.values)

open_device staticmethod

open_device(
    spec, *, backend=None, timeout=10.0, autostart=True
)

Sync analogue of :func:dtollib.open_device.

Returns a :class:SyncDtolSession — usable as a normal context manager.

Source code in src/dtollib/sync/daq.py
@staticmethod
def open_device(
    spec: TaskSpec,
    *,
    backend: DtolBackend | None = None,
    timeout: float = 10.0,
    autostart: bool = True,
) -> SyncDtolSession:
    """Sync analogue of :func:`dtollib.open_device`.

    Returns a :class:`SyncDtolSession` — usable as a normal
    context manager.
    """
    return SyncDtolSession(spec, backend=backend, timeout=timeout, autostart=autostart)

ErrorPolicy

Bases: StrEnum

How a recorder reacts to a backend error mid-stream.

Attributes:

Name Type Description
RAISE

Cancel the recorder; the exception propagates out of the async with record(...) block.

RETURN

Emit a payload with error=... set and continue the stream. Suitable for long unattended runs where a single bad poll shouldn't kill the recording.

SKIP

Drop the failed payload silently. Increments AcquisitionSummary.errors_observed so the run report still reflects the failure count.

OverflowPolicy

Bases: StrEnum

How a recorder reacts when its outgoing stream buffer fills.

Attributes:

Name Type Description
BLOCK

Backpressure — the producer awaits buffer space. Default; preserves every payload at the cost of stalling the upstream buffer pool. Slow consumers risk SDK-level OVERRUN.

DROP_OLDEST

Discard the head of the buffer to make room. Trades payload completeness for producer liveness. Increments AcquisitionSummary.payloads_dropped.

DROP_NEWEST

Discard the incoming payload. Same trade-off as DROP_OLDEST but the consumer sees only payloads from before the overflow started.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/dtollib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/dtollib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncDtolSession

SyncDtolSession(
    spec, *, backend=None, timeout=10.0, autostart=True
)

Blocking facade over :class:~dtollib.tasks.DtolSession.

Source code in src/dtollib/sync/session.py
def __init__(
    self,
    spec: TaskSpec,
    *,
    backend: DtolBackend | None = None,
    timeout: float = 10.0,
    autostart: bool = True,
) -> None:
    self._spec = spec
    self._backend = backend
    self._timeout = timeout
    self._autostart = autostart
    self._portal: SyncPortal | None = None
    self._session: DtolSession | None = None

raw_hdass property

raw_hdass

:attr:DtolSession.raw_hdass.

raw_hdrv property

raw_hdrv

:attr:DtolSession.raw_hdrv.

raw_session property

raw_session

Underlying async :class:DtolSession.

state property

state

:attr:DtolSession.state.

__enter__

__enter__()

Spin up a portal, then open the underlying async session.

Source code in src/dtollib/sync/session.py
def __enter__(self) -> Self:
    """Spin up a portal, then open the underlying async session."""
    portal = SyncPortal()
    portal.__enter__()
    try:
        session = portal.call(
            _open_device_async,
            self._spec,
            backend=self._backend,
            timeout=self._timeout,
            autostart=self._autostart,
        )
    except BaseException:
        portal.__exit__(None, None, None)
        raise
    self._portal = portal
    self._session = session
    return self

abort

abort()

Blocking :meth:DtolSession.abort.

Source code in src/dtollib/sync/session.py
def abort(self) -> None:
    """Blocking :meth:`DtolSession.abort`."""
    self._require_portal().call(self._require_session().abort)

close

close()

Blocking :meth:DtolSession.close.

Source code in src/dtollib/sync/session.py
def close(self) -> None:
    """Blocking :meth:`DtolSession.close`."""
    self._require_portal().call(self._require_session().close)

is_running

is_running()

:meth:DtolSession.is_running.

Source code in src/dtollib/sync/session.py
def is_running(self) -> bool:
    """:meth:`DtolSession.is_running`."""
    return self._require_session().is_running()

poll

poll()

Blocking :meth:DtolSession.poll.

Source code in src/dtollib/sync/session.py
def poll(self) -> DaqReading:
    """Blocking :meth:`DtolSession.poll`."""
    return self._require_portal().call(self._require_session().poll)

start

start()

Blocking :meth:DtolSession.start.

Source code in src/dtollib/sync/session.py
def start(self) -> None:
    """Blocking :meth:`DtolSession.start`."""
    self._require_portal().call(self._require_session().start)

stop

stop()

Blocking :meth:DtolSession.stop.

Source code in src/dtollib/sync/session.py
def stop(self) -> None:
    """Blocking :meth:`DtolSession.stop`."""
    self._require_portal().call(self._require_session().stop)

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Source code in src/dtollib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Source code in src/dtollib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop."""
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/dtollib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/dtollib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

SyncRecording dataclass

SyncRecording(stream, summary, rate_hz)

Sync mirror of :class:dtollib.streaming.Recording.

The stream here is a :class:SyncAsyncIterator (iterate with a plain for loop); the rest of the shape matches the async handle.

record

record(
    source,
    *,
    timeout=10.0,
    stream_buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
)

Sync wrapper around :func:dtollib.streaming.record.

Yields a :class:SyncRecording[DaqBlock]. Iterate recording.stream with a normal for loop. As with the async recorder, source must have been opened with autostart=False (the recorder owns the commit/arm/start sequence).

Example::

with (
    SyncDtolSession(spec, autostart=False) as session,
    record(session) as recording,
):
    for block in recording.stream:
        process(block)
Source code in src/dtollib/sync/recording.py
@contextlib.contextmanager  # pyright: ignore[reportDeprecated]
def record(
    source: SyncDtolSession,
    *,
    timeout: float = 10.0,
    stream_buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
) -> Iterator[SyncRecording[DaqBlock]]:
    """Sync wrapper around :func:`dtollib.streaming.record`.

    Yields a :class:`SyncRecording[DaqBlock]`. Iterate ``recording.stream``
    with a normal ``for`` loop. As with the async recorder, ``source`` must
    have been opened with ``autostart=False`` (the recorder owns the
    commit/arm/start sequence).

    Example::

        with (
            SyncDtolSession(spec, autostart=False) as session,
            record(session) as recording,
        ):
            for block in recording.stream:
                process(block)
    """
    with SyncPortal() as portal:
        acm = _async_record(
            source.raw_session,
            timeout=timeout,
            stream_buffer_size=stream_buffer_size,
            error_policy=error_policy,
            overflow=overflow,
        )
        with portal.wrap_async_context_manager(acm) as recording:
            sync_iter = portal.wrap_async_iter(recording.stream)
            try:
                yield SyncRecording(
                    stream=sync_iter,
                    summary=recording.summary,
                    rate_hz=recording.rate_hz,
                )
            finally:
                sync_iter.close()

record_polled

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Sync wrapper around :func:dtollib.streaming.record_polled.

The sync facade only accepts a session source — the manager-mode fan-out belongs to async-only call sites — so the per-tick payload is always :class:DaqReading.

Source code in src/dtollib/sync/recording.py
@contextlib.contextmanager  # pyright: ignore[reportDeprecated]
def record_polled(
    source: SyncDtolSession,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> Iterator[SyncRecording[DaqReading]]:
    """Sync wrapper around :func:`dtollib.streaming.record_polled`.

    The sync facade only accepts a session source — the manager-mode
    fan-out belongs to async-only call sites — so the per-tick payload is
    always :class:`DaqReading`.
    """
    with SyncPortal() as portal:
        acm = _async_record_polled(
            source.raw_session,
            rate_hz=rate_hz,
            error_policy=error_policy,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        with portal.wrap_async_context_manager(acm) as recording:
            # The session-source overload always emits DaqReading; the
            # async-side Union is widened only for manager-mode.
            reading_rx = cast(
                "SyncAsyncIterator[DaqReading]",
                portal.wrap_async_iter(recording.stream),
            )
            try:
                yield SyncRecording(
                    stream=reading_rx,
                    summary=recording.summary,
                    rate_hz=recording.rate_hz,
                )
            finally:
                reading_rx.close()

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/dtollib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)