Skip to content

sartoriuslib.streaming

Sample, StreamingSession, StreamMode, record(), AcquisitionSummary, OverflowPolicy, PollSource. See Streaming and Logging and acquisition.

Public surface

sartoriuslib.streaming

Streaming + recording primitives. See design doc §10.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    target_total_samples=None,
)

Mutable acquisition totals owned by recorder / pipe drivers.

The recorder is the sole writer — counters update in place during the run so progress polling (TUIs, dashboards) works without a separate API. Consumers MUST treat the summary as read-only; mutating it from the consumer side is a contract violation that will produce wrong totals on shutdown.

finished_at is None while the producer is running and is set on context-manager exit.

Unified spec §M: every sibling library follows the same mutability rule; the field set is library-specific.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown (None until then).

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream for recorder summaries, or individual samples handed to the sink for pipe() summaries.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration recorder runs, or None for open-ended runs and pipe() summaries.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~sartoriuslib.manager.SartoriusManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named balance (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~sartoriuslib.errors.SartoriusError as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).

Source code in src/sartoriuslib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named balance (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~sartoriuslib.errors.SartoriusError`
    as ``.error`` (per :class:`~sartoriuslib.manager.ErrorPolicy.RETURN`).
    """
    ...

PollSourceAdapter

PollSourceAdapter(name, device)

Wrap one :class:Balance as a :class:PollSource for :func:record.

Construction takes a name (the manager-style identifier the sample carries downstream) and the :class:Balance to poll. Every :meth:poll invocation returns either a single-entry mapping containing the poll outcome wrapped in :class:DeviceResult, or an empty mapping when names is supplied and does not include this adapter's name.

Usage::

adapter = PollSourceAdapter("bal1", balance)
async with record(adapter, rate_hz=10) as recording:
    async for batch in recording.stream:
        ...
Source code in src/sartoriuslib/streaming/poll_source.py
def __init__(self, name: str, device: Balance) -> None:
    self._name = name
    self._device = device

device property

device

The wrapped :class:Balance.

name property

name

The manager-style identifier this adapter publishes samples under.

poll async

poll(names=None)

Poll the wrapped balance and return a one-entry result mapping.

When names is supplied and excludes :attr:name, returns an empty mapping (the consumer asked for a different device). Otherwise returns {name: DeviceResult.success(reading)} on success or {name: DeviceResult.failure(error)} on a typed sartoriuslib failure.

Source code in src/sartoriuslib/streaming/poll_source.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll the wrapped balance and return a one-entry result mapping.

    When ``names`` is supplied and excludes :attr:`name`, returns
    an empty mapping (the consumer asked for a different device).
    Otherwise returns ``{name: DeviceResult.success(reading)}`` on
    success or ``{name: DeviceResult.failure(error)}`` on a typed
    sartoriuslib failure.
    """
    if names is not None and self._name not in set(names):
        return {}
    try:
        reading = await self._device.poll()
    except SartoriusError as exc:
        return {self._name: DeviceResult.failure(exc)}
    return {self._name: DeviceResult.success(reading)}

Recording dataclass

Recording(stream, summary, rate_hz, observed_rate_hz=None)

The context-manager payload returned by :func:record.

Bundles the per-tick stream, the live :class:AcquisitionSummary, and the configured / observed rates so consumers can poll progress without reaching into recorder internals. Cross-library spec §M: every sibling library yields Recording[T] from its record CM; T is what the recorder actually emits per tick (for sartoriuslib that's Mapping[str, Sample]).

Attributes:

Name Type Description
stream StreamT

The async iterator the recorder publishes per-tick payloads into. Drain by async for batch in recording.stream.

summary AcquisitionSummary

Live :class:AcquisitionSummary. Mutates in place; summary.finished_at is set on CM exit.

rate_hz float

Configured cadence the recorder is running at, as passed to :func:record.

observed_rate_hz float | None

Rolling mean inter-frame rate over the last 10 SBI autoprint frames. None until the buffer fills, and None for non-autoprint runs.

Sample dataclass

Sample(
    device,
    reading,
    t_mono_ns,
    t_utc,
    requested_at,
    received_at,
    latency_s,
    protocol,
    t_midpoint_mono_ns=None,
    metadata=_empty_metadata(),
    error=None,
)

One balance poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from SartoriusManager.add). Stable downstream identifier that follows the value into sinks.

reading Reading | None

The :class:Reading decoded from the balance's reply. None when the poll failed — inspect :attr:error.

t_mono_ns int

Canonical monotonic acquisition timestamp in nanoseconds since OS boot. The midpoint of the request / response monotonic timestamps for request/response polling; the receive-side monotonic for SBI autoprint frames. This is the join key downstream tooling correlates against sibling-library samples.

t_utc datetime

Wall-clock acquisition instant (UTC, tz-aware) for the same moment :attr:t_mono_ns records. For poll: midpoint of :attr:requested_at and :attr:received_at. For autoprint: :attr:received_at.

t_midpoint_mono_ns int | None

Optional integration-window midpoint in monotonic nanoseconds. None for single polled or autoprint samples (sartorius balances do not expose integration semantics); reserved for forward compatibility with sensors that do.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host. None for autoprint samples where the host did not send a request.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is read.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience. 0.0 for autoprint samples.

protocol ProtocolKind | None

Which wire protocol produced this sample. Duplicates reading.protocol on successful polls and preserves the value for error rows where reading is None. None only when an error-path sample arrives from a :class:PollSource that did not supply a protocol hint — in practice the manager always supplies one.

metadata Mapping[str, str]

Free-form per-sample annotations. Populated by stream(mode=...) to record which streaming mode produced the sample ("poll" or "autoprint").

error SartoriusError | None

The :class:SartoriusError captured on a failed poll, or None on success.

StreamingSession

StreamingSession(
    balance,
    *,
    rate_hz=None,
    mode="poll",
    temporary_autoprint=False,
    confirm=False,
    timeout=None,
)

Async context manager + iterator for one balance.

mode="poll" performs request/response polling at an absolute cadence. mode="autoprint" consumes already-enabled SBI autoprint lines and fails on entry if no line is available within timeout. The temporary_autoprint=True path is reserved for a future persistent SBI parameter-write flow and currently raises :class:NotImplementedError.

Source code in src/sartoriuslib/streaming/stream_session.py
def __init__(
    self,
    balance: Balance,
    *,
    rate_hz: float | None = None,
    mode: StreamMode = "poll",
    temporary_autoprint: bool = False,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    if mode not in ("poll", "autoprint"):
        raise ValueError(f"unknown stream mode {mode!r}")
    if mode == "poll" and (rate_hz is None or rate_hz <= 0):
        raise ValueError("poll stream requires rate_hz > 0")
    self._balance = balance
    self._rate_hz = rate_hz
    self._mode: StreamMode = mode
    self._temporary_autoprint = temporary_autoprint
    self._confirm = confirm
    self._timeout = timeout
    self._entered = False
    self._tick = 0
    self._start: float = 0.0
    self._pending: Sample | None = None

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as recording:
    async for batch in recording.stream:
        process(batch)
    print(recording.summary.samples_emitted)

The CM yields a :class:Recording whose :attr:Recording.stream is an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that participated on that tick. Successful polls produce a :class:Sample carrying a :class:Reading; failed polls produce a :class:Sample with reading=None and error set. The bundled :attr:Recording.summary updates live during the run and finalises (finished_at set) on CM exit.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically a :class:~sartoriuslib.manager.SartoriusManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. Finite runs schedule round(duration * rate_hz) ticks. If the producer overruns or the overflow policy drops a batch, samples_emitted may be lower than this target and the missed/dropped ticks are counted on samples_late. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64

Yields:

Name Type Description
A AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]

class:Recording parameterised on Mapping[str, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/sartoriuslib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as recording:
            async for batch in recording.stream:
                process(batch)
            print(recording.summary.samples_emitted)

    The CM yields a :class:`Recording` whose :attr:`Recording.stream`
    is an async iterator of per-tick sample batches. Each batch is a
    ``Mapping[name, Sample]`` — one entry per device that
    participated on that tick. Successful polls produce a
    :class:`Sample` carrying a :class:`Reading`; failed polls produce
    a :class:`Sample` with ``reading=None`` and ``error`` set. The
    bundled :attr:`Recording.summary` updates live during the run and
    finalises (``finished_at`` set) on CM exit.

    Args:
        source: Any :class:`PollSource` (typically a
            :class:`~sartoriuslib.manager.SartoriusManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. Finite runs
            schedule ``round(duration * rate_hz)`` ticks. If the
            producer overruns or the overflow policy drops a batch,
            ``samples_emitted`` may be lower than this target and the
            missed/dropped ticks are counted on ``samples_late``.
            ``None`` means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.

    Yields:
        A :class:`Recording` parameterised on ``Mapping[str, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §10).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        target_total_samples=total_ticks,
    )
    recording: Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]] = Recording(
        stream=receive_stream,
        summary=summary,
        rate_hz=rate_hz,
    )
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            summary,
        )
        try:
            yield recording
        finally:
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary.finished_at = finished_at
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "target_total_samples": summary.target_total_samples,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )

Sample

sartoriuslib.streaming.sample

Timed sample — one balance reading with send/receive provenance.

A :class:Sample is what the recorder emits into its memory-object stream. It pairs a :class:Reading with enough timing to reconstruct the acquisition timeline. The unified cross-library timestamp contract (see UNIFIED_API_HANDOFF.md §C) requires three canonical fields:

  • :attr:t_mono_ns — monotonic acquisition timestamp (join key).
  • :attr:t_utc — wall-clock acquisition instant. For request/response polling this is the midpoint between :attr:requested_at and :attr:received_at — the best point estimate of when the device produced the reading. For SBI autoprint it is :attr:received_at.
  • :attr:t_midpoint_mono_ns — integration-window midpoint (monotonic). None for single polled / autoprint samples; reserved for sensors that emit values integrated over a known window.

Per-protocol I/O provenance (requested_at / received_at / latency_s) is kept alongside so latency analysis and on-the-wire debugging stay possible. :attr:metadata carries free-form annotations (autoprint vs. poll mode) and is real data, not log spam.

reading is None when error is populated — the two fields are mutually exclusive. Samples with error still carry the timing fields so sinks can log the failed attempts with proper wall-clock provenance.

Design reference: docs/design.md §10; unified spec UNIFIED_API_HANDOFF.md §C.

Sample dataclass

Sample(
    device,
    reading,
    t_mono_ns,
    t_utc,
    requested_at,
    received_at,
    latency_s,
    protocol,
    t_midpoint_mono_ns=None,
    metadata=_empty_metadata(),
    error=None,
)

One balance poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from SartoriusManager.add). Stable downstream identifier that follows the value into sinks.

reading Reading | None

The :class:Reading decoded from the balance's reply. None when the poll failed — inspect :attr:error.

t_mono_ns int

Canonical monotonic acquisition timestamp in nanoseconds since OS boot. The midpoint of the request / response monotonic timestamps for request/response polling; the receive-side monotonic for SBI autoprint frames. This is the join key downstream tooling correlates against sibling-library samples.

t_utc datetime

Wall-clock acquisition instant (UTC, tz-aware) for the same moment :attr:t_mono_ns records. For poll: midpoint of :attr:requested_at and :attr:received_at. For autoprint: :attr:received_at.

t_midpoint_mono_ns int | None

Optional integration-window midpoint in monotonic nanoseconds. None for single polled or autoprint samples (sartorius balances do not expose integration semantics); reserved for forward compatibility with sensors that do.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host. None for autoprint samples where the host did not send a request.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is read.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience. 0.0 for autoprint samples.

protocol ProtocolKind | None

Which wire protocol produced this sample. Duplicates reading.protocol on successful polls and preserves the value for error rows where reading is None. None only when an error-path sample arrives from a :class:PollSource that did not supply a protocol hint — in practice the manager always supplies one.

metadata Mapping[str, str]

Free-form per-sample annotations. Populated by stream(mode=...) to record which streaming mode produced the sample ("poll" or "autoprint").

error SartoriusError | None

The :class:SartoriusError captured on a failed poll, or None on success.

Per-balance streaming session

sartoriuslib.streaming.stream_session

Per-balance streaming session used by :meth:Balance.stream.

StreamingSession

StreamingSession(
    balance,
    *,
    rate_hz=None,
    mode="poll",
    temporary_autoprint=False,
    confirm=False,
    timeout=None,
)

Async context manager + iterator for one balance.

mode="poll" performs request/response polling at an absolute cadence. mode="autoprint" consumes already-enabled SBI autoprint lines and fails on entry if no line is available within timeout. The temporary_autoprint=True path is reserved for a future persistent SBI parameter-write flow and currently raises :class:NotImplementedError.

Source code in src/sartoriuslib/streaming/stream_session.py
def __init__(
    self,
    balance: Balance,
    *,
    rate_hz: float | None = None,
    mode: StreamMode = "poll",
    temporary_autoprint: bool = False,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    if mode not in ("poll", "autoprint"):
        raise ValueError(f"unknown stream mode {mode!r}")
    if mode == "poll" and (rate_hz is None or rate_hz <= 0):
        raise ValueError("poll stream requires rate_hz > 0")
    self._balance = balance
    self._rate_hz = rate_hz
    self._mode: StreamMode = mode
    self._temporary_autoprint = temporary_autoprint
    self._confirm = confirm
    self._timeout = timeout
    self._entered = False
    self._tick = 0
    self._start: float = 0.0
    self._pending: Sample | None = None

Recorder — record(), summary, overflow

sartoriuslib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:~sartoriuslib.manager.SartoriusManager (or any :class:PollSource-shaped object — see below) at an absolute-target cadence and publishes the polled :class:~sartoriuslib.devices.models.Reading values into an :class:anyio.abc.ObjectReceiveStream as per-tick Mapping[name, Sample] batches.

Key invariants (design §10):

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives inside a create_task_group() strictly nested inside the async CM body.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each balance's poll and attached to the emitted :class:Sample.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.
  • Errors are samples too. Under ErrorPolicy.RETURN on the source, per-device failures arrive as :class:Sample with reading=None and error set — they still carry wall-clock timing so sinks can log the failure. Under ErrorPolicy.RAISE a failed device's poll aborts the batch at the source layer before the recorder sees it.

The recorder consumes a :class:PollSource — a narrow Protocol the :class:~sartoriuslib.manager.SartoriusManager already satisfies (its poll(names) signature matches). Kept as a Protocol so the recorder is unit-testable against a lightweight stub.

Design reference: docs/design.md §10.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    target_total_samples=None,
)

