Skip to content

alicatlib.streaming

Sample type and record() — the absolute-target acquisition primitive. See Logging and acquisition for the user guide and Streaming for the port-level streaming runtime.

Public surface

alicatlib.streaming

Sample acquisition — record() emits typed Sample streams.

Public surface:

  • :class:Sample — one device poll with full timing provenance.
  • :func:record — absolute-cadence async context manager.
  • :class:Recording — wrapper carrying the stream + live summary + rate.
  • :class:OverflowPolicy — backpressure control knob.
  • :class:AcquisitionSummary — per-run counters (mutable, updated in place by the recorder).
  • :class:PollSource — Protocol the recorder accepts (satisfied by :class:~alicatlib.manager.AlicatManager).
  • :class:PollSourceAdapter — single-device adapter so a bare :class:~alicatlib.devices.base.Device can drive :func:record.

See docs/design.md §5.14.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
)

Per-run summary owned by the recorder.

Mutability contract (per the cross-lib spec §M):

  • The recorder is the only writer. It updates counters in place during the run so progress polling (TUIs, dashboards) works without a separate API.
  • Consumers treat the summary as read-only.
  • :attr:finished_at is None while the recording is in flight and is set on context-manager exit.

Attributes:

Name Type Description
started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at producer shutdown — None while the recording is in flight.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~alicatlib.manager.AlicatManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named device (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~alicatlib.errors.AlicatError as .error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).

Source code in src/alicatlib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named device (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~alicatlib.errors.AlicatError` as
    ``.error`` (per :class:`~alicatlib.manager.ErrorPolicy.RETURN`).
    """
    ...

PollSourceAdapter

PollSourceAdapter(name, device)

Wrap one :class:Device as a :class:PollSource for :func:record.

Capa's old _SingleDevicePollSource shim reinvented this; the adapter lives here so the wiring is one line at the call site::

adapter = PollSourceAdapter("fuel", device)
async with record(adapter, rate_hz=10) as recording:
    ...

The names filter is honoured per the cross-lib spec §E: when the caller passes a name set that does not include this device's name, poll() returns an empty mapping rather than polling anyway. The recorder always passes a complete name set in single-device mode so filtering is harmless; the empty-mapping behaviour is the correct cross-lib semantic.

Source code in src/alicatlib/streaming/__init__.py
def __init__(self, name: str, device: Device) -> None:
    self._name = name
    self._device = device

device property

device

The wrapped async :class:Device.

name property

name

The manager-style name this adapter publishes the device under.

poll async

poll(names=None)

Poll the wrapped device and return a single-entry mapping.

Source code in src/alicatlib/streaming/__init__.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll the wrapped device and return a single-entry mapping."""
    if names is not None and self._name not in set(names):
        return {}
    try:
        reading = await self._device.poll()
    except AlicatError as err:
        failure: DeviceResult[Reading] = DeviceResult(value=None, error=err)
        return {self._name: failure}
    return {self._name: DeviceResult.success(reading)}

Recording dataclass

Recording(stream, summary, rate_hz)

The object yielded by :func:record's async context manager.

Wraps the live receive stream, the (mutable) :class:AcquisitionSummary the recorder is updating in place, and the rate the recorder is running at. Consumers iterate via async for batch in recording (the instance delegates to :attr:stream), observe progress via :attr:summary, and read :attr:rate_hz for queue-sizing decisions.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick batches (or whatever record type the lib emits). Typed via T so consumer code can stay strict.

summary AcquisitionSummary

Live :class:AcquisitionSummary — counters update in place during the run; consumers must not mutate it.

rate_hz float

The rate the recorder is running at, captured at entry. Useful for back-pressure sizing in wrappers.

__aiter__

__aiter__()

Delegate iteration to :attr:stream.

Lets async for batch in recording work without forcing callers to dereference recording.stream themselves — ergonomic for the common case, while keeping the typed attribute around for consumers that want to interleave reads with reading :attr:summary or :attr:rate_hz.

Source code in src/alicatlib/streaming/recorder.py
def __aiter__(self) -> AsyncIterator[T]:
    """Delegate iteration to :attr:`stream`.

    Lets ``async for batch in recording`` work without forcing
    callers to dereference ``recording.stream`` themselves —
    ergonomic for the common case, while keeping the typed
    attribute around for consumers that want to interleave reads
    with reading :attr:`summary` or :attr:`rate_hz`.
    """
    return self.stream.__aiter__()

Sample dataclass

Sample(
    device,
    unit_id,
    t_mono_ns,
    t_utc,
    requested_at,
    received_at,
    latency_s,
    reading,
    t_midpoint_mono_ns=None,
)

One device poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from AlicatManager.add). Stable downstream identifier that follows the value into sinks.

unit_id str

Bus-level single-letter unit id of the polled device. Kept separate from device so a user renaming the manager key doesn't lose the physical addressing context.

t_mono_ns int

:func:time.monotonic_ns at the acquisition midpoint. Canonical join key per the cross-lib §C contract; never displayed, since the absolute value has no calendar meaning.

t_utc datetime

Wall-clock datetime (UTC, tz-aware) for the acquisition midpoint — (requested_at + received_at) / 2. Use this when aligning Alicat samples against other sensor streams.

t_midpoint_mono_ns int | None

Optional monotonic-ns midpoint of an integration window. None for single polled samples (the common case); populated only when a sample summarises a multi-sample window.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host. I/O-boundary provenance — keep alongside t_utc so callers can see the dispatch instant separately from the acquisition midpoint.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply line is read. I/O-boundary provenance.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience; equivalent to received_at - requested_at but avoids the subtraction at every downstream call site.

reading Reading

The :class:Reading returned by the device's poll.

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as recording:
    async for batch in recording.stream:
        process(batch)
    print(recording.summary.samples_emitted)

The CM yields a :class:Recording carrying the async iterator, the live :class:AcquisitionSummary, and the rate. Each batch on the stream is a Mapping[name, Sample] — one entry per device that polled successfully on that tick. Devices whose :class:DeviceResult carries an error are omitted from that batch and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically an :class:~alicatlib.manager.AlicatManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches. 64 mirrors the design default.

64

Yields:

Name Type Description
A AsyncGenerator[Recording[Mapping[str, Sample]]]

class:Recording of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/alicatlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as recording:
            async for batch in recording.stream:
                process(batch)
            print(recording.summary.samples_emitted)

    The CM yields a :class:`Recording` carrying the async iterator, the
    live :class:`AcquisitionSummary`, and the rate. Each batch on the
    stream is a ``Mapping[name, Sample]`` — one entry per device that
    polled successfully on that tick. Devices whose :class:`DeviceResult`
    carries an error are omitted from that batch and logged at WARN.

    Args:
        source: Any :class:`PollSource` (typically an
            :class:`~alicatlib.manager.AlicatManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
            ``64`` mirrors the design default.

    Yields:
        A :class:`Recording` of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §5.14).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    recording: Recording[Mapping[str, Sample]] = Recording(
        stream=receive_stream,
        summary=summary,
        rate_hz=rate_hz,
    )

    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            summary,
        )
        try:
            yield recording
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with`` per §5.14.
            tg.cancel_scope.cancel()

    summary.finished_at = datetime.now(UTC)
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "duration_s": (summary.finished_at - started_at).total_seconds(),
        },
    )

Sample model

alicatlib.streaming.sample

Timed sample — one device reading with send/receive provenance.

A :class:Sample is what the recorder emits into its memory-object stream. It pairs a :class:Reading (the measurement) with enough timing to reconstruct the acquisition timeline after the fact: t_mono_ns and t_utc are the canonical join keys per the cross-lib contract (§C), and requested_at / received_at / latency_s are the I/O-boundary provenance fields.

t_utc is the best point-estimate of the acquisition instant on the device: halfway between when the poll byte left the host and when the full reply arrived. That's what downstream plots and correlations should use when aligning Alicat data against other sensor streams.

Design reference: docs/design.md §5.14.

Sample dataclass

Sample(
    device,
    unit_id,
    t_mono_ns,
    t_utc,
    requested_at,
    received_at,
    latency_s,
    reading,
    t_midpoint_mono_ns=None,
)

One device poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from AlicatManager.add). Stable downstream identifier that follows the value into sinks.

unit_id str

Bus-level single-letter unit id of the polled device. Kept separate from device so a user renaming the manager key doesn't lose the physical addressing context.

t_mono_ns int

:func:time.monotonic_ns at the acquisition midpoint. Canonical join key per the cross-lib §C contract; never displayed, since the absolute value has no calendar meaning.

t_utc datetime

Wall-clock datetime (UTC, tz-aware) for the acquisition midpoint — (requested_at + received_at) / 2. Use this when aligning Alicat samples against other sensor streams.

t_midpoint_mono_ns int | None

Optional monotonic-ns midpoint of an integration window. None for single polled samples (the common case); populated only when a sample summarises a multi-sample window.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host. I/O-boundary provenance — keep alongside t_utc so callers can see the dispatch instant separately from the acquisition midpoint.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply line is read. I/O-boundary provenance.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience; equivalent to received_at - requested_at but avoids the subtraction at every downstream call site.

reading Reading

The :class:Reading returned by the device's poll.

Recorder

