alicatlib.streaming¶
Sample type and record() — the absolute-target acquisition
primitive. See Logging and acquisition for the user
guide and Streaming for the port-level streaming
runtime.
Public surface¶
alicatlib.streaming ¶
Sample acquisition — record() emits typed Sample streams.
Public surface:
- :class:
Sample— one device poll with full timing provenance. - :func:
record— absolute-cadence async context manager. - :class:
Recording— wrapper carrying the stream + live summary + rate. - :class:
OverflowPolicy— backpressure control knob. - :class:
AcquisitionSummary— per-run counters (mutable, updated in place by the recorder). - :class:
PollSource— Protocol the recorder accepts (satisfied by :class:~alicatlib.manager.AlicatManager). - :class:
PollSourceAdapter— single-device adapter so a bare :class:~alicatlib.devices.base.Devicecan drive :func:record.
See docs/design.md §5.14.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
)
Per-run summary owned by the recorder.
Mutability contract (per the cross-lib spec §M):
- The recorder is the only writer. It updates counters in place during the run so progress polling (TUIs, dashboards) works without a separate API.
- Consumers treat the summary as read-only.
- :attr:
finished_atisNonewhile the recording is in flight and is set on context-manager exit.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at recorder entry. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown — |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~alicatlib.manager.AlicatManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named device (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~alicatlib.errors.AlicatError as
.error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).
Source code in src/alicatlib/streaming/recorder.py
PollSourceAdapter ¶
Wrap one :class:Device as a :class:PollSource for :func:record.
Capa's old _SingleDevicePollSource shim reinvented this; the
adapter lives here so the wiring is one line at the call site::
adapter = PollSourceAdapter("fuel", device)
async with record(adapter, rate_hz=10) as recording:
...
The names filter is honoured per the cross-lib spec §E: when
the caller passes a name set that does not include this device's
name, poll() returns an empty mapping rather than polling
anyway. The recorder always passes a complete name set in
single-device mode so filtering is harmless; the empty-mapping
behaviour is the correct cross-lib semantic.
Source code in src/alicatlib/streaming/__init__.py
poll
async
¶
Poll the wrapped device and return a single-entry mapping.
Source code in src/alicatlib/streaming/__init__.py
Recording
dataclass
¶
The object yielded by :func:record's async context manager.
Wraps the live receive stream, the (mutable) :class:AcquisitionSummary
the recorder is updating in place, and the rate the recorder is
running at. Consumers iterate via async for batch in recording
(the instance delegates to :attr:stream), observe progress via
:attr:summary, and read :attr:rate_hz for queue-sizing decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick batches (or whatever record
type the lib emits). Typed via |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The rate the recorder is running at, captured at entry. Useful for back-pressure sizing in wrappers. |
__aiter__ ¶
Delegate iteration to :attr:stream.
Lets async for batch in recording work without forcing
callers to dereference recording.stream themselves —
ergonomic for the common case, while keeping the typed
attribute around for consumers that want to interleave reads
with reading :attr:summary or :attr:rate_hz.
Source code in src/alicatlib/streaming/recorder.py
Sample
dataclass
¶
Sample(
device,
unit_id,
t_mono_ns,
t_utc,
requested_at,
received_at,
latency_s,
reading,
t_midpoint_mono_ns=None,
)
One device poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
unit_id |
str
|
Bus-level single-letter unit id of the polled device.
Kept separate from |
t_mono_ns |
int
|
:func: |
t_utc |
datetime
|
Wall-clock |
t_midpoint_mono_ns |
int | None
|
Optional monotonic-ns midpoint of an
integration window. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
reading |
Reading
|
The :class: |
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as recording:
async for batch in recording.stream:
process(batch)
print(recording.summary.samples_emitted)
The CM yields a :class:Recording carrying the async iterator, the
live :class:AcquisitionSummary, and the rate. Each batch on the
stream is a Mapping[name, Sample] — one entry per device that
polled successfully on that tick. Devices whose :class:DeviceResult
carries an error are omitted from that batch and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches.
|
64
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Mapping[str, Sample]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/alicatlib/streaming/recorder.py
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | |
Sample model¶
alicatlib.streaming.sample ¶
Timed sample — one device reading with send/receive provenance.
A :class:Sample is what the recorder emits into its memory-object
stream. It pairs a :class:Reading (the measurement) with enough
timing to reconstruct the acquisition timeline after the fact:
t_mono_ns and t_utc are the canonical join keys per the
cross-lib contract (§C), and requested_at / received_at /
latency_s are the I/O-boundary provenance fields.
t_utc is the best point-estimate of the acquisition instant on
the device: halfway between when the poll byte left the host and when
the full reply arrived. That's what downstream plots and correlations
should use when aligning Alicat data against other sensor streams.
Design reference: docs/design.md §5.14.
Sample
dataclass
¶
Sample(
device,
unit_id,
t_mono_ns,
t_utc,
requested_at,
received_at,
latency_s,
reading,
t_midpoint_mono_ns=None,
)
One device poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
unit_id |
str
|
Bus-level single-letter unit id of the polled device.
Kept separate from |
t_mono_ns |
int
|
:func: |
t_utc |
datetime
|
Wall-clock |
t_midpoint_mono_ns |
int | None
|
Optional monotonic-ns midpoint of an
integration window. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
reading |
Reading
|
The :class: |
Recorder¶
alicatlib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:~alicatlib.manager.AlicatManager (or any
:class:PollSource-shaped object — see below) at an absolute-target
cadence and publishes the polled :class:Reading values into an
:class:anyio.abc.ObjectReceiveStream as per-tick
Mapping[name, Sample] batches.
Key invariants (design §5.14):
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives inside a
create_task_group()strictly nested inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns. This matches AnyIO's own warning against yielding from inside a task group. - Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample— used for sink timestamps, never for scheduling. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind.
The recorder consumes a :class:PollSource — a narrow Protocol the
:class:~alicatlib.manager.AlicatManager already satisfies (its
poll(names) signature matches). Kept as a Protocol so the
recorder is unit-testable against a lightweight stub without standing
up a full manager + transport pipeline.
Design reference: docs/design.md §5.14.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
)
Per-run summary owned by the recorder.
Mutability contract (per the cross-lib spec §M):
- The recorder is the only writer. It updates counters in place during the run so progress polling (TUIs, dashboards) works without a separate API.
- Consumers treat the summary as read-only.
- :attr:
finished_atisNonewhile the recording is in flight and is set on context-manager exit.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at recorder entry. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown — |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~alicatlib.manager.AlicatManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named device (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~alicatlib.errors.AlicatError as
.error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).
Source code in src/alicatlib/streaming/recorder.py
Recording
dataclass
¶
The object yielded by :func:record's async context manager.
Wraps the live receive stream, the (mutable) :class:AcquisitionSummary
the recorder is updating in place, and the rate the recorder is
running at. Consumers iterate via async for batch in recording
(the instance delegates to :attr:stream), observe progress via
:attr:summary, and read :attr:rate_hz for queue-sizing decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick batches (or whatever record
type the lib emits). Typed via |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The rate the recorder is running at, captured at entry. Useful for back-pressure sizing in wrappers. |
__aiter__ ¶
Delegate iteration to :attr:stream.
Lets async for batch in recording work without forcing
callers to dereference recording.stream themselves —
ergonomic for the common case, while keeping the typed
attribute around for consumers that want to interleave reads
with reading :attr:summary or :attr:rate_hz.
Source code in src/alicatlib/streaming/recorder.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as recording:
async for batch in recording.stream:
process(batch)
print(recording.summary.samples_emitted)
The CM yields a :class:Recording carrying the async iterator, the
live :class:AcquisitionSummary, and the rate. Each batch on the
stream is a Mapping[name, Sample] — one entry per device that
polled successfully on that tick. Devices whose :class:DeviceResult
carries an error are omitted from that batch and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches.
|
64
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Mapping[str, Sample]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/alicatlib/streaming/recorder.py
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | |