Skip to content

alicatlib.sync

Sync facade over the async core. Every public async name has a sync parity. See Sync quickstart for usage and Design §5.16 for the portal-based architecture.

Public surface

alicatlib.sync

Sync facade over the async core.

Async is canonical; the sync facade wraps it through :class:SyncPortal so scripts, notebooks, and REPL sessions can drive devices without await.

Surfaces:

  • Device / manager — :class:Alicat, :class:SyncDevice, :class:SyncFlowMeter, :class:SyncFlowController, :class:SyncPressureMeter, :class:SyncPressureController, :class:SyncAlicatManager (+ :class:ErrorPolicy / :class:DeviceResult re-exports).
  • Recording — :func:record, :func:pipe, :class:AcquisitionSummary, :class:OverflowPolicy.
  • Sinks — :class:SyncSinkAdapter + :class:SyncInMemorySink / :class:SyncCsvSink / :class:SyncJsonlSink / :class:SyncSqliteSink / :class:SyncParquetSink / :class:SyncPostgresSink (+ :class:PostgresConfig re-export).
  • Discovery — :func:list_serial_ports, :func:probe, :func:find_devices, :class:DiscoveryResult, :data:DEFAULT_DISCOVERY_BAUDRATES.
  • Portal primitives — :class:SyncPortal, :func:run_sync.

See docs/design.md §5.16 for the design.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

Alicat

Namespace for the sync device entry point.

Use :meth:Alicat.open as a context manager:

from alicatlib.sync import Alicat with Alicat.open("/dev/ttyUSB0") as dev: # doctest: +SKIP ... print(dev.poll())

open staticmethod

open(
    port,
    *,
    unit_id="A",
    serial=None,
    timeout=0.5,
    recover_from_stream=True,
    model_hint=None,
    assume_capabilities=Capability.NONE,
    assume_media=None,
    portal=None,
)

Open a sync :class:SyncDevice scoped to a with block.

Mirrors :func:alicatlib.devices.factory.open_device parameter for parameter. The returned sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless a pre-existing one is passed in via portal=.

Passing a :class:Transport or :class:AlicatProtocolClient is advanced: the caller is responsible for ensuring the object was constructed inside the portal's event loop (or is loop-agnostic). The common case — passing a str port path — creates the transport inside the portal and avoids that concern.

Source code in src/alicatlib/sync/device.py
@staticmethod
@contextmanager
def open(
    port: str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
    recover_from_stream: bool = True,
    model_hint: str | None = None,
    assume_capabilities: Capability = Capability.NONE,
    assume_media: Medium | None = None,
    portal: SyncPortal | None = None,
) -> Generator[SyncDevice]:
    """Open a sync :class:`SyncDevice` scoped to a ``with`` block.

    Mirrors :func:`alicatlib.devices.factory.open_device` parameter
    for parameter. The returned sync CM drives the async factory
    through a :class:`SyncPortal`; the portal is created per-call
    unless a pre-existing one is passed in via ``portal=``.

    Passing a :class:`Transport` or
    :class:`AlicatProtocolClient` is advanced: the caller is
    responsible for ensuring the object was constructed inside the
    portal's event loop (or is loop-agnostic). The common case —
    passing a ``str`` port path — creates the transport inside the
    portal and avoids that concern.
    """
    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        async_cm = open_device(
            port,
            unit_id=unit_id,
            serial=serial,
            timeout=timeout,
            recover_from_stream=recover_from_stream,
            model_hint=model_hint,
            assume_capabilities=assume_capabilities,
            assume_media=assume_media,
        )
        async_device = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        yield wrap_device(async_device, active_portal)

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The union is encoded as two optional fields (rather than an Either / Result ADT) so mypy's narrowing on ok reads cleanly at call sites without pattern matching.

Attributes:

Name Type Description
value T | None

The successful result, or None if the call failed.

error AlicatError | None

The captured :class:~alicatlib.errors.AlicatError, or None if the call succeeded.

ok property

ok

True when the device produced a value (error is None).

DiscoveryResult dataclass

DiscoveryResult(port, unit_id, baudrate, info, error)

Outcome of a single :func:probe attempt.

Exactly one of :attr:info / :attr:error is populated — ok results carry a fully-identified :class:DeviceInfo, failed ones carry the typed :class:AlicatError from the identification pipeline. The :attr:ok convenience lets callers filter without hasattr.

ok property

ok

Whether identification completed successfully.

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every device's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each device produces a :class:DeviceResult and the caller inspects .error per entry.

Design reference: docs/design.md §5.13.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    connect_timeout_s=30.0,
    close_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

connect_timeout_s float

Cap on initial pool establishment in :meth:PostgresSink.open. A misconfigured DSN must not be able to wedge open() indefinitely — defaults to 30 s.

close_timeout_s float

Cap on :meth:PostgresSink.close's wait for in-flight queries to drain. Defaults to 10 s; the pool is then forcibly torn down so shutdown can't hang.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/alicatlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

SyncAlicatManager

SyncAlicatManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:alicatlib.manager.AlicatManager.

Example

with SyncAlicatManager() as mgr: # doctest: +SKIP ... mgr.add("fuel", "/dev/ttyUSB0") ... mgr.add("air", "/dev/ttyUSB1") ... frames = mgr.poll()

Parameters:

Name Type Description Default
error_policy ErrorPolicy

Forwarded to :class:AlicatManager.

RAISE
portal SyncPortal | None

Optional pre-built :class:SyncPortal to share an event-loop thread with other sync facades. Default is a per-instance portal created on __enter__.

None
Source code in src/alicatlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: AlicatManager | None = None
    self._wrapped: dict[str, SyncDevice] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed device names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

__enter__

__enter__()

Start the portal, build the async manager, enter its CM.

Source code in src/alicatlib/sync/manager.py
def __enter__(self) -> Self:
    """Start the portal, build the async manager, enter its CM."""
    if self._entered:
        raise RuntimeError("SyncAlicatManager is not reusable after exit")
    self._entered = True
    stack = ExitStack()
    try:
        portal = (
            self._portal_override
            if self._portal_override is not None
            else stack.enter_context(SyncPortal())
        )
        mgr = AlicatManager(error_policy=self._error_policy)
        stack.enter_context(portal.wrap_async_context_manager(mgr))
        self._portal = portal
        self._mgr = mgr
        self._stack = stack
    except BaseException:
        stack.close()
        self._portal = None
        self._mgr = None
        raise
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the managed devices + portal (if owned).

Source code in src/alicatlib/sync/manager.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the managed devices + portal (if owned)."""
    stack, self._stack = self._stack, None
    self._wrapped.clear()
    self._mgr = None
    self._portal = None
    if stack is not None:
        stack.__exit__(exc_type, exc, tb)

add

add(name, source, *, unit_id='A', serial=None, timeout=0.5)

Blocking :meth:AlicatManager.add.

Accepts a :class:SyncDevice as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Device before delegation, matching the manager's "pre-built device, no lifecycle ownership" contract.

Source code in src/alicatlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncDevice | Device | str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
) -> SyncDevice:
    """Blocking :meth:`AlicatManager.add`.

    Accepts a :class:`SyncDevice` as ``source`` in addition to the
    async shapes — the wrapper is unwrapped to the underlying
    :class:`Device` before delegation, matching the manager's
    "pre-built device, no lifecycle ownership" contract.
    """
    mgr = self._require_mgr()
    async_source: Device | str | Transport | AlicatProtocolClient = unwrap_sync_device(source)
    async_device = self.portal.call(
        mgr.add,
        name,
        async_source,
        unit_id=unit_id,
        serial=serial,
        timeout=timeout,
    )
    wrapped = wrap_device(async_device, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:AlicatManager.close — idempotent.

Source code in src/alicatlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`AlicatManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute

execute(command, requests_by_name)

Blocking :meth:AlicatManager.execute.

Source code in src/alicatlib/sync/manager.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Blocking :meth:`AlicatManager.execute`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute, command, requests_by_name)

get

get(name)

Return the sync wrapper for the device registered under name.

The async manager's :meth:AlicatManager.get is already synchronous — this method caches the sync wrapper so repeated get calls return the same :class:SyncDevice instance per device name.

Source code in src/alicatlib/sync/manager.py
def get(self, name: str) -> SyncDevice:
    """Return the sync wrapper for the device registered under ``name``.

    The async manager's :meth:`AlicatManager.get` is already
    synchronous — this method caches the sync wrapper so repeated
    ``get`` calls return the same :class:`SyncDevice` instance per
    device name.
    """
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_device = mgr.get(name)  # raises AlicatValidationError on unknown
    wrapped = wrap_device(async_device, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(names=None)

Blocking :meth:AlicatManager.poll.

Source code in src/alicatlib/sync/manager.py
def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[DataFrame]]:
    """Blocking :meth:`AlicatManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.poll, names)

remove

remove(name)

Blocking :meth:AlicatManager.remove.

Source code in src/alicatlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`AlicatManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

request

request(statistics, names=None, *, averaging_ms=1)

Blocking :meth:AlicatManager.request.

Source code in src/alicatlib/sync/manager.py
def request(
    self,
    statistics: Sequence[Statistic | str],
    names: Sequence[str] | None = None,
    *,
    averaging_ms: int = 1,
) -> Mapping[str, DeviceResult[MeasurementSet]]:
    """Blocking :meth:`AlicatManager.request`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.request,
        statistics,
        names,
        averaging_ms=averaging_ms,
    )

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.csv.CsvSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncDevice

SyncDevice(async_device, portal)

Blocking facade over :class:alicatlib.devices.base.Device.

Instances are produced by :meth:Alicat.open; users do not call this constructor directly. Every public method delegates to the underlying :class:Device through a :class:SyncPortal.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

info property

info

Identity snapshot — identical to :attr:Device.info.

portal property

portal

The :class:SyncPortal this device runs its coroutines on.

session property

session

Underlying async :class:Session for advanced escape-hatch use.

Calling coroutine methods on the returned session requires the caller to route them through :attr:portal.

unit_id property

unit_id

Validated single-letter unit id this device is addressed by.

__enter__

__enter__()

Nesting convenience — matches :meth:Device.__aenter__.

Source code in src/alicatlib/sync/device.py
def __enter__(self) -> Self:
    """Nesting convenience — matches :meth:`Device.__aenter__`."""
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the device on exit — session close is idempotent.

Source code in src/alicatlib/sync/device.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the device on exit — session close is idempotent."""
    del exc_type, exc, tb
    self.close()

analog_output_source

analog_output_source(
    channel=_ANALOG_OUTPUT_PRIMARY,
    value=None,
    unit_code=None,
)

Blocking :meth:Device.analog_output_source.

Default channel is :attr:AnalogOutputChannel.PRIMARY — same default as the async side, bound at module load via the module-level alias so sync-parity signature comparison sees the same sentinel object.

Source code in src/alicatlib/sync/device.py
def analog_output_source(
    self,
    channel: AnalogOutputChannel = _ANALOG_OUTPUT_PRIMARY,
    value: int | None = None,
    unit_code: int | None = None,
) -> AnalogOutputSourceSetting:
    """Blocking :meth:`Device.analog_output_source`.

    Default channel is :attr:`AnalogOutputChannel.PRIMARY` — same
    default as the async side, bound at module load via the
    module-level alias so sync-parity signature comparison sees
    the same sentinel object.
    """
    return self._portal.call(self._dev.analog_output_source, channel, value, unit_code)

average_timing

average_timing(statistic_code, averaging_ms=None)

Blocking :meth:Device.average_timing.

Source code in src/alicatlib/sync/device.py
def average_timing(
    self,
    statistic_code: int,
    averaging_ms: int | None = None,
) -> AverageTimingSetting:
    """Blocking :meth:`Device.average_timing`."""
    return self._portal.call(self._dev.average_timing, statistic_code, averaging_ms)
blink_display(duration_s=None)

Blocking :meth:Device.blink_display.

Source code in src/alicatlib/sync/device.py
def blink_display(self, duration_s: int | None = None) -> BlinkDisplayState:
    """Blocking :meth:`Device.blink_display`."""
    return self._portal.call(self._dev.blink_display, duration_s)

close

close()

Release the underlying session — idempotent.

Prefer with Alicat.open(...) as dev: over calling this by hand; the outer context manager owns transport lifecycle.

Source code in src/alicatlib/sync/device.py
def close(self) -> None:
    """Release the underlying session — idempotent.

    Prefer ``with Alicat.open(...) as dev:`` over calling this by
    hand; the outer context manager owns transport lifecycle.
    """
    self._portal.call(self._dev.close)

engineering_units

engineering_units(
    statistic,
    unit=None,
    *,
    apply_to_group=False,
    override_special_rules=False,
)

Blocking :meth:Device.engineering_units.

Source code in src/alicatlib/sync/device.py
def engineering_units(
    self,
    statistic: Statistic | str,
    unit: Unit | int | str | None = None,
    *,
    apply_to_group: bool = False,
    override_special_rules: bool = False,
) -> UnitSetting:
    """Blocking :meth:`Device.engineering_units`."""
    return self._portal.call(
        self._dev.engineering_units,
        statistic,
        unit,
        apply_to_group=apply_to_group,
        override_special_rules=override_special_rules,
    )

execute

execute(command, request)

Blocking :meth:Device.execute.

Source code in src/alicatlib/sync/device.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
) -> Resp:
    """Blocking :meth:`Device.execute`."""
    return self._portal.call(self._dev.execute, command, request)

full_scale

full_scale(statistic)

Blocking :meth:Device.full_scale.

Source code in src/alicatlib/sync/device.py
def full_scale(self, statistic: Statistic | str) -> FullScaleValue:
    """Blocking :meth:`Device.full_scale`."""
    return self._portal.call(self._dev.full_scale, statistic)

gas

gas(gas=None, *, save=None)

Blocking :meth:Device.gas.

Source code in src/alicatlib/sync/device.py
def gas(
    self,
    gas: Gas | str | None = None,
    *,
    save: bool | None = None,
) -> GasState:
    """Blocking :meth:`Device.gas`."""
    return self._portal.call(self._dev.gas, gas, save=save)

gas_list

gas_list()

Blocking :meth:Device.gas_list.

Source code in src/alicatlib/sync/device.py
def gas_list(self) -> Mapping[int, str]:
    """Blocking :meth:`Device.gas_list`."""
    return self._portal.call(self._dev.gas_list)

lock_display

lock_display()

Blocking :meth:Device.lock_display.

Source code in src/alicatlib/sync/device.py
def lock_display(self) -> DisplayLockResult:
    """Blocking :meth:`Device.lock_display`."""
    return self._portal.call(self._dev.lock_display)

poll

poll()

Blocking :meth:Device.poll.

Source code in src/alicatlib/sync/device.py
def poll(self) -> DataFrame:
    """Blocking :meth:`Device.poll`."""
    return self._portal.call(self._dev.poll)

power_up_tare

power_up_tare(enable=None)

Blocking :meth:Device.power_up_tare.

Source code in src/alicatlib/sync/device.py
def power_up_tare(self, enable: bool | None = None) -> PowerUpTareState:
    """Blocking :meth:`Device.power_up_tare`."""
    return self._portal.call(self._dev.power_up_tare, enable)

request

request(statistics, *, averaging_ms=1)

Blocking :meth:Device.request.

Source code in src/alicatlib/sync/device.py
def request(
    self,
    statistics: Sequence[Statistic | str],
    *,
    averaging_ms: int = 1,
) -> MeasurementSet:
    """Blocking :meth:`Device.request`."""
    return self._portal.call(
        self._dev.request,
        statistics,
        averaging_ms=averaging_ms,
    )

stp_ntp_pressure

stp_ntp_pressure(mode, pressure=None, unit_code=None)

Blocking :meth:Device.stp_ntp_pressure.

Source code in src/alicatlib/sync/device.py
def stp_ntp_pressure(
    self,
    mode: StpNtpMode,
    pressure: float | None = None,
    unit_code: int | None = None,
) -> StpNtpPressureSetting:
    """Blocking :meth:`Device.stp_ntp_pressure`."""
    return self._portal.call(self._dev.stp_ntp_pressure, mode, pressure, unit_code)

stp_ntp_temperature

stp_ntp_temperature(mode, temperature=None, unit_code=None)

Blocking :meth:Device.stp_ntp_temperature.

Source code in src/alicatlib/sync/device.py
def stp_ntp_temperature(
    self,
    mode: StpNtpMode,
    temperature: float | None = None,
    unit_code: int | None = None,
) -> StpNtpTemperatureSetting:
    """Blocking :meth:`Device.stp_ntp_temperature`."""
    return self._portal.call(self._dev.stp_ntp_temperature, mode, temperature, unit_code)

stream

stream(
    *,
    rate_ms=None,
    strict=False,
    overflow=None,
    buffer_size=256,
)

Blocking :meth:Device.stream — returns a sync context manager.

The returned :class:SyncStreamingSession is both a sync context manager and a sync iterator; use it as::

with sync_dev.stream(rate_ms=50) as stream:
    for frame in stream:
        process(frame)
Source code in src/alicatlib/sync/device.py
def stream(
    self,
    *,
    rate_ms: int | None = None,
    strict: bool = False,
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 256,
) -> SyncStreamingSession:
    """Blocking :meth:`Device.stream` — returns a sync context manager.

    The returned :class:`SyncStreamingSession` is both a sync
    context manager and a sync iterator; use it as::

        with sync_dev.stream(rate_ms=50) as stream:
            for frame in stream:
                process(frame)
    """
    async_stream = self._dev.stream(
        rate_ms=rate_ms,
        strict=strict,
        overflow=overflow,
        buffer_size=buffer_size,
    )
    return SyncStreamingSession(async_stream, self._portal)

tare_absolute_pressure

tare_absolute_pressure()

Blocking :meth:Device.tare_absolute_pressure.

Source code in src/alicatlib/sync/device.py
def tare_absolute_pressure(self) -> TareResult:
    """Blocking :meth:`Device.tare_absolute_pressure`."""
    return self._portal.call(self._dev.tare_absolute_pressure)

tare_flow

tare_flow()

Blocking :meth:Device.tare_flow.

Source code in src/alicatlib/sync/device.py
def tare_flow(self) -> TareResult:
    """Blocking :meth:`Device.tare_flow`."""
    return self._portal.call(self._dev.tare_flow)

tare_gauge_pressure

tare_gauge_pressure()

Blocking :meth:Device.tare_gauge_pressure.

Source code in src/alicatlib/sync/device.py
def tare_gauge_pressure(self) -> TareResult:
    """Blocking :meth:`Device.tare_gauge_pressure`."""
    return self._portal.call(self._dev.tare_gauge_pressure)

totalizer_config

totalizer_config(
    totalizer=_TOTALIZER_FIRST,
    *,
    flow_statistic_code=None,
    mode=None,
    limit_mode=None,
    digits=None,
    decimal_place=None,
)

Blocking :meth:Device.totalizer_config.

Source code in src/alicatlib/sync/device.py
def totalizer_config(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    flow_statistic_code: int | None = None,
    mode: TotalizerMode | None = None,
    limit_mode: TotalizerLimitMode | None = None,
    digits: int | None = None,
    decimal_place: int | None = None,
) -> TotalizerConfig:
    """Blocking :meth:`Device.totalizer_config`."""
    return self._portal.call(
        self._dev.totalizer_config,
        totalizer,
        flow_statistic_code=flow_statistic_code,
        mode=mode,
        limit_mode=limit_mode,
        digits=digits,
        decimal_place=decimal_place,
    )

totalizer_reset

totalizer_reset(
    totalizer=_TOTALIZER_FIRST, *, confirm=False
)

Blocking :meth:Device.totalizer_reset — destructive; requires confirm=True.

Source code in src/alicatlib/sync/device.py
def totalizer_reset(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    """Blocking :meth:`Device.totalizer_reset` — destructive; requires ``confirm=True``."""
    return self._portal.call(self._dev.totalizer_reset, totalizer, confirm=confirm)

totalizer_reset_peak

totalizer_reset_peak(
    totalizer=_TOTALIZER_FIRST, *, confirm=False
)

Blocking :meth:Device.totalizer_reset_peak — destructive.

Source code in src/alicatlib/sync/device.py
def totalizer_reset_peak(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    """Blocking :meth:`Device.totalizer_reset_peak` — destructive."""
    return self._portal.call(self._dev.totalizer_reset_peak, totalizer, confirm=confirm)

totalizer_save

totalizer_save(enable=None, *, save=None)

Blocking :meth:Device.totalizer_save.

Source code in src/alicatlib/sync/device.py
def totalizer_save(
    self,
    enable: bool | None = None,
    *,
    save: bool | None = None,
) -> TotalizerSaveState:
    """Blocking :meth:`Device.totalizer_save`."""
    return self._portal.call(self._dev.totalizer_save, enable, save=save)

unlock_display

unlock_display()

Blocking :meth:Device.unlock_display.

Source code in src/alicatlib/sync/device.py
def unlock_display(self) -> DisplayLockResult:
    """Blocking :meth:`Device.unlock_display`."""
    return self._portal.call(self._dev.unlock_display)

user_data

user_data(slot, value=None)

Blocking :meth:Device.user_data.

Source code in src/alicatlib/sync/device.py
def user_data(self, slot: int, value: str | None = None) -> UserDataSetting:
    """Blocking :meth:`Device.user_data`."""
    return self._portal.call(self._dev.user_data, slot, value)

zero_band

zero_band(zero_band=None)

Blocking :meth:Device.zero_band.

Source code in src/alicatlib/sync/device.py
def zero_band(self, zero_band: float | None = None) -> ZeroBandSetting:
    """Blocking :meth:`Device.zero_band`."""
    return self._portal.call(self._dev.zero_band, zero_band)

SyncFlowController

SyncFlowController(async_device, portal)

Bases: SyncFlowMeter, _SyncControllerMixin

Flow-controller facade — adds the shared controller surface.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncFlowMeter

SyncFlowMeter(async_device, portal)

Bases: SyncDevice

Flow-meter tag — empty pass-through, mirrors :class:FlowMeter.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.memory.InMemorySink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.jsonl.JsonlSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.parquet.ParquetSink.

Requires the alicatlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: ... result = portal.call(some_async_func, arg1, arg2)

Parameters:

Name Type Description Default
backend str

AnyIO backend to run on. Defaults to "asyncio"; the sync facade does not expose trio-specific features, so there is no reason to change this unless the surrounding process already runs a trio loop.

'asyncio'
Source code in src/alicatlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

__enter__

__enter__()

Start the portal's background thread and event loop.

Source code in src/alicatlib/sync/portal.py
def __enter__(self) -> Self:
    """Start the portal's background thread and event loop."""
    if self._entered:
        raise RuntimeError("SyncPortal is not reusable after exit")
    self._entered = True
    cm = start_blocking_portal(self._backend)
    self._portal = cm.__enter__()
    self._cm = cm
    return self

__exit__

__exit__(exc_type, exc, tb)

Stop the portal and join its thread.

Source code in src/alicatlib/sync/portal.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Stop the portal and join its thread."""
    cm, self._cm = self._cm, None
    self._portal = None
    if cm is not None:
        cm.__exit__(exc_type, exc, tb)

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped so callers can catch concrete exception types.

Source code in src/alicatlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped so
    callers can catch concrete exception types.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/alicatlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

The returned object is iterable (for x in it: ...) and supports :meth:close / context-manager use so outer wrappers can cancel the producer on early exit.

Source code in src/alicatlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator.

    The returned object is iterable (``for x in it: ...``) and
    supports :meth:`close` / context-manager use so outer wrappers
    can cancel the producer on early exit.
    """
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.postgres.PostgresSink.

Requires the alicatlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncPressureController

SyncPressureController(async_device, portal)

Bases: SyncPressureMeter, _SyncControllerMixin

Pressure-controller facade — inherits the shared controller surface.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncPressureMeter

SyncPressureMeter(async_device, portal)

Bases: SyncDevice

Pressure-meter tag — empty pass-through, mirrors :class:PressureMeter.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class. The portal / open / write / close / context-manager plumbing is shared.

Parameters:

Name Type Description Default
async_sink SampleSink

Already-constructed async sink.

required
portal SyncPortal | None

Optional pre-built :class:SyncPortal to share an event-loop thread. Default is a per-instance portal created on __enter__.

None
Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — advanced escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

__enter__

__enter__()

Start the portal, open the async sink.

Source code in src/alicatlib/sync/sinks.py
def __enter__(self) -> Self:
    """Start the portal, open the async sink."""
    if self._entered:
        raise RuntimeError("SyncSinkAdapter is not reusable after exit")
    self._entered = True
    stack = ExitStack()
    try:
        self._portal = (
            self._portal_override
            if self._portal_override is not None
            else stack.enter_context(SyncPortal())
        )
        self.open()
        self._stack = stack
    except BaseException:
        stack.close()
        self._portal = None
        raise
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the sink and, if owned, stop the portal.

Source code in src/alicatlib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink and, if owned, stop the portal."""
    stack, self._stack = self._stack, None
    try:
        self.close()
    finally:
        self._portal = None
        if stack is not None:
            stack.__exit__(exc_type, exc, tb)

close

close()

Blocking :meth:SampleSink.close — idempotent.

A no-op if the sink never reached :meth:open (portal absent).

Source code in src/alicatlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent.

    A no-op if the sink never reached :meth:`open` (portal absent).
    """
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/alicatlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/alicatlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.sqlite.SqliteSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )

find_devices

find_devices(
    ports=None,
    *,
    unit_ids=("A",),
    baudrates=DEFAULT_DISCOVERY_BAUDRATES,
    timeout=0.2,
    max_concurrency=8,
    stop_on_first_hit=False,
    portal=None,
)

Blocking :func:alicatlib.devices.discovery.find_devices.

Source code in src/alicatlib/sync/discovery.py
def find_devices(
    ports: Iterable[str] | None = None,
    *,
    unit_ids: Sequence[str] = ("A",),
    baudrates: Sequence[int] = DEFAULT_DISCOVERY_BAUDRATES,
    timeout: float = 0.2,
    max_concurrency: int = 8,
    stop_on_first_hit: bool = False,
    portal: SyncPortal | None = None,
) -> tuple[DiscoveryResult, ...]:
    """Blocking :func:`alicatlib.devices.discovery.find_devices`."""
    port_list = None if ports is None else list(ports)
    if portal is not None:
        return portal.call(
            async_find_devices,
            port_list,
            unit_ids=unit_ids,
            baudrates=baudrates,
            timeout=timeout,
            max_concurrency=max_concurrency,
            stop_on_first_hit=stop_on_first_hit,
        )
    with SyncPortal() as owned:
        return owned.call(
            async_find_devices,
            port_list,
            unit_ids=unit_ids,
            baudrates=baudrates,
            timeout=timeout,
            max_concurrency=max_concurrency,
            stop_on_first_hit=stop_on_first_hit,
        )

list_serial_ports

list_serial_ports(*, portal=None)

Blocking :func:alicatlib.devices.discovery.list_serial_ports.

Source code in src/alicatlib/sync/discovery.py
def list_serial_ports(*, portal: SyncPortal | None = None) -> list[str]:
    """Blocking :func:`alicatlib.devices.discovery.list_serial_ports`."""
    if portal is not None:
        return portal.call(async_list_serial_ports)
    return run_sync(async_list_serial_ports)

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:alicatlib.sinks.pipe.

Drains a sync iterator of per-tick batches into sink with the same buffered-flush semantics as the async driver: flush when the buffer reaches batch_size samples or flush_interval seconds have passed since the last flush.

sink may be a :class:SyncSinkAdapter (already open) or a raw async :class:SampleSink. In the async case a portal must be reachable — either passed explicitly or derived from a :class:SyncSinkAdapter — so writes can be dispatched.

Time thresholds use :func:time.monotonic (wall-clock-ish, independent of the portal's event loop) because the sink cares about persistence freshness, not scheduling precision.

The returned :class:AcquisitionSummary carries samples_emitted (count actually handed to the sink); the samples_late and max_drift_ms fields stay at zero — those are recorder-layer concepts and the recorder logs its own values via the alicatlib.streaming logger on CM exit.

Source code in src/alicatlib/sync/recording.py
def pipe(
    stream: Iterator[Mapping[str, Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`alicatlib.sinks.pipe`.

    Drains a sync iterator of per-tick batches into ``sink`` with the
    same buffered-flush semantics as the async driver: flush when the
    buffer reaches ``batch_size`` samples or ``flush_interval`` seconds
    have passed since the last flush.

    ``sink`` may be a :class:`SyncSinkAdapter` (already open) or a raw
    async :class:`SampleSink`. In the async case a ``portal`` must be
    reachable — either passed explicitly or derived from a
    :class:`SyncSinkAdapter` — so writes can be dispatched.

    Time thresholds use :func:`time.monotonic` (wall-clock-ish,
    independent of the portal's event loop) because the sink cares
    about persistence freshness, not scheduling precision.

    The returned :class:`AcquisitionSummary` carries ``samples_emitted``
    (count actually handed to the sink); the ``samples_late`` and
    ``max_drift_ms`` fields stay at zero — those are recorder-layer
    concepts and the recorder logs its own values via the
    ``alicatlib.streaming`` logger on CM exit.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch.values())
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

probe

probe(
    port,
    *,
    unit_id="A",
    baudrate=19200,
    timeout=0.2,
    portal=None,
)

Blocking :func:alicatlib.devices.discovery.probe.

Source code in src/alicatlib/sync/discovery.py
def probe(
    port: str,
    *,
    unit_id: str = "A",
    baudrate: int = 19200,
    timeout: float = 0.2,
    portal: SyncPortal | None = None,
) -> DiscoveryResult:
    """Blocking :func:`alicatlib.devices.discovery.probe`."""
    if portal is not None:
        return portal.call(
            async_probe,
            port,
            unit_id=unit_id,
            baudrate=baudrate,
            timeout=timeout,
        )
    with SyncPortal() as owned:
        return owned.call(
            async_probe,
            port,
            unit_id=unit_id,
            baudrate=baudrate,
            timeout=timeout,
        )

record

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:alicatlib.streaming.record.

The yielded iterator is a blocking bridge to the recorder's receive stream. Breaking out of the loop or exiting the with cancels the producer task.

If source is a :class:SyncAlicatManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override; pass a raw :class:AlicatManager and the recorder owns its own portal.

Source code in src/alicatlib/sync/recording.py
@contextmanager
def record(
    source: SyncAlicatManager | PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Mapping[str, Sample]]]:
    """Sync :func:`alicatlib.streaming.record`.

    The yielded iterator is a blocking bridge to the recorder's
    receive stream. Breaking out of the loop or exiting the ``with``
    cancels the producer task.

    If ``source`` is a :class:`SyncAlicatManager`, its portal is
    reused — the recorder and manager must share an event loop.
    Pass ``portal=`` to override; pass a raw :class:`AlicatManager`
    and the recorder owns its own portal.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Suitable for short-lived operations — the discovery helpers, for example — where the portal thread's start/stop cost is acceptable. For repeated calls, reuse a long-lived :class:SyncPortal.

Source code in src/alicatlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`.

    Suitable for short-lived operations — the discovery helpers, for
    example — where the portal thread's start/stop cost is acceptable.
    For repeated calls, reuse a long-lived :class:`SyncPortal`.
    """
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Device facades

alicatlib.sync.device

Sync device facade — portal-driven wrapper over :class:Device.

Each :class:SyncDevice holds a reference to an async :class:~alicatlib.devices.base.Device and a :class:~alicatlib.sync.portal.SyncPortal; every public method is a one-liner that hands the underlying coroutine to the portal. The subclass tree mirrors the async side so isinstance checks work the same way:

  • :class:SyncDevice — base, all shared methods.
  • :class:SyncFlowMeter — tag only (pass-through, design §5.9).
  • :class:SyncFlowController — adds setpoint, setpoint_source, loop_control_variable.
  • :class:SyncPressureMeter / :class:SyncPressureController — tag only today (controller-only pressure surface is planned future work).

The :class:Alicat namespace exposes a Alicat.open(...) context manager that drives the async :func:~alicatlib.devices.factory.open_device through the portal. By default each Alicat.open owns its own portal; callers that need several contexts to share an event loop can pass portal= to reuse a long-lived :class:SyncPortal.

Design reference: docs/design.md §5.16.

Alicat

Namespace for the sync device entry point.

Use :meth:Alicat.open as a context manager:

from alicatlib.sync import Alicat with Alicat.open("/dev/ttyUSB0") as dev: # doctest: +SKIP ... print(dev.poll())

open staticmethod

open(
    port,
    *,
    unit_id="A",
    serial=None,
    timeout=0.5,
    recover_from_stream=True,
    model_hint=None,
    assume_capabilities=Capability.NONE,
    assume_media=None,
    portal=None,
)

Open a sync :class:SyncDevice scoped to a with block.

Mirrors :func:alicatlib.devices.factory.open_device parameter for parameter. The returned sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless a pre-existing one is passed in via portal=.

Passing a :class:Transport or :class:AlicatProtocolClient is advanced: the caller is responsible for ensuring the object was constructed inside the portal's event loop (or is loop-agnostic). The common case — passing a str port path — creates the transport inside the portal and avoids that concern.

Source code in src/alicatlib/sync/device.py
@staticmethod
@contextmanager
def open(
    port: str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
    recover_from_stream: bool = True,
    model_hint: str | None = None,
    assume_capabilities: Capability = Capability.NONE,
    assume_media: Medium | None = None,
    portal: SyncPortal | None = None,
) -> Generator[SyncDevice]:
    """Open a sync :class:`SyncDevice` scoped to a ``with`` block.

    Mirrors :func:`alicatlib.devices.factory.open_device` parameter
    for parameter. The returned sync CM drives the async factory
    through a :class:`SyncPortal`; the portal is created per-call
    unless a pre-existing one is passed in via ``portal=``.

    Passing a :class:`Transport` or
    :class:`AlicatProtocolClient` is advanced: the caller is
    responsible for ensuring the object was constructed inside the
    portal's event loop (or is loop-agnostic). The common case —
    passing a ``str`` port path — creates the transport inside the
    portal and avoids that concern.
    """
    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        async_cm = open_device(
            port,
            unit_id=unit_id,
            serial=serial,
            timeout=timeout,
            recover_from_stream=recover_from_stream,
            model_hint=model_hint,
            assume_capabilities=assume_capabilities,
            assume_media=assume_media,
        )
        async_device = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        yield wrap_device(async_device, active_portal)

SyncDevice

SyncDevice(async_device, portal)

Blocking facade over :class:alicatlib.devices.base.Device.

Instances are produced by :meth:Alicat.open; users do not call this constructor directly. Every public method delegates to the underlying :class:Device through a :class:SyncPortal.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

info property

info

Identity snapshot — identical to :attr:Device.info.

portal property

portal

The :class:SyncPortal this device runs its coroutines on.

session property

session

Underlying async :class:Session for advanced escape-hatch use.

Calling coroutine methods on the returned session requires the caller to route them through :attr:portal.

unit_id property

unit_id

Validated single-letter unit id this device is addressed by.

__enter__

__enter__()

Nesting convenience — matches :meth:Device.__aenter__.

Source code in src/alicatlib/sync/device.py
def __enter__(self) -> Self:
    """Nesting convenience — matches :meth:`Device.__aenter__`."""
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the device on exit — session close is idempotent.

Source code in src/alicatlib/sync/device.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the device on exit — session close is idempotent."""
    del exc_type, exc, tb
    self.close()

analog_output_source

analog_output_source(
    channel=_ANALOG_OUTPUT_PRIMARY,
    value=None,
    unit_code=None,
)

Blocking :meth:Device.analog_output_source.

Default channel is :attr:AnalogOutputChannel.PRIMARY — same default as the async side, bound at module load via the module-level alias so sync-parity signature comparison sees the same sentinel object.

Source code in src/alicatlib/sync/device.py
def analog_output_source(
    self,
    channel: AnalogOutputChannel = _ANALOG_OUTPUT_PRIMARY,
    value: int | None = None,
    unit_code: int | None = None,
) -> AnalogOutputSourceSetting:
    """Blocking :meth:`Device.analog_output_source`.

    Default channel is :attr:`AnalogOutputChannel.PRIMARY` — same
    default as the async side, bound at module load via the
    module-level alias so sync-parity signature comparison sees
    the same sentinel object.
    """
    return self._portal.call(self._dev.analog_output_source, channel, value, unit_code)

average_timing

average_timing(statistic_code, averaging_ms=None)

Blocking :meth:Device.average_timing.

Source code in src/alicatlib/sync/device.py
def average_timing(
    self,
    statistic_code: int,
    averaging_ms: int | None = None,
) -> AverageTimingSetting:
    """Blocking :meth:`Device.average_timing`."""
    return self._portal.call(self._dev.average_timing, statistic_code, averaging_ms)
blink_display(duration_s=None)

Blocking :meth:Device.blink_display.

Source code in src/alicatlib/sync/device.py
def blink_display(self, duration_s: int | None = None) -> BlinkDisplayState:
    """Blocking :meth:`Device.blink_display`."""
    return self._portal.call(self._dev.blink_display, duration_s)

close

close()

Release the underlying session — idempotent.

Prefer with Alicat.open(...) as dev: over calling this by hand; the outer context manager owns transport lifecycle.

Source code in src/alicatlib/sync/device.py
def close(self) -> None:
    """Release the underlying session — idempotent.

    Prefer ``with Alicat.open(...) as dev:`` over calling this by
    hand; the outer context manager owns transport lifecycle.
    """
    self._portal.call(self._dev.close)

engineering_units

engineering_units(
    statistic,
    unit=None,
    *,
    apply_to_group=False,
    override_special_rules=False,
)

Blocking :meth:Device.engineering_units.

Source code in src/alicatlib/sync/device.py
def engineering_units(
    self,
    statistic: Statistic | str,
    unit: Unit | int | str | None = None,
    *,
    apply_to_group: bool = False,
    override_special_rules: bool = False,
) -> UnitSetting:
    """Blocking :meth:`Device.engineering_units`."""
    return self._portal.call(
        self._dev.engineering_units,
        statistic,
        unit,
        apply_to_group=apply_to_group,
        override_special_rules=override_special_rules,
    )

execute

execute(command, request)

Blocking :meth:Device.execute.

Source code in src/alicatlib/sync/device.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
) -> Resp:
    """Blocking :meth:`Device.execute`."""
    return self._portal.call(self._dev.execute, command, request)

full_scale

full_scale(statistic)

Blocking :meth:Device.full_scale.

Source code in src/alicatlib/sync/device.py
def full_scale(self, statistic: Statistic | str) -> FullScaleValue:
    """Blocking :meth:`Device.full_scale`."""
    return self._portal.call(self._dev.full_scale, statistic)

gas

gas(gas=None, *, save=None)

Blocking :meth:Device.gas.

Source code in src/alicatlib/sync/device.py
def gas(
    self,
    gas: Gas | str | None = None,
    *,
    save: bool | None = None,
) -> GasState:
    """Blocking :meth:`Device.gas`."""
    return self._portal.call(self._dev.gas, gas, save=save)

gas_list

gas_list()

Blocking :meth:Device.gas_list.

Source code in src/alicatlib/sync/device.py
def gas_list(self) -> Mapping[int, str]:
    """Blocking :meth:`Device.gas_list`."""
    return self._portal.call(self._dev.gas_list)

lock_display

lock_display()

Blocking :meth:Device.lock_display.

Source code in src/alicatlib/sync/device.py
def lock_display(self) -> DisplayLockResult:
    """Blocking :meth:`Device.lock_display`."""
    return self._portal.call(self._dev.lock_display)

poll

poll()

Blocking :meth:Device.poll.

Source code in src/alicatlib/sync/device.py
def poll(self) -> DataFrame:
    """Blocking :meth:`Device.poll`."""
    return self._portal.call(self._dev.poll)

power_up_tare

power_up_tare(enable=None)

Blocking :meth:Device.power_up_tare.

Source code in src/alicatlib/sync/device.py
def power_up_tare(self, enable: bool | None = None) -> PowerUpTareState:
    """Blocking :meth:`Device.power_up_tare`."""
    return self._portal.call(self._dev.power_up_tare, enable)

request

request(statistics, *, averaging_ms=1)

Blocking :meth:Device.request.

Source code in src/alicatlib/sync/device.py
def request(
    self,
    statistics: Sequence[Statistic | str],
    *,
    averaging_ms: int = 1,
) -> MeasurementSet:
    """Blocking :meth:`Device.request`."""
    return self._portal.call(
        self._dev.request,
        statistics,
        averaging_ms=averaging_ms,
    )

stp_ntp_pressure

stp_ntp_pressure(mode, pressure=None, unit_code=None)

Blocking :meth:Device.stp_ntp_pressure.

Source code in src/alicatlib/sync/device.py
def stp_ntp_pressure(
    self,
    mode: StpNtpMode,
    pressure: float | None = None,
    unit_code: int | None = None,
) -> StpNtpPressureSetting:
    """Blocking :meth:`Device.stp_ntp_pressure`."""
    return self._portal.call(self._dev.stp_ntp_pressure, mode, pressure, unit_code)

stp_ntp_temperature

stp_ntp_temperature(mode, temperature=None, unit_code=None)

Blocking :meth:Device.stp_ntp_temperature.

Source code in src/alicatlib/sync/device.py
def stp_ntp_temperature(
    self,
    mode: StpNtpMode,
    temperature: float | None = None,
    unit_code: int | None = None,
) -> StpNtpTemperatureSetting:
    """Blocking :meth:`Device.stp_ntp_temperature`."""
    return self._portal.call(self._dev.stp_ntp_temperature, mode, temperature, unit_code)

stream

stream(
    *,
    rate_ms=None,
    strict=False,
    overflow=None,
    buffer_size=256,
)

Blocking :meth:Device.stream — returns a sync context manager.

The returned :class:SyncStreamingSession is both a sync context manager and a sync iterator; use it as::

with sync_dev.stream(rate_ms=50) as stream:
    for frame in stream:
        process(frame)
Source code in src/alicatlib/sync/device.py
def stream(
    self,
    *,
    rate_ms: int | None = None,
    strict: bool = False,
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 256,
) -> SyncStreamingSession:
    """Blocking :meth:`Device.stream` — returns a sync context manager.

    The returned :class:`SyncStreamingSession` is both a sync
    context manager and a sync iterator; use it as::

        with sync_dev.stream(rate_ms=50) as stream:
            for frame in stream:
                process(frame)
    """
    async_stream = self._dev.stream(
        rate_ms=rate_ms,
        strict=strict,
        overflow=overflow,
        buffer_size=buffer_size,
    )
    return SyncStreamingSession(async_stream, self._portal)

tare_absolute_pressure

tare_absolute_pressure()

Blocking :meth:Device.tare_absolute_pressure.

Source code in src/alicatlib/sync/device.py
def tare_absolute_pressure(self) -> TareResult:
    """Blocking :meth:`Device.tare_absolute_pressure`."""
    return self._portal.call(self._dev.tare_absolute_pressure)

tare_flow

tare_flow()

Blocking :meth:Device.tare_flow.

Source code in src/alicatlib/sync/device.py
def tare_flow(self) -> TareResult:
    """Blocking :meth:`Device.tare_flow`."""
    return self._portal.call(self._dev.tare_flow)

tare_gauge_pressure

tare_gauge_pressure()

Blocking :meth:Device.tare_gauge_pressure.

Source code in src/alicatlib/sync/device.py
def tare_gauge_pressure(self) -> TareResult:
    """Blocking :meth:`Device.tare_gauge_pressure`."""
    return self._portal.call(self._dev.tare_gauge_pressure)

totalizer_config

totalizer_config(
    totalizer=_TOTALIZER_FIRST,
    *,
    flow_statistic_code=None,
    mode=None,
    limit_mode=None,
    digits=None,
    decimal_place=None,
)

Blocking :meth:Device.totalizer_config.

Source code in src/alicatlib/sync/device.py
def totalizer_config(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    flow_statistic_code: int | None = None,
    mode: TotalizerMode | None = None,
    limit_mode: TotalizerLimitMode | None = None,
    digits: int | None = None,
    decimal_place: int | None = None,
) -> TotalizerConfig:
    """Blocking :meth:`Device.totalizer_config`."""
    return self._portal.call(
        self._dev.totalizer_config,
        totalizer,
        flow_statistic_code=flow_statistic_code,
        mode=mode,
        limit_mode=limit_mode,
        digits=digits,
        decimal_place=decimal_place,
    )

totalizer_reset

totalizer_reset(
    totalizer=_TOTALIZER_FIRST, *, confirm=False
)

Blocking :meth:Device.totalizer_reset — destructive; requires confirm=True.

Source code in src/alicatlib/sync/device.py
def totalizer_reset(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    """Blocking :meth:`Device.totalizer_reset` — destructive; requires ``confirm=True``."""
    return self._portal.call(self._dev.totalizer_reset, totalizer, confirm=confirm)

totalizer_reset_peak

totalizer_reset_peak(
    totalizer=_TOTALIZER_FIRST, *, confirm=False
)

Blocking :meth:Device.totalizer_reset_peak — destructive.

Source code in src/alicatlib/sync/device.py
def totalizer_reset_peak(
    self,
    totalizer: TotalizerId = _TOTALIZER_FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    """Blocking :meth:`Device.totalizer_reset_peak` — destructive."""
    return self._portal.call(self._dev.totalizer_reset_peak, totalizer, confirm=confirm)

totalizer_save

totalizer_save(enable=None, *, save=None)

Blocking :meth:Device.totalizer_save.

Source code in src/alicatlib/sync/device.py
def totalizer_save(
    self,
    enable: bool | None = None,
    *,
    save: bool | None = None,
) -> TotalizerSaveState:
    """Blocking :meth:`Device.totalizer_save`."""
    return self._portal.call(self._dev.totalizer_save, enable, save=save)

unlock_display

unlock_display()

Blocking :meth:Device.unlock_display.

Source code in src/alicatlib/sync/device.py
def unlock_display(self) -> DisplayLockResult:
    """Blocking :meth:`Device.unlock_display`."""
    return self._portal.call(self._dev.unlock_display)

user_data

user_data(slot, value=None)

Blocking :meth:Device.user_data.

Source code in src/alicatlib/sync/device.py
def user_data(self, slot: int, value: str | None = None) -> UserDataSetting:
    """Blocking :meth:`Device.user_data`."""
    return self._portal.call(self._dev.user_data, slot, value)

zero_band

zero_band(zero_band=None)

Blocking :meth:Device.zero_band.

Source code in src/alicatlib/sync/device.py
def zero_band(self, zero_band: float | None = None) -> ZeroBandSetting:
    """Blocking :meth:`Device.zero_band`."""
    return self._portal.call(self._dev.zero_band, zero_band)

SyncFlowController

SyncFlowController(async_device, portal)

Bases: SyncFlowMeter, _SyncControllerMixin

Flow-controller facade — adds the shared controller surface.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncFlowMeter

SyncFlowMeter(async_device, portal)

Bases: SyncDevice

Flow-meter tag — empty pass-through, mirrors :class:FlowMeter.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncPressureController

SyncPressureController(async_device, portal)

Bases: SyncPressureMeter, _SyncControllerMixin

Pressure-controller facade — inherits the shared controller surface.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncPressureMeter

SyncPressureMeter(async_device, portal)

Bases: SyncDevice

Pressure-meter tag — empty pass-through, mirrors :class:PressureMeter.

Source code in src/alicatlib/sync/device.py
def __init__(self, async_device: Device, portal: SyncPortal) -> None:
    self._dev = async_device
    self._portal = portal

SyncStreamingSession

SyncStreamingSession(async_stream, portal)

Blocking view over :class:StreamingSession.

Wraps the async streaming context so sync callers see a plain with / for loop::

with sync_dev.stream(rate_ms=50) as stream:
    for frame in stream:
        process(frame)

Enter/exit and next() are routed through the device's :class:SyncPortal; the portal threads one coroutine at a time, so the underlying producer task keeps running in the background while the sync consumer polls for frames.

Source code in src/alicatlib/sync/device.py
def __init__(
    self,
    async_stream: StreamingSession,
    portal: SyncPortal,
) -> None:
    self._async = async_stream
    self._portal = portal
    self._cm: AbstractContextManager[StreamingSession] | None = None

dropped_frames property

dropped_frames

Mirror of :attr:StreamingSession.dropped_frames.

__enter__

__enter__()

Enter streaming mode on the async side.

Uses :meth:SyncPortal.wrap_async_context_manager rather than routing __aenter__/__aexit__ through portal.call: portal.call wraps each call in a fresh CancelScope, which conflicts with :meth:StreamingSession.__aenter__ entering a long-lived task group that outlives the entry call. Hardware validation (2026-04-17) surfaced the resulting "cancel scope that isn't the current task's" RuntimeError on real streaming hardware; wrap_async_context_manager lets anyio manage the portal-side scope for the whole CM lifetime instead.

Re-enters are rejected the same way as :meth:StreamingSession.__aenter__ — one streaming context per instance.

Source code in src/alicatlib/sync/device.py
def __enter__(self) -> Self:
    """Enter streaming mode on the async side.

    Uses :meth:`SyncPortal.wrap_async_context_manager` rather than
    routing ``__aenter__``/``__aexit__`` through ``portal.call``:
    ``portal.call`` wraps each call in a fresh ``CancelScope``,
    which conflicts with :meth:`StreamingSession.__aenter__`
    entering a long-lived task group that outlives the entry call.
    Hardware validation (2026-04-17) surfaced the resulting
    "cancel scope that isn't the current task's" ``RuntimeError``
    on real streaming hardware; ``wrap_async_context_manager``
    lets anyio manage the portal-side scope for the whole CM
    lifetime instead.

    Re-enters are rejected the same way as
    :meth:`StreamingSession.__aenter__` — one streaming context
    per instance.
    """
    self._cm = self._portal.wrap_async_context_manager(self._async)
    self._cm.__enter__()
    return self

__exit__

__exit__(exc_type, exc, tb)

Exit streaming mode — always sends stop-stream via the portal.

Source code in src/alicatlib/sync/device.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Exit streaming mode — always sends stop-stream via the portal."""
    if self._cm is None:
        return
    try:
        self._cm.__exit__(exc_type, exc, tb)
    finally:
        self._cm = None

__next__

__next__()

Block until the next frame, or :class:StopIteration on close.

Source code in src/alicatlib/sync/device.py
def __next__(self) -> DataFrame:
    """Block until the next frame, or :class:`StopIteration` on close."""
    try:
        return self._portal.call(self._async.__anext__)
    except StopAsyncIteration:
        raise StopIteration from None

unwrap_sync_device

unwrap_sync_device(source)

Return the async :class:Device inside source if it is wrapped.

Package-private: used by :meth:SyncAlicatManager.add so callers can hand in a previously-wrapped :class:SyncDevice.

Source code in src/alicatlib/sync/device.py
def unwrap_sync_device[T](source: T | SyncDevice) -> T | Device:
    """Return the async :class:`Device` inside ``source`` if it is wrapped.

    Package-private: used by :meth:`SyncAlicatManager.add` so callers
    can hand in a previously-wrapped :class:`SyncDevice`.
    """
    if isinstance(source, SyncDevice):
        return source._dev  # pyright: ignore[reportPrivateUsage]
    return source

wrap_device

wrap_device(async_device, portal)

Pick the correct :class:SyncDevice subclass for async_device.

Order is most-specific first — :class:FlowController is also a :class:FlowMeter, so the check must precede the meter branch.

Package-private: consumed by :mod:alicatlib.sync.manager and :class:Alicat. Not part of the public API.

Source code in src/alicatlib/sync/device.py
def wrap_device(async_device: Device, portal: SyncPortal) -> SyncDevice:
    """Pick the correct :class:`SyncDevice` subclass for ``async_device``.

    Order is most-specific first — :class:`FlowController` is also a
    :class:`FlowMeter`, so the check must precede the meter branch.

    Package-private: consumed by :mod:`alicatlib.sync.manager` and
    :class:`Alicat`. Not part of the public API.
    """
    if isinstance(async_device, FlowController):
        return SyncFlowController(async_device, portal)
    if isinstance(async_device, PressureController):
        return SyncPressureController(async_device, portal)
    if isinstance(async_device, FlowMeter):
        return SyncFlowMeter(async_device, portal)
    if isinstance(async_device, PressureMeter):
        return SyncPressureMeter(async_device, portal)
    return SyncDevice(async_device, portal)

Manager

alicatlib.sync.manager

Sync manager facade — portal-driven wrapper over :class:AlicatManager.

:class:SyncAlicatManager wraps the async :class:~alicatlib.manager.AlicatManager through a :class:~alicatlib.sync.portal.SyncPortal. Every coroutine method on the async manager becomes a blocking method here; the synchronous :meth:AlicatManager.get stays synchronous and delegates directly.

Lifecycle mirrors the async side: the class is a with context manager. By default each instance owns its own portal; callers that need several facades to share one event loop can pass portal= to reuse a long-lived :class:SyncPortal.

Design reference: docs/design.md §5.13 and §5.16.

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The union is encoded as two optional fields (rather than an Either / Result ADT) so mypy's narrowing on ok reads cleanly at call sites without pattern matching.

Attributes:

Name Type Description
value T | None

The successful result, or None if the call failed.

error AlicatError | None

The captured :class:~alicatlib.errors.AlicatError, or None if the call succeeded.

ok property

ok

True when the device produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every device's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each device produces a :class:DeviceResult and the caller inspects .error per entry.

Design reference: docs/design.md §5.13.

SyncAlicatManager

SyncAlicatManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:alicatlib.manager.AlicatManager.

Example

with SyncAlicatManager() as mgr: # doctest: +SKIP ... mgr.add("fuel", "/dev/ttyUSB0") ... mgr.add("air", "/dev/ttyUSB1") ... frames = mgr.poll()

Parameters:

Name Type Description Default
error_policy ErrorPolicy

Forwarded to :class:AlicatManager.

RAISE
portal SyncPortal | None

Optional pre-built :class:SyncPortal to share an event-loop thread with other sync facades. Default is a per-instance portal created on __enter__.

None
Source code in src/alicatlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: AlicatManager | None = None
    self._wrapped: dict[str, SyncDevice] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed device names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

__enter__

__enter__()

Start the portal, build the async manager, enter its CM.

Source code in src/alicatlib/sync/manager.py
def __enter__(self) -> Self:
    """Start the portal, build the async manager, enter its CM."""
    if self._entered:
        raise RuntimeError("SyncAlicatManager is not reusable after exit")
    self._entered = True
    stack = ExitStack()
    try:
        portal = (
            self._portal_override
            if self._portal_override is not None
            else stack.enter_context(SyncPortal())
        )
        mgr = AlicatManager(error_policy=self._error_policy)
        stack.enter_context(portal.wrap_async_context_manager(mgr))
        self._portal = portal
        self._mgr = mgr
        self._stack = stack
    except BaseException:
        stack.close()
        self._portal = None
        self._mgr = None
        raise
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the managed devices + portal (if owned).

Source code in src/alicatlib/sync/manager.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the managed devices + portal (if owned)."""
    stack, self._stack = self._stack, None
    self._wrapped.clear()
    self._mgr = None
    self._portal = None
    if stack is not None:
        stack.__exit__(exc_type, exc, tb)

add

add(name, source, *, unit_id='A', serial=None, timeout=0.5)

Blocking :meth:AlicatManager.add.

Accepts a :class:SyncDevice as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Device before delegation, matching the manager's "pre-built device, no lifecycle ownership" contract.

Source code in src/alicatlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncDevice | Device | str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
) -> SyncDevice:
    """Blocking :meth:`AlicatManager.add`.

    Accepts a :class:`SyncDevice` as ``source`` in addition to the
    async shapes — the wrapper is unwrapped to the underlying
    :class:`Device` before delegation, matching the manager's
    "pre-built device, no lifecycle ownership" contract.
    """
    mgr = self._require_mgr()
    async_source: Device | str | Transport | AlicatProtocolClient = unwrap_sync_device(source)
    async_device = self.portal.call(
        mgr.add,
        name,
        async_source,
        unit_id=unit_id,
        serial=serial,
        timeout=timeout,
    )
    wrapped = wrap_device(async_device, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:AlicatManager.close — idempotent.

Source code in src/alicatlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`AlicatManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute

execute(command, requests_by_name)

Blocking :meth:AlicatManager.execute.

Source code in src/alicatlib/sync/manager.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Blocking :meth:`AlicatManager.execute`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute, command, requests_by_name)

get

get(name)

Return the sync wrapper for the device registered under name.

The async manager's :meth:AlicatManager.get is already synchronous — this method caches the sync wrapper so repeated get calls return the same :class:SyncDevice instance per device name.

Source code in src/alicatlib/sync/manager.py
def get(self, name: str) -> SyncDevice:
    """Return the sync wrapper for the device registered under ``name``.

    The async manager's :meth:`AlicatManager.get` is already
    synchronous — this method caches the sync wrapper so repeated
    ``get`` calls return the same :class:`SyncDevice` instance per
    device name.
    """
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_device = mgr.get(name)  # raises AlicatValidationError on unknown
    wrapped = wrap_device(async_device, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(names=None)

Blocking :meth:AlicatManager.poll.

Source code in src/alicatlib/sync/manager.py
def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[DataFrame]]:
    """Blocking :meth:`AlicatManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.poll, names)

remove

remove(name)

Blocking :meth:AlicatManager.remove.

Source code in src/alicatlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`AlicatManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

request

request(statistics, names=None, *, averaging_ms=1)

Blocking :meth:AlicatManager.request.

Source code in src/alicatlib/sync/manager.py
def request(
    self,
    statistics: Sequence[Statistic | str],
    names: Sequence[str] | None = None,
    *,
    averaging_ms: int = 1,
) -> Mapping[str, DeviceResult[MeasurementSet]]:
    """Blocking :meth:`AlicatManager.request`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.request,
        statistics,
        names,
        averaging_ms=averaging_ms,
    )

Recording

alicatlib.sync.recording

Sync wrappers for :func:alicatlib.streaming.record and :func:alicatlib.sinks.pipe.

:func:record — sync context manager wrapping the async recorder. The produced iterator is blocking; on CM exit the underlying async task group is cancelled and joined by the portal.

:func:pipe — sync drain loop matching :func:alicatlib.sinks.pipe's batch / time flush semantics. Rebuilt in sync-land rather than wrapping the async driver so buffering stays under sync control and the time threshold uses :func:time.monotonic, not :func:anyio.current_time.

Both entry points accept :class:SyncAlicatManager / :class:SyncSinkAdapter instances — internally they reach for the wrapped async objects so the recorder / sink plumbing runs on the shared portal.

Design reference: docs/design.md §5.14, §5.15, §5.16.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:alicatlib.sinks.pipe.

Drains a sync iterator of per-tick batches into sink with the same buffered-flush semantics as the async driver: flush when the buffer reaches batch_size samples or flush_interval seconds have passed since the last flush.

sink may be a :class:SyncSinkAdapter (already open) or a raw async :class:SampleSink. In the async case a portal must be reachable — either passed explicitly or derived from a :class:SyncSinkAdapter — so writes can be dispatched.

Time thresholds use :func:time.monotonic (wall-clock-ish, independent of the portal's event loop) because the sink cares about persistence freshness, not scheduling precision.

The returned :class:AcquisitionSummary carries samples_emitted (count actually handed to the sink); the samples_late and max_drift_ms fields stay at zero — those are recorder-layer concepts and the recorder logs its own values via the alicatlib.streaming logger on CM exit.

Source code in src/alicatlib/sync/recording.py
def pipe(
    stream: Iterator[Mapping[str, Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`alicatlib.sinks.pipe`.

    Drains a sync iterator of per-tick batches into ``sink`` with the
    same buffered-flush semantics as the async driver: flush when the
    buffer reaches ``batch_size`` samples or ``flush_interval`` seconds
    have passed since the last flush.

    ``sink`` may be a :class:`SyncSinkAdapter` (already open) or a raw
    async :class:`SampleSink`. In the async case a ``portal`` must be
    reachable — either passed explicitly or derived from a
    :class:`SyncSinkAdapter` — so writes can be dispatched.

    Time thresholds use :func:`time.monotonic` (wall-clock-ish,
    independent of the portal's event loop) because the sink cares
    about persistence freshness, not scheduling precision.

    The returned :class:`AcquisitionSummary` carries ``samples_emitted``
    (count actually handed to the sink); the ``samples_late`` and
    ``max_drift_ms`` fields stay at zero — those are recorder-layer
    concepts and the recorder logs its own values via the
    ``alicatlib.streaming`` logger on CM exit.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch.values())
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

record

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:alicatlib.streaming.record.

The yielded iterator is a blocking bridge to the recorder's receive stream. Breaking out of the loop or exiting the with cancels the producer task.

If source is a :class:SyncAlicatManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override; pass a raw :class:AlicatManager and the recorder owns its own portal.

Source code in src/alicatlib/sync/recording.py
@contextmanager
def record(
    source: SyncAlicatManager | PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Mapping[str, Sample]]]:
    """Sync :func:`alicatlib.streaming.record`.

    The yielded iterator is a blocking bridge to the recorder's
    receive stream. Breaking out of the loop or exiting the ``with``
    cancels the producer task.

    If ``source`` is a :class:`SyncAlicatManager`, its portal is
    reused — the recorder and manager must share an event loop.
    Pass ``portal=`` to override; pass a raw :class:`AlicatManager`
    and the recorder owns its own portal.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

Sinks

alicatlib.sync.sinks

Sync wrappers for :mod:alicatlib.sinks.

Every in-tree sink has a one-to-one sync counterpart. All of them share :class:SyncSinkAdapter: the per-sink subclass only constructs the matching async sink with its own parameters and hands it to the adapter, which owns the portal + open/write/close plumbing.

Sinks follow the same portal-ownership pattern as the rest of the sync facade — each wrapper creates a throwaway :class:SyncPortal on __enter__ unless the caller passes one in. Pass a shared portal when the sink must share an event loop with a :class:SyncAlicatManager or :func:record, otherwise the sink's writes run on a different loop than the data producer.

Design reference: docs/design.md §5.15 and §5.16.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    connect_timeout_s=30.0,
    close_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

connect_timeout_s float

Cap on initial pool establishment in :meth:PostgresSink.open. A misconfigured DSN must not be able to wedge open() indefinitely — defaults to 30 s.

close_timeout_s float

Cap on :meth:PostgresSink.close's wait for in-flight queries to drain. Defaults to 10 s; the pool is then forcibly torn down so shutdown can't hang.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/alicatlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.csv.CsvSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.memory.InMemorySink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.jsonl.JsonlSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.parquet.ParquetSink.

Requires the alicatlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.postgres.PostgresSink.

Requires the alicatlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSampleSink

Bases: Protocol

Sync shape of an acquisition sink.

Mirrors :class:~alicatlib.sinks.base.SampleSink — same method names, no await. Every concrete wrapper in this module satisfies this Protocol.

__enter__

__enter__()

Open the sink and return self.

Source code in src/alicatlib/sync/sinks.py
def __enter__(self) -> Self:
    """Open the sink and return self."""
    ...

__exit__

__exit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/alicatlib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close

close()

Flush and release the backing resource — idempotent.

Source code in src/alicatlib/sync/sinks.py
def close(self) -> None:
    """Flush and release the backing resource — idempotent."""
    ...

open

open()

Allocate the sink's backing resource.

Source code in src/alicatlib/sync/sinks.py
def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_many

write_many(samples)

Append samples to the sink.

Source code in src/alicatlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class. The portal / open / write / close / context-manager plumbing is shared.

Parameters:

Name Type Description Default
async_sink SampleSink

Already-constructed async sink.

required
portal SyncPortal | None

Optional pre-built :class:SyncPortal to share an event-loop thread. Default is a per-instance portal created on __enter__.

None
Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — advanced escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

__enter__

__enter__()

Start the portal, open the async sink.

Source code in src/alicatlib/sync/sinks.py
def __enter__(self) -> Self:
    """Start the portal, open the async sink."""
    if self._entered:
        raise RuntimeError("SyncSinkAdapter is not reusable after exit")
    self._entered = True
    stack = ExitStack()
    try:
        self._portal = (
            self._portal_override
            if self._portal_override is not None
            else stack.enter_context(SyncPortal())
        )
        self.open()
        self._stack = stack
    except BaseException:
        stack.close()
        self._portal = None
        raise
    return self

__exit__

__exit__(exc_type, exc, tb)

Close the sink and, if owned, stop the portal.

Source code in src/alicatlib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink and, if owned, stop the portal."""
    stack, self._stack = self._stack, None
    try:
        self.close()
    finally:
        self._portal = None
        if stack is not None:
            stack.__exit__(exc_type, exc, tb)

close

close()

Blocking :meth:SampleSink.close — idempotent.

A no-op if the sink never reached :meth:open (portal absent).

Source code in src/alicatlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent.

    A no-op if the sink never reached :meth:`open` (portal absent).
    """
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/alicatlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/alicatlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~alicatlib.sinks.sqlite.SqliteSink.

Source code in src/alicatlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )

Discovery

alicatlib.sync.discovery

Sync wrappers for :mod:alicatlib.devices.discovery.

One-shot discovery primitives — each call creates, drives, and tears down its own :class:SyncPortal unless the caller passes one in. That matches the async helpers' "fire and forget" shape (open a port, probe, close).

Design reference: docs/design.md §5.12 and §5.16.

DiscoveryResult dataclass

DiscoveryResult(port, unit_id, baudrate, info, error)

Outcome of a single :func:probe attempt.

Exactly one of :attr:info / :attr:error is populated — ok results carry a fully-identified :class:DeviceInfo, failed ones carry the typed :class:AlicatError from the identification pipeline. The :attr:ok convenience lets callers filter without hasattr.

ok property

ok

Whether identification completed successfully.

find_devices

find_devices(
    ports=None,
    *,
    unit_ids=("A",),
    baudrates=DEFAULT_DISCOVERY_BAUDRATES,
    timeout=0.2,
    max_concurrency=8,
    stop_on_first_hit=False,
    portal=None,
)

Blocking :func:alicatlib.devices.discovery.find_devices.

Source code in src/alicatlib/sync/discovery.py
def find_devices(
    ports: Iterable[str] | None = None,
    *,
    unit_ids: Sequence[str] = ("A",),
    baudrates: Sequence[int] = DEFAULT_DISCOVERY_BAUDRATES,
    timeout: float = 0.2,
    max_concurrency: int = 8,
    stop_on_first_hit: bool = False,
    portal: SyncPortal | None = None,
) -> tuple[DiscoveryResult, ...]:
    """Blocking :func:`alicatlib.devices.discovery.find_devices`."""
    port_list = None if ports is None else list(ports)
    if portal is not None:
        return portal.call(
            async_find_devices,
            port_list,
            unit_ids=unit_ids,
            baudrates=baudrates,
            timeout=timeout,
            max_concurrency=max_concurrency,
            stop_on_first_hit=stop_on_first_hit,
        )
    with SyncPortal() as owned:
        return owned.call(
            async_find_devices,
            port_list,
            unit_ids=unit_ids,
            baudrates=baudrates,
            timeout=timeout,
            max_concurrency=max_concurrency,
            stop_on_first_hit=stop_on_first_hit,
        )

list_serial_ports

list_serial_ports(*, portal=None)

Blocking :func:alicatlib.devices.discovery.list_serial_ports.

Source code in src/alicatlib/sync/discovery.py
def list_serial_ports(*, portal: SyncPortal | None = None) -> list[str]:
    """Blocking :func:`alicatlib.devices.discovery.list_serial_ports`."""
    if portal is not None:
        return portal.call(async_list_serial_ports)
    return run_sync(async_list_serial_ports)

probe

probe(
    port,
    *,
    unit_id="A",
    baudrate=19200,
    timeout=0.2,
    portal=None,
)

Blocking :func:alicatlib.devices.discovery.probe.

Source code in src/alicatlib/sync/discovery.py
def probe(
    port: str,
    *,
    unit_id: str = "A",
    baudrate: int = 19200,
    timeout: float = 0.2,
    portal: SyncPortal | None = None,
) -> DiscoveryResult:
    """Blocking :func:`alicatlib.devices.discovery.probe`."""
    if portal is not None:
        return portal.call(
            async_probe,
            port,
            unit_id=unit_id,
            baudrate=baudrate,
            timeout=timeout,
        )
    with SyncPortal() as owned:
        return owned.call(
            async_probe,
            port,
            unit_id=unit_id,
            baudrate=baudrate,
            timeout=timeout,
        )

Portal primitives

alicatlib.sync.portal

Blocking portal primitive — sync access to the async core.

:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal so the rest of the sync facade (device, manager, recording, sinks, discovery) can share one dispatch primitive.

Shape:

  • Lifecycle is a plain with block. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises.
  • call(func, *args, **kwargs) runs a coroutine. kwargs are bound through :func:functools.partial because :meth:anyio.from_thread.BlockingPortal.call only accepts positional arguments.
  • Single-member :class:ExceptionGroup s are unwrapped. Our async core runs inside task groups (manager, recorder, factory), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~alicatlib.errors.AlicatError subclass they branch on. Multi-member groups pass through unchanged — those carry real aggregate failures (design §5.13).
  • wrap_async_context_manager delegates to the portal's own helper — no extra behaviour, but exposed through :class:SyncPortal so callers reach for one surface.
  • wrap_async_iter bridges async iteration. The returned :class:SyncAsyncIterator is both iterable and closeable; outer sync CMs (e.g. sync.record()) call :meth:close on exit to cancel the producer promptly.

Design reference: docs/design.md §5.16.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/alicatlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Safe to call more than once. Cleanup failures are swallowed — close runs on teardown paths where re-raising masks the real failure.

Source code in src/alicatlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``.

    Safe to call more than once. Cleanup failures are swallowed —
    ``close`` runs on teardown paths where re-raising masks the
    real failure.
    """
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: ... result = portal.call(some_async_func, arg1, arg2)

Parameters:

Name Type Description Default
backend str

AnyIO backend to run on. Defaults to "asyncio"; the sync facade does not expose trio-specific features, so there is no reason to change this unless the surrounding process already runs a trio loop.

'asyncio'
Source code in src/alicatlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

__enter__

__enter__()

Start the portal's background thread and event loop.

Source code in src/alicatlib/sync/portal.py
def __enter__(self) -> Self:
    """Start the portal's background thread and event loop."""
    if self._entered:
        raise RuntimeError("SyncPortal is not reusable after exit")
    self._entered = True
    cm = start_blocking_portal(self._backend)
    self._portal = cm.__enter__()
    self._cm = cm
    return self

__exit__

__exit__(exc_type, exc, tb)

Stop the portal and join its thread.

Source code in src/alicatlib/sync/portal.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Stop the portal and join its thread."""
    cm, self._cm = self._cm, None
    self._portal = None
    if cm is not None:
        cm.__exit__(exc_type, exc, tb)

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped so callers can catch concrete exception types.

Source code in src/alicatlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped so
    callers can catch concrete exception types.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/alicatlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

The returned object is iterable (for x in it: ...) and supports :meth:close / context-manager use so outer wrappers can cancel the producer on early exit.

Source code in src/alicatlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator.

    The returned object is iterable (``for x in it: ...``) and
    supports :meth:`close` / context-manager use so outer wrappers
    can cancel the producer on early exit.
    """
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Suitable for short-lived operations — the discovery helpers, for example — where the portal thread's start/stop cost is acceptable. For repeated calls, reuse a long-lived :class:SyncPortal.

Source code in src/alicatlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`.

    Suitable for short-lived operations — the discovery helpers, for
    example — where the portal thread's start/stop cost is acceptable.
    For repeated calls, reuse a long-lived :class:`SyncPortal`.
    """
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)