sartoriuslib.streaming¶
Sample, StreamingSession, StreamMode, record(),
AcquisitionSummary, OverflowPolicy, PollSource. See
Streaming and Logging and acquisition.
Public surface¶
sartoriuslib.streaming ¶
Streaming + recording primitives. See design doc §10.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at,
samples_emitted,
samples_late,
max_drift_ms,
target_total_samples=None,
)
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. |
target_total_samples |
int | None
|
Number of scheduled ticks for finite
duration runs, or |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~sartoriuslib.manager.SartoriusManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named balance (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~sartoriuslib.errors.SartoriusError
as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).
Source code in src/sartoriuslib/streaming/recorder.py
Sample
dataclass
¶
Sample(
device,
reading,
requested_at,
received_at,
midpoint_at,
monotonic_ns,
elapsed_s,
protocol,
metadata=_empty_metadata(),
error=None,
)
One balance poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
reading |
Reading | None
|
The :class: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
monotonic_ns |
int
|
:func: |
elapsed_s |
float
|
|
protocol |
ProtocolKind | None
|
Which wire protocol produced this sample.
Duplicates |
metadata |
Mapping[str, str]
|
Free-form per-sample annotations. Populated by
Phase-7 |
error |
SartoriusError | None
|
The :class: |
StreamingSession ¶
StreamingSession(
balance,
*,
rate_hz=None,
mode="poll",
temporary_autoprint=False,
confirm=False,
timeout=None,
)
Async context manager + iterator for one balance.
mode="poll" performs request/response polling at an absolute cadence.
mode="autoprint" consumes already-enabled SBI autoprint lines and
fails on entry if no line is available within timeout.
Source code in src/sartoriuslib/streaming/stream_session.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as stream:
async for batch in stream:
process(batch)
The CM yields an async iterator of per-tick sample batches. Each
batch is a Mapping[name, Sample] — one entry per device that
participated on that tick. Successful polls produce a
:class:Sample carrying a :class:Reading; failed polls produce
a :class:Sample with reading=None and error set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. Finite runs
schedule |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]
|
An async iterator of per-tick |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/sartoriuslib/streaming/recorder.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | |
Sample¶
sartoriuslib.streaming.sample ¶
Timed sample — one balance reading with send/receive provenance.
A :class:Sample is what the recorder emits into its memory-object
stream. It pairs a :class:Reading with enough timing to reconstruct
the acquisition timeline: monotonic_ns for drift analysis,
requested_at / received_at / midpoint_at for wall-clock
provenance, and elapsed_s for per-sample latency checks.
The midpoint is the best point-estimate of the acquisition instant on the device: halfway between when the poll bytes left the host and when the full reply arrived. Downstream plots and correlations should use this field when aligning balance data against other sensor streams.
reading is None when error is populated — the two fields
are mutually exclusive. Samples with error still carry the
timing fields so sinks can log the failed attempts with proper
wall-clock provenance.
Design reference: docs/design.md §10.
Sample
dataclass
¶
Sample(
device,
reading,
requested_at,
received_at,
midpoint_at,
monotonic_ns,
elapsed_s,
protocol,
metadata=_empty_metadata(),
error=None,
)
One balance poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
reading |
Reading | None
|
The :class: |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
midpoint_at |
datetime
|
|
monotonic_ns |
int
|
:func: |
elapsed_s |
float
|
|
protocol |
ProtocolKind | None
|
Which wire protocol produced this sample.
Duplicates |
metadata |
Mapping[str, str]
|
Free-form per-sample annotations. Populated by
Phase-7 |
error |
SartoriusError | None
|
The :class: |
Per-balance streaming session¶
sartoriuslib.streaming.stream_session ¶
Per-balance streaming session used by :meth:Balance.stream.
StreamingSession ¶
StreamingSession(
balance,
*,
rate_hz=None,
mode="poll",
temporary_autoprint=False,
confirm=False,
timeout=None,
)
Async context manager + iterator for one balance.
mode="poll" performs request/response polling at an absolute cadence.
mode="autoprint" consumes already-enabled SBI autoprint lines and
fails on entry if no line is available within timeout.
Source code in src/sartoriuslib/streaming/stream_session.py
Recorder — record(), summary, overflow¶
sartoriuslib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:~sartoriuslib.manager.SartoriusManager (or any
:class:PollSource-shaped object — see below) at an absolute-target
cadence and publishes the polled :class:~sartoriuslib.devices.models.Reading
values into an :class:anyio.abc.ObjectReceiveStream as per-tick
Mapping[name, Sample] batches.
Key invariants (design §10):
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives inside a
create_task_group()strictly nested inside the async CM body. - Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each balance's poll and attached to the emitted :class:Sample. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind. - Errors are samples too. Under
ErrorPolicy.RETURNon the source, per-device failures arrive as :class:Samplewithreading=Noneanderrorset — they still carry wall-clock timing so sinks can log the failure. UnderErrorPolicy.RAISEa failed device's poll aborts the batch at the source layer before the recorder sees it.
The recorder consumes a :class:PollSource — a narrow Protocol the
:class:~sartoriuslib.manager.SartoriusManager already satisfies (its
poll(names) signature matches). Kept as a Protocol so the
recorder is unit-testable against a lightweight stub.
Design reference: docs/design.md §10.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at,
samples_emitted,
samples_late,
max_drift_ms,
target_total_samples=None,
)
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. |
target_total_samples |
int | None
|
Number of scheduled ticks for finite
duration runs, or |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~sartoriuslib.manager.SartoriusManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named balance (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~sartoriuslib.errors.SartoriusError
as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).
Source code in src/sartoriuslib/streaming/recorder.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as stream:
async for batch in stream:
process(batch)
The CM yields an async iterator of per-tick sample batches. Each
batch is a Mapping[name, Sample] — one entry per device that
participated on that tick. Successful polls produce a
:class:Sample carrying a :class:Reading; failed polls produce
a :class:Sample with reading=None and error set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. Finite runs
schedule |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AsyncIterator[Mapping[str, Sample]]]
|
An async iterator of per-tick |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/sartoriuslib/streaming/recorder.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | |