Logging and acquisition¶
For continuous data capture, watlowlib provides a recorder that
samples a controller (or a WatlowManager of controllers) at a fixed
rate, plus first-party sinks that persist the resulting Sample
stream. The two halves connect through pipe(). See
Design §6.
Recorder¶
record(...) is an async
context manager. It schedules ticks on absolute targets (drift-free)
and yields a stream of Sample batches.
import anyio
from watlowlib import WatlowManager, record
async def main() -> None:
async with WatlowManager() as mgr:
await mgr.add("oven_top", "/dev/ttyUSB0", address=1)
await mgr.add("oven_bot", "/dev/ttyUSB0", address=2)
async with record(
mgr,
parameters=["process_value", "setpoint"],
rate_hz=2.0,
duration=60.0,
) as stream:
async for batch in stream:
for sample in batch:
print(sample.device, sample.parameter, sample.value)
anyio.run(main)
record(...) accepts:
- A
Controllerfor a single device. - A
WatlowManagerfor multi-device acquisition. - A custom
PollSource— the protocol the recorder drives for adapters and tests.
Required: parameters (list of canonical names or aliases) and
rate_hz. Optional: duration (seconds; None = unbounded),
overflow_policy. See Design §6 for the full scheduling
model.
Watlow polls a small group of parameters per device per tick, so a recorder tick produces N×M samples — one per (device, parameter) pair that succeeded. The shape is intentionally long-format so a SQLite file can carry mixed-vendor recordings (Watlow rows union with alicatlib rows under the same schema).
Sample¶
Sample wraps a single
parameter read with timing provenance.
| Field | Notes |
|---|---|
device |
Manager-assigned name (or controller label for solo recordings). |
address |
Bus address (Std Bus 1..16, Modbus 1..247). |
protocol |
ProtocolKind of the active session. |
parameter |
Canonical parameter name (e.g. process_value). |
parameter_id |
Registry parameter id. |
instance |
1-indexed loop / channel selector. |
value |
Decoded scalar, or None on sensor-fail / overload. |
unit |
Display string for the unit, or None (v1 leaves this None for every PM parameter — the registry doesn't carry per-row units yet). |
monotonic_ns |
time.monotonic_ns at the read site (drift analysis only). |
requested_at |
UTC datetime just before the read leaves the host. |
received_at |
UTC datetime just after the reply is decoded. |
midpoint_at |
(requested_at + received_at) / 2 — preferred timestamp for plots. |
latency_s |
Poll round-trip seconds (precomputed). |
raw |
Wire payload that produced the value. Tabular sinks drop it. |
AcquisitionSummary¶
AcquisitionSummary is yielded by record() on
exit and returned by pipe().
| Field | Notes |
|---|---|
started_at |
Wall-clock at the first scheduled tick. |
finished_at |
Wall-clock at producer shutdown. |
samples_emitted |
Total samples pushed onto the stream (= ticks × parameters × devices on a clean run). |
samples_late |
Samples that missed their target slot or were dropped under the overflow policy. |
max_drift_ms |
Largest observed positive drift of an emitted batch. |
target_total_samples |
Scheduled sample count for finite-duration runs, or None for open-ended runs. |
Sinks¶
A sink persists Samples. The protocol is small:
class SampleSink(Protocol):
async def open(self) -> None: ...
async def write_many(self, samples: Sequence[Sample]) -> None: ...
async def close(self) -> None: ...
# plus async context-manager dunder methods
First-party sinks all conform:
| Sink | Module | Backing | Needs extra |
|---|---|---|---|
InMemorySink |
sinks/memory.py | list (test/debug) | — |
CsvSink |
sinks/csv.py | stdlib csv |
— |
JsonlSink |
sinks/jsonl.py | stdlib json |
— |
SqliteSink |
sinks/sqlite.py | stdlib sqlite3 (WAL) |
— |
ParquetSink |
sinks/parquet.py | pyarrow |
watlowlib[parquet] |
PostgresSink |
sinks/postgres.py | asyncpg |
watlowlib[postgres] |
Optional-extra sinks lazy-import their backend; importing
watlowlib.sinks.parquet without pyarrow installed raises
WatlowSinkDependencyError with the install hint, not an opaque
ImportError.
pipe(stream, sink)¶
pipe(...) is the v1 acquisition
glue. It reads sample batches from the recorder, buffers up to
batch_size (or flush_interval seconds, whichever comes first), and
calls sink.write_many to flush. On stream exhaustion it drains any
remaining buffer and returns the AcquisitionSummary.
async with (
record(mgr, parameters=["process_value"], rate_hz=2.0, duration=60.0) as stream,
SqliteSink("run.db") as sink,
):
summary = await pipe(stream, sink, batch_size=64, flush_interval=1.0)
print(f"{summary.samples_emitted} samples, {summary.samples_late} late")
Row schema¶
sample_to_row(sample) is the
canonical flatten. Schema is locked on the first batch so mixed
success / error rows don't reshape the file.
| Column | Type | Notes |
|---|---|---|
device |
str | Manager-assigned name. |
address |
int | Bus address. |
protocol |
str | stdbus / modbus_rtu. |
parameter |
str | Canonical parameter name. |
parameter_id |
int | Registry parameter id. |
instance |
int | 1-indexed loop / channel selector. |
value |
float | int | str | null | Coerced from Sample.value. Bools become "true"/"false" strings so SQLite type inference doesn't pin the column to INTEGER. |
unit |
str | null | Display unit; v1 emits None for every PM parameter. |
requested_at / received_at / midpoint_at |
str (ISO 8601) | Wall-clock timing per read. |
latency_s |
float | Poll round-trip seconds. |
The Sample.raw payload is intentionally not in the row — bytes
don't fit cleanly into CSV / JSONL / SQLite affinities. Callers that
need raw consume Sample directly via
InMemorySink.
The schema is intentionally compatible with
alicatlib's sink schema
where the columns make sense across both libraries — see
examples/mixed_watlow_alicat_sqlite.py.
Backpressure¶
The recorder has a bounded internal queue. If a slow sink can't keep
up, the recorder drops batches per the configured OverflowPolicy:
OverflowPolicy.BLOCK— await the slow consumer (default). Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than discarding samples. Effective sample rate drops to the consumer's drain rate;samples_lateaccrues once the consumer catches up.OverflowPolicy.DROP_NEWEST— drop the batch about to be enqueued. Counted onsamples_late.
Drops are counted on AcquisitionSummary.samples_late and emitted as
structured log events through _logging.get_logger("streaming").
SQLite specifics¶
SqliteSink defaults to journal mode WAL, synchronous=NORMAL, and
one BEGIN IMMEDIATE … COMMIT per write_many call. Schema is
locked on the first batch — later samples carrying an unknown column
are dropped with a one-shot WARN log rather than reshaping the file.
SELECT device, parameter, AVG(value) FROM samples
GROUP BY device, parameter
ORDER BY device, parameter;
The same query works on a file that mixes Watlow and Alicat rows.
See also¶
- Streaming — solo
Controllerand manager recordings. - Controllers —
ControllerandReadingshape. - Sinks API — full reference.
- Testing —
FakeTransportfor sink unit tests. - Design §6 — scheduling, overflow, schema rationale.