watlowlib.streaming¶
Sample, record(), OverflowPolicy, AcquisitionSummary, and the
PollSource Protocol that the recorder drives. See
Streaming and Logging and acquisition.
Public surface¶
watlowlib.streaming ¶
Streaming primitives — :func:record + :class:Sample.
The streaming layer drives a :class:PollSource (a
:class:~watlowlib.devices.controller.Controller or
:class:~watlowlib.manager.WatlowManager) at an absolute-target
cadence and publishes :class:Sample batches into an async receive
stream. Pair with :func:watlowlib.sinks.pipe to drain into a
:class:~watlowlib.sinks.SampleSink.
Design reference: docs/design.md §6.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at,
samples_emitted,
samples_late,
max_drift_ms,
disconnects=0,
)
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late. |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
disconnects |
int
|
Count of WatlowConnectionError events the
producer absorbed under |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch that was about to be enqueued. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
Both :class:~watlowlib.devices.controller.Controller (solo) and
:class:~watlowlib.manager.WatlowManager (multi-device) satisfy
this Protocol. Using a Protocol keeps :func:record testable
against a lightweight stub without standing up a full controller +
transport pipeline.
The contract is intentionally narrow: per call, return a flat
:class:~collections.abc.Sequence of :class:Sample\ s — one
per (device, parameter) read that succeeded. Failed reads are
dropped from the batch and logged by the source; the recorder
never sees them.
poll
async
¶
Read every parameters × instances combination on every device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs. |
required |
names
|
Sequence[str] | None
|
Subset of device names to poll (Manager-only;
Controller ignores). |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device.
Single-loop devices use |
(1,)
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat :class: |
Sequence[Sample]
|
every poll failed. |
Source code in src/watlowlib/streaming/recorder.py
Sample
dataclass
¶
Sample(
device,
address,
protocol,
parameter,
parameter_id,
instance,
value,
unit,
monotonic_ns,
requested_at,
received_at,
midpoint_at,
latency_s,
raw,
)
One parameter read with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks. |
address |
int
|
Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247. |
protocol |
ProtocolKind
|
Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row. |
parameter |
str
|
Canonical parameter name (e.g. |
parameter_id |
int
|
Registry parameter id (e.g. |
instance |
int
|
1-indexed loop / channel selector used for the read. |
value |
float | int | str | bool | None
|
The decoded scalar. |
unit |
str | None
|
Display string for the value's unit, or |
monotonic_ns |
int
|
:func: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
latency_s |
float
|
|
raw |
bytes
|
The wire payload that produced the value. Available for diagnostics; tabular sinks drop it. |
record
async
¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(
controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as stream:
async for batch in stream:
for sample in batch:
print(sample.parameter, sample.value)
The CM yields an async iterator of per-tick :class:Sample batches.
Each batch is a flat :class:Sequence — one entry per (device,
parameter, instance) read that succeeded. Failed reads are dropped
by the source and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs to poll each tick. |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device. Single-
loop devices use |
(1,)
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
When |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
When supplied alongside |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Sequence[Sample]]]
|
An async iterator of per-tick :class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/watlowlib/streaming/recorder.py
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | |
Sample¶
watlowlib.streaming.sample ¶
Timed sample — one parameter read with send/receive provenance.
A :class:Sample is the unit the recorder emits into its memory-object
stream. Watlow polls a small group of parameters per device per tick
(unlike Alicat, which returns one wide DataFrame per poll), so a
recorder tick produces N×M samples — one per (device, parameter) pair
that succeeded — each one carrying:
midpoint_at— best point-estimate of the on-device acquisition instant (halfway between request and reply). Use this for aligning Watlow values against other sensor streams.monotonic_ns— :func:time.monotonic_nsat the read boundary, for drift analysis only (no calendar meaning).raw— the wire payload that produced the value. Available for diagnostics; tabular sinks drop it.
The shape is deliberately long-format (one row per parameter) so the SQLite cross-vendor test can union Watlow rows with Alicat rows under one schema.
Design reference: docs/design.md §6.
Sample
dataclass
¶
Sample(
device,
address,
protocol,
parameter,
parameter_id,
instance,
value,
unit,
monotonic_ns,
requested_at,
received_at,
midpoint_at,
latency_s,
raw,
)
One parameter read with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks. |
address |
int
|
Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247. |
protocol |
ProtocolKind
|
Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row. |
parameter |
str
|
Canonical parameter name (e.g. |
parameter_id |
int
|
Registry parameter id (e.g. |
instance |
int
|
1-indexed loop / channel selector used for the read. |
value |
float | int | str | bool | None
|
The decoded scalar. |
unit |
str | None
|
Display string for the value's unit, or |
monotonic_ns |
int
|
:func: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
latency_s |
float
|
|
raw |
bytes
|
The wire payload that produced the value. Available for diagnostics; tabular sinks drop it. |
Recorder¶
watlowlib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:PollSource (an opened :class:~watlowlib.devices.controller.Controller
or a :class:~watlowlib.manager.WatlowManager) at an absolute-target
cadence and publishes the polled :class:Sample values into an
:class:anyio.abc.ObjectReceiveStream as per-tick batches.
Key invariants:
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives strictly inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns.
- Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample— used for sink timestamps, never for scheduling. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind.
The recorder consumes a :class:PollSource — a narrow Protocol both
:class:~watlowlib.devices.controller.Controller and
:class:~watlowlib.manager.WatlowManager satisfy. Kept as a Protocol
so the recorder is unit-testable against a lightweight stub.
Design reference: docs/design.md §6.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at,
samples_emitted,
samples_late,
max_drift_ms,
disconnects=0,
)
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late. |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
disconnects |
int
|
Count of WatlowConnectionError events the
producer absorbed under |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch that was about to be enqueued. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
Both :class:~watlowlib.devices.controller.Controller (solo) and
:class:~watlowlib.manager.WatlowManager (multi-device) satisfy
this Protocol. Using a Protocol keeps :func:record testable
against a lightweight stub without standing up a full controller +
transport pipeline.
The contract is intentionally narrow: per call, return a flat
:class:~collections.abc.Sequence of :class:Sample\ s — one
per (device, parameter) read that succeeded. Failed reads are
dropped from the batch and logged by the source; the recorder
never sees them.
poll
async
¶
Read every parameters × instances combination on every device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs. |
required |
names
|
Sequence[str] | None
|
Subset of device names to poll (Manager-only;
Controller ignores). |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device.
Single-loop devices use |
(1,)
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat :class: |
Sequence[Sample]
|
every poll failed. |
Source code in src/watlowlib/streaming/recorder.py
record
async
¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(
controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as stream:
async for batch in stream:
for sample in batch:
print(sample.parameter, sample.value)
The CM yields an async iterator of per-tick :class:Sample batches.
Each batch is a flat :class:Sequence — one entry per (device,
parameter, instance) read that succeeded. Failed reads are dropped
by the source and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs to poll each tick. |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device. Single-
loop devices use |
(1,)
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
When |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
When supplied alongside |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Sequence[Sample]]]
|
An async iterator of per-tick :class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/watlowlib/streaming/recorder.py
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | |