Skip to content

servomexlib.sync

The synchronous facade over the async core.

servomexlib.sync

Synchronous facade for :mod:servomexlib.

Wraps the async core behind an anyio blocking portal so scripts, notebooks, and the REPL can use the library without async/await::

from servomexlib.sync import Servomex

with Servomex.open("COM11", protocol="continuous") as anz:
    print(anz.poll())

:class:SyncManager mirrors :class:~servomexlib.manager.ServomexManager, :class:SyncSink mirrors the sinks, and :func:record_to_sink runs a blocking acquisition into a sink.

Servomex

Entry namespace for the sync facade — :meth:open mirrors open_device.

SyncAnalyzer

SyncAnalyzer(
    portal, analyzer, managed, *, owns_portal=True
)

Blocking mirror of :class:~servomexlib.devices.analyzer.Analyzer.

Source code in src/servomexlib/sync/analyzer.py
def __init__(
    self,
    portal: SyncPortal,
    analyzer: Analyzer,
    managed: AbstractContextManager[Analyzer],
    *,
    owns_portal: bool = True,
) -> None:
    self._portal = portal
    self._analyzer = analyzer
    self._managed = managed
    self._owns_portal = owns_portal

capabilities property

capabilities

The active client's capability set.

dropped_frames property

dropped_frames

Count of frames dropped for parse/checksum failures.

info property

info

The cached :class:DeviceInfo, if identify has run.

protocol property

protocol

The active wire protocol.

analyser_status

analyser_status(*, timeout=None)

Return the analyser-level status.

Source code in src/servomexlib/sync/analyzer.py
def analyser_status(self, *, timeout: float | None = None) -> AnalyserStatus:
    """Return the analyser-level status."""
    return self._portal.call(self._analyzer.analyser_status, timeout=timeout)

calibration_status

calibration_status(group=1, *, timeout=None)

Return autocalibration progress for group (Modbus only).

Source code in src/servomexlib/sync/analyzer.py
def calibration_status(
    self, group: int = 1, *, timeout: float | None = None
) -> CalibrationProgress:
    """Return autocalibration progress for ``group`` (Modbus only)."""
    return self._portal.call(self._analyzer.calibration_status, group, timeout=timeout)

identify

identify(*, timeout=None)

Return device identity, caching it.

Source code in src/servomexlib/sync/analyzer.py
def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Return device identity, caching it."""
    return self._portal.call(self._analyzer.identify, timeout=timeout)

open classmethod

open(
    port,
    *,
    protocol=ProtocolKind.AUTO,
    address=1,
    serial_settings=None,
    timeout=1.0,
    identify=True,
    backend="asyncio",
    portal=None,
)

Open an analyser synchronously and return an entered :class:SyncAnalyzer.

Pass portal to share an existing loop (e.g. one already hosting an in-process fake); otherwise a private background loop is started and torn down with the analyzer.

Source code in src/servomexlib/sync/analyzer.py
@classmethod
def open(
    cls,
    port: str | Transport,
    *,
    protocol: ProtocolKind | str = ProtocolKind.AUTO,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    identify: bool = True,
    backend: str = "asyncio",
    portal: SyncPortal | None = None,
) -> SyncAnalyzer:
    """Open an analyser synchronously and return an entered :class:`SyncAnalyzer`.

    Pass ``portal`` to share an existing loop (e.g. one already hosting an
    in-process fake); otherwise a private background loop is started and torn
    down with the analyzer.
    """
    resolved = ProtocolKind(protocol)  # accepts a ProtocolKind or its str value
    owns_portal = portal is None
    active = portal if portal is not None else SyncPortal(backend=backend)
    if owns_portal:
        active.__enter__()
    try:
        analyzer = active.call(
            open_device,
            port,
            protocol=resolved,
            address=address,
            serial_settings=serial_settings,
            timeout=timeout,
            identify=identify,
        )
        # Enter via the portal's wrapper so __aenter__/__aexit__ share one task.
        managed = active.wrap_async_context_manager(analyzer)
        managed.__enter__()
    except BaseException:
        if owns_portal:
            active.__exit__(None, None, None)
        raise
    return cls(active, analyzer, managed, owns_portal=owns_portal)

poll

poll(*, wait_fresh=False, timeout=None)

Return one frame (all channels + analyser status).

Source code in src/servomexlib/sync/analyzer.py
def poll(self, *, wait_fresh: bool = False, timeout: float | None = None) -> Frame:
    """Return one frame (all channels + analyser status)."""
    return self._portal.call(self._analyzer.poll, wait_fresh=wait_fresh, timeout=timeout)

read_all

read_all(*, timeout=None)

Return every channel's latest reading keyed by id.

Source code in src/servomexlib/sync/analyzer.py
def read_all(self, *, timeout: float | None = None) -> dict[ChannelId, Reading]:
    """Return every channel's latest reading keyed by id."""
    return self._portal.call(self._analyzer.read_all, timeout=timeout)

read_channel

read_channel(channel, *, timeout=None)

Return one channel's latest reading.

Source code in src/servomexlib/sync/analyzer.py
def read_channel(self, channel: ChannelId | str, *, timeout: float | None = None) -> Reading:
    """Return one channel's latest reading."""
    return self._portal.call(self._analyzer.read_channel, channel, timeout=timeout)

snapshot

snapshot()

Return the cached latest frame without any I/O.

Source code in src/servomexlib/sync/analyzer.py
def snapshot(self) -> Frame:
    """Return the cached latest frame without any I/O."""
    return self._analyzer.snapshot()

start_calibration

start_calibration(group, *, confirm=False, timeout=None)

Start autocalibration for group (Modbus only; requires confirm=True).

Source code in src/servomexlib/sync/analyzer.py
def start_calibration(
    self, group: int, *, confirm: bool = False, timeout: float | None = None
) -> None:
    """Start autocalibration for ``group`` (Modbus only; requires ``confirm=True``)."""
    self._portal.call(self._analyzer.start_calibration, group, confirm=confirm, timeout=timeout)

status

status(channel, *, timeout=None)

Return one channel's latest status.

Source code in src/servomexlib/sync/analyzer.py
def status(self, channel: ChannelId | str, *, timeout: float | None = None) -> ChannelStatus:
    """Return one channel's latest status."""
    return self._portal.call(self._analyzer.status, channel, timeout=timeout)

stop_calibration

stop_calibration(*, confirm=False, timeout=None)

Stop all autocalibration (Modbus only; requires confirm=True).

Source code in src/servomexlib/sync/analyzer.py
def stop_calibration(self, *, confirm: bool = False, timeout: float | None = None) -> None:
    """Stop all autocalibration (Modbus only; requires ``confirm=True``)."""
    self._portal.call(self._analyzer.stop_calibration, confirm=confirm, timeout=timeout)

stream

stream(*, mode=None, rate_hz=None)

Return a blocking iterator over the analyser's sample stream.

Source code in src/servomexlib/sync/analyzer.py
def stream(
    self, *, mode: StreamMode | None = None, rate_hz: float | None = None
) -> SyncStreamingSession:
    """Return a blocking iterator over the analyser's sample stream."""
    session = self._portal.call(_make_stream, self._analyzer, mode, rate_hz)
    return SyncStreamingSession(self._portal, session)

SyncManager

SyncManager(
    *,
    error_policy=ErrorPolicy.RAISE,
    backend="asyncio",
    portal=None,
)

Blocking facade over :class:~servomexlib.manager.ServomexManager.

Source code in src/servomexlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    backend: str = "asyncio",
    portal: SyncPortal | None = None,
) -> None:
    self._owns_portal = portal is None
    self._portal = portal if portal is not None else SyncPortal(backend=backend)
    self._manager = ServomexManager(error_policy=error_policy)

closed property

closed

True once the underlying manager is closed.

names property

names

Insertion-ordered tuple of managed analyser names.

add

add(
    name,
    source,
    *,
    protocol=ProtocolKind.MODBUS_RTU,
    address=1,
    serial_settings=None,
    timeout=1.0,
)

Register an analyser; returns the underlying async :class:Analyzer.

Source code in src/servomexlib/sync/manager.py
def add(
    self,
    name: str,
    source: Analyzer | str | Transport,
    *,
    protocol: ProtocolKind | str = ProtocolKind.MODBUS_RTU,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
) -> Analyzer:
    """Register an analyser; returns the underlying async :class:`Analyzer`."""
    return self._portal.call(
        self._manager.add,
        name,
        source,
        protocol=ProtocolKind(protocol),
        address=address,
        serial_settings=serial_settings,
        timeout=timeout,
    )

get

get(name)

Return the analyser registered under name (no I/O).

Source code in src/servomexlib/sync/manager.py
def get(self, name: str) -> Analyzer:
    """Return the analyser registered under ``name`` (no I/O)."""
    return self._manager.get(name)

poll

poll(names=None, *, timeout=None)

Read one :class:Frame per (or named) analyser, keyed by name.

Source code in src/servomexlib/sync/manager.py
def poll(
    self, names: Sequence[str] | None = None, *, timeout: float | None = None
) -> Mapping[str, DeviceResult[Frame]]:
    """Read one :class:`Frame` per (or named) analyser, keyed by name."""
    return self._portal.call(self._manager.poll, names, timeout=timeout)

poll_samples

poll_samples(*, names=None, timeout=None)

Poll every (or named) analyser → flat samples.

Source code in src/servomexlib/sync/manager.py
def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
    """Poll every (or named) analyser → flat samples."""
    return self._portal.call(self._manager.poll_samples, names=names, timeout=timeout)

remove

remove(name)

Unregister name, closing the shared transport on the last ref.

Source code in src/servomexlib/sync/manager.py
def remove(self, name: str) -> None:
    """Unregister ``name``, closing the shared transport on the last ref."""
    self._portal.call(self._manager.remove, name)

SyncPortal

SyncPortal(*, backend='asyncio')

Owns a background event loop and marshals async calls onto it.

Source code in src/servomexlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[anyio.from_thread.BlockingPortal] | None = None
    self._portal: anyio.from_thread.BlockingPortal | None = None

running property

running

Whether the backing event loop is currently running.

call

call(func, /, *args, **kwargs)

Run func(*args, **kwargs) on the loop thread and return its result.

The portal re-raises the concrete exception the coroutine raised, so sync callers catch the same :class:ServomexError subclasses as async ones.

Source code in src/servomexlib/sync/portal.py
def call(self, func: Callable[..., Awaitable[T]], /, *args: object, **kwargs: object) -> T:
    """Run ``func(*args, **kwargs)`` on the loop thread and return its result.

    The portal re-raises the concrete exception the coroutine raised, so sync
    callers catch the same :class:`ServomexError` subclasses as async ones.
    """
    target = functools.partial(func, *args, **kwargs)
    return self._require_portal().call(target)

wrap_async_context_manager

wrap_async_context_manager(cm)

Wrap an async context manager so its enter/exit run in one portal task.

call(cm.__aenter__) then call(cm.__aexit__) would run in different loop tasks and trip anyio's cancel-scope affinity check; this bridges them.

Source code in src/servomexlib/sync/portal.py
def wrap_async_context_manager(
    self, cm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Wrap an async context manager so its enter/exit run in one portal task.

    ``call(cm.__aenter__)`` then ``call(cm.__aexit__)`` would run in *different*
    loop tasks and trip anyio's cancel-scope affinity check; this bridges them.
    """
    return self._require_portal().wrap_async_context_manager(cm)

SyncSink

SyncSink(sink, *, portal=None)

Blocking adapter over an async :class:SampleSink.

Source code in src/servomexlib/sync/sinks.py
def __init__(self, sink: SampleSink, *, portal: SyncPortal | None = None) -> None:
    self._sink = sink
    self._owns_portal = portal is None
    self._portal = portal if portal is not None else SyncPortal()

close

close()

Close the backing sink.

Source code in src/servomexlib/sync/sinks.py
def close(self) -> None:
    """Close the backing sink."""
    self._portal.call(self._sink.close)

open

open()

Open the backing sink.

Source code in src/servomexlib/sync/sinks.py
def open(self) -> None:
    """Open the backing sink."""
    self._portal.call(self._sink.open)

write_many

write_many(samples)

Append samples to the backing sink.

Source code in src/servomexlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the backing sink."""
    self._portal.call(self._sink.write_many, samples)

SyncStreamingSession

SyncStreamingSession(portal, session)

Blocking iterator over an async streaming session, marshalled via the portal.

Source code in src/servomexlib/sync/analyzer.py
def __init__(self, portal: SyncPortal, session: object) -> None:
    self._portal = portal
    self._session = session

record_to_sink

record_to_sink(
    source,
    sink,
    *,
    rate_hz,
    duration,
    names=None,
    timeout=None,
    batch_size=64,
    flush_interval=1.0,
    backend="asyncio",
    portal=None,
)

Record source into sink for duration seconds, blocking until done.

Parameters:

Name Type Description Default
source PollSource

An async :class:PollSource (an Analyzer or ServomexManager).

required
sink SampleSink

An async :class:SampleSink; opened and closed by this call.

required
rate_hz float

Poll cadence.

required
duration float

Acquisition seconds (required — the call blocks until it elapses).

required
names Sequence[str] | None

Subset of device names (manager only).

None
timeout float | None

Per-poll I/O ceiling.

None
batch_size int

Sink flush threshold in samples.

64
flush_interval float

Sink flush interval in seconds.

1.0
backend str

AnyIO backend for the private portal (ignored if portal given).

'asyncio'
portal SyncPortal | None

Reuse an existing portal instead of starting one.

None

Returns:

Type Description
AcquisitionSummary

The sink-side :class:AcquisitionSummary.

Source code in src/servomexlib/sync/recording.py
def record_to_sink(
    source: PollSource,
    sink: SampleSink,
    *,
    rate_hz: float,
    duration: float,
    names: Sequence[str] | None = None,
    timeout: float | None = None,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    backend: str = "asyncio",
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Record ``source`` into ``sink`` for ``duration`` seconds, blocking until done.

    Args:
        source: An async :class:`PollSource` (an ``Analyzer`` or ``ServomexManager``).
        sink: An async :class:`SampleSink`; opened and closed by this call.
        rate_hz: Poll cadence.
        duration: Acquisition seconds (required — the call blocks until it elapses).
        names: Subset of device names (manager only).
        timeout: Per-poll I/O ceiling.
        batch_size: Sink flush threshold in samples.
        flush_interval: Sink flush interval in seconds.
        backend: AnyIO backend for the private portal (ignored if ``portal`` given).
        portal: Reuse an existing portal instead of starting one.

    Returns:
        The sink-side :class:`AcquisitionSummary`.
    """
    owns_portal = portal is None
    active = portal if portal is not None else SyncPortal(backend=backend)
    if owns_portal:
        active.__enter__()
    try:
        return active.call(
            _run,
            source,
            sink,
            rate_hz,
            duration,
            names,
            timeout,
            batch_size,
            flush_interval,
        )
    finally:
        if owns_portal:
            active.__exit__(None, None, None)

sync_csv_sink

sync_csv_sink(path, *, portal=None)

A blocking CSV sink.

Source code in src/servomexlib/sync/sinks.py
def sync_csv_sink(path: str | Path, *, portal: SyncPortal | None = None) -> SyncSink:
    """A blocking CSV sink."""
    from servomexlib.sinks import CsvSink  # noqa: PLC0415

    return SyncSink(CsvSink(Path(path)), portal=portal)

sync_jsonl_sink

sync_jsonl_sink(path, *, portal=None)

A blocking JSONL sink.

Source code in src/servomexlib/sync/sinks.py
def sync_jsonl_sink(path: str | Path, *, portal: SyncPortal | None = None) -> SyncSink:
    """A blocking JSONL sink."""
    from servomexlib.sinks import JsonlSink  # noqa: PLC0415

    return SyncSink(JsonlSink(Path(path)), portal=portal)

sync_sqlite_sink

sync_sqlite_sink(path, *, table='samples', portal=None)

A blocking SQLite sink.

Source code in src/servomexlib/sync/sinks.py
def sync_sqlite_sink(
    path: str | Path, *, table: str = "samples", portal: SyncPortal | None = None
) -> SyncSink:
    """A blocking SQLite sink."""
    from servomexlib.sinks import SqliteSink  # noqa: PLC0415

    return SyncSink(SqliteSink(Path(path), table=table), portal=portal)