sartoriuslib.streaming¶
Sample, StreamingSession, StreamMode, record(),
AcquisitionSummary, OverflowPolicy, PollSource. See
Streaming and Logging and acquisition.
Public surface¶
sartoriuslib.streaming ¶
Streaming + recording primitives. See design doc §10.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
target_total_samples=None,
)
Mutable acquisition totals owned by recorder / pipe drivers.
The recorder is the sole writer — counters update in place during the run so progress polling (TUIs, dashboards) works without a separate API. Consumers MUST treat the summary as read-only; mutating it from the consumer side is a contract violation that will produce wrong totals on shutdown.
finished_at is None while the producer is running and is
set on context-manager exit.
Unified spec §M: every sibling library follows the same mutability rule; the field set is library-specific.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown ( |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream for recorder summaries, or
individual samples handed to the sink for |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. |
target_total_samples |
int | None
|
Number of scheduled ticks for finite
duration recorder runs, or |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~sartoriuslib.manager.SartoriusManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named balance (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~sartoriuslib.errors.SartoriusError
as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).
Source code in src/sartoriuslib/streaming/recorder.py
PollSourceAdapter ¶
Wrap one :class:Balance as a :class:PollSource for :func:record.
Construction takes a name (the manager-style identifier the
sample carries downstream) and the :class:Balance to poll. Every
:meth:poll invocation returns either a single-entry mapping
containing the poll outcome wrapped in :class:DeviceResult, or an
empty mapping when names is supplied and does not include this
adapter's name.
Usage::
adapter = PollSourceAdapter("bal1", balance)
async with record(adapter, rate_hz=10) as recording:
async for batch in recording.stream:
...
Source code in src/sartoriuslib/streaming/poll_source.py
poll
async
¶
Poll the wrapped balance and return a one-entry result mapping.
When names is supplied and excludes :attr:name, returns
an empty mapping (the consumer asked for a different device).
Otherwise returns {name: DeviceResult.success(reading)} on
success or {name: DeviceResult.failure(error)} on a typed
sartoriuslib failure.
Source code in src/sartoriuslib/streaming/poll_source.py
Recording
dataclass
¶
The context-manager payload returned by :func:record.
Bundles the per-tick stream, the live :class:AcquisitionSummary,
and the configured / observed rates so consumers can poll progress
without reaching into recorder internals. Cross-library spec §M:
every sibling library yields Recording[T] from its record
CM; T is what the recorder actually emits per tick (for
sartoriuslib that's Mapping[str, Sample]).
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
StreamT
|
The async iterator the recorder publishes per-tick
payloads into. Drain by |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
Configured cadence the recorder is running at, as
passed to :func: |
observed_rate_hz |
float | None
|
Rolling mean inter-frame rate over the last
10 SBI autoprint frames. |
Sample
dataclass
¶
Sample(
device,
reading,
t_mono_ns,
t_utc,
requested_at,
received_at,
latency_s,
protocol,
t_midpoint_mono_ns=None,
metadata=_empty_metadata(),
error=None,
)
One balance poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
reading |
Reading | None
|
The :class: |
t_mono_ns |
int
|
Canonical monotonic acquisition timestamp in nanoseconds since OS boot. The midpoint of the request / response monotonic timestamps for request/response polling; the receive-side monotonic for SBI autoprint frames. This is the join key downstream tooling correlates against sibling-library samples. |
t_utc |
datetime
|
Wall-clock acquisition instant (UTC, tz-aware) for the
same moment :attr: |
t_midpoint_mono_ns |
int | None
|
Optional integration-window midpoint in
monotonic nanoseconds. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
protocol |
ProtocolKind | None
|
Which wire protocol produced this sample.
Duplicates |
metadata |
Mapping[str, str]
|
Free-form per-sample annotations. Populated by
|
error |
SartoriusError | None
|
The :class: |
StreamingSession ¶
StreamingSession(
balance,
*,
rate_hz=None,
mode="poll",
temporary_autoprint=False,
confirm=False,
timeout=None,
)
Async context manager + iterator for one balance.
mode="poll" performs request/response polling at an absolute cadence.
mode="autoprint" consumes already-enabled SBI autoprint lines and
fails on entry if no line is available within timeout. The
temporary_autoprint=True path is reserved for a future persistent
SBI parameter-write flow and currently raises :class:NotImplementedError.
Source code in src/sartoriuslib/streaming/stream_session.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as recording:
async for batch in recording.stream:
process(batch)
print(recording.summary.samples_emitted)
The CM yields a :class:Recording whose :attr:Recording.stream
is an async iterator of per-tick sample batches. Each batch is a
Mapping[name, Sample] — one entry per device that
participated on that tick. Successful polls produce a
:class:Sample carrying a :class:Reading; failed polls produce
a :class:Sample with reading=None and error set. The
bundled :attr:Recording.summary updates live during the run and
finalises (finished_at set) on CM exit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. Finite runs
schedule |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/sartoriuslib/streaming/recorder.py
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 | |
Sample¶
sartoriuslib.streaming.sample ¶
Timed sample — one balance reading with send/receive provenance.
A :class:Sample is what the recorder emits into its memory-object
stream. It pairs a :class:Reading with enough timing to reconstruct
the acquisition timeline. The unified cross-library timestamp contract
(see UNIFIED_API_HANDOFF.md §C) requires three canonical fields:
- :attr:
t_mono_ns— monotonic acquisition timestamp (join key). - :attr:
t_utc— wall-clock acquisition instant. For request/response polling this is the midpoint between :attr:requested_atand :attr:received_at— the best point estimate of when the device produced the reading. For SBI autoprint it is :attr:received_at. - :attr:
t_midpoint_mono_ns— integration-window midpoint (monotonic).Nonefor single polled / autoprint samples; reserved for sensors that emit values integrated over a known window.
Per-protocol I/O provenance (requested_at / received_at /
latency_s) is kept alongside so latency analysis and on-the-wire
debugging stay possible. :attr:metadata carries free-form annotations
(autoprint vs. poll mode) and is real data, not log spam.
reading is None when error is populated — the two fields
are mutually exclusive. Samples with error still carry the
timing fields so sinks can log the failed attempts with proper
wall-clock provenance.
Design reference: docs/design.md §10; unified spec
UNIFIED_API_HANDOFF.md §C.
Sample
dataclass
¶
Sample(
device,
reading,
t_mono_ns,
t_utc,
requested_at,
received_at,
latency_s,
protocol,
t_midpoint_mono_ns=None,
metadata=_empty_metadata(),
error=None,
)
One balance poll with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
The manager-assigned name (from |
reading |
Reading | None
|
The :class: |
t_mono_ns |
int
|
Canonical monotonic acquisition timestamp in nanoseconds since OS boot. The midpoint of the request / response monotonic timestamps for request/response polling; the receive-side monotonic for SBI autoprint frames. This is the join key downstream tooling correlates against sibling-library samples. |
t_utc |
datetime
|
Wall-clock acquisition instant (UTC, tz-aware) for the
same moment :attr: |
t_midpoint_mono_ns |
int | None
|
Optional integration-window midpoint in
monotonic nanoseconds. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
protocol |
ProtocolKind | None
|
Which wire protocol produced this sample.
Duplicates |
metadata |
Mapping[str, str]
|
Free-form per-sample annotations. Populated by
|
error |
SartoriusError | None
|
The :class: |
Per-balance streaming session¶
sartoriuslib.streaming.stream_session ¶
Per-balance streaming session used by :meth:Balance.stream.
StreamingSession ¶
StreamingSession(
balance,
*,
rate_hz=None,
mode="poll",
temporary_autoprint=False,
confirm=False,
timeout=None,
)
Async context manager + iterator for one balance.
mode="poll" performs request/response polling at an absolute cadence.
mode="autoprint" consumes already-enabled SBI autoprint lines and
fails on entry if no line is available within timeout. The
temporary_autoprint=True path is reserved for a future persistent
SBI parameter-write flow and currently raises :class:NotImplementedError.
Source code in src/sartoriuslib/streaming/stream_session.py
Recorder — record(), summary, overflow¶
sartoriuslib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:~sartoriuslib.manager.SartoriusManager (or any
:class:PollSource-shaped object — see below) at an absolute-target
cadence and publishes the polled :class:~sartoriuslib.devices.models.Reading
values into an :class:anyio.abc.ObjectReceiveStream as per-tick
Mapping[name, Sample] batches.
Key invariants (design §10):
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives inside a
create_task_group()strictly nested inside the async CM body. - Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each balance's poll and attached to the emitted :class:Sample. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind. - Errors are samples too. Under
ErrorPolicy.RETURNon the source, per-device failures arrive as :class:Samplewithreading=Noneanderrorset — they still carry wall-clock timing so sinks can log the failure. UnderErrorPolicy.RAISEa failed device's poll aborts the batch at the source layer before the recorder sees it.
The recorder consumes a :class:PollSource — a narrow Protocol the
:class:~sartoriuslib.manager.SartoriusManager already satisfies (its
poll(names) signature matches). Kept as a Protocol so the
recorder is unit-testable against a lightweight stub.
Design reference: docs/design.md §10.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
target_total_samples=None,
)
Mutable acquisition totals owned by recorder / pipe drivers.
The recorder is the sole writer — counters update in place during the run so progress polling (TUIs, dashboards) works without a separate API. Consumers MUST treat the summary as read-only; mutating it from the consumer side is a contract violation that will produce wrong totals on shutdown.
finished_at is None while the producer is running and is
set on context-manager exit.
Unified spec §M: every sibling library follows the same mutability rule; the field set is library-specific.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown ( |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream for recorder summaries, or
individual samples handed to the sink for |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. |
target_total_samples |
int | None
|
Number of scheduled ticks for finite
duration recorder runs, or |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
:class:~sartoriuslib.manager.SartoriusManager satisfies this: its
poll(names) returns a Mapping[str, DeviceResult[Reading]].
Using a Protocol keeps :func:record testable against a lightweight
stub without pulling in the whole manager + transport stack.
poll
async
¶
Poll every named balance (or all under management) concurrently.
Must return a mapping keyed by the manager-assigned device name.
Successful polls carry the :class:Reading as .value;
failed ones carry the :class:~sartoriuslib.errors.SartoriusError
as .error (per :class:~sartoriuslib.manager.ErrorPolicy.RETURN).
Source code in src/sartoriuslib/streaming/recorder.py
Recording
dataclass
¶
The context-manager payload returned by :func:record.
Bundles the per-tick stream, the live :class:AcquisitionSummary,
and the configured / observed rates so consumers can poll progress
without reaching into recorder internals. Cross-library spec §M:
every sibling library yields Recording[T] from its record
CM; T is what the recorder actually emits per tick (for
sartoriuslib that's Mapping[str, Sample]).
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
StreamT
|
The async iterator the recorder publishes per-tick
payloads into. Drain by |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
Configured cadence the recorder is running at, as
passed to :func: |
observed_rate_hz |
float | None
|
Rolling mean inter-frame rate over the last
10 SBI autoprint frames. |
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(mgr, rate_hz=10, duration=60) as recording:
async for batch in recording.stream:
process(batch)
print(recording.summary.samples_emitted)
The CM yields a :class:Recording whose :attr:Recording.stream
is an async iterator of per-tick sample batches. Each batch is a
Mapping[name, Sample] — one entry per device that
participated on that tick. Successful polls produce a
:class:Sample carrying a :class:Reading; failed polls produce
a :class:Sample with reading=None and error set. The
bundled :attr:Recording.summary updates live during the run and
finalises (finished_at set) on CM exit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. Finite runs
schedule |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Mapping[str, Sample], AsyncIterator[Mapping[str, Sample]]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/sartoriuslib/streaming/recorder.py
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 | |