servomexlib.streaming¶
Sample, StreamingSession, StreamMode, record(), and AcquisitionSummary.
servomexlib.streaming ¶
Streaming layer.
Two acquisition shapes share one :class:Sample model and :class:StreamingSession
interface: the passive AUTOPRINT subscribe (continuous broadcast) and the
drift-free POLL :func:record loop (Modbus). :class:PollSource is the narrow
contract the recorder drives.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
tick_duration_ms_p50=0.0,
tick_duration_ms_p99=0.0,
disconnects=0,
)
Per-run summary, owned and mutated by the recorder (sole writer).
Counters update in place during the run so progress-polling consumers see
live values; consumers treat it as read-only. finished_at and the
percentile fields are populated on context-manager exit.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown, or |
samples_emitted |
int
|
Per-tick batches pushed onto the stream (a tick whose reads all failed still counts as one emitted batch). |
samples_late |
int
|
Ticks that missed their slot (overrun, overflow drop, or a reconnect gap). |
max_drift_ms |
float
|
Largest positive drift of an emitted batch from its target. |
tick_duration_ms_p50 |
float
|
Median |
tick_duration_ms_p99 |
float
|
99th-percentile |
disconnects |
int
|
|
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks the response.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer (default). Silent drops are surprising in acquisition, so the recorder blocks rather than discarding.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch and enqueue the newest. For real-time monitoring where the latest reading matters most. Each eviction is late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
poll_samples
async
¶
Read this tick's samples across every channel (and managed device).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
names
|
Sequence[str] | None
|
Subset of device names to poll (manager only); |
None
|
timeout
|
float | None
|
Per-poll I/O ceiling, or |
None
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat sequence of :class: |
Source code in src/servomexlib/streaming/poll_source.py
Recording
dataclass
¶
Container yielded by :func:record — stream + live summary + rate.
Shares the cross-family shape (stream / summary / rate_hz) so
downstream consumers are vendor-agnostic. For servomexlib the payload is
Recording[Sequence[Sample]] — per-tick batches.
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick :class: |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The cadence captured at |
Sample
dataclass
¶
Sample(
device,
channel,
reading,
protocol,
monotonic_ns,
received_at,
requested_at=None,
latency_s=None,
metadata=_empty_metadata(),
error=None,
)
Long-format row — one channel reading with streaming provenance.
requested_at / latency_s are None in passive continuous mode (we
did not ask). error is set when a frame was dropped/corrupt; reading
is then None (the two are mutually exclusive) and channel is None
because a dropped frame is not tied to one channel.
StreamMode ¶
Bases: StrEnum
How a :meth:Analyzer.stream session sources samples.
AUTOPRINT is the inherited family member (sartorius SBI vocabulary),
reused verbatim for boundary harmony; for the 4100 it denotes a passive
unsolicited-broadcast subscribe.
StreamingSession ¶
Async-iterable context manager over one subscriber's :class:Sample stream.
Two backing shapes share this one interface:
- Passive
AUTOPRINT(continuous):receiveris fed by the client's already-running background loop;on_closeunsubscribes. - Active
POLL(Modbus): aproducercoroutine is supplied. It is run in a task group this session owns — started on__aenter__and cancelled on close — so the recorder's lifetime is strictly nested in the session.
Source code in src/servomexlib/streaming/stream_session.py
aclose
async
¶
Stop the owned producer (if any), unsubscribe, and close the stream.
Source code in src/servomexlib/streaming/stream_session.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
timeout=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(analyzer, rate_hz=2, duration=10) as rec:
async for batch in rec.stream:
for sample in batch:
print(sample.channel, sample.value)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence; |
required |
duration
|
float | None
|
Total acquisition seconds, or |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll (manager only); |
None
|
timeout
|
float | None
|
Per-poll I/O ceiling passed to |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the buffer is full. |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
Treat :class: |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
Rebuilds the source after a disconnect when supplied. |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Sequence[Sample]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/servomexlib/streaming/recorder.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | |