Skip to content

Logging and acquisition

For continuous data capture, watlowlib provides a recorder that samples a controller (or a WatlowManager of controllers) at a fixed rate, plus first-party sinks that persist the resulting Sample stream. The two halves connect through pipe(). See Design §6.

Recorder

record(...) is an async context manager. It schedules ticks on absolute targets (drift-free) and yields a stream of Sample batches.

import anyio
from watlowlib import WatlowManager, record

async def main() -> None:
    async with WatlowManager() as mgr:
        await mgr.add("oven_top", "/dev/ttyUSB0", address=1)
        await mgr.add("oven_bot", "/dev/ttyUSB0", address=2)
        async with record(
            mgr,
            parameters=["process_value", "setpoint"],
            rate_hz=2.0,
            duration=60.0,
        ) as stream:
            async for batch in stream:
                for sample in batch:
                    print(sample.device, sample.parameter, sample.value)

anyio.run(main)

record(...) accepts:

  • A Controller for a single device.
  • A WatlowManager for multi-device acquisition.
  • A custom PollSource — the protocol the recorder drives for adapters and tests.

Required: parameters (list of canonical names or aliases) and rate_hz. Optional: duration (seconds; None = unbounded), overflow, buffer_size, and auto_reconnect. See Design §6 for the full scheduling model.

Watlow polls a small group of parameters per device per tick, so a recorder tick produces N×M samples — one per (device, parameter) pair that succeeded. The shape is intentionally long-format so a SQLite file can carry mixed-vendor recordings (Watlow rows union with alicatlib rows under the same schema).

Sample

Sample wraps a single parameter read with timing provenance.

Field Notes
device Manager-assigned name (or controller label for solo recordings).
address Bus address (Std Bus 1..16, Modbus 1..247).
protocol ProtocolKind of the active session.
parameter Canonical parameter name (e.g. process_value).
parameter_id Registry parameter id.
instance 1-indexed loop / channel selector.
value Decoded scalar, or None on sensor-fail / overload.
unit Display string for the unit, or None (v1 leaves this None for every PM parameter — the registry doesn't carry per-row units yet).
t_mono_ns time.monotonic_ns midpoint of the round-trip. Canonical join key across streams.
t_utc (requested_at + received_at) / 2 — preferred timestamp for plots, tz-aware UTC.
t_midpoint_mono_ns Optional integration-window midpoint in monotonic ns. None for single polled reads.
requested_at UTC datetime just before the read leaves the host.
received_at UTC datetime just after the reply is decoded.
latency_s Poll round-trip seconds (precomputed).
raw Wire payload that produced the value. Tabular sinks drop it.

AcquisitionSummary

AcquisitionSummary is the shared summary shape used by the recorder log event and returned by pipe().

Field Notes
started_at Wall-clock at the first scheduled tick.
finished_at Wall-clock at producer shutdown.
samples_emitted For pipe(): samples handed to the sink. For recorder logs: per-tick batches pushed onto the stream.
samples_late Recorder-side ticks that missed their target slot or were dropped under the overflow policy; pipe() returns 0 for this field.
max_drift_ms Largest observed positive drift of an emitted batch.
disconnects Watlow connection drops absorbed under auto_reconnect=True; pipe() returns 0.

Sinks

A sink persists Samples. The protocol is small:

class SampleSink(Protocol):
    async def open(self) -> None: ...
    async def write_many(self, samples: Sequence[Sample]) -> None: ...
    async def close(self) -> None: ...
    # plus async context-manager dunder methods

First-party sinks all conform:

Sink Module Backing Needs extra
InMemorySink sinks/memory.py list (test/debug)
CsvSink sinks/csv.py stdlib csv
JsonlSink sinks/jsonl.py stdlib json
SqliteSink sinks/sqlite.py stdlib sqlite3 (WAL)
ParquetSink sinks/parquet.py pyarrow watlowlib[parquet]
PostgresSink sinks/postgres.py asyncpg watlowlib[postgres]

Optional-extra sinks lazy-import their backend; importing watlowlib.sinks.parquet without pyarrow installed raises WatlowSinkDependencyError with the install hint, not an opaque ImportError.

pipe(stream, sink)

pipe(...) is the v1 acquisition glue. It reads sample batches from the recorder, buffers up to batch_size (or flush_interval seconds, whichever comes first), and calls sink.write_many to flush. On stream exhaustion it drains any remaining buffer and returns the AcquisitionSummary.

async with (
    record(mgr, parameters=["process_value"], rate_hz=2.0, duration=60.0) as stream,
    SqliteSink("run.db") as sink,
):
    summary = await pipe(stream, sink, batch_size=64, flush_interval=1.0)
print(f"{summary.samples_emitted} samples, {summary.samples_late} late")

Row schema

sample_to_row(sample) is the canonical flatten. Schema is locked on the first batch so mixed success / error rows don't reshape the file.

Column Type Notes
device str Manager-assigned name.
address int Bus address.
protocol str stdbus / modbus_rtu.
parameter str Canonical parameter name.
parameter_id int Registry parameter id.
instance int 1-indexed loop / channel selector.
value float | int | str | null Coerced from Sample.value. Bools become "true"/"false" strings so SQLite type inference doesn't pin the column to INTEGER.
unit str | null Display unit; v1 emits None for every PM parameter.
t_mono_ns int Monotonic-ns join key.
t_utc str (ISO 8601) Acquisition-instant wall clock.
requested_at / received_at str (ISO 8601) I/O provenance per read.
latency_s float Poll round-trip seconds.

The Sample.raw payload is intentionally not in the row — bytes don't fit cleanly into CSV / JSONL / SQLite affinities. Callers that need raw consume Sample directly via InMemorySink.

The schema is intentionally compatible with alicatlib's sink schema where the columns make sense across both libraries — see examples/mixed_watlow_alicat_sqlite.py.

Backpressure

The recorder has a bounded internal queue. If a slow sink can't keep up, the recorder drops batches per the configured OverflowPolicy:

  • OverflowPolicy.BLOCK — await the slow consumer (default). Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than discarding samples. Effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up.
  • OverflowPolicy.DROP_NEWEST — drop the batch about to be enqueued. Counted on samples_late.
  • OverflowPolicy.DROP_OLDEST — evict the oldest queued batch and enqueue the newest. Counted on samples_late.

Drops are counted on the recorder summary's samples_late and emitted as structured log events through _logging.get_logger("streaming").

SQLite specifics

SqliteSink defaults to journal mode WAL, synchronous=NORMAL, and one BEGIN IMMEDIATECOMMIT per write_many call. Schema is locked on the first batch — later samples carrying an unknown column are dropped with a one-shot WARN log rather than reshaping the file.

SELECT device, parameter, AVG(value) FROM samples
GROUP BY device, parameter
ORDER BY device, parameter;

The same query works on a file that mixes Watlow and Alicat rows.

See also

  • Streaming — solo Controller and manager recordings.
  • ControllersController and Reading shape.
  • Sinks API — full reference.
  • TestingFakeTransport for sink unit tests.
  • Design §6 — scheduling, overflow, schema rationale.