Skip to content

sartoriuslib.sync

Sync facade over the async core, wrapped through anyio.BlockingPortal. See Sync quickstart and Design §9.

Public surface

sartoriuslib.sync

Sync facade — anyio.BlockingPortal wrapper over the async core.

Async is canonical; the sync facade wraps it through :class:SyncPortal so scripts, notebooks, and REPL sessions can drive balances without await.

Surfaces:

  • Balance / manager — :class:Sartorius, :class:SyncBalance, :class:SyncSartoriusManager (+ :class:SyncBalanceManager alias, :class:ErrorPolicy / :class:DeviceResult re-exports).
  • Recording — :func:record, :func:pipe, :class:AcquisitionSummary, :class:OverflowPolicy.
  • Sinks — :class:SyncSinkAdapter + :class:SyncInMemorySink / :class:SyncCsvSink / :class:SyncJsonlSink / :class:SyncSqliteSink / :class:SyncParquetSink / :class:SyncPostgresSink (+ :class:PostgresConfig).
  • Portal primitives — :class:SyncPortal, :func:run_sync.

See docs/design.md §9 for the design.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    target_total_samples=None,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration runs, or None for open-ended runs.

DeviceResult dataclass

DeviceResult(value, error, protocol=None)

Per-device result container — value or error, never both.

:attr:protocol is populated by :class:SartoriusManager from the balance's session so error samples from the :mod:~sartoriuslib.streaming layer can still record which protocol produced the failure. Non-manager :class:~sartoriuslib.streaming.PollSource stubs may leave it None.

ok property

ok

True when the balance produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every balance's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each balance produces a :class:DeviceResult and the caller inspects .error per entry.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/sartoriuslib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

Sartorius

Namespace for the sync balance entry point.

Use :meth:Sartorius.open as a context manager::

from sartoriuslib.sync import Sartorius

with Sartorius.open("/dev/ttyUSB0") as bal:
    print(bal.poll())

open staticmethod

open(
    port,
    *,
    protocol=None,
    serial_settings=None,
    timeout=1.0,
    src_sbn=1,
    dst_sbn=9,
    strict=False,
    identify=True,
    portal=None,
)

Open a sync :class:SyncBalance scoped to a with block.

Mirrors :func:sartoriuslib.open_device parameter-for- parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

Source code in src/sartoriuslib/sync/balance.py
@staticmethod
@contextmanager
def open(
    port: str | Transport,
    *,
    protocol: ProtocolKind | None = None,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    src_sbn: int = 0x01,
    dst_sbn: int = 0x09,
    strict: bool = False,
    identify: bool = True,
    portal: SyncPortal | None = None,
) -> Generator[SyncBalance]:
    """Open a sync :class:`SyncBalance` scoped to a ``with`` block.

    Mirrors :func:`sartoriuslib.open_device` parameter-for-
    parameter (modulo the portal plumbing). The sync CM drives
    the async factory through a :class:`SyncPortal`; the portal
    is created per-call unless one is passed in via ``portal=``.
    """
    # Local import keeps the ProtocolKind value available at runtime
    # (the top-level import is guarded by TYPE_CHECKING).
    from sartoriuslib.protocol.base import ProtocolKind as _ProtocolKind  # noqa: PLC0415

    effective_protocol = protocol if protocol is not None else _ProtocolKind.XBPI

    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        balance = active_portal.call(
            open_device,
            port,
            protocol=effective_protocol,
            serial_settings=serial_settings,
            timeout=timeout,
            src_sbn=src_sbn,
            dst_sbn=dst_sbn,
            strict=strict,
            identify=identify,
        )
        try:
            yield wrap_balance(balance, active_portal)
        finally:
            # Close the underlying transport through the portal;
            # the Balance's aclose closes the transport it was
            # constructed against.
            active_portal.call(balance.aclose)

SyncBalance

SyncBalance(balance, portal)

Blocking facade over :class:sartoriuslib.devices.balance.Balance.

Instances are produced by :meth:Sartorius.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/sartoriuslib/sync/balance.py
def __init__(self, balance: Balance, portal: SyncPortal) -> None:
    self._bal = balance
    self._portal = portal

info property

info

Identity snapshot — passes through :attr:Balance.info.

portal property

portal

The :class:SyncPortal this balance routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

capacity

capacity(area=0)

Blocking :meth:Balance.capacity.

Source code in src/sartoriuslib/sync/balance.py
def capacity(self, area: int = 0) -> Quantity:
    """Blocking :meth:`Balance.capacity`."""
    return self._portal.call(self._bal.capacity, area)

configure_protocol

configure_protocol(
    target,
    *,
    baudrate=None,
    parity=None,
    stopbits=None,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.configure_protocol.

Source code in src/sartoriuslib/sync/balance.py
def configure_protocol(
    self,
    target: ProtocolKind,
    *,
    baudrate: int | None = None,
    parity: Parity | None = None,
    stopbits: StopBits | None = None,
    timeout: float | None = None,
    confirm: bool = False,
) -> DeviceInfo:
    """Blocking :meth:`Balance.configure_protocol`."""
    return self._portal.call(
        self._bal.configure_protocol,
        target,
        baudrate=baudrate,
        parity=parity,
        stopbits=stopbits,
        timeout=timeout,
        confirm=confirm,
    )

discover_temperature_sensors

discover_temperature_sensors(*, max_index=8)

Blocking :meth:Balance.discover_temperature_sensors.

Source code in src/sartoriuslib/sync/balance.py
def discover_temperature_sensors(self, *, max_index: int = 8) -> tuple[int, ...]:
    """Blocking :meth:`Balance.discover_temperature_sensors`."""
    return self._portal.call(
        self._bal.discover_temperature_sensors,
        max_index=max_index,
    )

get_auto_zero

get_auto_zero()

Blocking :meth:Balance.get_auto_zero.

Source code in src/sartoriuslib/sync/balance.py
def get_auto_zero(self) -> AutoZeroMode:
    """Blocking :meth:`Balance.get_auto_zero`."""
    return self._portal.call(self._bal.get_auto_zero)

get_display_unit

get_display_unit()

Blocking :meth:Balance.get_display_unit.

Source code in src/sartoriuslib/sync/balance.py
def get_display_unit(self) -> Unit:
    """Blocking :meth:`Balance.get_display_unit`."""
    return self._portal.call(self._bal.get_display_unit)

get_filter_mode

get_filter_mode()

Blocking :meth:Balance.get_filter_mode.

Source code in src/sartoriuslib/sync/balance.py
def get_filter_mode(self) -> FilterMode:
    """Blocking :meth:`Balance.get_filter_mode`."""
    return self._portal.call(self._bal.get_filter_mode)

get_isocal_mode

get_isocal_mode()

Blocking :meth:Balance.get_isocal_mode.

Source code in src/sartoriuslib/sync/balance.py
def get_isocal_mode(self) -> IsoCalMode:
    """Blocking :meth:`Balance.get_isocal_mode`."""
    return self._portal.call(self._bal.get_isocal_mode)

get_menu_access

get_menu_access()

Blocking :meth:Balance.get_menu_access.

Source code in src/sartoriuslib/sync/balance.py
def get_menu_access(self) -> MenuAccessMode:
    """Blocking :meth:`Balance.get_menu_access`."""
    return self._portal.call(self._bal.get_menu_access)

get_tare_behavior

get_tare_behavior()

Blocking :meth:Balance.get_tare_behavior.

Source code in src/sartoriuslib/sync/balance.py
def get_tare_behavior(self) -> TareBehavior:
    """Blocking :meth:`Balance.get_tare_behavior`."""
    return self._portal.call(self._bal.get_tare_behavior)

identify

identify()

Blocking :meth:Balance.identify.

Source code in src/sartoriuslib/sync/balance.py
def identify(self) -> DeviceInfo:
    """Blocking :meth:`Balance.identify`."""
    return self._portal.call(self._bal.identify)

increment

increment(area=0)

Blocking :meth:Balance.increment.

Source code in src/sartoriuslib/sync/balance.py
def increment(self, area: int = 0) -> Quantity:
    """Blocking :meth:`Balance.increment`."""
    return self._portal.call(self._bal.increment, area)

internal_adjust

internal_adjust(*, cal_type=None, confirm=False)

Blocking :meth:Balance.internal_adjust.

Source code in src/sartoriuslib/sync/balance.py
def internal_adjust(
    self,
    *,
    cal_type: int | None = None,
    confirm: bool = False,
) -> None:
    """Blocking :meth:`Balance.internal_adjust`."""
    self._portal.call(
        self._bal.internal_adjust,
        cal_type=cal_type,
        confirm=confirm,
    )

last_cal_record

last_cal_record()

Blocking :meth:Balance.last_cal_record.

Source code in src/sartoriuslib/sync/balance.py
def last_cal_record(self) -> CalRecord:
    """Blocking :meth:`Balance.last_cal_record`."""
    return self._portal.call(self._bal.last_cal_record)

poll

poll()

Blocking :meth:Balance.poll.

Source code in src/sartoriuslib/sync/balance.py
def poll(self) -> Reading:
    """Blocking :meth:`Balance.poll`."""
    return self._portal.call(self._bal.poll)

raw_sbi

raw_sbi(
    command, *, confirm=False, timeout=None, expect_lines=1
)

Blocking :meth:Balance.raw_sbi.

Source code in src/sartoriuslib/sync/balance.py
def raw_sbi(
    self,
    command: bytes | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
    expect_lines: int = 1,
) -> SbiReply:
    """Blocking :meth:`Balance.raw_sbi`."""
    return self._portal.call(
        self._bal.raw_sbi,
        command,
        confirm=confirm,
        timeout=timeout,
        expect_lines=expect_lines,
    )

raw_xbpi

raw_xbpi(opcode, args=b'', *, confirm=False, timeout=None)

Blocking :meth:Balance.raw_xbpi.

Source code in src/sartoriuslib/sync/balance.py
def raw_xbpi(
    self,
    opcode: int,
    args: bytes = b"",
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> XbpiFrame:
    """Blocking :meth:`Balance.raw_xbpi`."""
    return self._portal.call(
        self._bal.raw_xbpi,
        opcode,
        args,
        confirm=confirm,
        timeout=timeout,
    )

read_gross

read_gross(*, hires=0)

Blocking :meth:Balance.read_gross.

Source code in src/sartoriuslib/sync/balance.py
def read_gross(self, *, hires: int = 0) -> Reading:
    """Blocking :meth:`Balance.read_gross`."""
    return self._portal.call(self._bal.read_gross, hires=hires)

read_net

read_net(*, hires=0)

Blocking :meth:Balance.read_net.

Source code in src/sartoriuslib/sync/balance.py
def read_net(self, *, hires: int = 0) -> Reading:
    """Blocking :meth:`Balance.read_net`."""
    return self._portal.call(self._bal.read_net, hires=hires)

read_parameter

read_parameter(index)

Blocking :meth:Balance.read_parameter.

Source code in src/sartoriuslib/sync/balance.py
def read_parameter(self, index: int) -> ParameterEntry:
    """Blocking :meth:`Balance.read_parameter`."""
    return self._portal.call(self._bal.read_parameter, index)

read_tare_value

read_tare_value()

Blocking :meth:Balance.read_tare_value.

Source code in src/sartoriuslib/sync/balance.py
def read_tare_value(self) -> Reading:
    """Blocking :meth:`Balance.read_tare_value`."""
    return self._portal.call(self._bal.read_tare_value)

refresh_sbi_autoprint_state

refresh_sbi_autoprint_state(*, timeout=None)

Blocking :meth:Balance.refresh_sbi_autoprint_state.

Source code in src/sartoriuslib/sync/balance.py
def refresh_sbi_autoprint_state(self, *, timeout: float | None = None) -> bool:
    """Blocking :meth:`Balance.refresh_sbi_autoprint_state`."""
    return self._portal.call(
        self._bal.refresh_sbi_autoprint_state,
        timeout=timeout,
    )

reload_menu

reload_menu(*, confirm=False)

Blocking :meth:Balance.reload_menu.

Source code in src/sartoriuslib/sync/balance.py
def reload_menu(self, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.reload_menu`."""
    self._portal.call(self._bal.reload_menu, confirm=confirm)

save_menu

save_menu(*, confirm=False)

Blocking :meth:Balance.save_menu.

Source code in src/sartoriuslib/sync/balance.py
def save_menu(self, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.save_menu`."""
    self._portal.call(self._bal.save_menu, confirm=confirm)

set_auto_zero

set_auto_zero(mode, *, confirm=False)

Blocking :meth:Balance.set_auto_zero.

Source code in src/sartoriuslib/sync/balance.py
def set_auto_zero(self, mode: AutoZeroMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_auto_zero`."""
    self._portal.call(self._bal.set_auto_zero, mode, confirm=confirm)

set_baud_rate

set_baud_rate(
    wire_code,
    *,
    baudrate,
    parity=None,
    stopbits=None,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.set_baud_rate.

Source code in src/sartoriuslib/sync/balance.py
def set_baud_rate(
    self,
    wire_code: int,
    *,
    baudrate: int,
    parity: Parity | None = None,
    stopbits: StopBits | None = None,
    timeout: float | None = None,
    confirm: bool = False,
) -> DeviceInfo:
    """Blocking :meth:`Balance.set_baud_rate`."""
    return self._portal.call(
        self._bal.set_baud_rate,
        wire_code,
        baudrate=baudrate,
        parity=parity,
        stopbits=stopbits,
        timeout=timeout,
        confirm=confirm,
    )

set_display_unit

set_display_unit(unit, *, confirm=False)

Blocking :meth:Balance.set_display_unit.

Source code in src/sartoriuslib/sync/balance.py
def set_display_unit(self, unit: Unit | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_display_unit`."""
    self._portal.call(self._bal.set_display_unit, unit, confirm=confirm)

set_filter_mode

set_filter_mode(mode, *, confirm=False)

Blocking :meth:Balance.set_filter_mode.

Source code in src/sartoriuslib/sync/balance.py
def set_filter_mode(self, mode: FilterMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_filter_mode`."""
    self._portal.call(self._bal.set_filter_mode, mode, confirm=confirm)

set_isocal_mode

set_isocal_mode(mode, *, confirm=False)

Blocking :meth:Balance.set_isocal_mode.

Source code in src/sartoriuslib/sync/balance.py
def set_isocal_mode(self, mode: IsoCalMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_isocal_mode`."""
    self._portal.call(self._bal.set_isocal_mode, mode, confirm=confirm)

set_menu_access

set_menu_access(mode, *, confirm=False)

Blocking :meth:Balance.set_menu_access.

Source code in src/sartoriuslib/sync/balance.py
def set_menu_access(self, mode: MenuAccessMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_menu_access`."""
    self._portal.call(self._bal.set_menu_access, mode, confirm=confirm)

set_tare_behavior

set_tare_behavior(mode, *, confirm=False)

Blocking :meth:Balance.set_tare_behavior.

Source code in src/sartoriuslib/sync/balance.py
def set_tare_behavior(self, mode: TareBehavior | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_tare_behavior`."""
    self._portal.call(self._bal.set_tare_behavior, mode, confirm=confirm)

status

status()

Blocking :meth:Balance.status.

Source code in src/sartoriuslib/sync/balance.py
def status(self) -> BalanceStatus:
    """Blocking :meth:`Balance.status`."""
    return self._portal.call(self._bal.status)

tare

tare()

Blocking :meth:Balance.tare.

Source code in src/sartoriuslib/sync/balance.py
def tare(self) -> None:
    """Blocking :meth:`Balance.tare`."""
    self._portal.call(self._bal.tare)

temperature

temperature(sensor=0)

Blocking :meth:Balance.temperature.

Source code in src/sartoriuslib/sync/balance.py
def temperature(self, sensor: int = 0) -> TemperatureReading:
    """Blocking :meth:`Balance.temperature`."""
    return self._portal.call(self._bal.temperature, sensor)

write_parameter

write_parameter(index, value, *, confirm=False)

Blocking :meth:Balance.write_parameter.

Source code in src/sartoriuslib/sync/balance.py
def write_parameter(self, index: int, value: int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.write_parameter`."""
    self._portal.call(self._bal.write_parameter, index, value, confirm=confirm)

write_sbn_address

write_sbn_address(
    sbn,
    *,
    update_session_dst=False,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.write_sbn_address.

Source code in src/sartoriuslib/sync/balance.py
def write_sbn_address(
    self,
    sbn: int,
    *,
    update_session_dst: bool = False,
    timeout: float | None = None,
    confirm: bool = False,
) -> int:
    """Blocking :meth:`Balance.write_sbn_address`."""
    return self._portal.call(
        self._bal.write_sbn_address,
        sbn,
        update_session_dst=update_session_dst,
        timeout=timeout,
        confirm=confirm,
    )

zero

zero()

Blocking :meth:Balance.zero.

Source code in src/sartoriuslib/sync/balance.py
def zero(self) -> None:
    """Blocking :meth:`Balance.zero`."""
    self._portal.call(self._bal.zero)

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.csv.CsvSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.memory.InMemorySink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.jsonl.JsonlSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.parquet.ParquetSink.

Requires the sartoriuslib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/sartoriuslib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/sartoriuslib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/sartoriuslib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/sartoriuslib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.postgres.PostgresSink.

Requires the sartoriuslib[postgres] extra — dependency check runs on :meth:open.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSartoriusManager

SyncSartoriusManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:sartoriuslib.manager.SartoriusManager.

Source code in src/sartoriuslib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: SartoriusManager | None = None
    self._wrapped: dict[str, SyncBalance] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed balance names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=None,
    serial_settings=None,
    timeout=1.0,
    src_sbn=1,
    dst_sbn=9,
    strict=False,
    identify=True,
)

Blocking :meth:SartoriusManager.add.

Accepts a :class:SyncBalance as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Balance before delegation.

Source code in src/sartoriuslib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncBalance | Balance | str | Transport,
    *,
    protocol: ProtocolKind | None = None,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    src_sbn: int = 0x01,
    dst_sbn: int = 0x09,
    strict: bool = False,
    identify: bool = True,
) -> SyncBalance:
    """Blocking :meth:`SartoriusManager.add`.

    Accepts a :class:`SyncBalance` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Balance` before delegation.
    """
    from sartoriuslib.protocol.base import ProtocolKind as _ProtocolKind  # noqa: PLC0415

    effective_protocol = protocol if protocol is not None else _ProtocolKind.XBPI
    mgr = self._require_mgr()
    async_source: Balance | str | Transport = unwrap_sync_balance(source)
    async_balance = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=effective_protocol,
        serial_settings=serial_settings,
        timeout=timeout,
        src_sbn=src_sbn,
        dst_sbn=dst_sbn,
        strict=strict,
        identify=identify,
    )
    wrapped = wrap_balance(async_balance, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:SartoriusManager.close — idempotent.

Source code in src/sartoriuslib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`SartoriusManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute

execute(command, requests_by_name)

Blocking :meth:SartoriusManager.execute.

Source code in src/sartoriuslib/sync/manager.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Blocking :meth:`SartoriusManager.execute`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute, command, requests_by_name)

get

get(name)

Return the sync wrapper for the balance registered under name.

Source code in src/sartoriuslib/sync/manager.py
def get(self, name: str) -> SyncBalance:
    """Return the sync wrapper for the balance registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_balance = mgr.get(name)
    wrapped = wrap_balance(async_balance, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(names=None)

Blocking :meth:SartoriusManager.poll.

Source code in src/sartoriuslib/sync/manager.py
def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Blocking :meth:`SartoriusManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.poll, names)

remove

remove(name)

Blocking :meth:SartoriusManager.remove.

Source code in src/sartoriuslib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`SartoriusManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/sartoriuslib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/sartoriuslib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/sartoriuslib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.sqlite.SqliteSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:sartoriuslib.sinks.pipe.

Source code in src/sartoriuslib/sync/recording.py
def pipe(
    stream: Iterator[Mapping[str, Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`sartoriuslib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch.values())
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

record

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:sartoriuslib.streaming.record.

If source is a :class:SyncSartoriusManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/sartoriuslib/sync/recording.py
@contextmanager
def record(
    source: SyncSartoriusManager | PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Mapping[str, Sample]]]:
    """Sync :func:`sartoriuslib.streaming.record`.

    If ``source`` is a :class:`SyncSartoriusManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/sartoriuslib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Portal

sartoriuslib.sync.portal

Blocking portal primitive — sync access to the async core.

:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal so the rest of the sync facade (balance, manager, recording, sinks) can share one dispatch primitive.

Shape:

  • Lifecycle is a plain with block. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises.
  • call(func, *args, **kwargs) runs a coroutine. kwargs are bound through :func:functools.partial because :meth:anyio.from_thread.BlockingPortal.call only accepts positional arguments.
  • Single-member :class:ExceptionGroup s are unwrapped. The async core runs inside task groups (manager, recorder), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~sartoriuslib.errors.SartoriusError subclass they branch on. Aggregates with two or more exceptions stay as :class:ExceptionGroup, so sync callers under manager ErrorPolicy.RAISE handle one-failure and multi-failure cases with different exception shapes.
  • wrap_async_context_manager delegates to the portal's helper.
  • wrap_async_iter bridges async iteration. The returned :class:SyncAsyncIterator is both iterable and closeable.

Design reference: docs/design.md §9.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/sartoriuslib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/sartoriuslib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/sartoriuslib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/sartoriuslib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/sartoriuslib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/sartoriuslib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/sartoriuslib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Sartorius / SyncBalance

sartoriuslib.sync.balance

Sync balance facade — portal-driven wrapper over :class:Balance.

Each :class:SyncBalance holds a reference to an async :class:~sartoriuslib.devices.balance.Balance and a :class:~sartoriuslib.sync.portal.SyncPortal; every public method is a one-liner that hands the underlying coroutine to the portal.

The :class:Sartorius namespace exposes a Sartorius.open(...) context manager that drives the async :func:~sartoriuslib.devices.factory.open_device through the portal.

Design reference: docs/design.md §9.

Sartorius

Namespace for the sync balance entry point.

Use :meth:Sartorius.open as a context manager::

from sartoriuslib.sync import Sartorius

with Sartorius.open("/dev/ttyUSB0") as bal:
    print(bal.poll())

open staticmethod

open(
    port,
    *,
    protocol=None,
    serial_settings=None,
    timeout=1.0,
    src_sbn=1,
    dst_sbn=9,
    strict=False,
    identify=True,
    portal=None,
)

Open a sync :class:SyncBalance scoped to a with block.

Mirrors :func:sartoriuslib.open_device parameter-for- parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

Source code in src/sartoriuslib/sync/balance.py
@staticmethod
@contextmanager
def open(
    port: str | Transport,
    *,
    protocol: ProtocolKind | None = None,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    src_sbn: int = 0x01,
    dst_sbn: int = 0x09,
    strict: bool = False,
    identify: bool = True,
    portal: SyncPortal | None = None,
) -> Generator[SyncBalance]:
    """Open a sync :class:`SyncBalance` scoped to a ``with`` block.

    Mirrors :func:`sartoriuslib.open_device` parameter-for-
    parameter (modulo the portal plumbing). The sync CM drives
    the async factory through a :class:`SyncPortal`; the portal
    is created per-call unless one is passed in via ``portal=``.
    """
    # Local import keeps the ProtocolKind value available at runtime
    # (the top-level import is guarded by TYPE_CHECKING).
    from sartoriuslib.protocol.base import ProtocolKind as _ProtocolKind  # noqa: PLC0415

    effective_protocol = protocol if protocol is not None else _ProtocolKind.XBPI

    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        balance = active_portal.call(
            open_device,
            port,
            protocol=effective_protocol,
            serial_settings=serial_settings,
            timeout=timeout,
            src_sbn=src_sbn,
            dst_sbn=dst_sbn,
            strict=strict,
            identify=identify,
        )
        try:
            yield wrap_balance(balance, active_portal)
        finally:
            # Close the underlying transport through the portal;
            # the Balance's aclose closes the transport it was
            # constructed against.
            active_portal.call(balance.aclose)

SyncBalance

SyncBalance(balance, portal)

Blocking facade over :class:sartoriuslib.devices.balance.Balance.

Instances are produced by :meth:Sartorius.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/sartoriuslib/sync/balance.py
def __init__(self, balance: Balance, portal: SyncPortal) -> None:
    self._bal = balance
    self._portal = portal

info property

info

Identity snapshot — passes through :attr:Balance.info.

portal property

portal

The :class:SyncPortal this balance routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

capacity

capacity(area=0)

Blocking :meth:Balance.capacity.

Source code in src/sartoriuslib/sync/balance.py
def capacity(self, area: int = 0) -> Quantity:
    """Blocking :meth:`Balance.capacity`."""
    return self._portal.call(self._bal.capacity, area)

configure_protocol

configure_protocol(
    target,
    *,
    baudrate=None,
    parity=None,
    stopbits=None,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.configure_protocol.

Source code in src/sartoriuslib/sync/balance.py
def configure_protocol(
    self,
    target: ProtocolKind,
    *,
    baudrate: int | None = None,
    parity: Parity | None = None,
    stopbits: StopBits | None = None,
    timeout: float | None = None,
    confirm: bool = False,
) -> DeviceInfo:
    """Blocking :meth:`Balance.configure_protocol`."""
    return self._portal.call(
        self._bal.configure_protocol,
        target,
        baudrate=baudrate,
        parity=parity,
        stopbits=stopbits,
        timeout=timeout,
        confirm=confirm,
    )

discover_temperature_sensors

discover_temperature_sensors(*, max_index=8)

Blocking :meth:Balance.discover_temperature_sensors.

Source code in src/sartoriuslib/sync/balance.py
def discover_temperature_sensors(self, *, max_index: int = 8) -> tuple[int, ...]:
    """Blocking :meth:`Balance.discover_temperature_sensors`."""
    return self._portal.call(
        self._bal.discover_temperature_sensors,
        max_index=max_index,
    )

get_auto_zero

get_auto_zero()

Blocking :meth:Balance.get_auto_zero.

Source code in src/sartoriuslib/sync/balance.py
def get_auto_zero(self) -> AutoZeroMode:
    """Blocking :meth:`Balance.get_auto_zero`."""
    return self._portal.call(self._bal.get_auto_zero)

get_display_unit

get_display_unit()

Blocking :meth:Balance.get_display_unit.

Source code in src/sartoriuslib/sync/balance.py
def get_display_unit(self) -> Unit:
    """Blocking :meth:`Balance.get_display_unit`."""
    return self._portal.call(self._bal.get_display_unit)

get_filter_mode

get_filter_mode()

Blocking :meth:Balance.get_filter_mode.

Source code in src/sartoriuslib/sync/balance.py
def get_filter_mode(self) -> FilterMode:
    """Blocking :meth:`Balance.get_filter_mode`."""
    return self._portal.call(self._bal.get_filter_mode)

get_isocal_mode

get_isocal_mode()

Blocking :meth:Balance.get_isocal_mode.

Source code in src/sartoriuslib/sync/balance.py
def get_isocal_mode(self) -> IsoCalMode:
    """Blocking :meth:`Balance.get_isocal_mode`."""
    return self._portal.call(self._bal.get_isocal_mode)

get_menu_access

get_menu_access()

Blocking :meth:Balance.get_menu_access.

Source code in src/sartoriuslib/sync/balance.py
def get_menu_access(self) -> MenuAccessMode:
    """Blocking :meth:`Balance.get_menu_access`."""
    return self._portal.call(self._bal.get_menu_access)

get_tare_behavior

get_tare_behavior()

Blocking :meth:Balance.get_tare_behavior.

Source code in src/sartoriuslib/sync/balance.py
def get_tare_behavior(self) -> TareBehavior:
    """Blocking :meth:`Balance.get_tare_behavior`."""
    return self._portal.call(self._bal.get_tare_behavior)

identify

identify()

Blocking :meth:Balance.identify.

Source code in src/sartoriuslib/sync/balance.py
def identify(self) -> DeviceInfo:
    """Blocking :meth:`Balance.identify`."""
    return self._portal.call(self._bal.identify)

increment

increment(area=0)

Blocking :meth:Balance.increment.

Source code in src/sartoriuslib/sync/balance.py
def increment(self, area: int = 0) -> Quantity:
    """Blocking :meth:`Balance.increment`."""
    return self._portal.call(self._bal.increment, area)

internal_adjust

internal_adjust(*, cal_type=None, confirm=False)

Blocking :meth:Balance.internal_adjust.

Source code in src/sartoriuslib/sync/balance.py
def internal_adjust(
    self,
    *,
    cal_type: int | None = None,
    confirm: bool = False,
) -> None:
    """Blocking :meth:`Balance.internal_adjust`."""
    self._portal.call(
        self._bal.internal_adjust,
        cal_type=cal_type,
        confirm=confirm,
    )

last_cal_record

last_cal_record()

Blocking :meth:Balance.last_cal_record.

Source code in src/sartoriuslib/sync/balance.py
def last_cal_record(self) -> CalRecord:
    """Blocking :meth:`Balance.last_cal_record`."""
    return self._portal.call(self._bal.last_cal_record)

poll

poll()

Blocking :meth:Balance.poll.

Source code in src/sartoriuslib/sync/balance.py
def poll(self) -> Reading:
    """Blocking :meth:`Balance.poll`."""
    return self._portal.call(self._bal.poll)

raw_sbi

raw_sbi(
    command, *, confirm=False, timeout=None, expect_lines=1
)

Blocking :meth:Balance.raw_sbi.

Source code in src/sartoriuslib/sync/balance.py
def raw_sbi(
    self,
    command: bytes | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
    expect_lines: int = 1,
) -> SbiReply:
    """Blocking :meth:`Balance.raw_sbi`."""
    return self._portal.call(
        self._bal.raw_sbi,
        command,
        confirm=confirm,
        timeout=timeout,
        expect_lines=expect_lines,
    )

raw_xbpi

raw_xbpi(opcode, args=b'', *, confirm=False, timeout=None)

Blocking :meth:Balance.raw_xbpi.

Source code in src/sartoriuslib/sync/balance.py
def raw_xbpi(
    self,
    opcode: int,
    args: bytes = b"",
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> XbpiFrame:
    """Blocking :meth:`Balance.raw_xbpi`."""
    return self._portal.call(
        self._bal.raw_xbpi,
        opcode,
        args,
        confirm=confirm,
        timeout=timeout,
    )

read_gross

read_gross(*, hires=0)

Blocking :meth:Balance.read_gross.

Source code in src/sartoriuslib/sync/balance.py
def read_gross(self, *, hires: int = 0) -> Reading:
    """Blocking :meth:`Balance.read_gross`."""
    return self._portal.call(self._bal.read_gross, hires=hires)

read_net

read_net(*, hires=0)

Blocking :meth:Balance.read_net.

Source code in src/sartoriuslib/sync/balance.py
def read_net(self, *, hires: int = 0) -> Reading:
    """Blocking :meth:`Balance.read_net`."""
    return self._portal.call(self._bal.read_net, hires=hires)

read_parameter

read_parameter(index)

Blocking :meth:Balance.read_parameter.

Source code in src/sartoriuslib/sync/balance.py
def read_parameter(self, index: int) -> ParameterEntry:
    """Blocking :meth:`Balance.read_parameter`."""
    return self._portal.call(self._bal.read_parameter, index)

read_tare_value

read_tare_value()

Blocking :meth:Balance.read_tare_value.

Source code in src/sartoriuslib/sync/balance.py
def read_tare_value(self) -> Reading:
    """Blocking :meth:`Balance.read_tare_value`."""
    return self._portal.call(self._bal.read_tare_value)

refresh_sbi_autoprint_state

refresh_sbi_autoprint_state(*, timeout=None)

Blocking :meth:Balance.refresh_sbi_autoprint_state.

Source code in src/sartoriuslib/sync/balance.py
def refresh_sbi_autoprint_state(self, *, timeout: float | None = None) -> bool:
    """Blocking :meth:`Balance.refresh_sbi_autoprint_state`."""
    return self._portal.call(
        self._bal.refresh_sbi_autoprint_state,
        timeout=timeout,
    )

reload_menu

reload_menu(*, confirm=False)

Blocking :meth:Balance.reload_menu.

Source code in src/sartoriuslib/sync/balance.py
def reload_menu(self, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.reload_menu`."""
    self._portal.call(self._bal.reload_menu, confirm=confirm)

save_menu

save_menu(*, confirm=False)

Blocking :meth:Balance.save_menu.

Source code in src/sartoriuslib/sync/balance.py
def save_menu(self, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.save_menu`."""
    self._portal.call(self._bal.save_menu, confirm=confirm)

set_auto_zero

set_auto_zero(mode, *, confirm=False)

Blocking :meth:Balance.set_auto_zero.

Source code in src/sartoriuslib/sync/balance.py
def set_auto_zero(self, mode: AutoZeroMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_auto_zero`."""
    self._portal.call(self._bal.set_auto_zero, mode, confirm=confirm)

set_baud_rate

set_baud_rate(
    wire_code,
    *,
    baudrate,
    parity=None,
    stopbits=None,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.set_baud_rate.

Source code in src/sartoriuslib/sync/balance.py
def set_baud_rate(
    self,
    wire_code: int,
    *,
    baudrate: int,
    parity: Parity | None = None,
    stopbits: StopBits | None = None,
    timeout: float | None = None,
    confirm: bool = False,
) -> DeviceInfo:
    """Blocking :meth:`Balance.set_baud_rate`."""
    return self._portal.call(
        self._bal.set_baud_rate,
        wire_code,
        baudrate=baudrate,
        parity=parity,
        stopbits=stopbits,
        timeout=timeout,
        confirm=confirm,
    )

set_display_unit

set_display_unit(unit, *, confirm=False)

Blocking :meth:Balance.set_display_unit.

Source code in src/sartoriuslib/sync/balance.py
def set_display_unit(self, unit: Unit | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_display_unit`."""
    self._portal.call(self._bal.set_display_unit, unit, confirm=confirm)

set_filter_mode

set_filter_mode(mode, *, confirm=False)

Blocking :meth:Balance.set_filter_mode.

Source code in src/sartoriuslib/sync/balance.py
def set_filter_mode(self, mode: FilterMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_filter_mode`."""
    self._portal.call(self._bal.set_filter_mode, mode, confirm=confirm)

set_isocal_mode

set_isocal_mode(mode, *, confirm=False)

Blocking :meth:Balance.set_isocal_mode.

Source code in src/sartoriuslib/sync/balance.py
def set_isocal_mode(self, mode: IsoCalMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_isocal_mode`."""
    self._portal.call(self._bal.set_isocal_mode, mode, confirm=confirm)

set_menu_access

set_menu_access(mode, *, confirm=False)

Blocking :meth:Balance.set_menu_access.

Source code in src/sartoriuslib/sync/balance.py
def set_menu_access(self, mode: MenuAccessMode | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_menu_access`."""
    self._portal.call(self._bal.set_menu_access, mode, confirm=confirm)

set_tare_behavior

set_tare_behavior(mode, *, confirm=False)

Blocking :meth:Balance.set_tare_behavior.

Source code in src/sartoriuslib/sync/balance.py
def set_tare_behavior(self, mode: TareBehavior | str | int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.set_tare_behavior`."""
    self._portal.call(self._bal.set_tare_behavior, mode, confirm=confirm)

status

status()

Blocking :meth:Balance.status.

Source code in src/sartoriuslib/sync/balance.py
def status(self) -> BalanceStatus:
    """Blocking :meth:`Balance.status`."""
    return self._portal.call(self._bal.status)

tare

tare()

Blocking :meth:Balance.tare.

Source code in src/sartoriuslib/sync/balance.py
def tare(self) -> None:
    """Blocking :meth:`Balance.tare`."""
    self._portal.call(self._bal.tare)

temperature

temperature(sensor=0)

Blocking :meth:Balance.temperature.

Source code in src/sartoriuslib/sync/balance.py
def temperature(self, sensor: int = 0) -> TemperatureReading:
    """Blocking :meth:`Balance.temperature`."""
    return self._portal.call(self._bal.temperature, sensor)

write_parameter

write_parameter(index, value, *, confirm=False)

Blocking :meth:Balance.write_parameter.

Source code in src/sartoriuslib/sync/balance.py
def write_parameter(self, index: int, value: int, *, confirm: bool = False) -> None:
    """Blocking :meth:`Balance.write_parameter`."""
    self._portal.call(self._bal.write_parameter, index, value, confirm=confirm)

write_sbn_address

write_sbn_address(
    sbn,
    *,
    update_session_dst=False,
    timeout=None,
    confirm=False,
)

Blocking :meth:Balance.write_sbn_address.

Source code in src/sartoriuslib/sync/balance.py
def write_sbn_address(
    self,
    sbn: int,
    *,
    update_session_dst: bool = False,
    timeout: float | None = None,
    confirm: bool = False,
) -> int:
    """Blocking :meth:`Balance.write_sbn_address`."""
    return self._portal.call(
        self._bal.write_sbn_address,
        sbn,
        update_session_dst=update_session_dst,
        timeout=timeout,
        confirm=confirm,
    )

zero

zero()

Blocking :meth:Balance.zero.

Source code in src/sartoriuslib/sync/balance.py
def zero(self) -> None:
    """Blocking :meth:`Balance.zero`."""
    self._portal.call(self._bal.zero)

unwrap_sync_balance

unwrap_sync_balance(source)

Return the async :class:Balance inside source if wrapped.

Package-private helper used by :class:SyncSartoriusManager.

Source code in src/sartoriuslib/sync/balance.py
def unwrap_sync_balance[T](source: T | SyncBalance) -> T | Balance:
    """Return the async :class:`Balance` inside ``source`` if wrapped.

    Package-private helper used by :class:`SyncSartoriusManager`.
    """
    if isinstance(source, SyncBalance):
        return source._bal  # pyright: ignore[reportPrivateUsage]
    return source

wrap_balance

wrap_balance(balance, portal)

Return a :class:SyncBalance wrapping balance on portal.

Package-private helper used by :class:SyncSartoriusManager.

Source code in src/sartoriuslib/sync/balance.py
def wrap_balance(balance: Balance, portal: SyncPortal) -> SyncBalance:
    """Return a :class:`SyncBalance` wrapping ``balance`` on ``portal``.

    Package-private helper used by :class:`SyncSartoriusManager`.
    """
    return SyncBalance(balance, portal)

SyncSartoriusManager

sartoriuslib.sync.manager

Sync manager facade — portal-driven wrapper over :class:SartoriusManager.

:class:SyncSartoriusManager wraps the async :class:~sartoriuslib.manager.SartoriusManager through a :class:~sartoriuslib.sync.portal.SyncPortal. Every coroutine method becomes a blocking method here; the synchronous :meth:get stays synchronous and delegates directly.

Lifecycle mirrors the async side: the class is a with context manager. By default each instance owns its own portal; callers that need several facades to share one event loop can pass portal= to reuse a long-lived :class:SyncPortal.

Design reference: docs/design.md §9 and §11.

DeviceResult dataclass

DeviceResult(value, error, protocol=None)

Per-device result container — value or error, never both.

:attr:protocol is populated by :class:SartoriusManager from the balance's session so error samples from the :mod:~sartoriuslib.streaming layer can still record which protocol produced the failure. Non-manager :class:~sartoriuslib.streaming.PollSource stubs may leave it None.

ok property

ok

True when the balance produced a value (error is None).

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every balance's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each balance produces a :class:DeviceResult and the caller inspects .error per entry.

SyncSartoriusManager

SyncSartoriusManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:sartoriuslib.manager.SartoriusManager.

Source code in src/sartoriuslib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: SartoriusManager | None = None
    self._wrapped: dict[str, SyncBalance] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed balance names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=None,
    serial_settings=None,
    timeout=1.0,
    src_sbn=1,
    dst_sbn=9,
    strict=False,
    identify=True,
)

Blocking :meth:SartoriusManager.add.

Accepts a :class:SyncBalance as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Balance before delegation.

Source code in src/sartoriuslib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncBalance | Balance | str | Transport,
    *,
    protocol: ProtocolKind | None = None,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    src_sbn: int = 0x01,
    dst_sbn: int = 0x09,
    strict: bool = False,
    identify: bool = True,
) -> SyncBalance:
    """Blocking :meth:`SartoriusManager.add`.

    Accepts a :class:`SyncBalance` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Balance` before delegation.
    """
    from sartoriuslib.protocol.base import ProtocolKind as _ProtocolKind  # noqa: PLC0415

    effective_protocol = protocol if protocol is not None else _ProtocolKind.XBPI
    mgr = self._require_mgr()
    async_source: Balance | str | Transport = unwrap_sync_balance(source)
    async_balance = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=effective_protocol,
        serial_settings=serial_settings,
        timeout=timeout,
        src_sbn=src_sbn,
        dst_sbn=dst_sbn,
        strict=strict,
        identify=identify,
    )
    wrapped = wrap_balance(async_balance, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:SartoriusManager.close — idempotent.

Source code in src/sartoriuslib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`SartoriusManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute

execute(command, requests_by_name)

Blocking :meth:SartoriusManager.execute.

Source code in src/sartoriuslib/sync/manager.py
def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    requests_by_name: Mapping[str, Req],
) -> Mapping[str, DeviceResult[Resp]]:
    """Blocking :meth:`SartoriusManager.execute`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute, command, requests_by_name)

get

get(name)

Return the sync wrapper for the balance registered under name.

Source code in src/sartoriuslib/sync/manager.py
def get(self, name: str) -> SyncBalance:
    """Return the sync wrapper for the balance registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_balance = mgr.get(name)
    wrapped = wrap_balance(async_balance, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(names=None)

Blocking :meth:SartoriusManager.poll.

Source code in src/sartoriuslib/sync/manager.py
def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Blocking :meth:`SartoriusManager.poll`."""
    mgr = self._require_mgr()
    return self.portal.call(mgr.poll, names)

remove

remove(name)

Blocking :meth:SartoriusManager.remove.

Source code in src/sartoriuslib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`SartoriusManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

Sync recording

sartoriuslib.sync.recording

Sync wrappers for :func:sartoriuslib.streaming.record and :func:sartoriuslib.sinks.pipe.

:func:record — sync context manager wrapping the async recorder. The produced iterator is blocking; on CM exit the underlying async task group is cancelled and joined by the portal.

:func:pipe — sync drain loop matching :func:sartoriuslib.sinks.pipe's batch / time flush semantics. Rebuilt in sync-land rather than wrapping the async driver so buffering stays under sync control and the time threshold uses :func:time.monotonic, not :func:anyio.current_time.

Design reference: docs/design.md §10.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    target_total_samples=None,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration runs, or None for open-ended runs.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:sartoriuslib.sinks.pipe.

Source code in src/sartoriuslib/sync/recording.py
def pipe(
    stream: Iterator[Mapping[str, Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`sartoriuslib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch.values())
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

record

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:sartoriuslib.streaming.record.

If source is a :class:SyncSartoriusManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/sartoriuslib/sync/recording.py
@contextmanager
def record(
    source: SyncSartoriusManager | PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[Iterator[Mapping[str, Sample]]]:
    """Sync :func:`sartoriuslib.streaming.record`.

    If ``source`` is a :class:`SyncSartoriusManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_stream = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_stream))
        yield sync_iter

Sync sinks

sartoriuslib.sync.sinks

Sync wrappers for :mod:sartoriuslib.sinks.

Every in-tree sink has a one-to-one sync counterpart. All of them share :class:SyncSinkAdapter: the per-sink subclass only constructs the matching async sink with its own parameters and hands it to the adapter, which owns the portal + open/write/close plumbing.

Sinks follow the same portal-ownership pattern as the rest of the sync facade — each wrapper creates a throwaway :class:SyncPortal on __enter__ unless the caller passes one in. Pass a shared portal when the sink must share an event loop with a :class:SyncSartoriusManager or :func:record, otherwise the sink's writes run on a different loop than the data producer.

Design reference: docs/design.md §10.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/sartoriuslib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.csv.CsvSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.memory.InMemorySink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.jsonl.JsonlSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.parquet.ParquetSink.

Requires the sartoriuslib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.postgres.PostgresSink.

Requires the sartoriuslib[postgres] extra — dependency check runs on :meth:open.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSampleSink

Bases: Protocol

Sync shape of an acquisition sink.

Mirrors :class:~sartoriuslib.sinks.base.SampleSink — same method names, no await. Every concrete wrapper in this module satisfies this Protocol.

__enter__

__enter__()

Open the sink and return self.

Source code in src/sartoriuslib/sync/sinks.py
def __enter__(self) -> Self:
    """Open the sink and return self."""
    ...

__exit__

__exit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/sartoriuslib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close

close()

Flush and release the backing resource — idempotent.

Source code in src/sartoriuslib/sync/sinks.py
def close(self) -> None:
    """Flush and release the backing resource — idempotent."""
    ...

open

open()

Allocate the sink's backing resource.

Source code in src/sartoriuslib/sync/sinks.py
def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_many

write_many(samples)

Append samples to the sink.

Source code in src/sartoriuslib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/sartoriuslib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/sartoriuslib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/sartoriuslib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~sartoriuslib.sinks.sqlite.SqliteSink.

Source code in src/sartoriuslib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )