Skip to content

servomexlib.streaming

Sample, StreamingSession, StreamMode, record(), and AcquisitionSummary.

servomexlib.streaming

Streaming layer.

Two acquisition shapes share one :class:Sample model and :class:StreamingSession interface: the passive AUTOPRINT subscribe (continuous broadcast) and the drift-free POLL :func:record loop (Modbus). :class:PollSource is the narrow contract the recorder drives.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary, owned and mutated by the recorder (sole writer).

Counters update in place during the run so progress-polling consumers see live values; consumers treat it as read-only. finished_at and the percentile fields are populated on context-manager exit.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Per-tick batches pushed onto the stream (a tick whose reads all failed still counts as one emitted batch).

samples_late int

Ticks that missed their slot (overrun, overflow drop, or a reconnect gap).

max_drift_ms float

Largest positive drift of an emitted batch from its target.

tick_duration_ms_p50 float

Median poll_samples duration (set on exit).

tick_duration_ms_p99 float

99th-percentile poll_samples duration (on exit).

disconnects int

ServomexConnectionError events absorbed under auto_reconnect; 0 when it was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks the response.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer (default). Silent drops are surprising in acquisition, so the recorder blocks rather than discarding.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. For real-time monitoring where the latest reading matters most. Each eviction is late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

poll_samples async

poll_samples(*, names=None, timeout=None)

Read this tick's samples across every channel (and managed device).

Parameters:

Name Type Description Default
names Sequence[str] | None

Subset of device names to poll (manager only); None polls everything. A solo analyser ignores this.

None
timeout float | None

Per-poll I/O ceiling, or None for the source default.

None

Returns:

Type Description
Sequence[Sample]

A flat sequence of :class:Sample — empty when every read failed.

Source code in src/servomexlib/streaming/poll_source.py
async def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> Sequence[Sample]:
    """Read this tick's samples across every channel (and managed device).

    Args:
        names: Subset of device names to poll (manager only); ``None`` polls
            everything. A solo analyser ignores this.
        timeout: Per-poll I/O ceiling, or ``None`` for the source default.

    Returns:
        A flat sequence of :class:`Sample` — empty when every read failed.
    """
    ...

Recording dataclass

Recording(stream, summary, rate_hz)

Container yielded by :func:record — stream + live summary + rate.

Shares the cross-family shape (stream / summary / rate_hz) so downstream consumers are vendor-agnostic. For servomexlib the payload is Recording[Sequence[Sample]] — per-tick batches.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick :class:Sample batches.

summary AcquisitionSummary

Live :class:AcquisitionSummary (recorder mutates, consumer reads); summary.finished_at is set on CM exit.

rate_hz float

The cadence captured at record() entry.

Sample dataclass

Sample(
    device,
    channel,
    reading,
    protocol,
    monotonic_ns,
    received_at,
    requested_at=None,
    latency_s=None,
    metadata=_empty_metadata(),
    error=None,
)

Long-format row — one channel reading with streaming provenance.

requested_at / latency_s are None in passive continuous mode (we did not ask). error is set when a frame was dropped/corrupt; reading is then None (the two are mutually exclusive) and channel is None because a dropped frame is not tied to one channel.

StreamMode

Bases: StrEnum

How a :meth:Analyzer.stream session sources samples.

AUTOPRINT is the inherited family member (sartorius SBI vocabulary), reused verbatim for boundary harmony; for the 4100 it denotes a passive unsolicited-broadcast subscribe.

StreamingSession

StreamingSession(
    receiver, *, mode, on_close=None, producer=None
)

Async-iterable context manager over one subscriber's :class:Sample stream.

Two backing shapes share this one interface:

  • Passive AUTOPRINT (continuous): receiver is fed by the client's already-running background loop; on_close unsubscribes.
  • Active POLL (Modbus): a producer coroutine is supplied. It is run in a task group this session owns — started on __aenter__ and cancelled on close — so the recorder's lifetime is strictly nested in the session.
Source code in src/servomexlib/streaming/stream_session.py
def __init__(
    self,
    receiver: MemoryObjectReceiveStream[Sample],
    *,
    mode: StreamMode,
    on_close: Callable[[], None] | None = None,
    producer: Callable[[], Awaitable[None]] | None = None,
) -> None:
    self._receiver = receiver
    self._mode = mode
    self._on_close = on_close
    self._producer = producer
    self._task_group: TaskGroup | None = None

mode property

mode

The mode this session is streaming in.

aclose async

aclose()

Stop the owned producer (if any), unsubscribe, and close the stream.

Source code in src/servomexlib/streaming/stream_session.py
async def aclose(self) -> None:
    """Stop the owned producer (if any), unsubscribe, and close the stream."""
    if self._task_group is not None:
        self._task_group.cancel_scope.cancel()
        await self._task_group.__aexit__(None, None, None)
        self._task_group = None
    if self._on_close is not None:
        self._on_close()
        self._on_close = None
    await self._receiver.aclose()

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    timeout=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(analyzer, rate_hz=2, duration=10) as rec:
    async for batch in rec.stream:
        for sample in batch:
            print(sample.channel, sample.value)

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (an :class:Analyzer or a :class:ServomexManager).

required
rate_hz float

Target cadence; target[n] = start + n / rate_hz. Must be > 0.

required
duration float | None

Total acquisition seconds, or None for "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll (manager only); None polls all.

None
timeout float | None

Per-poll I/O ceiling passed to source.poll_samples.

None
overflow OverflowPolicy

Backpressure policy when the buffer is full.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

Treat :class:ServomexConnectionError from the source as a transient drop: log, back off, optionally rebuild via reconnect_factory, and keep going (missed ticks count late).

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

Rebuilds the source after a disconnect when supplied.

None

Yields:

Name Type Description
A AsyncGenerator[Recording[Sequence[Sample]]]

class:Recording[Sequence[Sample]].

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/servomexlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    timeout: float | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[Recording[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(analyzer, rate_hz=2, duration=10) as rec:
            async for batch in rec.stream:
                for sample in batch:
                    print(sample.channel, sample.value)

    Args:
        source: Any :class:`PollSource` (an :class:`Analyzer` or a
            :class:`ServomexManager`).
        rate_hz: Target cadence; ``target[n] = start + n / rate_hz``. Must be > 0.
        duration: Total acquisition seconds, or ``None`` for "until the caller
            exits the CM".
        names: Subset of device names to poll (manager only); ``None`` polls all.
        timeout: Per-poll I/O ceiling passed to ``source.poll_samples``.
        overflow: Backpressure policy when the buffer is full.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: Treat :class:`ServomexConnectionError` from the source as
            a transient drop: log, back off, optionally rebuild via
            ``reconnect_factory``, and keep going (missed ticks count late).
        reconnect_factory: Rebuilds the source after a disconnect when supplied.

    Yields:
        A :class:`Recording[Sequence[Sample]]`.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    # Producer-side clone for DROP_OLDEST eviction, off the consumer's iterator.
    drop_rx = receive_stream.clone()

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    tick_durations_ms: list[float] = []
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    try:
        async with anyio.create_task_group() as tg, receive_stream:

            async def _producer_entrypoint() -> None:
                await _run_producer(
                    source,
                    send_stream,
                    drop_rx,
                    names,
                    timeout,
                    period,
                    total_ticks,
                    overflow,
                    summary,
                    tick_durations_ms,
                    auto_reconnect=auto_reconnect,
                    reconnect_factory=reconnect_factory,
                )

            tg.start_soon(_producer_entrypoint)
            try:
                yield Recording(stream=receive_stream, summary=summary, rate_hz=rate_hz)
            finally:
                tg.cancel_scope.cancel()
    except BaseExceptionGroup as eg:
        # A lone producer failure (e.g. ServomexConnectionError without
        # auto_reconnect) surfaces as a single-member group from the task group;
        # collapse it so callers catch the concrete error, not the wrapper.
        raise _collapse(eg) from None
    finally:
        summary.finished_at = datetime.now(UTC)
        summary.tick_duration_ms_p50, summary.tick_duration_ms_p99 = _tick_percentiles(
            tick_durations_ms
        )
        _logger.info(
            "recorder.stop emitted=%s late=%s max_drift_ms=%.3f tick_p50_ms=%.3f tick_p99_ms=%.3f",
            summary.samples_emitted,
            summary.samples_late,
            summary.max_drift_ms,
            summary.tick_duration_ms_p50,
            summary.tick_duration_ms_p99,
        )