Skip to content

alicatlib.streaming

Sample type and record() — the absolute-target acquisition primitive. See Logging and acquisition for the user guide and Streaming for the port-level streaming runtime.

Public surface

alicatlib.streaming

Sample acquisition — record() emits typed Sample streams.

Public surface:

  • :class:Sample — one device poll with full timing provenance.
  • :func:record — absolute-cadence async context manager.
  • :class:OverflowPolicy — backpressure control knob.
  • :class:AcquisitionSummary — per-run counters.
  • :class:PollSource — Protocol the recorder accepts (satisfied by :class:~alicatlib.manager.AlicatManager).

See docs/design.md §5.14.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~alicatlib.manager.AlicatManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[DataFrame]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named device (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:DataFrame as .value; failed ones carry the :class:~alicatlib.errors.AlicatError as .error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).

Source code in src/alicatlib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[DataFrame]]:
    """Poll every named device (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`DataFrame` as ``.value``;
    failed ones carry the :class:`~alicatlib.errors.AlicatError` as
    ``.error`` (per :class:`~alicatlib.manager.ErrorPolicy.RETURN`).
    """
    ...

Sample dataclass

Sample(
    device,
    unit_id,
    monotonic_ns,
    requested_at,
    received_at,
    midpoint_at,
    latency_s,
    frame,
)

One device poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from AlicatManager.add). Stable downstream identifier that follows the value into sinks.

unit_id str

Bus-level single-letter unit id of the polled device. Kept separate from device so a user renaming the manager key doesn't lose the physical addressing context.

monotonic_ns int

:func:time.monotonic_ns at the read site. Used for scheduling / drift analysis only — never displayed, since the absolute value has no calendar meaning.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply line is read.

midpoint_at datetime

(requested_at + received_at) / 2 — the design-preferred point estimate of the sample instant. Use this when aligning Alicat samples against other sensor streams.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience; equivalent to received_at - requested_at but avoids the subtraction at every downstream call site.

frame DataFrame

The :class:DataFrame returned by the device's poll.

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as stream:
    async for batch in stream:
        process(batch)

The CM yields an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that polled successfully on that tick. Devices whose :class:DeviceResult carries an error are omitted from that batch and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically an :class:~alicatlib.manager.AlicatManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches. 64 mirrors the design default.

64

Yields:

Type Description
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]

An async iterator of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/alicatlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as stream:
            async for batch in stream:
                process(batch)

    The CM yields an async iterator of per-tick sample batches. Each
    batch is a ``Mapping[name, Sample]`` — one entry per device that
    polled successfully on that tick. Devices whose :class:`DeviceResult`
    carries an error are omitted from that batch and logged at WARN.

    Args:
        source: Any :class:`PollSource` (typically an
            :class:`~alicatlib.manager.AlicatManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
            ``64`` mirrors the design default.

    Yields:
        An async iterator of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §5.14).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            stats,
        )
        try:
            yield receive_stream
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with`` per §5.14.
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
    )
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )

Sample model

alicatlib.streaming.sample

Timed sample — one device reading with send/receive provenance.

A :class:Sample is what the recorder emits into its memory-object stream. It pairs a :class:DataFrame (the measurement) with enough timing to reconstruct the acquisition timeline after the fact: monotonic_ns for drift analysis, requested_at / received_at / midpoint_at for wall-clock provenance, and latency_s for quick per-sample latency checks.

The midpoint is the best point-estimate of the acquisition instant on the device: halfway between when the poll byte left the host and when the full reply arrived. That's what downstream plots and correlations should use when aligning Alicat data against other sensor streams.

Design reference: docs/design.md §5.14.

Sample dataclass

Sample(
    device,
    unit_id,
    monotonic_ns,
    requested_at,
    received_at,
    midpoint_at,
    latency_s,
    frame,
)

One device poll with full timing provenance.

Attributes:

Name Type Description
device str

The manager-assigned name (from AlicatManager.add). Stable downstream identifier that follows the value into sinks.

unit_id str

Bus-level single-letter unit id of the polled device. Kept separate from device so a user renaming the manager key doesn't lose the physical addressing context.

monotonic_ns int

:func:time.monotonic_ns at the read site. Used for scheduling / drift analysis only — never displayed, since the absolute value has no calendar meaning.

requested_at datetime

Wall-clock datetime (UTC) captured just before the poll bytes leave the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply line is read.

midpoint_at datetime

(requested_at + received_at) / 2 — the design-preferred point estimate of the sample instant. Use this when aligning Alicat samples against other sensor streams.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience; equivalent to received_at - requested_at but avoids the subtraction at every downstream call site.

frame DataFrame

The :class:DataFrame returned by the device's poll.

Recorder

alicatlib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:~alicatlib.manager.AlicatManager (or any :class:PollSource-shaped object — see below) at an absolute-target cadence and publishes the polled :class:DataFrame values into an :class:anyio.abc.ObjectReceiveStream as per-tick Mapping[name, Sample] batches.

Key invariants (design §5.14):

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives inside a create_task_group() strictly nested inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns. This matches AnyIO's own warning against yielding from inside a task group.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample — used for sink timestamps, never for scheduling.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.

The recorder consumes a :class:PollSource — a narrow Protocol the :class:~alicatlib.manager.AlicatManager already satisfies (its poll(names) signature matches). Kept as a Protocol so the recorder is unit-testable against a lightweight stub without standing up a full manager + transport pipeline.

Design reference: docs/design.md §5.14.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at,
    samples_emitted,
    samples_late,
    max_drift_ms,
)

Per-run summary emitted after record()'s CM exits.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime

Wall-clock at producer shutdown.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. Partial batches (some devices errored under ErrorPolicy.RETURN) still count as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch).

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

:class:~alicatlib.manager.AlicatManager satisfies this: its poll(names) returns a Mapping[str, DeviceResult[DataFrame]]. Using a Protocol keeps :func:record testable against a lightweight stub without pulling in the whole manager + transport stack.

poll async

poll(names=None)

Poll every named device (or all under management) concurrently.

Must return a mapping keyed by the manager-assigned device name. Successful polls carry the :class:DataFrame as .value; failed ones carry the :class:~alicatlib.errors.AlicatError as .error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).

Source code in src/alicatlib/streaming/recorder.py
async def poll(
    self,
    names: Sequence[str] | None = None,
) -> Mapping[str, DeviceResult[DataFrame]]:
    """Poll every named device (or all under management) concurrently.

    Must return a mapping keyed by the manager-assigned device name.
    Successful polls carry the :class:`DataFrame` as ``.value``;
    failed ones carry the :class:`~alicatlib.errors.AlicatError` as
    ``.error`` (per :class:`~alicatlib.manager.ErrorPolicy.RETURN`).
    """
    ...

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(mgr, rate_hz=10, duration=60) as stream:
    async for batch in stream:
        process(batch)

The CM yields an async iterator of per-tick sample batches. Each batch is a Mapping[name, Sample] — one entry per device that polled successfully on that tick. Devices whose :class:DeviceResult carries an error are omitted from that batch and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (typically an :class:~alicatlib.manager.AlicatManager).

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages.

None
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches. 64 mirrors the design default.

64

Yields:

Type Description
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]

An async iterator of per-tick Mapping[device_name, Sample].

Raises:

Type Description
ValueError

If rate_hz <= 0 or duration <= 0 or buffer_size < 1.

Source code in src/alicatlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(mgr, rate_hz=10, duration=60) as stream:
            async for batch in stream:
                process(batch)

    The CM yields an async iterator of per-tick sample batches. Each
    batch is a ``Mapping[name, Sample]`` — one entry per device that
    polled successfully on that tick. Devices whose :class:`DeviceResult`
    carries an error are omitted from that batch and logged at WARN.

    Args:
        source: Any :class:`PollSource` (typically an
            :class:`~alicatlib.manager.AlicatManager`).
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
            ``64`` mirrors the design default.

    Yields:
        An async iterator of per-tick ``Mapping[device_name, Sample]``.

    Raises:
        ValueError: If ``rate_hz <= 0`` or ``duration <= 0`` or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if overflow is OverflowPolicy.DROP_OLDEST:
        # Fail at call site (not deep inside the producer task) so the
        # exception type doesn't come back wrapped in an ExceptionGroup.
        raise NotImplementedError(
            "OverflowPolicy.DROP_OLDEST is not yet implemented; use BLOCK "
            "or DROP_NEWEST for now (design §5.14).",
        )

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Mapping[str, Sample]](
        max_buffer_size=buffer_size,
    )
    stats = _RunStats()

    started_at = datetime.now(UTC)
    _logger.info(
        "recorder.start",
        extra={
            "rate_hz": rate_hz,
            "duration_s": duration,
            "overflow": overflow.value,
            "buffer_size": buffer_size,
            "names": list(names) if names is not None else None,
        },
    )

    async with anyio.create_task_group() as tg, receive_stream:
        tg.start_soon(
            _run_producer,
            source,
            send_stream,
            period,
            total_ticks,
            names,
            overflow,
            stats,
        )
        try:
            yield receive_stream
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with`` per §5.14.
            tg.cancel_scope.cancel()

    finished_at = datetime.now(UTC)
    summary = AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=stats.emitted,
        samples_late=stats.late,
        max_drift_ms=stats.max_drift_ms,
    )
    _logger.info(
        "recorder.stop",
        extra={
            "samples_emitted": summary.samples_emitted,
            "samples_late": summary.samples_late,
            "max_drift_ms": summary.max_drift_ms,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )