Skip to content

watlowlib.sync

Sync facade over the async core. Every async method has a sync parity routed through a SyncPortal (an anyio.from_thread.BlockingPortal). See Sync quickstart.

Public surface

watlowlib.sync

Sync facade — blocking wrappers over the async core.

The sync surface targets scripts, notebooks, and REPL use. The async core remains canonical; every sync facade routes coroutines through a :class:SyncPortal (an :class:anyio.from_thread.BlockingPortal wrapper) so the event loop runs on a background thread.

What ships here:

  • :class:SyncPortal — single dispatch primitive used by the rest of the sync facade.
  • :class:Watlow / :class:SyncController — sync mirror of :class:~watlowlib.devices.controller.Controller.
  • :class:SyncWatlowManager — sync mirror of :class:~watlowlib.manager.WatlowManager.
  • :func:record / :func:pipe — sync mirrors of the streaming primitives.
  • :class:SyncSinkAdapter + per-sink wrappers (SyncCsvSink, SyncJsonlSink, SyncSqliteSink, SyncInMemorySink, SyncParquetSink, SyncPostgresSink).

Design reference: docs/design.md §6 (sync portal).

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/watlowlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncController

SyncController(controller, portal)

Blocking facade over :class:watlowlib.devices.controller.Controller.

Instances are produced by :meth:Watlow.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/watlowlib/sync/controller.py
def __init__(self, controller: Controller, portal: SyncPortal) -> None:
    self._ctl = controller
    self._portal = portal

loops property

loops

Cached loop count — passes through :attr:Controller.loops.

portal property

portal

The :class:SyncPortal this controller routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

close

close()

Blocking :meth:Controller.aclose. Idempotent.

Source code in src/watlowlib/sync/controller.py
def close(self) -> None:
    """Blocking :meth:`Controller.aclose`. Idempotent."""
    if not self._portal.running:
        return
    self._portal.call(self._ctl.aclose)

identify

identify(*, timeout=None)

Blocking :meth:Controller.identify.

Source code in src/watlowlib/sync/controller.py
def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Blocking :meth:`Controller.identify`."""
    return self._portal.call(self._ctl.identify, timeout=timeout)

loop

loop(n)

Return a sync sub-facade bound to loop n (1-indexed).

Source code in src/watlowlib/sync/controller.py
def loop(self, n: int) -> SyncControllerLoop:
    """Return a sync sub-facade bound to loop ``n`` (1-indexed)."""
    return SyncControllerLoop(self._ctl.loop(n), self._portal)

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:Controller.poll.

Source code in src/watlowlib/sync/controller.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`Controller.poll`."""
    return self._portal.call(
        self._ctl.poll,
        parameters,
        names=names,
        instances=instances,
    )

read_parameter

read_parameter(name_or_id, *, instance=1, timeout=None)

Blocking :meth:Controller.read_parameter.

Source code in src/watlowlib/sync/controller.py
def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.read_parameter`."""
    return self._portal.call(
        self._ctl.read_parameter,
        name_or_id,
        instance=instance,
        timeout=timeout,
    )

read_pv

read_pv(*, instance=1, timeout=None)

Blocking :meth:Controller.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_pv`."""
    return self._portal.call(self._ctl.read_pv, instance=instance, timeout=timeout)

read_setpoint

read_setpoint(*, instance=1, timeout=None)

Blocking :meth:Controller.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_setpoint`."""
    return self._portal.call(self._ctl.read_setpoint, instance=instance, timeout=timeout)

set_setpoint

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Blocking :meth:Controller.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`Controller.set_setpoint`."""
    return self._portal.call(
        self._ctl.set_setpoint,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

write_parameter

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Blocking :meth:Controller.write_parameter.

Source code in src/watlowlib/sync/controller.py
def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.write_parameter`."""
    return self._portal.call(
        self._ctl.write_parameter,
        name_or_id,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

SyncControllerLoop

SyncControllerLoop(async_loop, portal)

Blocking view over a single control loop (mirror of :class:ControllerLoop).

Returned by :meth:SyncController.loop; never instantiated directly. Lifetime is bound to the parent :class:SyncController and its portal — closing the controller is the only cleanup needed.

Source code in src/watlowlib/sync/controller.py
def __init__(self, async_loop: ControllerLoop, portal: SyncPortal) -> None:
    self._loop = async_loop
    self._portal = portal

number property

number

The 1-indexed loop number this view binds.

read_alarms

read_alarms()

Blocking :meth:ControllerLoop.read_alarms.

Source code in src/watlowlib/sync/controller.py
def read_alarms(self) -> AlarmState:
    """Blocking :meth:`ControllerLoop.read_alarms`."""
    return self._portal.call(self._loop.read_alarms)

read_output

read_output()

Blocking :meth:ControllerLoop.read_output.

Source code in src/watlowlib/sync/controller.py
def read_output(self) -> Reading:
    """Blocking :meth:`ControllerLoop.read_output`."""
    return self._portal.call(self._loop.read_output)

read_pid

read_pid()

Blocking :meth:ControllerLoop.read_pid.

Source code in src/watlowlib/sync/controller.py
def read_pid(self) -> PidGains:
    """Blocking :meth:`ControllerLoop.read_pid`."""
    return self._portal.call(self._loop.read_pid)

read_pv

read_pv(*, timeout=None)

Blocking :meth:ControllerLoop.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_pv`."""
    return self._portal.call(self._loop.read_pv, timeout=timeout)

read_setpoint

read_setpoint(*, timeout=None)

Blocking :meth:ControllerLoop.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_setpoint`."""
    return self._portal.call(self._loop.read_setpoint, timeout=timeout)

set_setpoint

set_setpoint(value, *, confirm=False, timeout=None)

Blocking :meth:ControllerLoop.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`ControllerLoop.set_setpoint`."""
    return self._portal.call(
        self._loop.set_setpoint,
        value,
        confirm=confirm,
        timeout=timeout,
    )

write_pid

write_pid(gains, *, confirm=False)

Blocking :meth:ControllerLoop.write_pid.

Source code in src/watlowlib/sync/controller.py
def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Blocking :meth:`ControllerLoop.write_pid`."""
    return self._portal.call(self._loop.write_pid, gains, confirm=confirm)

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.

Requires the watlowlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/watlowlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/watlowlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/watlowlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/watlowlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.

Requires the watlowlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )

SyncWatlowManager

SyncWatlowManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:watlowlib.manager.WatlowManager.

Source code in src/watlowlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: WatlowManager | None = None
    self._wrapped: dict[str, SyncController] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    family=ControllerFamily.UNKNOWN,
)

Blocking :meth:WatlowManager.add.

Accepts a :class:SyncController as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Controller before delegation.

Source code in src/watlowlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncController | Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    family: ControllerFamily = ControllerFamily.UNKNOWN,
) -> SyncController:
    """Blocking :meth:`WatlowManager.add`.

    Accepts a :class:`SyncController` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Controller` before delegation.
    """
    mgr = self._require_mgr()
    async_source: Controller | str | Transport = unwrap_sync_controller(source)
    async_controller = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=protocol,
        address=address,
        serial_settings=serial_settings,
        family=family,
    )
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:WatlowManager.close — idempotent.

Source code in src/watlowlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`WatlowManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute_each

execute_each(op, names=None)

Blocking :meth:WatlowManager.execute_each.

op receives the async :class:Controller so existing coroutines compose. If you have a sync helper, wrap it in an async stub or run it on the portal yourself.

Source code in src/watlowlib/sync/manager.py
def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Blocking :meth:`WatlowManager.execute_each`.

    ``op`` receives the **async** :class:`Controller` so existing
    coroutines compose. If you have a sync helper, wrap it in an
    async stub or run it on the portal yourself.
    """
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute_each, op, names)

get

get(name)

Return the sync wrapper for the controller registered under name.

Source code in src/watlowlib/sync/manager.py
def get(self, name: str) -> SyncController:
    """Return the sync wrapper for the controller registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_controller = mgr.get(name)
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:WatlowManager.poll.

Source code in src/watlowlib/sync/manager.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`WatlowManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.poll,
        parameters,
        names=names,
        instances=instances,
    )

record_to_sink

record_to_sink(
    *,
    parameters,
    rate_hz,
    duration=None,
    sink,
    names=None,
    instances=(1,),
    overflow=None,
    buffer_size=64,
    batch_size=64,
    flush_interval=1.0,
)

Record polled samples directly into a sink — one-call convenience.

Combines :func:watlowlib.sync.record and :func:watlowlib.sync.pipe into a single blocking call. The manager's portal is reused for both legs so the recorder and the sink share an event loop. sink may be either a :class:SyncSinkAdapter (preferred, opened externally) or a bare async :class:SampleSink — in the latter case this method opens the sink against the manager's portal and closes it after the recording finishes.

Returns the :class:AcquisitionSummary from :func:watlowlib.sync.pipe.

Source code in src/watlowlib/sync/manager.py
def record_to_sink(
    self,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    sink: SyncSinkAdapter | SampleSink,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 64,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    """Record polled samples directly into a sink — one-call convenience.

    Combines :func:`watlowlib.sync.record` and
    :func:`watlowlib.sync.pipe` into a single blocking call. The
    manager's portal is reused for both legs so the recorder and
    the sink share an event loop. ``sink`` may be either a
    :class:`SyncSinkAdapter` (preferred, opened externally) or a
    bare async :class:`SampleSink` — in the latter case this
    method opens the sink against the manager's portal and closes
    it after the recording finishes.

    Returns the :class:`AcquisitionSummary` from
    :func:`watlowlib.sync.pipe`.
    """
    # Lazy imports — sink machinery pulls heavy deps (anyio sink
    # primitives, sqlite, etc.) and we want the surface importable
    # without that until the user reaches for streaming.
    from watlowlib.streaming.recorder import OverflowPolicy as _OverflowPolicy  # noqa: PLC0415
    from watlowlib.sync.recording import pipe, record  # noqa: PLC0415
    from watlowlib.sync.sinks import SyncSinkAdapter  # noqa: PLC0415

    self._require_mgr()
    active_overflow = overflow if overflow is not None else _OverflowPolicy.BLOCK
    portal = self.portal

    with ExitStack() as stack:
        sink_for_pipe: SyncSinkAdapter | SampleSink
        if isinstance(sink, SyncSinkAdapter):
            # Caller-owned sync wrapper — no open / close here.
            sink_for_pipe = sink
        else:
            # Bare async sink — wrap on this manager's portal so it
            # shares the recorder's event loop, and own the
            # open/close lifecycle through the ExitStack.
            wrapped = SyncSinkAdapter(sink, portal=portal)
            stack.enter_context(wrapped)
            sink_for_pipe = wrapped

        stream = stack.enter_context(
            record(
                self,
                parameters=parameters,
                rate_hz=rate_hz,
                duration=duration,
                names=names,
                instances=instances,
                overflow=active_overflow,
                buffer_size=buffer_size,
                portal=portal,
            ),
        )
        return pipe(
            stream,
            sink_for_pipe,
            batch_size=batch_size,
            flush_interval=flush_interval,
            portal=portal,
        )

remove

remove(name)

Blocking :meth:WatlowManager.remove.

Source code in src/watlowlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`WatlowManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

Watlow

Namespace for the sync controller entry point.

Use :meth:Watlow.open as a context manager::

from watlowlib.sync import Watlow

with Watlow.open("/dev/ttyUSB0") as ctl:
    print(ctl.read_pv())

open staticmethod

open(
    port,
    *,
    protocol=None,
    address=1,
    serial_settings=None,
    portal=None,
)

Open a sync :class:SyncController scoped to a with block.

Mirrors :func:watlowlib.open_device parameter-for-parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

Source code in src/watlowlib/sync/controller.py
@staticmethod
@contextmanager
def open(
    port: str,
    *,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    portal: SyncPortal | None = None,
) -> Generator[SyncController]:
    """Open a sync :class:`SyncController` scoped to a ``with`` block.

    Mirrors :func:`watlowlib.open_device` parameter-for-parameter
    (modulo the portal plumbing). The sync CM drives the async
    factory through a :class:`SyncPortal`; the portal is created
    per-call unless one is passed in via ``portal=``.
    """
    effective_protocol = protocol if protocol is not None else ProtocolKind.STDBUS

    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        controller = active_portal.call(
            open_device,
            port,
            protocol=effective_protocol,
            address=address,
            serial_settings=serial_settings,
        )
        # ``open_device`` returns a controller that may or may not
        # already be open: AUTO returned by the detector is open;
        # STDBUS / MODBUS_RTU need ``__aenter__`` to run open().
        # ``Controller.__aenter__`` short-circuits when already open
        # and returns ``self``; calling it through the portal here
        # gives us the same lifecycle as ``async with`` does.
        active_portal.call(controller.__aenter__)
        try:
            yield wrap_controller(controller, active_portal)
        finally:
            # Close the underlying transport through the portal.
            active_portal.call(controller.aclose)

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:watlowlib.sinks.pipe.

Source code in src/watlowlib/sync/recording.py
def pipe(
    stream: Iterator[Sequence[Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`watlowlib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch)
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

record

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:watlowlib.streaming.record.

If source is a :class:SyncWatlowManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/watlowlib/sync/recording.py
@contextmanager
def record(
    source: SyncWatlowManager | PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Sequence[Sample]]]:
    """Sync :func:`watlowlib.streaming.record`.

    If ``source`` is a :class:`SyncWatlowManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            parameters=parameters,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            instances=instances,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Portal

watlowlib.sync.portal

Blocking portal primitive — sync access to the async core.

:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal so the rest of the sync facade (controller, manager, recording, sinks) can share one dispatch primitive.

Shape:

  • Lifecycle is a plain with block. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises.
  • call(func, *args, **kwargs) runs a coroutine. kwargs are bound through :func:functools.partial because :meth:anyio.from_thread.BlockingPortal.call only accepts positional arguments.
  • Single-member :class:ExceptionGroup s are unwrapped. The async core runs inside task groups (manager, recorder), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~watlowlib.errors.WatlowError subclass they branch on. Aggregates with two or more exceptions stay as :class:ExceptionGroup, so sync callers under manager ErrorPolicy.RAISE handle one-failure and multi-failure cases with different exception shapes.
  • wrap_async_context_manager delegates to the portal's helper.
  • wrap_async_iter bridges async iteration. The returned :class:SyncAsyncIterator is both iterable and closeable.

Design reference: docs/design.md §6.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/watlowlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/watlowlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/watlowlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/watlowlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/watlowlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Controller

watlowlib.sync.controller

Sync controller facade — portal-driven wrapper over :class:Controller.

Each :class:SyncController holds a reference to an async :class:~watlowlib.devices.controller.Controller and a :class:~watlowlib.sync.portal.SyncPortal; every public method is a one-liner that hands the underlying coroutine to the portal.

The :class:Watlow namespace exposes a Watlow.open(...) context manager that drives the async :func:~watlowlib.devices.factory.open_device through the portal.

Design reference: docs/design.md §6.

SyncController

SyncController(controller, portal)

Blocking facade over :class:watlowlib.devices.controller.Controller.

Instances are produced by :meth:Watlow.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/watlowlib/sync/controller.py
def __init__(self, controller: Controller, portal: SyncPortal) -> None:
    self._ctl = controller
    self._portal = portal

loops property

loops

Cached loop count — passes through :attr:Controller.loops.

portal property

portal

The :class:SyncPortal this controller routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

close

close()

Blocking :meth:Controller.aclose. Idempotent.

Source code in src/watlowlib/sync/controller.py
def close(self) -> None:
    """Blocking :meth:`Controller.aclose`. Idempotent."""
    if not self._portal.running:
        return
    self._portal.call(self._ctl.aclose)

identify

identify(*, timeout=None)

Blocking :meth:Controller.identify.

Source code in src/watlowlib/sync/controller.py
def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Blocking :meth:`Controller.identify`."""
    return self._portal.call(self._ctl.identify, timeout=timeout)

loop

loop(n)

Return a sync sub-facade bound to loop n (1-indexed).

Source code in src/watlowlib/sync/controller.py
def loop(self, n: int) -> SyncControllerLoop:
    """Return a sync sub-facade bound to loop ``n`` (1-indexed)."""
    return SyncControllerLoop(self._ctl.loop(n), self._portal)

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:Controller.poll.

Source code in src/watlowlib/sync/controller.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`Controller.poll`."""
    return self._portal.call(
        self._ctl.poll,
        parameters,
        names=names,
        instances=instances,
    )

read_parameter

read_parameter(name_or_id, *, instance=1, timeout=None)

Blocking :meth:Controller.read_parameter.

Source code in src/watlowlib/sync/controller.py
def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.read_parameter`."""
    return self._portal.call(
        self._ctl.read_parameter,
        name_or_id,
        instance=instance,
        timeout=timeout,
    )

read_pv

read_pv(*, instance=1, timeout=None)

Blocking :meth:Controller.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_pv`."""
    return self._portal.call(self._ctl.read_pv, instance=instance, timeout=timeout)

read_setpoint

read_setpoint(*, instance=1, timeout=None)

Blocking :meth:Controller.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_setpoint`."""
    return self._portal.call(self._ctl.read_setpoint, instance=instance, timeout=timeout)

set_setpoint

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Blocking :meth:Controller.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`Controller.set_setpoint`."""
    return self._portal.call(
        self._ctl.set_setpoint,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

write_parameter

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Blocking :meth:Controller.write_parameter.

Source code in src/watlowlib/sync/controller.py
def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.write_parameter`."""
    return self._portal.call(
        self._ctl.write_parameter,
        name_or_id,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

SyncControllerLoop

SyncControllerLoop(async_loop, portal)

Blocking view over a single control loop (mirror of :class:ControllerLoop).

Returned by :meth:SyncController.loop; never instantiated directly. Lifetime is bound to the parent :class:SyncController and its portal — closing the controller is the only cleanup needed.

Source code in src/watlowlib/sync/controller.py
def __init__(self, async_loop: ControllerLoop, portal: SyncPortal) -> None:
    self._loop = async_loop
    self._portal = portal

number property

number

The 1-indexed loop number this view binds.

read_alarms

read_alarms()

Blocking :meth:ControllerLoop.read_alarms.

Source code in src/watlowlib/sync/controller.py
def read_alarms(self) -> AlarmState:
    """Blocking :meth:`ControllerLoop.read_alarms`."""
    return self._portal.call(self._loop.read_alarms)

read_output

read_output()

Blocking :meth:ControllerLoop.read_output.

Source code in src/watlowlib/sync/controller.py
def read_output(self) -> Reading:
    """Blocking :meth:`ControllerLoop.read_output`."""
    return self._portal.call(self._loop.read_output)

read_pid

read_pid()

Blocking :meth:ControllerLoop.read_pid.

Source code in src/watlowlib/sync/controller.py
def read_pid(self) -> PidGains:
    """Blocking :meth:`ControllerLoop.read_pid`."""
    return self._portal.call(self._loop.read_pid)

read_pv

read_pv(*, timeout=None)

Blocking :meth:ControllerLoop.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_pv`."""
    return self._portal.call(self._loop.read_pv, timeout=timeout)

read_setpoint

read_setpoint(*, timeout=None)

Blocking :meth:ControllerLoop.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_setpoint`."""
    return self._portal.call(self._loop.read_setpoint, timeout=timeout)

set_setpoint

set_setpoint(value, *, confirm=False, timeout=None)

Blocking :meth:ControllerLoop.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`ControllerLoop.set_setpoint`."""
    return self._portal.call(
        self._loop.set_setpoint,
        value,
        confirm=confirm,
        timeout=timeout,
    )

write_pid

write_pid(gains, *, confirm=False)

Blocking :meth:ControllerLoop.write_pid.

Source code in src/watlowlib/sync/controller.py
def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Blocking :meth:`ControllerLoop.write_pid`."""
    return self._portal.call(self._loop.write_pid, gains, confirm=confirm)

Watlow

Namespace for the sync controller entry point.

Use :meth:Watlow.open as a context manager::

from watlowlib.sync import Watlow

with Watlow.open("/dev/ttyUSB0") as ctl:
    print(ctl.read_pv())

open staticmethod

open(
    port,
    *,
    protocol=None,
    address=1,
    serial_settings=None,
    portal=None,
)

Open a sync :class:SyncController scoped to a with block.

Mirrors :func:watlowlib.open_device parameter-for-parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

Source code in src/watlowlib/sync/controller.py
@staticmethod
@contextmanager
def open(
    port: str,
    *,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    portal: SyncPortal | None = None,
) -> Generator[SyncController]:
    """Open a sync :class:`SyncController` scoped to a ``with`` block.

    Mirrors :func:`watlowlib.open_device` parameter-for-parameter
    (modulo the portal plumbing). The sync CM drives the async
    factory through a :class:`SyncPortal`; the portal is created
    per-call unless one is passed in via ``portal=``.
    """
    effective_protocol = protocol if protocol is not None else ProtocolKind.STDBUS

    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        controller = active_portal.call(
            open_device,
            port,
            protocol=effective_protocol,
            address=address,
            serial_settings=serial_settings,
        )
        # ``open_device`` returns a controller that may or may not
        # already be open: AUTO returned by the detector is open;
        # STDBUS / MODBUS_RTU need ``__aenter__`` to run open().
        # ``Controller.__aenter__`` short-circuits when already open
        # and returns ``self``; calling it through the portal here
        # gives us the same lifecycle as ``async with`` does.
        active_portal.call(controller.__aenter__)
        try:
            yield wrap_controller(controller, active_portal)
        finally:
            # Close the underlying transport through the portal.
            active_portal.call(controller.aclose)

unwrap_sync_controller

unwrap_sync_controller(source)

Return the async :class:Controller inside source if wrapped.

Package-private helper used by :class:SyncWatlowManager.

Source code in src/watlowlib/sync/controller.py
def unwrap_sync_controller[T](source: T | SyncController) -> T | Controller:
    """Return the async :class:`Controller` inside ``source`` if wrapped.

    Package-private helper used by :class:`SyncWatlowManager`.
    """
    if isinstance(source, SyncController):
        return source._ctl  # pyright: ignore[reportPrivateUsage]
    return source

wrap_controller

wrap_controller(controller, portal)

Return a :class:SyncController wrapping controller on portal.

Package-private helper used by :class:SyncWatlowManager.

Source code in src/watlowlib/sync/controller.py
def wrap_controller(controller: Controller, portal: SyncPortal) -> SyncController:
    """Return a :class:`SyncController` wrapping ``controller`` on ``portal``.

    Package-private helper used by :class:`SyncWatlowManager`.
    """
    return SyncController(controller, portal)

Manager

watlowlib.sync.manager

Sync manager facade — portal-driven wrapper over :class:WatlowManager.

:class:SyncWatlowManager wraps the async :class:~watlowlib.manager.WatlowManager through a :class:~watlowlib.sync.portal.SyncPortal. Every coroutine method becomes a blocking method here; the synchronous :meth:get stays synchronous and delegates directly.

Lifecycle mirrors the async side: the class is a with context manager. By default each instance owns its own portal; callers that need several facades to share one event loop can pass portal= to reuse a long-lived :class:SyncPortal.

Design reference: docs/design.md §6.

DeviceResult dataclass

DeviceResult(value, error, protocol=None)

Per-device result container — value or error, never both.

:attr:protocol is populated from the controller's session so error rows from the streaming layer can still record which protocol produced the failure.

ok property

ok

True when the controller produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every controller's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each controller produces a :class:DeviceResult and the caller inspects .error per entry.

SyncWatlowManager

SyncWatlowManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:watlowlib.manager.WatlowManager.

Source code in src/watlowlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: WatlowManager | None = None
    self._wrapped: dict[str, SyncController] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    family=ControllerFamily.UNKNOWN,
)

Blocking :meth:WatlowManager.add.

Accepts a :class:SyncController as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Controller before delegation.

Source code in src/watlowlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncController | Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    family: ControllerFamily = ControllerFamily.UNKNOWN,
) -> SyncController:
    """Blocking :meth:`WatlowManager.add`.

    Accepts a :class:`SyncController` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Controller` before delegation.
    """
    mgr = self._require_mgr()
    async_source: Controller | str | Transport = unwrap_sync_controller(source)
    async_controller = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=protocol,
        address=address,
        serial_settings=serial_settings,
        family=family,
    )
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:WatlowManager.close — idempotent.

Source code in src/watlowlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`WatlowManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute_each

execute_each(op, names=None)

Blocking :meth:WatlowManager.execute_each.

op receives the async :class:Controller so existing coroutines compose. If you have a sync helper, wrap it in an async stub or run it on the portal yourself.

Source code in src/watlowlib/sync/manager.py
def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Blocking :meth:`WatlowManager.execute_each`.

    ``op`` receives the **async** :class:`Controller` so existing
    coroutines compose. If you have a sync helper, wrap it in an
    async stub or run it on the portal yourself.
    """
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute_each, op, names)

get

get(name)

Return the sync wrapper for the controller registered under name.

Source code in src/watlowlib/sync/manager.py
def get(self, name: str) -> SyncController:
    """Return the sync wrapper for the controller registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_controller = mgr.get(name)
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:WatlowManager.poll.

Source code in src/watlowlib/sync/manager.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`WatlowManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.poll,
        parameters,
        names=names,
        instances=instances,
    )

record_to_sink

record_to_sink(
    *,
    parameters,
    rate_hz,
    duration=None,
    sink,
    names=None,
    instances=(1,),
    overflow=None,
    buffer_size=64,
    batch_size=64,
    flush_interval=1.0,
)

Record polled samples directly into a sink — one-call convenience.

Combines :func:watlowlib.sync.record and :func:watlowlib.sync.pipe into a single blocking call. The manager's portal is reused for both legs so the recorder and the sink share an event loop. sink may be either a :class:SyncSinkAdapter (preferred, opened externally) or a bare async :class:SampleSink — in the latter case this method opens the sink against the manager's portal and closes it after the recording finishes.

Returns the :class:AcquisitionSummary from :func:watlowlib.sync.pipe.

Source code in src/watlowlib/sync/manager.py
def record_to_sink(
    self,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    sink: SyncSinkAdapter | SampleSink,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 64,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    """Record polled samples directly into a sink — one-call convenience.

    Combines :func:`watlowlib.sync.record` and
    :func:`watlowlib.sync.pipe` into a single blocking call. The
    manager's portal is reused for both legs so the recorder and
    the sink share an event loop. ``sink`` may be either a
    :class:`SyncSinkAdapter` (preferred, opened externally) or a
    bare async :class:`SampleSink` — in the latter case this
    method opens the sink against the manager's portal and closes
    it after the recording finishes.

    Returns the :class:`AcquisitionSummary` from
    :func:`watlowlib.sync.pipe`.
    """
    # Lazy imports — sink machinery pulls heavy deps (anyio sink
    # primitives, sqlite, etc.) and we want the surface importable
    # without that until the user reaches for streaming.
    from watlowlib.streaming.recorder import OverflowPolicy as _OverflowPolicy  # noqa: PLC0415
    from watlowlib.sync.recording import pipe, record  # noqa: PLC0415
    from watlowlib.sync.sinks import SyncSinkAdapter  # noqa: PLC0415

    self._require_mgr()
    active_overflow = overflow if overflow is not None else _OverflowPolicy.BLOCK
    portal = self.portal

    with ExitStack() as stack:
        sink_for_pipe: SyncSinkAdapter | SampleSink
        if isinstance(sink, SyncSinkAdapter):
            # Caller-owned sync wrapper — no open / close here.
            sink_for_pipe = sink
        else:
            # Bare async sink — wrap on this manager's portal so it
            # shares the recorder's event loop, and own the
            # open/close lifecycle through the ExitStack.
            wrapped = SyncSinkAdapter(sink, portal=portal)
            stack.enter_context(wrapped)
            sink_for_pipe = wrapped

        stream = stack.enter_context(
            record(
                self,
                parameters=parameters,
                rate_hz=rate_hz,
                duration=duration,
                names=names,
                instances=instances,
                overflow=active_overflow,
                buffer_size=buffer_size,
                portal=portal,
            ),
        )
        return pipe(
            stream,
            sink_for_pipe,
            batch_size=batch_size,
            flush_interval=flush_interval,
            portal=portal,
        )

remove

remove(name)

Blocking :meth:WatlowManager.remove.

Source code in src/watlowlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`WatlowManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

Recording

watlowlib.sync.recording

Sync wrappers for :func:watlowlib.streaming.record and :func:watlowlib.sinks.pipe.

:func:record — sync context manager wrapping the async recorder. The produced iterator is blocking; on CM exit the underlying async task group is cancelled and joined by the portal.

:func:pipe — sync drain loop matching :func:watlowlib.sinks.pipe's batch / time flush semantics. Rebuilt in sync-land rather than wrapping the async driver so buffering stays under sync control and the time threshold uses :func:time.monotonic, not :func:anyio.current_time.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    disconnects=0,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:watlowlib.sinks.pipe.

Source code in src/watlowlib/sync/recording.py
def pipe(
    stream: Iterator[Sequence[Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`watlowlib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch)
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

record

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:watlowlib.streaming.record.

If source is a :class:SyncWatlowManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/watlowlib/sync/recording.py
@contextmanager
def record(
    source: SyncWatlowManager | PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Sequence[Sample]]]:
    """Sync :func:`watlowlib.streaming.record`.

    If ``source`` is a :class:`SyncWatlowManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            parameters=parameters,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            instances=instances,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

Sinks

watlowlib.sync.sinks

Sync wrappers for :mod:watlowlib.sinks.

Every in-tree sink has a one-to-one sync counterpart. All of them share :class:SyncSinkAdapter: the per-sink subclass only constructs the matching async sink with its own parameters and hands it to the adapter, which owns the portal + open/write/close plumbing.

Sinks follow the same portal-ownership pattern as the rest of the sync facade — each wrapper creates a throwaway :class:SyncPortal on __enter__ unless the caller passes one in. Pass a shared portal when the sink must share an event loop with a :class:SyncWatlowManager or :func:record, otherwise the sink's writes run on a different loop than the data producer.

Design reference: docs/design.md §6.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/watlowlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.

Requires the watlowlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.

Requires the watlowlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSampleSink

Bases: Protocol

Sync shape of an acquisition sink.

Mirrors :class:~watlowlib.sinks.base.SampleSink — same method names, no await. Every concrete wrapper in this module satisfies this Protocol.

__enter__

__enter__()

Open the sink and return self.

Source code in src/watlowlib/sync/sinks.py
def __enter__(self) -> Self:
    """Open the sink and return self."""
    ...

__exit__

__exit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/watlowlib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close

close()

Flush and release the backing resource — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Flush and release the backing resource — idempotent."""
    ...

open

open()

Allocate the sink's backing resource.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_many

write_many(samples)

Append samples to the sink.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )