Synchronous facade for :mod:servomexlib.
Wraps the async core behind an anyio blocking portal so scripts, notebooks,
and the REPL can use the library without async/await::
from servomexlib.sync import Servomex
with Servomex.open("COM11", protocol="continuous") as anz:
print(anz.poll())
:class:SyncManager mirrors :class:~servomexlib.manager.ServomexManager,
:class:SyncSink mirrors the sinks, and :func:record_to_sink runs a blocking
acquisition into a sink.
Servomex
Entry namespace for the sync facade — :meth:open mirrors open_device.
SyncAnalyzer
SyncAnalyzer(
portal, analyzer, managed, *, owns_portal=True
)
Blocking mirror of :class:~servomexlib.devices.analyzer.Analyzer.
Source code in src/servomexlib/sync/analyzer.py
| def __init__(
self,
portal: SyncPortal,
analyzer: Analyzer,
managed: AbstractContextManager[Analyzer],
*,
owns_portal: bool = True,
) -> None:
self._portal = portal
self._analyzer = analyzer
self._managed = managed
self._owns_portal = owns_portal
|
capabilities
property
The active client's capability set.
dropped_frames
property
Count of frames dropped for parse/checksum failures.
info
property
The cached :class:DeviceInfo, if identify has run.
protocol
property
The active wire protocol.
analyser_status
analyser_status(*, timeout=None)
Return the analyser-level status.
Source code in src/servomexlib/sync/analyzer.py
| def analyser_status(self, *, timeout: float | None = None) -> AnalyserStatus:
"""Return the analyser-level status."""
return self._portal.call(self._analyzer.analyser_status, timeout=timeout)
|
calibration_status
calibration_status(group=1, *, timeout=None)
Return autocalibration progress for group (Modbus only).
Source code in src/servomexlib/sync/analyzer.py
| def calibration_status(
self, group: int = 1, *, timeout: float | None = None
) -> CalibrationProgress:
"""Return autocalibration progress for ``group`` (Modbus only)."""
return self._portal.call(self._analyzer.calibration_status, group, timeout=timeout)
|
identify
identify(*, timeout=None)
Return device identity, caching it.
Source code in src/servomexlib/sync/analyzer.py
| def identify(self, *, timeout: float | None = None) -> DeviceInfo:
"""Return device identity, caching it."""
return self._portal.call(self._analyzer.identify, timeout=timeout)
|
open
classmethod
open(
port,
*,
protocol=ProtocolKind.AUTO,
address=1,
serial_settings=None,
timeout=1.0,
identify=True,
backend="asyncio",
portal=None,
)
Open an analyser synchronously and return an entered :class:SyncAnalyzer.
Pass portal to share an existing loop (e.g. one already hosting an
in-process fake); otherwise a private background loop is started and torn
down with the analyzer.
Source code in src/servomexlib/sync/analyzer.py
| @classmethod
def open(
cls,
port: str | Transport,
*,
protocol: ProtocolKind | str = ProtocolKind.AUTO,
address: int = 1,
serial_settings: SerialSettings | None = None,
timeout: float = 1.0,
identify: bool = True,
backend: str = "asyncio",
portal: SyncPortal | None = None,
) -> SyncAnalyzer:
"""Open an analyser synchronously and return an entered :class:`SyncAnalyzer`.
Pass ``portal`` to share an existing loop (e.g. one already hosting an
in-process fake); otherwise a private background loop is started and torn
down with the analyzer.
"""
resolved = ProtocolKind(protocol) # accepts a ProtocolKind or its str value
owns_portal = portal is None
active = portal if portal is not None else SyncPortal(backend=backend)
if owns_portal:
active.__enter__()
try:
analyzer = active.call(
open_device,
port,
protocol=resolved,
address=address,
serial_settings=serial_settings,
timeout=timeout,
identify=identify,
)
# Enter via the portal's wrapper so __aenter__/__aexit__ share one task.
managed = active.wrap_async_context_manager(analyzer)
managed.__enter__()
except BaseException:
if owns_portal:
active.__exit__(None, None, None)
raise
return cls(active, analyzer, managed, owns_portal=owns_portal)
|
poll
poll(*, wait_fresh=False, timeout=None)
Return one frame (all channels + analyser status).
Source code in src/servomexlib/sync/analyzer.py
| def poll(self, *, wait_fresh: bool = False, timeout: float | None = None) -> Frame:
"""Return one frame (all channels + analyser status)."""
return self._portal.call(self._analyzer.poll, wait_fresh=wait_fresh, timeout=timeout)
|
read_all
read_all(*, timeout=None)
Return every channel's latest reading keyed by id.
Source code in src/servomexlib/sync/analyzer.py
| def read_all(self, *, timeout: float | None = None) -> dict[ChannelId, Reading]:
"""Return every channel's latest reading keyed by id."""
return self._portal.call(self._analyzer.read_all, timeout=timeout)
|
read_channel
read_channel(channel, *, timeout=None)
Return one channel's latest reading.
Source code in src/servomexlib/sync/analyzer.py
| def read_channel(self, channel: ChannelId | str, *, timeout: float | None = None) -> Reading:
"""Return one channel's latest reading."""
return self._portal.call(self._analyzer.read_channel, channel, timeout=timeout)
|
snapshot
Return the cached latest frame without any I/O.
Source code in src/servomexlib/sync/analyzer.py
| def snapshot(self) -> Frame:
"""Return the cached latest frame without any I/O."""
return self._analyzer.snapshot()
|
start_calibration
start_calibration(group, *, confirm=False, timeout=None)
Start autocalibration for group (Modbus only; requires confirm=True).
Source code in src/servomexlib/sync/analyzer.py
| def start_calibration(
self, group: int, *, confirm: bool = False, timeout: float | None = None
) -> None:
"""Start autocalibration for ``group`` (Modbus only; requires ``confirm=True``)."""
self._portal.call(self._analyzer.start_calibration, group, confirm=confirm, timeout=timeout)
|
status
status(channel, *, timeout=None)
Return one channel's latest status.
Source code in src/servomexlib/sync/analyzer.py
| def status(self, channel: ChannelId | str, *, timeout: float | None = None) -> ChannelStatus:
"""Return one channel's latest status."""
return self._portal.call(self._analyzer.status, channel, timeout=timeout)
|
stop_calibration
stop_calibration(*, confirm=False, timeout=None)
Stop all autocalibration (Modbus only; requires confirm=True).
Source code in src/servomexlib/sync/analyzer.py
| def stop_calibration(self, *, confirm: bool = False, timeout: float | None = None) -> None:
"""Stop all autocalibration (Modbus only; requires ``confirm=True``)."""
self._portal.call(self._analyzer.stop_calibration, confirm=confirm, timeout=timeout)
|
stream
stream(*, mode=None, rate_hz=None)
Return a blocking iterator over the analyser's sample stream.
Source code in src/servomexlib/sync/analyzer.py
| def stream(
self, *, mode: StreamMode | None = None, rate_hz: float | None = None
) -> SyncStreamingSession:
"""Return a blocking iterator over the analyser's sample stream."""
session = self._portal.call(_make_stream, self._analyzer, mode, rate_hz)
return SyncStreamingSession(self._portal, session)
|
SyncManager
SyncManager(
*,
error_policy=ErrorPolicy.RAISE,
backend="asyncio",
portal=None,
)
Blocking facade over :class:~servomexlib.manager.ServomexManager.
Source code in src/servomexlib/sync/manager.py
| def __init__(
self,
*,
error_policy: ErrorPolicy = ErrorPolicy.RAISE,
backend: str = "asyncio",
portal: SyncPortal | None = None,
) -> None:
self._owns_portal = portal is None
self._portal = portal if portal is not None else SyncPortal(backend=backend)
self._manager = ServomexManager(error_policy=error_policy)
|
closed
property
True once the underlying manager is closed.
names
property
Insertion-ordered tuple of managed analyser names.
add
add(
name,
source,
*,
protocol=ProtocolKind.MODBUS_RTU,
address=1,
serial_settings=None,
timeout=1.0,
)
Register an analyser; returns the underlying async :class:Analyzer.
Source code in src/servomexlib/sync/manager.py
| def add(
self,
name: str,
source: Analyzer | str | Transport,
*,
protocol: ProtocolKind | str = ProtocolKind.MODBUS_RTU,
address: int = 1,
serial_settings: SerialSettings | None = None,
timeout: float = 1.0,
) -> Analyzer:
"""Register an analyser; returns the underlying async :class:`Analyzer`."""
return self._portal.call(
self._manager.add,
name,
source,
protocol=ProtocolKind(protocol),
address=address,
serial_settings=serial_settings,
timeout=timeout,
)
|
get
Return the analyser registered under name (no I/O).
Source code in src/servomexlib/sync/manager.py
| def get(self, name: str) -> Analyzer:
"""Return the analyser registered under ``name`` (no I/O)."""
return self._manager.get(name)
|
poll
poll(names=None, *, timeout=None)
Read one :class:Frame per (or named) analyser, keyed by name.
Source code in src/servomexlib/sync/manager.py
| def poll(
self, names: Sequence[str] | None = None, *, timeout: float | None = None
) -> Mapping[str, DeviceResult[Frame]]:
"""Read one :class:`Frame` per (or named) analyser, keyed by name."""
return self._portal.call(self._manager.poll, names, timeout=timeout)
|
poll_samples
poll_samples(*, names=None, timeout=None)
Poll every (or named) analyser → flat samples.
Source code in src/servomexlib/sync/manager.py
| def poll_samples(
self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
"""Poll every (or named) analyser → flat samples."""
return self._portal.call(self._manager.poll_samples, names=names, timeout=timeout)
|
remove
Unregister name, closing the shared transport on the last ref.
Source code in src/servomexlib/sync/manager.py
| def remove(self, name: str) -> None:
"""Unregister ``name``, closing the shared transport on the last ref."""
self._portal.call(self._manager.remove, name)
|
SyncPortal
SyncPortal(*, backend='asyncio')
Owns a background event loop and marshals async calls onto it.
Source code in src/servomexlib/sync/portal.py
| def __init__(self, *, backend: str = "asyncio") -> None:
self._backend = backend
self._cm: AbstractContextManager[anyio.from_thread.BlockingPortal] | None = None
self._portal: anyio.from_thread.BlockingPortal | None = None
|
running
property
Whether the backing event loop is currently running.
call
call(func, /, *args, **kwargs)
Run func(*args, **kwargs) on the loop thread and return its result.
The portal re-raises the concrete exception the coroutine raised, so sync
callers catch the same :class:ServomexError subclasses as async ones.
Source code in src/servomexlib/sync/portal.py
| def call(self, func: Callable[..., Awaitable[T]], /, *args: object, **kwargs: object) -> T:
"""Run ``func(*args, **kwargs)`` on the loop thread and return its result.
The portal re-raises the concrete exception the coroutine raised, so sync
callers catch the same :class:`ServomexError` subclasses as async ones.
"""
target = functools.partial(func, *args, **kwargs)
return self._require_portal().call(target)
|
wrap_async_context_manager
wrap_async_context_manager(cm)
Wrap an async context manager so its enter/exit run in one portal task.
call(cm.__aenter__) then call(cm.__aexit__) would run in different
loop tasks and trip anyio's cancel-scope affinity check; this bridges them.
Source code in src/servomexlib/sync/portal.py
| def wrap_async_context_manager(
self, cm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
"""Wrap an async context manager so its enter/exit run in one portal task.
``call(cm.__aenter__)`` then ``call(cm.__aexit__)`` would run in *different*
loop tasks and trip anyio's cancel-scope affinity check; this bridges them.
"""
return self._require_portal().wrap_async_context_manager(cm)
|
SyncSink
SyncSink(sink, *, portal=None)
Blocking adapter over an async :class:SampleSink.
Source code in src/servomexlib/sync/sinks.py
| def __init__(self, sink: SampleSink, *, portal: SyncPortal | None = None) -> None:
self._sink = sink
self._owns_portal = portal is None
self._portal = portal if portal is not None else SyncPortal()
|
close
Close the backing sink.
Source code in src/servomexlib/sync/sinks.py
| def close(self) -> None:
"""Close the backing sink."""
self._portal.call(self._sink.close)
|
open
Open the backing sink.
Source code in src/servomexlib/sync/sinks.py
| def open(self) -> None:
"""Open the backing sink."""
self._portal.call(self._sink.open)
|
write_many
Append samples to the backing sink.
Source code in src/servomexlib/sync/sinks.py
| def write_many(self, samples: Sequence[Sample]) -> None:
"""Append ``samples`` to the backing sink."""
self._portal.call(self._sink.write_many, samples)
|
SyncStreamingSession
SyncStreamingSession(portal, session)
Blocking iterator over an async streaming session, marshalled via the portal.
Source code in src/servomexlib/sync/analyzer.py
| def __init__(self, portal: SyncPortal, session: object) -> None:
self._portal = portal
self._session = session
|
record_to_sink
record_to_sink(
source,
sink,
*,
rate_hz,
duration,
names=None,
timeout=None,
batch_size=64,
flush_interval=1.0,
backend="asyncio",
portal=None,
)
Record source into sink for duration seconds, blocking until done.
Parameters:
| Name |
Type |
Description |
Default |
source
|
PollSource
|
An async :class:PollSource (an Analyzer or ServomexManager).
|
required
|
sink
|
SampleSink
|
An async :class:SampleSink; opened and closed by this call.
|
required
|
rate_hz
|
float
|
|
required
|
duration
|
float
|
Acquisition seconds (required — the call blocks until it elapses).
|
required
|
names
|
Sequence[str] | None
|
Subset of device names (manager only).
|
None
|
timeout
|
float | None
|
|
None
|
batch_size
|
int
|
Sink flush threshold in samples.
|
64
|
flush_interval
|
float
|
Sink flush interval in seconds.
|
1.0
|
backend
|
str
|
AnyIO backend for the private portal (ignored if portal given).
|
'asyncio'
|
portal
|
SyncPortal | None
|
Reuse an existing portal instead of starting one.
|
None
|
Returns:
Source code in src/servomexlib/sync/recording.py
| def record_to_sink(
source: PollSource,
sink: SampleSink,
*,
rate_hz: float,
duration: float,
names: Sequence[str] | None = None,
timeout: float | None = None,
batch_size: int = 64,
flush_interval: float = 1.0,
backend: str = "asyncio",
portal: SyncPortal | None = None,
) -> AcquisitionSummary:
"""Record ``source`` into ``sink`` for ``duration`` seconds, blocking until done.
Args:
source: An async :class:`PollSource` (an ``Analyzer`` or ``ServomexManager``).
sink: An async :class:`SampleSink`; opened and closed by this call.
rate_hz: Poll cadence.
duration: Acquisition seconds (required — the call blocks until it elapses).
names: Subset of device names (manager only).
timeout: Per-poll I/O ceiling.
batch_size: Sink flush threshold in samples.
flush_interval: Sink flush interval in seconds.
backend: AnyIO backend for the private portal (ignored if ``portal`` given).
portal: Reuse an existing portal instead of starting one.
Returns:
The sink-side :class:`AcquisitionSummary`.
"""
owns_portal = portal is None
active = portal if portal is not None else SyncPortal(backend=backend)
if owns_portal:
active.__enter__()
try:
return active.call(
_run,
source,
sink,
rate_hz,
duration,
names,
timeout,
batch_size,
flush_interval,
)
finally:
if owns_portal:
active.__exit__(None, None, None)
|
sync_csv_sink
sync_csv_sink(path, *, portal=None)
A blocking CSV sink.
Source code in src/servomexlib/sync/sinks.py
| def sync_csv_sink(path: str | Path, *, portal: SyncPortal | None = None) -> SyncSink:
"""A blocking CSV sink."""
from servomexlib.sinks import CsvSink # noqa: PLC0415
return SyncSink(CsvSink(Path(path)), portal=portal)
|
sync_jsonl_sink
sync_jsonl_sink(path, *, portal=None)
A blocking JSONL sink.
Source code in src/servomexlib/sync/sinks.py
| def sync_jsonl_sink(path: str | Path, *, portal: SyncPortal | None = None) -> SyncSink:
"""A blocking JSONL sink."""
from servomexlib.sinks import JsonlSink # noqa: PLC0415
return SyncSink(JsonlSink(Path(path)), portal=portal)
|
sync_sqlite_sink
sync_sqlite_sink(path, *, table='samples', portal=None)
A blocking SQLite sink.
Source code in src/servomexlib/sync/sinks.py
| def sync_sqlite_sink(
path: str | Path, *, table: str = "samples", portal: SyncPortal | None = None
) -> SyncSink:
"""A blocking SQLite sink."""
from servomexlib.sinks import SqliteSink # noqa: PLC0415
return SyncSink(SqliteSink(Path(path), table=table), portal=portal)
|