Skip to content

sartoriuslib.streaming

Sample, StreamingSession, StreamMode, record(), AcquisitionSummary, OverflowPolicy, PollSource. See Streaming and Logging and acquisition.

Public surface

sartoriuslib.streaming

Streaming + recording primitives. See design doc §10.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    target_total_samples=None,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration runs, or None for open-ended runs.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~sartoriuslib.manager.SartoriusManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named balance (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~sartoriuslib.errors.SartoriusError as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).

Source code in src/sartoriuslib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named balance (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~sartoriuslib.errors.SartoriusError`
    as ``.error`` (per :class:`~sartoriuslib.manager.ErrorPolicy.RETURN`).
    """
    ...

Sample dataclass

Sample(
    device,
    reading,
    requested_at,
    received_at,
    midpoint_at,
    monotonic_ns,
    elapsed_s,
    protocol,
    metadata=_empty_metadata(),
    error=None,
)

One balance poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from SartoriusManager.add). Stable downstream identifier that follows the value into sinks.

reading Reading | None

The :class:Reading decoded from the balance's reply. None when the poll failed — inspect :attr:error.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is read.

midpoint_at datetime

(requested_at + received_at) / 2 — the design-preferred point estimate of the sample instant.

monotonic_ns int

:func:time.monotonic_ns at the read site. Used for scheduling / drift analysis only — never displayed, since the absolute value has no calendar meaning.

elapsed_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

protocol ProtocolKind | None

Which wire protocol produced this sample. Duplicates reading.protocol on successful polls and preserves the value for error rows where reading is None. None only when an error-path sample arrives from a :class:PollSource that did not supply a protocol hint — in practice the manager always supplies one.

metadata Mapping[str, str]

Free-form per-sample annotations. Populated by Phase-7 stream(mode=...) to record which streaming mode produced the sample ("poll", "autoprint", "autoprint+temporary").

error SartoriusError | None

The :class:SartoriusError captured on a failed poll, or None on success.

StreamingSession

StreamingSession(
    balance,
    *,
    rate_hz=None,
    mode="poll",
    temporary_autoprint=False,
    confirm=False,
    timeout=None,
)

Async context manager + iterator for one balance.

mode="poll" performs request/response polling at an absolute cadence. mode="autoprint" consumes already-enabled SBI autoprint lines and fails on entry if no line is available within timeout.

Source code in src/sartoriuslib/streaming/stream_session.py
def __init__(
    self,
    balance: Balance,
    *,
    rate_hz: float | None = None,
    mode: StreamMode = "poll",
    temporary_autoprint: bool = False,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    if mode not in ("poll", "autoprint"):
        raise ValueError(f"unknown stream mode {mode!r}")
    if mode == "poll" and (rate_hz is None or rate_hz <= 0):
        raise ValueError("poll stream requires rate_hz > 0")
    self._balance = balance
    self._rate_hz = rate_hz
    self._mode: StreamMode = mode
    self._temporary_autoprint = temporary_autoprint
    self._confirm = confirm
    self._timeout = timeout
    self._entered = False
    self._tick = 0
    self._start: float = 0.0
    self._pending: Sample | None = None

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as stream:
    async for batch in stream:
        process(batch)

The CM yields an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that participated on that tick. Successful polls produce a :class:Sample carrying a :class:Reading; failed polls produce a :class:Sample with reading=None and error set.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically a :class:~sartoriuslib.manager.SartoriusManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. Finite runs schedule round(duration * rate_hz) ticks. If the producer overruns or the overflow policy drops a batch, samples_emitted may be lower than this target and the missed/dropped ticks are counted on samples_late. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64

Yields:

Type Description
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]

An async iterator of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/sartoriuslib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as stream:
            async for batch in stream:
                process(batch)

    The CM yields an async iterator of per-tick sample batches. Each
    batch is a ``Mapping[name, Sample]`` — one entry per device that
    participated on that tick. Successful polls produce a
    :class:`Sample` carrying a :class:`Reading`; failed polls produce
    a :class:`Sample` with ``reading=None`` and ``error`` set.

    Args:
        source: Any :class:`PollSource` (typically a
            :class:`~sartoriuslib.manager.SartoriusManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. Finite runs
            schedule ``round(duration * rate_hz)`` ticks. If the
            producer overruns or the overflow policy drops a batch,
            ``samples_emitted`` may be lower than this target and the
            missed/dropped ticks are counted on ``samples_late``.
            ``None`` means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.

    Yields:
        An async iterator of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §10).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            stats,
        )
        try:
            yield receive_stream
        finally:
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
        target_total_samples=total_ticks,
    )
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "target_total_samples": summary.target_total_samples,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )

Sample

sartoriuslib.streaming.sample

Timed sample — one balance reading with send/receive provenance.

A :class:Sample is what the recorder emits into its memory-object stream. It pairs a :class:Reading with enough timing to reconstruct the acquisition timeline: monotonic_ns for drift analysis, requested_at / received_at / midpoint_at for wall-clock provenance, and elapsed_s for per-sample latency checks.

The midpoint is the best point-estimate of the acquisition instant on the device: halfway between when the poll bytes left the host and when the full reply arrived. Downstream plots and correlations should use this field when aligning balance data against other sensor streams.

reading is None when error is populated — the two fields are mutually exclusive. Samples with error still carry the timing fields so sinks can log the failed attempts with proper wall-clock provenance.

Design reference: docs/design.md §10.

Sample dataclass

Sample(
    device,
    reading,
    requested_at,
    received_at,
    midpoint_at,
    monotonic_ns,
    elapsed_s,
    protocol,
    metadata=_empty_metadata(),
    error=None,
)

One balance poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from SartoriusManager.add). Stable downstream identifier that follows the value into sinks.

reading Reading | None

The :class:Reading decoded from the balance's reply. None when the poll failed — inspect :attr:error.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is read.

midpoint_at datetime

(requested_at + received_at) / 2 — the design-preferred point estimate of the sample instant.

monotonic_ns int

:func:time.monotonic_ns at the read site. Used for scheduling / drift analysis only — never displayed, since the absolute value has no calendar meaning.

elapsed_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

protocol ProtocolKind | None

Which wire protocol produced this sample. Duplicates reading.protocol on successful polls and preserves the value for error rows where reading is None. None only when an error-path sample arrives from a :class:PollSource that did not supply a protocol hint — in practice the manager always supplies one.

metadata Mapping[str, str]

Free-form per-sample annotations. Populated by Phase-7 stream(mode=...) to record which streaming mode produced the sample ("poll", "autoprint", "autoprint+temporary").

error SartoriusError | None

The :class:SartoriusError captured on a failed poll, or None on success.

Per-balance streaming session

sartoriuslib.streaming.stream_session

Per-balance streaming session used by :meth:Balance.stream.

StreamingSession

StreamingSession(
    balance,
    *,
    rate_hz=None,
    mode="poll",
    temporary_autoprint=False,
    confirm=False,
    timeout=None,
)

Async context manager + iterator for one balance.

mode="poll" performs request/response polling at an absolute cadence. mode="autoprint" consumes already-enabled SBI autoprint lines and fails on entry if no line is available within timeout.

Source code in src/sartoriuslib/streaming/stream_session.py
def __init__(
    self,
    balance: Balance,
    *,
    rate_hz: float | None = None,
    mode: StreamMode = "poll",
    temporary_autoprint: bool = False,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    if mode not in ("poll", "autoprint"):
        raise ValueError(f"unknown stream mode {mode!r}")
    if mode == "poll" and (rate_hz is None or rate_hz <= 0):
        raise ValueError("poll stream requires rate_hz > 0")
    self._balance = balance
    self._rate_hz = rate_hz
    self._mode: StreamMode = mode
    self._temporary_autoprint = temporary_autoprint
    self._confirm = confirm
    self._timeout = timeout
    self._entered = False
    self._tick = 0
    self._start: float = 0.0
    self._pending: Sample | None = None

Recorder — record(), summary, overflow

sartoriuslib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:~sartoriuslib.manager.SartoriusManager (or any :class:PollSource-shaped object — see below) at an absolute-target cadence and publishes the polled :class:~sartoriuslib.devices.models.Reading values into an :class:anyio.abc.ObjectReceiveStream as per-tick Mapping[name, Sample] batches.

Key invariants (design §10):

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives inside a create_task_group() strictly nested inside the async CM body.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each balance's poll and attached to the emitted :class:Sample.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.
  • Errors are samples too. Under ErrorPolicy.RETURN on the source, per-device failures arrive as :class:Sample with reading=None and error set — they still carry wall-clock timing so sinks can log the failure. Under ErrorPolicy.RAISE a failed device's poll aborts the batch at the source layer before the recorder sees it.

The recorder consumes a :class:PollSource — a narrow Protocol the :class:~sartoriuslib.manager.SartoriusManager already satisfies (its poll(names) signature matches). Kept as a Protocol so the recorder is unit-testable against a lightweight stub.

Design reference: docs/design.md §10.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    target_total_samples=None,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds.

target_total_samples int | None

Number of scheduled ticks for finite duration runs, or None for open-ended runs.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~sartoriuslib.manager.SartoriusManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named balance (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~sartoriuslib.errors.SartoriusError as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).

Source code in src/sartoriuslib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named balance (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~sartoriuslib.errors.SartoriusError`
    as ``.error`` (per :class:`~sartoriuslib.manager.ErrorPolicy.RETURN`).
    """
    ...

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as stream:
    async for batch in stream:
        process(batch)

The CM yields an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that participated on that tick. Successful polls produce a :class:Sample carrying a :class:Reading; failed polls produce a :class:Sample with reading=None and error set.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically a :class:~sartoriuslib.manager.SartoriusManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. Finite runs schedule round(duration * rate_hz) ticks. If the producer overruns or the overflow policy drops a batch, samples_emitted may be lower than this target and the missed/dropped ticks are counted on samples_late. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64

Yields:

Type Description
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]

An async iterator of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/sartoriuslib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as stream:
            async for batch in stream:
                process(batch)

    The CM yields an async iterator of per-tick sample batches. Each
    batch is a ``Mapping[name, Sample]`` — one entry per device that
    participated on that tick. Successful polls produce a
    :class:`Sample` carrying a :class:`Reading`; failed polls produce
    a :class:`Sample` with ``reading=None`` and ``error`` set.

    Args:
        source: Any :class:`PollSource` (typically a
            :class:`~sartoriuslib.manager.SartoriusManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. Finite runs
            schedule ``round(duration * rate_hz)`` ticks. If the
            producer overruns or the overflow policy drops a batch,
            ``samples_emitted`` may be lower than this target and the
            missed/dropped ticks are counted on ``samples_late``.
            ``None`` means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.

    Yields:
        An async iterator of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §10).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            stats,
        )
        try:
            yield receive_stream
        finally:
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
        target_total_samples=total_ticks,
    )
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "target_total_samples": summary.target_total_samples,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )