Streaming mode¶
Streaming mode is a port-level state transition, not a
request/response command. When a device enters streaming, it stops
responding to prompts, overwrites its unit-id letter on the wire with
@ (or a space on 10v20 — see below), and pushes data frames
continuously at a configured rate until stopped. The
StreamingSession runtime
owns that mode transition, the producer loop, and the teardown
contract.
See Design §5.8 for the authoritative architecture and §15.3 for the hardware-driven corrections that shaped the implementation.
Opening a stream¶
async with open_device("/dev/ttyUSB0") as dev:
async with dev.stream(rate_ms=50) as stream:
async for frame in stream:
print(frame.get_float("Mass_Flow"))
Device.stream(...) returns
a StreamingSession — both an async context manager and an async
iterator. The context's lifetime is the streaming session's lifetime:
enter sends start-stream, exit sends stop-stream, and a crashed
consumer never leaves the device flooding the bus because teardown
runs under __aexit__ even when the body raises.
Parameters¶
| Parameter | Default | Purpose |
|---|---|---|
rate_ms |
None |
If set, configures NCS before entering streaming mode. 0 is "as-fast-as-possible"; distinct from None ("leave device at its current rate"). Firmware-gated at V10 >= 10v05 by the STREAMING_RATE command. |
strict |
False |
When True, AlicatParseError from a malformed frame propagates out of __anext__ and tears the stream down. When False, the error is logged at WARN and the producer continues. |
overflow |
DROP_OLDEST |
Buffer backpressure policy — see below. |
buffer_size |
256 |
Bounded producer/consumer buffer depth. 256 frames at 50 ms is ~13 s of backlog. |
Mode transition¶
Entering streaming mode runs a four-step sequence under the port lock (devices/streaming.py:169):
- Cached format. Lazy-probe
??D*if the session doesn't have a cachedDataFrameFormat. Streaming has to parse every frame, so a missing format is a hard error before any producer starts. - Optional
NCSrate. Runs as a normal request/response command before the streaming latch flips, so the dispatch gate still allows it. - Atomic latch + start-stream write. Acquires the port lock,
verifies the client isn't already streaming, flips
AlicatProtocolClient.is_streaming, writes{unit_id}@ @\rdirectly to the transport, releases the lock. Holding the lock across the latch + write is what makes the transition atomic with respect to other sessions on the same client. - Producer task. Starts inside a task group whose lifetime matches the CM. Cancelled and joined on exit.
The is_streaming latch¶
AlicatProtocolClient.is_streaming is the dispatch gate. While it's
set, every Session.execute / poll / request call on any
session sharing this client's port fails fast with
AlicatStreamingModeError — measured at 0.088 ms with zero tx on
real hardware. One streamer per port is a hard invariant; attempting
to enter a second StreamingSession while one is active raises
immediately without touching the wire.
The latch is cleared in __aexit__ after the stop-stream write
and drain, so the next poll() on the session cannot race the
device's shutdown of its continuous push.
Overflow policy¶
OverflowPolicy controls
what the producer does when the bounded buffer is full:
| Policy | Behaviour |
|---|---|
DROP_OLDEST (default) |
Evict the oldest queued frame, then enqueue. Latest-data-wins — the right default for high-rate telemetry where staleness matters more than completeness. |
DROP_NEWEST |
Drop the frame that was about to be enqueued. Keeps the oldest queued frame; useful when you want to preserve a coherent window. |
BLOCK |
Await the slow consumer. Valid but risks the OS-level serial buffer dropping bytes if the consumer stays behind for long — the kernel tty buffer is finite, and the device keeps pushing. |
StreamingSession.dropped_frames counts frames the producer had to
discard under DROP_OLDEST / DROP_NEWEST. Available after the CM
exits.
Parse-error handling¶
The producer parses each line through the session's cached
DataFrameFormat. Malformed frames are handled by
the strict flag:
strict=False(default) — logs a WARN with the raw bytes and continues. A single bad frame on a flaky RS-485 bus doesn't kill the stream.strict=True— records the error, cancels the producer, and re-raises theAlicatParseErrorout of the consumer's__anext__so theasync forloop surfaces the real exception.
Transport failures (port yanked, process shutdown) always tear the
stream down — no knob to disable that. The error propagates through
__anext__ so the consumer sees the cause.
Wire-shape normalisation¶
The primer says a streaming device "changes its unit-id letter to
@". Empirically on 10v20, the letter is dropped entirely,
leaving a leading space. The producer normalises either form back to
the request/response shape by prepending the session's unit id —
self._session.unit_id.encode("ascii") + stripped[1:] — so the
single DataFrameFormat.parse path handles both. See
devices/streaming.py:394
for the exact dispatch.
Stop-stream and recovery¶
__aexit__ is load-bearing (streaming.py:241):
- Cancel the producer task group and close the send side of the
buffer so any pending
__anext__receivesStopAsyncIterationor the re-raised strict-mode error. - Under the port lock, write
@@ {unit_id}\rand drain with a 100 ms idle window. If the transport is already torn down (AlicatTransportErrorduring the write), the exception is suppressed — the device is the caller's problem at that point, but the latch still needs clearing. - Always clear the streaming latch, even if the stop-stream write failed. Leaving it set would permanently brick the client for request/response use.
Stale-stream recovery on open¶
If a prior process left a device streaming,
open_device detects
this during the identification pipeline's passive sniff. The factory
issues the stop-stream bytes directly (bypassing the session layer
because the session doesn't exist yet) and drains before VE runs.
The passive sniff and the post-stop drain are both capped at
256 bytes — the uncapped form deadlocks open_device against a
device continuously streaming at its 50 ms default rate, because the
bus never goes idle for the 100 ms window the read needs to return.
See design §15.3 for the hardware-day diagnosis.
Sync streaming¶
SyncDevice.stream(...) returns
a SyncStreamingSession — a
sync context manager and a sync iterator:
The sync wrapper enters and exits the underlying async
StreamingSession via SyncPortal.wrap_async_context_manager, not
portal.call(__aenter__). portal.call wraps each call in its own
CancelScope; StreamingSession.__aenter__ enters a long-lived task
group that outlives the entry call, so the nested scope hierarchy
becomes inconsistent at exit and raises
RuntimeError: Attempted to exit a cancel scope that isn't the current
task's current cancel scope on real hardware.
wrap_async_context_manager lets anyio own the portal-side scope for
the full CM lifetime, which is the fix. Design §15.3 has the full
narrative.
Streaming vs. record()¶
Two primitives, different use cases:
| Primitive | Timing source | Use when |
|---|---|---|
StreamingSession |
Device-driven; frames arrive when the device sends them | Highest rates (device's NCS-configured cadence), one device per port |
record() |
Host-driven absolute-target scheduler over poll() |
Multi-device acquisition, cadence chosen by host, sink integration via pipe() |
Both produce DataFrame values; both honour overflow policies; both
integrate with sinks (streaming via the iterator + user code;
record() via pipe()). The streaming runtime is the right choice
for high-rate single-device capture; record() is the right choice
for everything else, especially multi-device runs.
See logging.md for the recorder side and the sink ecosystem.