Skip to content

servomexlib.sinks

The SampleSink protocol and the first-party sinks (memory / CSV / JSONL / SQLite / Parquet / Postgres).

servomexlib.sinks

Sinks — durable destinations for streamed :class:Sample rows.

The :class:SampleSink Protocol plus :func:pipe (the recorder→sink driver) and :func:sample_to_row (the long-format flattener). Core sinks are stdlib-only (memory/csv/jsonl/sqlite); :class:ParquetSink ([parquet]) and :class:PostgresSink ([postgres]) lazy-import their optional deps in :meth:open, raising :class:~servomexlib.errors.ServomexSinkDependencyError if the extra is missing — so importing this package never requires the extras.

CsvSink

CsvSink(path)

Single-run CSV writer with first-batch schema lock.

:meth:open truncates the destination; the first :meth:write_many writes a fresh header. Cross-run appending is intentionally not supported (a re-open with a different column shape would silently mismatch) — use :class:~servomexlib.sinks.jsonl.JsonlSink or :class:~servomexlib.sinks.sqlite.SqliteSink for append semantics.

Attributes:

Name Type Description
path Path

Destination file, created/overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first flush (None before).

Source code in src/servomexlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/servomexlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing (truncating). Idempotent.

Source code in src/servomexlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing (truncating). Idempotent."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows, locking the header on first call.

Source code in src/servomexlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows, locking the header on first call."""
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        for key in row.keys() - set(columns):
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column path=%s column=%s action=drop",
                    str(self._path),
                    key,
                )
        self._writer.writerow({k: row.get(k) for k in columns})
    self._file.flush()

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned), and :meth:close does not clear it — the point is post-run inspection.

Source code in src/servomexlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/servomexlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/servomexlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/servomexlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

On-disk format is <sample-row-as-json>\n; read back with [json.loads(line) for line in f]. Re-opening the same path extends it.

Source code in src/servomexlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/servomexlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing in append mode. Idempotent.

Source code in src/servomexlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing in append mode. Idempotent."""
    if self._file is not None:
        return
    self._file = self._path.open("a", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/servomexlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        self._file.write(json.dumps(sample_to_row(sample), ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/servomexlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(f"row_group_size must be >= 1 if set, got {row_group_size!r}")
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/servomexlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close path=%s rows_written=%s", str(self._path), self._rows_written
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The ParquetWriter itself opens lazily on the first :meth:write_many, when the concrete schema is known.

Source code in src/servomexlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The ``ParquetWriter`` itself opens lazily on the first
    :meth:`write_many`, when the concrete schema is known.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open path=%s compression=%s", str(self._path), self._compression
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

Source code in src/servomexlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group."""
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}
    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(table, row_group_size=self._row_group_size)
    except Exception as exc:
        raise ServomexSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}"
        ) from exc
    self._rows_written += len(projected)

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided (mutually exclusive). Credentials are never logged.

target

target()

Return a log-safe host:port/db.schema.table (no password).

Source code in src/servomexlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe ``host:port/db.schema.table`` (no password)."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/servomexlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/servomexlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Source code in src/servomexlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table."""
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "servomexlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise ServomexSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}"
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_table=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_table,
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/servomexlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples = [
        tuple(self._schema.project(row)[spec.name] for spec in columns) for row in rows
    ]

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except ServomexSinkWriteError:
        raise
    except Exception as exc:
        raise ServomexSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}"
        ) from exc
    self._rows_written += len(projected_tuples)

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Concrete sinks own their storage handle and typically follow:

  1. await sink.open() — allocate file descriptors / DB connections.
  2. await sink.write_many(samples) — one or more times.
  3. await sink.close() — flush and release (idempotent).

The async context-manager methods give the async with sink: shape.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/servomexlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/servomexlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/servomexlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/servomexlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(samples)

Append samples to the sink (Sequence so the sink sees len).

Source code in src/servomexlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink (``Sequence`` so the sink sees ``len``)."""
    ...

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file, created on :meth:open.

table str

Target table name (validated).

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before flush.

Source code in src/servomexlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/servomexlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info("sinks.sqlite.close path=%s table=%s", str(self._path), self._table)

open async

open()

Open the connection, apply PRAGMAs, and introspect the target. Idempotent.

Source code in src/servomexlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the connection, apply PRAGMAs, and introspect the target. Idempotent."""
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open path=%s table=%s journal_mode=%s synchronous=%s",
        str(self._path),
        self._table,
        self._journal_mode,
        self._synchronous,
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction (parameterised).

Source code in src/servomexlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction (parameterised)."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected = [
        tuple(self._schema.project(row)[spec.name] for spec in columns) for row in rows
    ]
    await run_sync(self._executemany_blocking, projected)

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates individual :class:Sample\ s. A flush happens when the buffer reaches batch_size or flush_interval seconds have elapsed since the last flush, whichever first. On stream exhaustion any leftover buffer is flushed before returning.

Parameters:

Name Type Description Default
stream AsyncIterator[Sequence[Sample]]

The async iterator yielded by :func:~servomexlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Time threshold in seconds between flushes.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to the count

AcquisitionSummary

handed to the sink (the sink-side view; samples_late / drift stay 0).

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval <= 0.

Source code in src/servomexlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Sequence[Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates individual
    :class:`Sample`\ s. A flush happens when the buffer reaches ``batch_size`` or
    ``flush_interval`` seconds have elapsed since the last flush, whichever first.
    On stream exhaustion any leftover buffer is flushed before returning.

    Args:
        stream: The async iterator yielded by :func:`~servomexlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Time threshold in seconds between flushes.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to the count
        handed to the sink (the sink-side view; ``samples_late`` / drift stay 0).

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done sink=%s samples_emitted=%s duration_s=%.3f",
        type(sink).__name__,
        emitted,
        (finished_at - started_at).total_seconds(),
    )
    return AcquisitionSummary(
        started_at=started_at, finished_at=finished_at, samples_emitted=emitted
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Long-format schema (one row per channel read), stable across all in-tree sinks. Error rows (a dropped/corrupt frame — :attr:Sample.error set, :attr:Sample.reading None) carry None channel fields and a string error so a resync is still recorded rather than lost.

The sample's raw payload is intentionally not in the row: bytes do not fit CSV / JSONL / SQLite affinities. Callers needing raw consume the :class:Sample directly via :class:~servomexlib.sinks.memory.InMemorySink.

Source code in src/servomexlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Long-format schema (one row per channel read), stable across all in-tree
    sinks. Error rows (a dropped/corrupt frame — :attr:`Sample.error` set,
    :attr:`Sample.reading` ``None``) carry ``None`` channel fields and a string
    ``error`` so a resync is still recorded rather than lost.

    The sample's ``raw`` payload is intentionally **not** in the row: bytes do not
    fit CSV / JSONL / SQLite affinities. Callers needing ``raw`` consume the
    :class:`Sample` directly via :class:`~servomexlib.sinks.memory.InMemorySink`.
    """
    reading = sample.reading
    channel = sample.channel.value if sample.channel is not None else None
    status = reading.status if reading is not None else None
    return {
        "device": sample.device,
        "channel": channel,
        "kind": reading.kind.value if reading is not None else None,
        "name": reading.name if reading is not None else None,
        "value": reading.value if reading is not None else None,
        "unit": reading.unit.value if reading is not None else None,
        "ok": _coerce_bool(status.ok) if status is not None else None,
        "fault": _coerce_bool(status.fault) if status is not None else None,
        "maintenance": _coerce_bool(status.maintenance) if status is not None else None,
        "calibrating": _coerce_bool(status.calibrating) if status is not None else None,
        "warming_up": _coerce_bool(status.warming_up) if status is not None else None,
        "protocol": sample.protocol.value,
        "monotonic_ns": sample.monotonic_ns,
        "received_at": sample.received_at.isoformat(),
        "requested_at": (
            sample.requested_at.isoformat() if sample.requested_at is not None else None
        ),
        "latency_s": sample.latency_s,
        "error": str(sample.error) if sample.error is not None else None,
    }