Skip to content

watlowlib.streaming

Sample, record(), OverflowPolicy, AcquisitionSummary, and the PollSource Protocol that the recorder drives. See Streaming and Logging and acquisition.

Public surface

watlowlib.streaming

Streaming primitives — :func:record + :class:Sample.

The streaming layer drives a :class:PollSource (a :class:~watlowlib.devices.controller.Controller or :class:~watlowlib.manager.WatlowManager) at an absolute-target cadence and publishes :class:Sample batches into an async receive stream. Pair with :func:watlowlib.sinks.pipe to drain into a :class:~watlowlib.sinks.SampleSink.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    disconnects=0,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

Both :class:~watlowlib.devices.controller.Controller (solo) and :class:~watlowlib.manager.WatlowManager (multi-device) satisfy this Protocol. Using a Protocol keeps :func:record testable against a lightweight stub without standing up a full controller + transport pipeline.

The contract is intentionally narrow: per call, return a flat :class:~collections.abc.Sequence of :class:Sample\ s — one per (device, parameter) read that succeeded. Failed reads are dropped from the batch and logged by the source; the recorder never sees them.

poll async

poll(parameters, *, names=None, instances=(1,))

Read every parameters × instances combination on every device.

Parameters:

Name Type Description Default
parameters Sequence[str | int]

Parameter names or registry IDs.

required
names Sequence[str] | None

Subset of device names to poll (Manager-only; Controller ignores). None polls everything the source manages.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single-loop devices use (1,) (the default).

(1,)

Returns:

Type Description
Sequence[Sample]

A flat :class:Sequence of :class:Sample. Empty when

Sequence[Sample]

every poll failed.

Source code in src/watlowlib/streaming/recorder.py
async def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> Sequence[Sample]:
    """Read every ``parameters`` × ``instances`` combination on every device.

    Args:
        parameters: Parameter names or registry IDs.
        names: Subset of device names to poll (Manager-only;
            Controller ignores). ``None`` polls everything the
            source manages.
        instances: 1-indexed loop / channel numbers per device.
            Single-loop devices use ``(1,)`` (the default).

    Returns:
        A flat :class:`Sequence` of :class:`Sample`. Empty when
        every poll failed.
    """
    ...

Sample dataclass

Sample(
    device,
    address,
    protocol,
    parameter,
    parameter_id,
    instance,
    value,
    unit,
    monotonic_ns,
    requested_at,
    received_at,
    midpoint_at,
    latency_s,
    raw,
)

One parameter read with full timing provenance.

Attributes:

Name Type Description
device str

Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks.

address int

Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247.

protocol ProtocolKind

Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row.

parameter str

Canonical parameter name (e.g. "process_value").

parameter_id int

Registry parameter id (e.g. 4001).

instance int

1-indexed loop / channel selector used for the read.

value float | int | str | bool | None

The decoded scalar. None when the device reported the value as unavailable (sensor-fail, overload, ...).

unit str | None

Display string for the value's unit, or None if the registry doesn't carry per-parameter unit metadata. v1 leaves this None for every PM parameter — the registry doesn't carry per-row units yet.

monotonic_ns int

:func:time.monotonic_ns at the read site, roughly the midpoint of send/receive. Used for scheduling / drift analysis only — never displayed.

requested_at datetime

Wall-clock datetime (UTC) captured just before the read leaves the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is decoded.

midpoint_at datetime

(requested_at + received_at) / 2 — the preferred point estimate of the sample instant. Use this when aligning Watlow samples against other sensor streams.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

raw bytes

The wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

record async

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(
    controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as stream:
    async for batch in stream:
        for sample in batch:
            print(sample.parameter, sample.value)

The CM yields an async iterator of per-tick :class:Sample batches. Each batch is a flat :class:Sequence — one entry per (device, parameter, instance) read that succeeded. Failed reads are dropped by the source and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (a :class:Controller or a :class:WatlowManager).

required
parameters Sequence[str | int]

Parameter names or registry IDs to poll each tick.

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages. Ignored for solo controllers.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single- loop devices use (1,).

(1,)
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

When True, treat :class:WatlowConnectionError raised by source.poll as a transient transport drop rather than a fatal error. The producer logs recorder.disconnected, waits per the backoff schedule, and either rebuilds the source via reconnect_factory (if supplied) or simply retries the same source.poll on the next tick. samples_late ticks up for each tick missed during the gap.

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

When supplied alongside auto_reconnect, invoked to rebuild the :class:PollSource after a disconnect. Useful when the source's transport needs to be re-opened explicitly (e.g. a fresh :func:watlowlib.open_device call). The returned source replaces source for subsequent ticks. Without a factory, the recorder relies on source.poll itself to recover (which works for callers that wrap their own transport-reopen logic inside poll).

None

Yields:

Type Description
AsyncGenerator[AsyncIterator[Sequence[Sample]]]

An async iterator of per-tick :class:Sample batches.

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/watlowlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[AsyncIterator[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(
            controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
        ) as stream:
            async for batch in stream:
                for sample in batch:
                    print(sample.parameter, sample.value)

    The CM yields an async iterator of per-tick :class:`Sample` batches.
    Each batch is a flat :class:`Sequence` — one entry per (device,
    parameter, instance) read that succeeded. Failed reads are dropped
    by the source and logged at WARN.

    Args:
        source: Any :class:`PollSource` (a :class:`Controller` or a
            :class:`WatlowManager`).
        parameters: Parameter names or registry IDs to poll each tick.
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages. Ignored for solo controllers.
        instances: 1-indexed loop / channel numbers per device. Single-
            loop devices use ``(1,)``.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: When ``True``, treat
            :class:`WatlowConnectionError` raised by ``source.poll``
            as a transient transport drop rather than a fatal error.
            The producer logs ``recorder.disconnected``, waits per the
            backoff schedule, and either rebuilds the source via
            ``reconnect_factory`` (if supplied) or simply retries the
            same ``source.poll`` on the next tick. ``samples_late``
            ticks up for each tick missed during the gap.
        reconnect_factory: When supplied alongside ``auto_reconnect``,
            invoked to rebuild the :class:`PollSource` after a
            disconnect. Useful when the source's transport needs to be
            re-opened explicitly (e.g. a fresh
            :func:`watlowlib.open_device` call). The returned source
            replaces ``source`` for subsequent ticks. Without a
            factory, the recorder relies on ``source.poll`` itself to
            recover (which works for callers that wrap their own
            transport-reopen logic inside ``poll``).

    Yields:
        An async iterator of per-tick :class:`Sample` batches.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if not parameters:
        raise ValueError("parameters must be a non-empty sequence")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    async with anyio.create_task_group() as tg, receive_stream:

        async def _producer_entrypoint() -> None:
            await _run_producer(
                source,
                send_stream,
                tuple(parameters),
                tuple(instances),
                names,
                period,
                total_ticks,
                overflow,
                stats,
                auto_reconnect=auto_reconnect,
                reconnect_factory=reconnect_factory,
            )

        tg.start_soon(_producer_entrypoint)
        try:
            yield receive_stream
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with``.
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
        disconnects=stats.disconnects,
    )
    _logger.info(
        "recorder.stop emitted=%s late=%s max_drift_ms=%.3f duration_s=%.3f",
        summary.samples_emitted,
        summary.samples_late,
        summary.max_drift_ms,
        (finished_at - started_at).total_seconds(),
    )

Sample

watlowlib.streaming.sample

Timed sample — one parameter read with send/receive provenance.

A :class:Sample is the unit the recorder emits into its memory-object stream. Watlow polls a small group of parameters per device per tick (unlike Alicat, which returns one wide DataFrame per poll), so a recorder tick produces N×M samples — one per (device, parameter) pair that succeeded — each one carrying:

  • midpoint_at — best point-estimate of the on-device acquisition instant (halfway between request and reply). Use this for aligning Watlow values against other sensor streams.
  • monotonic_ns — :func:time.monotonic_ns at the read boundary, for drift analysis only (no calendar meaning).
  • raw — the wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

The shape is deliberately long-format (one row per parameter) so the SQLite cross-vendor test can union Watlow rows with Alicat rows under one schema.

Design reference: docs/design.md §6.

Sample dataclass

Sample(
    device,
    address,
    protocol,
    parameter,
    parameter_id,
    instance,
    value,
    unit,
    monotonic_ns,
    requested_at,
    received_at,
    midpoint_at,
    latency_s,
    raw,
)

One parameter read with full timing provenance.

Attributes:

Name Type Description
device str

Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks.

address int

Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247.

protocol ProtocolKind

Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row.

parameter str

Canonical parameter name (e.g. "process_value").

parameter_id int

Registry parameter id (e.g. 4001).

instance int

1-indexed loop / channel selector used for the read.

value float | int | str | bool | None

The decoded scalar. None when the device reported the value as unavailable (sensor-fail, overload, ...).

unit str | None

Display string for the value's unit, or None if the registry doesn't carry per-parameter unit metadata. v1 leaves this None for every PM parameter — the registry doesn't carry per-row units yet.

monotonic_ns int

:func:time.monotonic_ns at the read site, roughly the midpoint of send/receive. Used for scheduling / drift analysis only — never displayed.

requested_at datetime

Wall-clock datetime (UTC) captured just before the read leaves the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is decoded.

midpoint_at datetime

(requested_at + received_at) / 2 — the preferred point estimate of the sample instant. Use this when aligning Watlow samples against other sensor streams.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

raw bytes

The wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

Recorder

watlowlib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:PollSource (an opened :class:~watlowlib.devices.controller.Controller or a :class:~watlowlib.manager.WatlowManager) at an absolute-target cadence and publishes the polled :class:Sample values into an :class:anyio.abc.ObjectReceiveStream as per-tick batches.

Key invariants:

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives strictly inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample — used for sink timestamps, never for scheduling.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.

The recorder consumes a :class:PollSource — a narrow Protocol both :class:~watlowlib.devices.controller.Controller and :class:~watlowlib.manager.WatlowManager satisfy. Kept as a Protocol so the recorder is unit-testable against a lightweight stub.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
    disconnects=0,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

Both :class:~watlowlib.devices.controller.Controller (solo) and :class:~watlowlib.manager.WatlowManager (multi-device) satisfy this Protocol. Using a Protocol keeps :func:record testable against a lightweight stub without standing up a full controller + transport pipeline.

The contract is intentionally narrow: per call, return a flat :class:~collections.abc.Sequence of :class:Sample\ s — one per (device, parameter) read that succeeded. Failed reads are dropped from the batch and logged by the source; the recorder never sees them.

poll async

poll(parameters, *, names=None, instances=(1,))

Read every parameters × instances combination on every device.

Parameters:

Name Type Description Default
parameters Sequence[str | int]

Parameter names or registry IDs.

required
names Sequence[str] | None

Subset of device names to poll (Manager-only; Controller ignores). None polls everything the source manages.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single-loop devices use (1,) (the default).

(1,)

Returns:

Type Description
Sequence[Sample]

A flat :class:Sequence of :class:Sample. Empty when

Sequence[Sample]

every poll failed.

Source code in src/watlowlib/streaming/recorder.py
async def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> Sequence[Sample]:
    """Read every ``parameters`` × ``instances`` combination on every device.

    Args:
        parameters: Parameter names or registry IDs.
        names: Subset of device names to poll (Manager-only;
            Controller ignores). ``None`` polls everything the
            source manages.
        instances: 1-indexed loop / channel numbers per device.
            Single-loop devices use ``(1,)`` (the default).

    Returns:
        A flat :class:`Sequence` of :class:`Sample`. Empty when
        every poll failed.
    """
    ...

record async

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(
    controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as stream:
    async for batch in stream:
        for sample in batch:
            print(sample.parameter, sample.value)

The CM yields an async iterator of per-tick :class:Sample batches. Each batch is a flat :class:Sequence — one entry per (device, parameter, instance) read that succeeded. Failed reads are dropped by the source and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (a :class:Controller or a :class:WatlowManager).

required
parameters Sequence[str | int]

Parameter names or registry IDs to poll each tick.

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages. Ignored for solo controllers.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single- loop devices use (1,).

(1,)
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

When True, treat :class:WatlowConnectionError raised by source.poll as a transient transport drop rather than a fatal error. The producer logs recorder.disconnected, waits per the backoff schedule, and either rebuilds the source via reconnect_factory (if supplied) or simply retries the same source.poll on the next tick. samples_late ticks up for each tick missed during the gap.

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

When supplied alongside auto_reconnect, invoked to rebuild the :class:PollSource after a disconnect. Useful when the source's transport needs to be re-opened explicitly (e.g. a fresh :func:watlowlib.open_device call). The returned source replaces source for subsequent ticks. Without a factory, the recorder relies on source.poll itself to recover (which works for callers that wrap their own transport-reopen logic inside poll).

None

Yields:

Type Description
AsyncGenerator[AsyncIterator[Sequence[Sample]]]

An async iterator of per-tick :class:Sample batches.

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/watlowlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[AsyncIterator[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(
            controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
        ) as stream:
            async for batch in stream:
                for sample in batch:
                    print(sample.parameter, sample.value)

    The CM yields an async iterator of per-tick :class:`Sample` batches.
    Each batch is a flat :class:`Sequence` — one entry per (device,
    parameter, instance) read that succeeded. Failed reads are dropped
    by the source and logged at WARN.

    Args:
        source: Any :class:`PollSource` (a :class:`Controller` or a
            :class:`WatlowManager`).
        parameters: Parameter names or registry IDs to poll each tick.
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages. Ignored for solo controllers.
        instances: 1-indexed loop / channel numbers per device. Single-
            loop devices use ``(1,)``.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: When ``True``, treat
            :class:`WatlowConnectionError` raised by ``source.poll``
            as a transient transport drop rather than a fatal error.
            The producer logs ``recorder.disconnected``, waits per the
            backoff schedule, and either rebuilds the source via
            ``reconnect_factory`` (if supplied) or simply retries the
            same ``source.poll`` on the next tick. ``samples_late``
            ticks up for each tick missed during the gap.
        reconnect_factory: When supplied alongside ``auto_reconnect``,
            invoked to rebuild the :class:`PollSource` after a
            disconnect. Useful when the source's transport needs to be
            re-opened explicitly (e.g. a fresh
            :func:`watlowlib.open_device` call). The returned source
            replaces ``source`` for subsequent ticks. Without a
            factory, the recorder relies on ``source.poll`` itself to
            recover (which works for callers that wrap their own
            transport-reopen logic inside ``poll``).

    Yields:
        An async iterator of per-tick :class:`Sample` batches.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if not parameters:
        raise ValueError("parameters must be a non-empty sequence")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    async with anyio.create_task_group() as tg, receive_stream:

        async def _producer_entrypoint() -> None:
            await _run_producer(
                source,
                send_stream,
                tuple(parameters),
                tuple(instances),
                names,
                period,
                total_ticks,
                overflow,
                stats,
                auto_reconnect=auto_reconnect,
                reconnect_factory=reconnect_factory,
            )

        tg.start_soon(_producer_entrypoint)
        try:
            yield receive_stream
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with``.
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
        disconnects=stats.disconnects,
    )
    _logger.info(
        "recorder.stop emitted=%s late=%s max_drift_ms=%.3f duration_s=%.3f",
        summary.samples_emitted,
        summary.samples_late,
        summary.max_drift_ms,
        (finished_at - started_at).total_seconds(),
    )