Mutable acquisition totals owned by recorder / pipe drivers.

The recorder is the sole writer — counters update in place during the run so progress polling (TUIs, dashboards) works without a separate API. Consumers MUST treat the summary as read-only; mutating it from the consumer side is a contract violation that will produce wrong totals on shutdown.

finished_at is None while the producer is running and is set on context-manager exit.

Unified spec §M: every sibling library follows the same mutability rule; the field set is library-specific.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown (None until then).

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream for recorder summaries, or individual samples handed to the sink for pipe() summaries.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration recorder runs, or None for open-ended runs and pipe() summaries.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~sartoriuslib.manager.SartoriusManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named balance (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~sartoriuslib.errors.SartoriusError as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).

Source code in src/sartoriuslib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named balance (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~sartoriuslib.errors.SartoriusError`
    as ``.error`` (per :class:`~sartoriuslib.manager.ErrorPolicy.RETURN`).
    """
    ...

Recording dataclass

Recording(stream, summary, rate_hz, observed_rate_hz=None)

The context-manager payload returned by :func:record.

Bundles the per-tick stream, the live :class:AcquisitionSummary, and the configured / observed rates so consumers can poll progress without reaching into recorder internals. Cross-library spec §M: every sibling library yields Recording[T] from its record CM; T is what the recorder actually emits per tick (for sartoriuslib that's Mapping[str, Sample]).

Attributes:

Name Type Description
stream StreamT

The async iterator the recorder publishes per-tick payloads into. Drain by async for batch in recording.stream.

summary AcquisitionSummary

Live :class:AcquisitionSummary. Mutates in place; summary.finished_at is set on CM exit.

rate_hz float

Configured cadence the recorder is running at, as passed to :func:record.

observed_rate_hz float | None

Rolling mean inter-frame rate over the last 10 SBI autoprint frames. None until the buffer fills, and None for non-autoprint runs.

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as recording:
    async for batch in recording.stream:
        process(batch)
    print(recording.summary.samples_emitted)

The CM yields a :class:Recording whose :attr:Recording.stream is an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that participated on that tick. Successful polls produce a :class:Sample carrying a :class:Reading; failed polls produce a :class:Sample with reading=None and error set. The bundled :attr:Recording.summary updates live during the run and finalises (finished_at set) on CM exit.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically a :class:~sartoriuslib.manager.SartoriusManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. Finite runs schedule round(duration * rate_hz) ticks. If the producer overruns or the overflow policy drops a batch, samples_emitted may be lower than this target and the missed/dropped ticks are counted on samples_late. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64

Yields:

Name Type Description
A AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]

class:Recording parameterised on Mapping[str, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/sartoriuslib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as recording:
            async for batch in recording.stream:
                process(batch)
            print(recording.summary.samples_emitted)

    The CM yields a :class:`Recording` whose :attr:`Recording.stream`
    is an async iterator of per-tick sample batches. Each batch is a
    ``Mapping[name, Sample]`` — one entry per device that
    participated on that tick. Successful polls produce a
    :class:`Sample` carrying a :class:`Reading`; failed polls produce
    a :class:`Sample` with ``reading=None`` and ``error`` set. The
    bundled :attr:`Recording.summary` updates live during the run and
    finalises (``finished_at`` set) on CM exit.

    Args:
        source: Any :class:`PollSource` (typically a
            :class:`~sartoriuslib.manager.SartoriusManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. Finite runs
            schedule ``round(duration * rate_hz)`` ticks. If the
            producer overruns or the overflow policy drops a batch,
            ``samples_emitted`` may be lower than this target and the
            missed/dropped ticks are counted on ``samples_late``.
            ``None`` means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.

    Yields:
        A :class:`Recording` parameterised on ``Mapping[str, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §10).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        target_total_samples=total_ticks,
    )
    recording: Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]] = Recording(
        stream=receive_stream,
        summary=summary,
        rate_hz=rate_hz,
    )
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            summary,
        )
        try:
            yield recording
        finally:
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary.finished_at = finished_at
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "target_total_samples": summary.target_total_samples,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )