alicatlib.streaming¶
Sample type and record() — the absolute-target acquisition
primitive. See Logging and acquisition for the user
guide and Streaming for the port-level streaming
runtime.
Public surface¶
alicatlib.streaming ¶
Sample acquisition — record() emits typed Sample streams.
Public surface:
- :class:
Sample— one device poll with full timing provenance. - :func:
record— absolute-cadence async context manager. - :class:
OverflowPolicy— backpressure control knob. - :class:
AcquisitionSummary— per-run counters. - :class:
PollSource— Protocol the recorder accepts (satisfied by :class:~alicatlib.manager.AlicatManager).
See docs/design.md §5.14.
AcquisitionSummary
dataclass
¶
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~alicatlib.manager.AlicatManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[DataFrame]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named device (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:DataFrame as .value;
failed ones carry the :class:~alicatlib.errors.AlicatError as
.error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).
Source code in src/alicatlib/streaming/recorder.py
Sample
dataclass
¶
One device poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
unit_id |
str
|
Bus-level single-letter unit id of the polled device.
Kept separate from |
monotonic_ns |
int
|
:func: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
latency_s |
float
|
|
frame |
DataFrame
|
The :class: |
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as stream:
async for batch in stream:
process(batch)
The CM yields an async iterator of per-tick sample batches. Each
batch is a Mapping[name, Sample] — one entry per device that
polled successfully on that tick. Devices whose :class:DeviceResult
carries an error are omitted from that batch and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches.
|
64
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]
|
An async iterator of per-tick |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/alicatlib/streaming/recorder.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | |
Sample model¶
alicatlib.streaming.sample ¶
Timed sample — one device reading with send/receive provenance.
A :class:Sample is what the recorder emits into its memory-object
stream. It pairs a :class:DataFrame (the measurement) with enough
timing to reconstruct the acquisition timeline after the fact:
monotonic_ns for drift analysis, requested_at /
received_at / midpoint_at for wall-clock provenance, and
latency_s for quick per-sample latency checks.
The midpoint is the best point-estimate of the acquisition instant on the device: halfway between when the poll byte left the host and when the full reply arrived. That's what downstream plots and correlations should use when aligning Alicat data against other sensor streams.
Design reference: docs/design.md §5.14.
Sample
dataclass
¶
One device poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
unit_id |
str
|
Bus-level single-letter unit id of the polled device.
Kept separate from |
monotonic_ns |
int
|
:func: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
latency_s |
float
|
|
frame |
DataFrame
|
The :class: |
Recorder¶
alicatlib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:~alicatlib.manager.AlicatManager (or any
:class:PollSource-shaped object — see below) at an absolute-target
cadence and publishes the polled :class:DataFrame values into an
:class:anyio.abc.ObjectReceiveStream as per-tick
Mapping[name, Sample] batches.
Key invariants (design §5.14):
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives inside a
create_task_group()strictly nested inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns. This matches AnyIO's own warning against yielding from inside a task group. - Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample— used for sink timestamps, never for scheduling. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind.
The recorder consumes a :class:PollSource — a narrow Protocol the
:class:~alicatlib.manager.AlicatManager already satisfies (its
poll(names) signature matches). Kept as a Protocol so the
recorder is unit-testable against a lightweight stub without standing
up a full manager + transport pipeline.
Design reference: docs/design.md §5.14.
AcquisitionSummary
dataclass
¶
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~alicatlib.manager.AlicatManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[DataFrame]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named device (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:DataFrame as .value;
failed ones carry the :class:~alicatlib.errors.AlicatError as
.error (per :class:~alicatlib.manager.ErrorPolicy.RETURN).
Source code in src/alicatlib/streaming/recorder.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as stream:
async for batch in stream:
process(batch)
The CM yields an async iterator of per-tick sample batches. Each
batch is a Mapping[name, Sample] — one entry per device that
polled successfully on that tick. Devices whose :class:DeviceResult
carries an error are omitted from that batch and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches.
|
64
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]
|
An async iterator of per-tick |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/alicatlib/streaming/recorder.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | |