Skip to content

Sinks

Durable consumers of DaqBlock / DaqReading data. Every sink is an async context manager satisfying the sink Protocol. See the Sinks guide for choosing among them and the Raw logging guide for the loss-proof .dt-raw path.

Sink Protocol and helpers

dtollib.sinks.base

Sink Protocols, row helpers, and pipe drivers.

Three Protocols, one per input shape:

  • :class:ReadingSink — accepts :class:DaqReading sequences.
  • :class:BlockSink — accepts one :class:DaqBlock per call. No batching axis; a block is already (n_channels, n_samples).
  • :class:RawBlockSink — accepts raw HBUF payloads (:class:RawCountsSink).

Two drivers thread streams to sinks:

  • :func:pipe — row-oriented, batched.
  • :func:pipe_blocks — block-native, no batching axis.

Row helpers convert acquisition records into row dicts for the tabular sinks (SQLite / Parquet / Postgres):

  • :func:reading_to_row — flatten :class:DaqReading into one row.
  • :func:block_to_rows — explicit per-(channel, sample) scalarisation of a :class:DaqBlock into row dicts. Distinct from :func:~dtollib.tasks.models.block_to_long_rows, which yields typed :class:~dtollib.tasks.models.DaqSample objects; this one yields plain dicts ready for a tabular sink. Never invoked automatically.

BlockSink

Bases: Protocol

Sink that consumes one :class:DaqBlock per call.

A block is already (n_channels, n_samples) — wrapping it in a sequence per call would burn allocations in the hot path. Sinks that need scalar rows opt in via :func:block_to_rows.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/dtollib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/dtollib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/dtollib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource.

Source code in src/dtollib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write async

write(block)

Append one :class:DaqBlock as a row group / file segment / row.

Source code in src/dtollib/sinks/base.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a row group / file segment / row."""
    ...

RawBlockSink

Bases: Protocol

Sink that consumes raw HBUF payloads — :class:RawCountsSink only.

Distinct from :class:BlockSink because the raw-counts path carries a fundamentally different payload (bytes from olDmGetBufferPtr) than the converted :class:DaqBlock. Typing them as separate Protocols prevents accidental miswiring at sink-attach time.

Design reference: docs/design.md §15.1.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/dtollib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/dtollib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/dtollib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource.

Source code in src/dtollib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_raw async

write_raw(block)

Append one raw-counts payload — typically block.raw_codes.

Source code in src/dtollib/sinks/base.py
async def write_raw(self, block: DaqBlock) -> None:
    """Append one raw-counts payload — typically ``block.raw_codes``."""
    ...

ReadingSink

Bases: Protocol

Sink that consumes :class:DaqReading sequences.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/dtollib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/dtollib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/dtollib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/dtollib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(items)

Append items to the sink.

Source code in src/dtollib/sinks/base.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append ``items`` to the sink."""
    ...

block_to_rows

block_to_rows(block)

Unroll a :class:DaqBlock into one row per (channel, sample).

Per-sample timestamps reconstruct from block.t_mono_ns, block.block_period_ns, and block.first_sample_index. When the block has no clock period, samples are spaced uniformly within the read window.

Each row carries:

  • device, task, channel — join keys.
  • block_index, sample_index — block- and task-level indices.
  • t_mono_ns — reconstructed monotonic nanoseconds for this sample.
  • t_utc — reconstructed wall-clock (ISO 8601) for this sample.
  • value — the scalar sample value.
  • unit — engineering unit for the channel (or None).
  • error_type / error_message — populated only on error blocks.

Distinct from :func:~dtollib.tasks.models.block_to_long_rows (which yields typed :class:~dtollib.tasks.models.DaqSample objects); this one yields plain dicts ready for a tabular sink.

Source code in src/dtollib/sinks/base.py
def block_to_rows(block: DaqBlock) -> list[dict[str, float | int | str | bool | None]]:
    """Unroll a :class:`DaqBlock` into one row per ``(channel, sample)``.

    Per-sample timestamps reconstruct from ``block.t_mono_ns``,
    ``block.block_period_ns``, and ``block.first_sample_index``. When the
    block has no clock period, samples are spaced uniformly within the read
    window.

    Each row carries:

    - ``device``, ``task``, ``channel`` — join keys.
    - ``block_index``, ``sample_index`` — block- and task-level indices.
    - ``t_mono_ns`` — reconstructed monotonic nanoseconds for this sample.
    - ``t_utc`` — reconstructed wall-clock (ISO 8601) for this sample.
    - ``value`` — the scalar sample value.
    - ``unit`` — engineering unit for the channel (or ``None``).
    - ``error_type`` / ``error_message`` — populated only on error blocks.

    Distinct from :func:`~dtollib.tasks.models.block_to_long_rows` (which
    yields typed :class:`~dtollib.tasks.models.DaqSample` objects); this one
    yields plain dicts ready for a tabular sink.
    """
    from datetime import timedelta  # noqa: PLC0415

    n_channels = len(block.channels)
    n_samples = block.samples_per_channel
    period_ns = block.block_period_ns
    if period_ns is None:
        span_ns = int((block.read_finished_at - block.read_started_at).total_seconds() * 1e9)
        period_ns = span_ns // max(1, n_samples)

    err = block.error
    err_type = f"{type(err).__module__}.{type(err).__qualname__}" if err is not None else None
    err_msg = str(err) if err is not None else None

    rows: list[dict[str, float | int | str | bool | None]] = []
    for c in range(n_channels):
        ch_name = block.channels[c]
        unit = block.units.get(ch_name)
        for k in range(n_samples):
            absolute = block.first_sample_index + k
            sample_t_mono_ns = block.t_mono_ns + k * period_ns
            sample_t_utc = block.t_utc + timedelta(microseconds=(k * period_ns) / 1_000)
            rows.append(
                {
                    "device": block.device,
                    "task": block.task,
                    "channel": ch_name,
                    "block_index": block.block_index,
                    "sample_index": absolute,
                    "t_mono_ns": sample_t_mono_ns,
                    "t_utc": sample_t_utc.isoformat(),
                    "value": float(block.data[c, k]),
                    "unit": unit,
                    "error_type": err_type,
                    "error_message": err_msg,
                }
            )
    return rows

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval_s=1.0)

Drain a :class:DaqReading stream into a row-oriented sink with buffered flushes.

Reads records from stream and accumulates them into a list. A flush is triggered when either the buffer reaches batch_size or flush_interval_s elapses since the last flush. On stream exhaustion the leftover buffer is flushed before returning.

Parameters:

Name Type Description Default
stream AsyncIterator[DaqReading]

Async iterator of :class:DaqReading records.

required
sink ReadingSink

An open :class:ReadingSink. The sink's write_many is awaited per flush.

required
batch_size int

Records per flush.

64
flush_interval_s float

Wall-clock seconds between flushes.

1.0

Returns:

Type Description
int

Total records actually handed to the sink.

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval_s <= 0.

Source code in src/dtollib/sinks/base.py
async def pipe(
    stream: AsyncIterator[DaqReading],
    sink: ReadingSink,
    *,
    batch_size: int = 64,
    flush_interval_s: float = 1.0,
) -> int:
    """Drain a :class:`DaqReading` stream into a row-oriented sink with buffered flushes.

    Reads records from ``stream`` and accumulates them into a list. A flush
    is triggered when either the buffer reaches ``batch_size`` or
    ``flush_interval_s`` elapses since the last flush. On stream exhaustion
    the leftover buffer is flushed before returning.

    Args:
        stream: Async iterator of :class:`DaqReading` records.
        sink: An open :class:`ReadingSink`. The sink's ``write_many`` is
            awaited per flush.
        batch_size: Records per flush.
        flush_interval_s: Wall-clock seconds between flushes.

    Returns:
        Total records actually handed to the sink.

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval_s <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval_s <= 0:
        raise ValueError(f"flush_interval_s must be > 0, got {flush_interval_s!r}")

    emitted = 0
    buffer: list[DaqReading] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for record in stream:
        buffer.append(record)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval_s:
            await _flush()
            last_flush = now

    await _flush()
    _logger.info("sinks.pipe_done", extra={"records_emitted": emitted})
    return emitted

pipe_blocks async

pipe_blocks(stream, sink, *, flush_interval_s=None)

Drain a block stream into a :class:BlockSink.

No batching axis — blocks are already batched. flush_interval_s is accepted for API symmetry with :func:pipe but currently unused; sinks that need a periodic flush implement their own.

Returns:

Type Description
int

Total blocks written.

Source code in src/dtollib/sinks/base.py
async def pipe_blocks(
    stream: AsyncIterator[DaqBlock],
    sink: BlockSink,
    *,
    flush_interval_s: float | None = None,
) -> int:
    """Drain a block stream into a :class:`BlockSink`.

    No batching axis — blocks are already batched. ``flush_interval_s`` is
    accepted for API symmetry with :func:`pipe` but currently unused; sinks
    that need a periodic flush implement their own.

    Returns:
        Total blocks written.
    """
    del flush_interval_s  # reserved for future per-sink flush helpers
    emitted = 0
    async for block in stream:
        await sink.write(block)
        emitted += 1
    _logger.info("sinks.pipe_blocks_done", extra={"blocks_emitted": emitted})
    return emitted

reading_to_row

reading_to_row(reading)

Flatten a :class:DaqReading into a single row dict.

Layout:

  • device, task — join keys.
  • t_mono_ns — int, canonical monotonic join key.
  • t_utc — ISO 8601, wall-clock acquisition midpoint.
  • t_midpoint_mono_ns — int or None (integration-window midpoint).
  • requested_at / received_at — ISO 8601, I/O provenance.
  • latency_s — float seconds.
  • one column per channel (values keys), values flattened.
  • one <channel>_unit column per channel.
  • error_type / error_message — populated only on error rows.

The same row layout is used by every row-oriented (tabular) sink.

Source code in src/dtollib/sinks/base.py
def reading_to_row(reading: DaqReading) -> dict[str, float | int | str | bool | None]:
    """Flatten a :class:`DaqReading` into a single row dict.

    Layout:

    - ``device``, ``task`` — join keys.
    - ``t_mono_ns`` — int, canonical monotonic join key.
    - ``t_utc`` — ISO 8601, wall-clock acquisition midpoint.
    - ``t_midpoint_mono_ns`` — int or None (integration-window midpoint).
    - ``requested_at`` / ``received_at`` — ISO 8601, I/O provenance.
    - ``latency_s`` — float seconds.
    - one column per channel (``values`` keys), values flattened.
    - one ``<channel>_unit`` column per channel.
    - ``error_type`` / ``error_message`` — populated only on error rows.

    The same row layout is used by every row-oriented (tabular) sink.
    """
    row: dict[str, float | int | str | bool | None] = {
        "device": reading.device,
        "task": reading.task,
        "t_mono_ns": reading.t_mono_ns,
        "t_utc": reading.t_utc.isoformat(),
        "t_midpoint_mono_ns": reading.t_midpoint_mono_ns,
        "requested_at": reading.requested_at.isoformat(),
        "received_at": reading.received_at.isoformat(),
        "latency_s": reading.latency_s,
    }
    row.update(reading.values)
    row.update({f"{ch}_unit": unit for ch, unit in reading.units.items()})
    err = reading.error
    if err is not None:
        row["error_type"] = f"{type(err).__module__}.{type(err).__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row

CSV

dtollib.sinks.csv

CSV sink — one row per :class:DaqReading, with optional block support.

Refuses :class:DaqBlock by default to prevent accidental 1-GB CSVs at 10 kHz × 8 channels. Pass accept_blocks=True to enable the block_to_long_rows per-sample explosion.

Design reference: docs/design.md §15.1.

CsvSink

CsvSink(path, *, accept_blocks=False)

Write :class:DaqReading rows (and optionally per-sample block rows) to CSV.

Parameters:

Name Type Description Default
path Path | str

Output file path.

required
accept_blocks bool

When True, write(block) explodes the block into per-(channel, sample) rows. Default False — block writes raise :class:DtolSinkError.

False
Source code in src/dtollib/sinks/csv.py
def __init__(self, path: Path | str, *, accept_blocks: bool = False) -> None:
    self._path = Path(path)
    self._accept_blocks = accept_blocks
    self._fh: Any = None
    self._writer: Any = None
    self._header_written = False

close async

close()

Flush and close the file. Idempotent.

Source code in src/dtollib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the file. Idempotent."""
    if self._fh is not None:
        self._fh.flush()
        self._fh.close()
        self._fh = None
        self._writer = None

open async

open()

Open the file for writing.

Source code in src/dtollib/sinks/csv.py
async def open(self) -> None:
    """Open the file for writing."""
    if self._fh is not None:
        return
    self._fh = self._path.open("w", newline="", encoding="utf-8")
    self._writer = csv.writer(self._fh)

write async

write(block)

Block path — refused by default; explodes per-sample when enabled.

Source code in src/dtollib/sinks/csv.py
async def write(self, block: DaqBlock) -> None:
    """Block path — refused by default; explodes per-sample when enabled."""
    if not self._accept_blocks:
        raise DtolSinkError(
            "CsvSink refuses blocks by default; pass accept_blocks=True "
            "(see docs/design.md §15.1 — prevents 1-GB CSVs)",
            context=ErrorContext(operation="CsvSink.write"),
        )
    if self._writer is None:
        raise DtolSinkError(
            "CsvSink: write before open()",
            context=ErrorContext(operation="CsvSink.write"),
        )
    from dtollib.tasks.models import block_to_long_rows  # noqa: PLC0415

    for sample in block_to_long_rows(block):
        self._write_row(sample.to_dict())

write_many async

write_many(items)

Append every reading as one CSV row.

Source code in src/dtollib/sinks/csv.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append every reading as one CSV row."""
    if self._writer is None:
        raise DtolSinkError(
            "CsvSink: write_many before open()",
            context=ErrorContext(operation="CsvSink.write_many"),
        )
    for reading in items:
        row = reading.to_dict()
        self._write_row(row)

JSONL

dtollib.sinks.jsonl

JSONL sink — one JSON object per :class:DaqReading (or per sample for blocks).

Refuses :class:DaqBlock by default; accept_blocks=True enables block_to_long_rows per-sample explosion.

Design reference: docs/design.md §15.1.

JsonlSink

JsonlSink(path, *, accept_blocks=False)

Write one JSON object per line.

Parameters:

Name Type Description Default
path Path | str

Output file path.

required
accept_blocks bool

When True, write(block) explodes via :func:block_to_long_rows. Default False.

False
Source code in src/dtollib/sinks/jsonl.py
def __init__(self, path: Path | str, *, accept_blocks: bool = False) -> None:
    self._path = Path(path)
    self._accept_blocks = accept_blocks
    self._fh: Any = None

close async

close()

Flush and close the file. Idempotent.

Source code in src/dtollib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the file. Idempotent."""
    if self._fh is not None:
        self._fh.flush()
        self._fh.close()
        self._fh = None

open async

open()

Open the file for writing.

Source code in src/dtollib/sinks/jsonl.py
async def open(self) -> None:
    """Open the file for writing."""
    if self._fh is not None:
        return
    self._fh = self._path.open("w", encoding="utf-8")

write async

write(block)

Block path — refused by default; explodes per-sample when enabled.

Source code in src/dtollib/sinks/jsonl.py
async def write(self, block: DaqBlock) -> None:
    """Block path — refused by default; explodes per-sample when enabled."""
    if not self._accept_blocks:
        raise DtolSinkError(
            "JsonlSink refuses blocks by default; pass accept_blocks=True",
            context=ErrorContext(operation="JsonlSink.write"),
        )
    if self._fh is None:
        raise DtolSinkError(
            "JsonlSink: write before open()",
            context=ErrorContext(operation="JsonlSink.write"),
        )
    from dtollib.tasks.models import block_to_long_rows  # noqa: PLC0415

    for sample in block_to_long_rows(block):
        self._fh.write(json.dumps(sample.to_dict(), default=str))
        self._fh.write("\n")

write_many async

write_many(items)

Append every reading as one JSON object on its own line.

Source code in src/dtollib/sinks/jsonl.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append every reading as one JSON object on its own line."""
    if self._fh is None:
        raise DtolSinkError(
            "JsonlSink: write_many before open()",
            context=ErrorContext(operation="JsonlSink.write_many"),
        )
    for reading in items:
        self._fh.write(json.dumps(reading.to_dict(), default=str))
        self._fh.write("\n")

Parquet

dtollib.sinks.parquet

Parquet sink — :mod:pyarrow, row groups per block, zstd by default.

The preferred sink for hardware-clocked acquisition. One row group per :meth:write call (one block) — a crash mid-run loses at most the current block.

Shape-locking. The first call to either :meth:write or :meth:write_many locks the schema. Mixing record shapes after the first write raises :class:~dtollib.errors.DtolSinkSchemaError.

Block layout (long-format) — one row per (channel, sample), with t_mono_ns / t_utc per row reconstructed via :func:dtollib.sinks.base.block_to_rows.

pyarrow is an optional dependency behind dtollib[parquet]; the import defers to :meth:open so instantiating the sink succeeds on bare-core installs.

Design reference: docs/design.md §15.1.

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Parquet writer with first-write shape lock.

Parameters:

Name Type Description Default
path str | Path

Destination Parquet file.

required
compression _Compression

Codec for every row group. zstd matches or beats snappy on speed with ~30% better ratios.

'zstd'
use_dictionary bool

Dictionary encoding for string columns.

True
row_group_size int | None

Optional max rows per row group. None lets pyarrow batch the whole call.

None
Source code in src/dtollib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(f"row_group_size must be >= 1 if set, got {row_group_size!r}")
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._shape: Literal["readings", "blocks"] | None = None
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first write.

compression property

compression

Configured compression codec.

path property

path

Destination Parquet file path.

shape property

shape

Locked record shape, or None before first write.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/dtollib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={"path": str(self._path), "rows_written": self._rows_written},
    )

open async

open()

Load pyarrow and create the parent directory.

The :class:pyarrow.parquet.ParquetWriter itself is opened lazily on the first write — we don't have the schema until then.

Source code in src/dtollib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory.

    The :class:`pyarrow.parquet.ParquetWriter` itself is opened lazily
    on the first write — we don't have the schema until then.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={"path": str(self._path), "compression": self._compression},
    )

write async

write(block)

Append one :class:DaqBlock as a row group of long-format rows.

Long-format layout: one row per (channel, sample). The block_index / sample_index columns let consumers re-aggregate efficiently.

Source code in src/dtollib/sinks/parquet.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a row group of long-format rows.

    Long-format layout: one row per (channel, sample). The
    ``block_index`` / ``sample_index`` columns let consumers
    re-aggregate efficiently.
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write called before open()")
    self._lock_or_check_shape("blocks")
    self._write_rows(block_to_rows(block))

write_many async

write_many(items)

Append :class:DaqReading rows.

First call locks the schema and the record shape. Mixing shapes afterwards raises :class:~dtollib.errors.DtolSinkSchemaError.

Source code in src/dtollib/sinks/parquet.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append :class:`DaqReading` rows.

    First call locks the schema and the record shape. Mixing shapes
    afterwards raises :class:`~dtollib.errors.DtolSinkSchemaError`.
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not items:
        return
    self._lock_or_check_shape("readings")
    self._write_rows([reading_to_row(r) for r in items])

SQLite

dtollib.sinks.sqlite

SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.

Accepts :class:DaqReading via :meth:write_many and :class:DaqBlock via :meth:write — one row per block (summary, no scalarisation). Readings and block summaries go to separate tables: readings / blocks by default. Override via the table_* arguments.

The sqlite3 driver is synchronous; calls go through :func:anyio.to_thread.run_sync so the event loop stays responsive.

Best-practice defaults:

  • journal_mode=WAL + synchronous=NORMAL.
  • busy_timeout=5000 ms.
  • One BEGIN IMMEDIATECOMMIT per write_many / write.
  • SQL identifiers validated against ^[A-Za-z_][A-Za-z0-9_]{0,62}$; values always parameterised.

Design reference: docs/design.md §15.1.

SqliteSink

SqliteSink(
    path,
    *,
    table_readings="readings",
    table_blocks="blocks",
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and per-table schema lock.

One sink instance routes records to up to two tables, one per shape (readings / blocks). Each table's column set is locked on its first write; later writes project onto the locked schema and drop unknown columns with a one-shot WARN.

Parameters:

Name Type Description Default
path str | Path

Destination SQLite file.

required
table_readings str

Table name for :class:DaqReading rows.

'readings'
table_blocks str

Table name for :class:DaqBlock summary rows.

'blocks'
journal_mode _JournalMode

SQLite journal mode pragma. WAL is recommended.

'WAL'
synchronous _Synchronous

SQLite synchronous pragma. NORMAL balances durability and throughput.

'NORMAL'
busy_timeout_ms int

SQLite busy-wait, in milliseconds.

5000
Source code in src/dtollib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table_readings: str = "readings",
    table_blocks: str = "blocks",
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table_readings = _validate_identifier(table_readings, label="table_readings")
    self._table_blocks = _validate_identifier(table_blocks, label="table_blocks")
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._lock_readings = SchemaLock(sink_name="sqlite.readings", logger=_logger)
    self._lock_blocks = SchemaLock(sink_name="sqlite.blocks", logger=_logger)
    self._insert_readings: str | None = None
    self._insert_blocks: str | None = None

path property

path

Destination SQLite file path.

close async

close()

Close the connection. Idempotent.

Source code in src/dtollib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info("sinks.sqlite.close", extra={"path": str(self._path)})

open async

open()

Open the SQLite connection and apply pragmas.

Source code in src/dtollib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection and apply pragmas."""
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={"path": str(self._path), "journal_mode": self._journal_mode},
    )

write async

write(block)

Append one :class:DaqBlock as a summary row.

Source code in src/dtollib/sinks/sqlite.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a summary row."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write called before open()")
    await self._insert_rows(
        [_block_summary_row(block)],
        table=self._table_blocks,
        lock=self._lock_blocks,
        kind="blocks",
    )

write_many async

write_many(items)

Append :class:DaqReading rows.

Source code in src/dtollib/sinks/sqlite.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append :class:`DaqReading` rows."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not items:
        return
    rows = [reading_to_row(r) for r in items]
    await self._insert_rows(
        rows, table=self._table_readings, lock=self._lock_readings, kind="readings"
    )

Postgres

dtollib.sinks.postgres

PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.

:class:PostgresSink writes :class:DaqReading rows via :meth:write_many, and :class:DaqBlock summary rows via :meth:write — one row per shape, routed to a per-shape table (readings / blocks by default). asyncpg is an optional dependency behind dtollib[postgres]; the import is deferred to :meth:open so instantiation works on bare-core installs and :class:~dtollib.errors.DtolSinkDependencyError is raised only when the user actually tries to open a connection.

Best-practice defaults baked in:

  • Binary COPY via :meth:asyncpg.Connection.copy_records_to_table. COPY is ~5-10x faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copy to False to fall back to a prepared executemany.
  • Connection pool via :func:asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases.
  • Identifier validation on schema and every table name (strict regex). Every value passes through $N placeholders — never string-formatted into SQL.
  • Credential scrubbing — log lines that reference the connection use :meth:PostgresConfig.target, which never includes the password.
  • statement_timeout applied as a server setting so a wedged query cannot block the acquisition loop forever.

Schema evolution mirrors the other tabular sinks. create_tables=False reads the target tables' columns from information_schema.columns on open and locks each per-shape schema to that set. create_tables=True switches to first-batch inference and runs CREATE TABLE IF NOT EXISTS per shape.

Design reference: docs/design.md §15.1.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table_readings="readings",
    table_blocks="blocks",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_tables=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string. Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table_readings str

Table name for :class:DaqReading rows.

table_blocks str

Table name for :class:DaqBlock summary rows.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout.

create_tables bool

If True, infer per-shape schemas from the first batch of each kind and run CREATE TABLE IF NOT EXISTS. If False (default), require the tables to exist and lock per-shape schemas from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable to fall back to prepared executemany.

target

target()

Return a log-safe URI describing the connection target.

Source code in src/dtollib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe URI describing the connection target."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
        scheme = parsed.scheme or "postgres"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
        scheme = "postgres"
    return urlunparse((scheme, f"{host}:{port}", f"/{db}.{self.schema}", "", "", ""))

PostgresSink

PostgresSink(config)

Append-only Postgres writer for DAQ readings and block summaries.

One sink instance routes records to up to two tables, one per shape (readings / blocks). Each table's column set is locked on first write (create_tables=True) or read on :meth:open from information_schema.columns (create_tables=False, the default).

Source code in src/dtollib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._asyncpg: Any = None
    self._pool: Any = None
    self._readings = _TableState(
        table=config.table_readings,
        sink_name="postgres.readings",
    )
    self._blocks = _TableState(
        table=config.table_blocks,
        sink_name="postgres.blocks",
    )
    self._rows_written = 0

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/dtollib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect tables.

Idempotent. When create_tables=False (the default), each target's columns are read on open and the per-shape schemas locked immediately. When create_tables=True the locks happen lazily on the first :meth:write_many / :meth:write of each shape.

Source code in src/dtollib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect tables.

    Idempotent. When ``create_tables=False`` (the default), each
    target's columns are read on open and the per-shape schemas
    locked immediately. When ``create_tables=True`` the locks
    happen lazily on the first :meth:`write_many` / :meth:`write`
    of each shape.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "dtollib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise DtolSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_tables=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_tables,
    )

    if not cfg.create_tables:
        for state in (self._readings, self._blocks):
            await self._introspect_existing_table(state)

write async

write(block)

Append one :class:DaqBlock as a summary row.

Source code in src/dtollib/sinks/postgres.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a summary row."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write called before open()")
    await self._insert_rows([_block_summary_row(block)], state=self._blocks)

write_many async

write_many(items)

Append :class:DaqReading rows.

Source code in src/dtollib/sinks/postgres.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append :class:`DaqReading` rows."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not items:
        return
    rows = [reading_to_row(r) for r in items]
    await self._insert_rows(rows, state=self._readings)

In-memory

dtollib.sinks.memory

In-memory sink — collects records in lists for tests and notebooks.

:class:InMemorySink satisfies both sink Protocols (:class:~dtollib.sinks.base.ReadingSink, :class:~dtollib.sinks.base.BlockSink). Useful for unit tests, REPL exploration, and short-run captures.

InMemorySink

InMemorySink()

Collect every written record in a per-shape list.

:attr:readings / :attr:blocks are appended to (never re-assigned). :meth:close does not clear the buffers — the point of this sink is post-run inspection.

Source code in src/dtollib/sinks/memory.py
def __init__(self) -> None:
    self._readings: list[DaqReading] = []
    self._blocks: list[DaqBlock] = []
    self._open = False
    self._closed = False

blocks property

blocks

Captured :class:DaqBlock records, in write order.

is_open property

is_open

True once :meth:open has been called and close has not.

readings property

readings

Captured :class:DaqReading records, in write order.

close async

close()

Flip the closed flag — buffers preserved for inspection.

Source code in src/dtollib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — buffers preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — flips the open flag.

Source code in src/dtollib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — flips the open flag."""
    self._open = True
    self._closed = False

write async

write(block)

Append one :class:DaqBlock to the block buffer.

Source code in src/dtollib/sinks/memory.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` to the block buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write called before open()")
    self._blocks.append(block)

write_many async

write_many(items)

Append every :class:DaqReading to the readings buffer.

Source code in src/dtollib/sinks/memory.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append every :class:`DaqReading` to the readings buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    if not items:
        return
    self._readings.extend(items)

Raw counts (.dt-raw)

dtollib.sinks.raw_counts

:class:RawCountsSink — TDMS-equivalent durable raw-data logger.

The single sink unique to dtollib. Writes the raw int16/int32 buffer data (not the volt-converted floats) plus a JSON file header and a per-chunk JSON record header to a .dt-raw v2 file.

Why a custom format instead of TDMS: no third-party dependency, the format is dead-simple to read in NumPy / MATLAB / any tool (np.fromfile), and it faithfully preserves what the SDK gave us.

Threading: writes from the drainer thread (the §12.3.2 bridge attaches this sink BEFORE the async stream). Consumer back-pressure does not stop the file from growing.

Design reference: docs/design.md §15.2.

RAW_FORMAT_VERSION module-attribute

RAW_FORMAT_VERSION = 2

.dt-raw format version — bumped from v1 for per-chunk framing.

RawCountsSink

RawCountsSink(path, *, file_metadata=None)

Writes :class:DaqBlock.raw_codes to a .dt-raw v2 file.

File layout (per docs/design.md §15.2)::

file_header_len:uint32
+ file_header_json:bytes
+ (chunk_record)*

chunk_record = chunk_header_len:uint32
             + chunk_header_json:bytes
             + chunk_payload:bytes

The sink writes synchronously from whatever thread calls write_raw — the §12.3.2 callback bridge attaches this sink as a passive observer that runs from the drainer thread, so consumer slowness on the async path does not slow the file growing.

Parameters:

Name Type Description Default
path Path | str

Output file path. Created with mode="wb". Overwritten if it already exists.

required
file_metadata dict[str, Any] | None

Optional dict merged into the file header JSON. Sink adds format_version, dtype (inferred from the first block's raw_codes dtype), and bookkeeping.

None
Source code in src/dtollib/sinks/raw_counts.py
def __init__(
    self,
    path: Path | str,
    *,
    file_metadata: dict[str, Any] | None = None,
) -> None:
    self._path = Path(path)
    self._metadata = dict(file_metadata or {})
    self._fh: Any = None
    self._seq: int = 0
    self._header_written: bool = False
    self._dtype: str | None = None
    self._closed: bool = False

path property

path

The output file path.

close async

close()

Flush and close the file. Idempotent.

Source code in src/dtollib/sinks/raw_counts.py
async def close(self) -> None:
    """Flush and close the file. Idempotent."""
    if self._fh is not None and not self._closed:
        self._fh.flush()
        self._fh.close()
        self._closed = True

open async

open()

Open the file for binary writing. Idempotent.

Source code in src/dtollib/sinks/raw_counts.py
async def open(self) -> None:
    """Open the file for binary writing. Idempotent."""
    if self._fh is not None:
        return
    try:
        self._fh = self._path.open("wb")
    except OSError as exc:
        raise DtolSinkError(
            f"RawCountsSink: failed to open {self._path}: {exc}",
            context=ErrorContext(operation="RawCountsSink.open"),
        ) from exc
    self._closed = False

write_raw async

write_raw(block)

Append one block's raw-counts payload to the file.

Lazily writes the file header on the first call (when the dtype is known from the block).

Source code in src/dtollib/sinks/raw_counts.py
async def write_raw(self, block: DaqBlock) -> None:
    """Append one block's raw-counts payload to the file.

    Lazily writes the file header on the first call (when the dtype
    is known from the block).
    """
    if self._fh is None:
        raise DtolSinkError(
            "RawCountsSink: write_raw before open()",
            context=ErrorContext(operation="RawCountsSink.write_raw"),
        )
    codes = block.raw_codes
    if codes is None:
        raise DtolSinkWriteError(
            "RawCountsSink: block has no raw_codes (RawLogging not configured?)",
            context=ErrorContext(operation="RawCountsSink.write_raw"),
        )
    if not self._header_written:
        self._dtype = str(codes.dtype)
        self._write_file_header(block, codes)
        self._header_written = True

    flags: list[str] = []
    if block.error is not None:
        flags.append("overrun_marker")
    if block.samples_per_channel < codes.shape[1]:
        flags.append("partial")

    chunk_header = {
        "seq": self._seq,
        "event_kind": "buffer_done" if not flags else "buffer_done_with_flags",
        "first_sample_index": block.first_sample_index,
        "valid_samples": int(block.samples_per_channel),
        "buffer_capacity": int(codes.shape[1]),
        "t_mono_ns": int(block.t_mono_ns),
        "t_utc": block.t_utc.isoformat(),
        "flags": flags,
    }
    header_bytes = json.dumps(chunk_header).encode("utf-8")
    self._fh.write(struct.pack("<I", len(header_bytes)))
    self._fh.write(header_bytes)
    # Payload: ascontiguousarray so .tobytes() works regardless of
    # the source view's stride.  Only write the VALID samples — the
    # buffer's full capacity may be larger.
    payload = np.ascontiguousarray(codes[:, : block.samples_per_channel]).tobytes()
    self._fh.write(payload)
    self._fh.flush()
    self._seq += 1