alicatlib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:~alicatlib.manager.AlicatManager (or any :class:PollSource-shaped object — see below) at an absolute-target cadence and publishes the polled :class:Reading values into an :class:anyio.abc.ObjectReceiveStream as per-tick Mapping[name, Sample] batches.

Key invariants (design §5.14):

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives inside a create_task_group() strictly nested inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns. This matches AnyIO's own warning against yielding from inside a task group.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample — used for sink timestamps, never for scheduling.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.

The recorder consumes a :class:PollSource — a narrow Protocol the :class:~alicatlib.manager.AlicatManager already satisfies (its poll(names) signature matches). Kept as a Protocol so the recorder is unit-testable against a lightweight stub without standing up a full manager + transport pipeline.

Design reference: docs/design.md §5.14.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
)

Per-run summary owned by the recorder.

Mutability contract (per the cross-lib spec §M):

  • The recorder is the only writer. It updates counters in place during the run so progress polling (TUIs, dashboards) works without a separate API.
  • Consumers treat the summary as read-only.
  • :attr:finished_at is None while the recording is in flight and is set on context-manager exit.

Attributes:

Name Type Description
started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at producer shutdown — None while the recording is in flight.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~alicatlib.manager.AlicatManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[Reading]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named device (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:Reading as .value; failed ones carry the :class:~alicatlib.errors.AlicatError as .error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).

Source code in src/alicatlib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[Reading]]:
    """Poll every named device (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`Reading` as ``.value``;
    failed ones carry the :class:`~alicatlib.errors.AlicatError` as
    ``.error`` (per :class:`~alicatlib.manager.ErrorPolicy.RETURN`).
    """
    ...

Recording dataclass

Recording(stream, summary, rate_hz)

The object yielded by :func:record's async context manager.

Wraps the live receive stream, the (mutable) :class:AcquisitionSummary the recorder is updating in place, and the rate the recorder is running at. Consumers iterate via async for batch in recording (the instance delegates to :attr:stream), observe progress via :attr:summary, and read :attr:rate_hz for queue-sizing decisions.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick batches (or whatever record type the lib emits). Typed via T so consumer code can stay strict.

summary AcquisitionSummary

Live :class:AcquisitionSummary — counters update in place during the run; consumers must not mutate it.

rate_hz float

The rate the recorder is running at, captured at entry. Useful for back-pressure sizing in wrappers.

__aiter__

__aiter__()

Delegate iteration to :attr:stream.

Lets async for batch in recording work without forcing callers to dereference recording.stream themselves — ergonomic for the common case, while keeping the typed attribute around for consumers that want to interleave reads with reading :attr:summary or :attr:rate_hz.

Source code in src/alicatlib/streaming/recorder.py
def __aiter__(self) -> AsyncIterator[T]:
    """Delegate iteration to :attr:`stream`.

    Lets ``async for batch in recording`` work without forcing
    callers to dereference ``recording.stream`` themselves —
    ergonomic for the common case, while keeping the typed
    attribute around for consumers that want to interleave reads
    with reading :attr:`summary` or :attr:`rate_hz`.
    """
    return self.stream.__aiter__()

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as recording:
    async for batch in recording.stream:
        process(batch)
    print(recording.summary.samples_emitted)

The CM yields a :class:Recording carrying the async iterator, the live :class:AcquisitionSummary, and the rate. Each batch on the stream is a Mapping[name, Sample] — one entry per device that polled successfully on that tick. Devices whose :class:DeviceResult carries an error are omitted from that batch and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically an :class:~alicatlib.manager.AlicatManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches. 64 mirrors the design default.

64

Yields:

Name Type Description
A AsyncGenerator[Recording[Mapping[str, Sample]]]

class:Recording of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/alicatlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as recording:
            async for batch in recording.stream:
                process(batch)
            print(recording.summary.samples_emitted)

    The CM yields a :class:`Recording` carrying the async iterator, the
    live :class:`AcquisitionSummary`, and the rate. Each batch on the
    stream is a ``Mapping[name, Sample]`` — one entry per device that
    polled successfully on that tick. Devices whose :class:`DeviceResult`
    carries an error are omitted from that batch and logged at WARN.

    Args:
        source: Any :class:`PollSource` (typically an
            :class:`~alicatlib.manager.AlicatManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
            ``64`` mirrors the design default.

    Yields:
        A :class:`Recording` of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §5.14).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    recording: Recording[Mapping[str, Sample]] = Recording(
        stream=receive_stream,
        summary=summary,
        rate_hz=rate_hz,
    )

    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            summary,
        )
        try:
            yield recording
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with`` per §5.14.
            tg.cancel_scope.cancel()

    summary.finished_at = datetime.now(UTC)
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "duration_s": (summary.finished_at - started_at).total_seconds(),
        },
    )