Skip to content

nidaqlib.sinks

nidaqlib.sinks

Data sinks for DaqReading / DaqSample / DaqBlock outputs.

Three Protocols (one per record shape) and two pipe drivers — design doc §14.1. The row-oriented sinks (CSV, JSONL) refuse :class:DaqBlock by default; pass accept_blocks=True to opt into per-sample scalarisation via :func:block_to_long_rows.

BlockSink

Bases: Protocol

Sink that consumes one :class:DaqBlock per call.

A block is already (n_channels, n_samples) — wrapping it in a sequence per call would burn allocations in the hot path. Sinks that need scalar rows opt in via :func:block_to_long_rows.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/nidaqlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/nidaqlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/nidaqlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource.

Source code in src/nidaqlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write async

write(block)

Append one :class:DaqBlock as a row group / file segment / row.

Source code in src/nidaqlib/sinks/base.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a row group / file segment / row."""
    ...

CsvSink

CsvSink(path, *, accept_blocks=False)

Append-only CSV writer with first-batch schema lock.

Parameters:

Name Type Description Default
path str | Path

Destination file. Created or overwritten on :meth:open.

required
accept_blocks bool

When True, :meth:write calls :func:block_to_long_rows and emits one CSV row per (channel, sample). Default False raises :class:NIDaqSinkSchemaError.

False
Source code in src/nidaqlib/sinks/csv.py
def __init__(self, path: str | Path, *, accept_blocks: bool = False) -> None:
    self._path = Path(path)
    self._accept_blocks = accept_blocks
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_warned: set[str] = set()

columns property

columns

Locked column order, or None before the first flush.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/nidaqlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing. Overwrites any existing file.

Source code in src/nidaqlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._path.parent.mkdir(parents=True, exist_ok=True)
    self._file = self._path.open("w", encoding="utf-8", newline="")

write async

write(block)

Refuse blocks unless accept_blocks=True was set on construction.

With accept_blocks=True, per-(channel, sample) rows are emitted via :func:block_to_long_rows. The cost of this opt-in is up to n_channels * samples_per_channel rows per block.

Source code in src/nidaqlib/sinks/csv.py
async def write(self, block: DaqBlock) -> None:
    """Refuse blocks unless ``accept_blocks=True`` was set on construction.

    With ``accept_blocks=True``, per-(channel, sample) rows are emitted
    via :func:`block_to_long_rows`. The cost of this opt-in is up to
    ``n_channels * samples_per_channel`` rows per block.
    """
    if not self._accept_blocks:
        raise NIDaqSinkSchemaError(
            "CsvSink refuses DaqBlock by default — pass accept_blocks=True "
            "if you really want one CSV row per (channel, sample). At high "
            "rates this can produce gigabyte CSVs; consider ParquetSink."
        )
    if self._file is None:
        raise RuntimeError("CsvSink: write called before open()")
    rows = [sample_to_row(s) for s in block_to_long_rows(block)]
    self._write_rows(rows)

write_many async

write_many(items)

Append :class:DaqReading or :class:DaqSample rows.

Sniffs the first item's type to choose the row helper.

Source code in src/nidaqlib/sinks/csv.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Append :class:`DaqReading` or :class:`DaqSample` rows.

    Sniffs the first item's type to choose the row helper.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not items:
        return

    # Late import — avoid circulars.
    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        rows = [reading_to_row(item) for item in items]  # type: ignore[arg-type]
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        rows = [sample_to_row(item) for item in items]  # type: ignore[arg-type]
    else:  # pragma: no cover - defensive
        raise NIDaqSinkSchemaError(
            f"CsvSink.write_many: unsupported record type {type(first).__name__}"
        )

    self._write_rows(rows)

InMemorySink

InMemorySink()

Collect every written record in a per-shape list.

:attr:readings / :attr:samples / :attr:blocks are appended to (never re-assigned). :meth:close does not clear the buffers — the point of this sink is post-run inspection.

Source code in src/nidaqlib/sinks/memory.py
def __init__(self) -> None:
    self._readings: list[DaqReading] = []
    self._samples: list[DaqSample] = []
    self._blocks: list[DaqBlock] = []
    self._open = False
    self._closed = False

blocks property

blocks

Captured :class:DaqBlock records, in write order.

is_open property

is_open

True once :meth:open has been called and close has not.

readings property

readings

Captured :class:DaqReading records, in write order.

samples property

samples

Captured :class:DaqSample records, in write order.

close async

close()

Flip the closed flag — buffers preserved for inspection.

Source code in src/nidaqlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — buffers preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — flips the open flag.

Source code in src/nidaqlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — flips the open flag."""
    self._open = True
    self._closed = False

write async

write(block)

Append one :class:DaqBlock to the block buffer.

Source code in src/nidaqlib/sinks/memory.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` to the block buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write called before open()")
    self._blocks.append(block)

write_many async

write_many(items)

Append every item to the matching per-shape buffer.

Sniffs the first item's type to dispatch (the sequence is required to be homogeneous by the :class:SampleSink / :class:ReadingSink Protocols). An empty sequence is a no-op.

Source code in src/nidaqlib/sinks/memory.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Append every item to the matching per-shape buffer.

    Sniffs the first item's type to dispatch (the sequence is required
    to be homogeneous by the :class:`SampleSink` / :class:`ReadingSink`
    Protocols). An empty sequence is a no-op.
    """
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    if not items:
        return
    # Late import to avoid circular dependency at module load.
    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        self._readings.extend(items)  # type: ignore[arg-type]
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        self._samples.extend(items)  # type: ignore[arg-type]
    else:  # pragma: no cover - defensive
        raise TypeError(
            f"InMemorySink.write_many expected DaqReading or DaqSample, "
            f"got {type(first).__name__}"
        )

JsonlSink

JsonlSink(path, *, accept_blocks=False)

Append-only JSONL writer — one flattened record per line.

Parameters:

Name Type Description Default
path str | Path

Destination file. Created or overwritten on :meth:open.

required
accept_blocks bool

When True, :meth:write calls :func:block_to_long_rows. Default False raises :class:NIDaqSinkSchemaError.

False
Source code in src/nidaqlib/sinks/jsonl.py
def __init__(self, path: str | Path, *, accept_blocks: bool = False) -> None:
    self._path = Path(path)
    self._accept_blocks = accept_blocks
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/nidaqlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing. Overwrites any existing file.

Source code in src/nidaqlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._path.parent.mkdir(parents=True, exist_ok=True)
    self._file = self._path.open("w", encoding="utf-8", newline="")

write async

write(block)

Refuse blocks unless accept_blocks=True.

Source code in src/nidaqlib/sinks/jsonl.py
async def write(self, block: DaqBlock) -> None:
    """Refuse blocks unless ``accept_blocks=True``."""
    if not self._accept_blocks:
        raise NIDaqSinkSchemaError(
            "JsonlSink refuses DaqBlock by default — pass accept_blocks=True "
            "to scalarise via block_to_long_rows."
        )
    if self._file is None:
        raise RuntimeError("JsonlSink: write called before open()")
    rows = [sample_to_row(s) for s in block_to_long_rows(block)]
    self._write_rows(rows)

write_many async

write_many(items)

Serialise each record as one JSON object per line.

Source code in src/nidaqlib/sinks/jsonl.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Serialise each record as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not items:
        return

    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        rows = [reading_to_row(item) for item in items]  # type: ignore[arg-type]
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        rows = [sample_to_row(item) for item in items]  # type: ignore[arg-type]
    else:  # pragma: no cover - defensive
        raise NIDaqSinkSchemaError(
            f"JsonlSink.write_many: unsupported record type {type(first).__name__}"
        )
    self._write_rows(rows)

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Parquet writer with first-write shape lock.

Parameters:

Name Type Description Default
path str | Path

Destination Parquet file.

required
compression _Compression

Codec for every row group. zstd matches or beats snappy on speed with ~30% better ratios.

'zstd'
use_dictionary bool

Dictionary encoding for string columns.

True
row_group_size int | None

Optional max rows per row group. None lets pyarrow batch the whole call.

None
Source code in src/nidaqlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(f"row_group_size must be >= 1 if set, got {row_group_size!r}")
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._shape: Literal["readings", "samples", "blocks"] | None = None
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first write.

compression property

compression

Configured compression codec.

path property

path

Destination Parquet file path.

shape property

shape

Locked record shape, or None before first write.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/nidaqlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={"path": str(self._path), "rows_written": self._rows_written},
    )

open async

open()

Load pyarrow and create the parent directory.

The :class:pyarrow.parquet.ParquetWriter itself is opened lazily on the first write — we don't have the schema until then.

Source code in src/nidaqlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory.

    The :class:`pyarrow.parquet.ParquetWriter` itself is opened lazily
    on the first write — we don't have the schema until then.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={"path": str(self._path), "compression": self._compression},
    )

write async

write(block)

Append one :class:DaqBlock as a row group of long-format rows.

Long-format layout: one row per (channel, sample). The block_index / sample_index columns let consumers re-aggregate efficiently.

Source code in src/nidaqlib/sinks/parquet.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a row group of long-format rows.

    Long-format layout: one row per (channel, sample). The
    ``block_index`` / ``sample_index`` columns let consumers
    re-aggregate efficiently.
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write called before open()")
    self._lock_or_check_shape("blocks")
    rows = list(_block_to_long_dict_rows(block))
    self._write_rows(rows)

write_many async

write_many(items)

Append :class:DaqReading or :class:DaqSample rows.

First call locks the schema and the record shape. Mixing shapes afterwards raises :class:NIDaqSinkSchemaError.

Source code in src/nidaqlib/sinks/parquet.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Append :class:`DaqReading` or :class:`DaqSample` rows.

    First call locks the schema and the record shape. Mixing shapes
    afterwards raises :class:`NIDaqSinkSchemaError`.
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not items:
        return

    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        shape: Literal["readings", "samples", "blocks"] = "readings"
        rows = [reading_to_row(r) for r in items]  # type: ignore[arg-type]
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        shape = "samples"
        rows = [sample_to_row(s) for s in items]  # type: ignore[arg-type]
    else:  # pragma: no cover - defensive
        raise NIDaqSinkSchemaError(
            f"ParquetSink.write_many: unsupported record type {type(first).__name__}"
        )
    self._lock_or_check_shape(shape)
    self._write_rows(rows)

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table_readings="readings",
    table_samples="samples",
    table_blocks="blocks",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_tables=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string. Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table_readings str

Table name for :class:DaqReading rows.

table_samples str

Table name for :class:DaqSample rows.

table_blocks str

Table name for :class:DaqBlock summary rows.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout.

create_tables bool

If True, infer per-shape schemas from the first batch of each kind and run CREATE TABLE IF NOT EXISTS. If False (default), require the tables to exist and lock per-shape schemas from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable to fall back to prepared executemany.

target

target()

Return a log-safe description: host:port/db.schema.

Source code in src/nidaqlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description: ``host:port/db.schema``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer for DAQ readings, samples, and block summaries.

One sink instance routes records to up to three tables, one per shape (readings / samples / blocks). Each table's column set is locked on first write (create_tables=True) or read on :meth:open from information_schema.columns (create_tables=False, the default).

Source code in src/nidaqlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._asyncpg: Any = None
    self._pool: Any = None
    self._readings = _TableState(
        table=config.table_readings,
        sink_name="postgres.readings",
    )
    self._samples = _TableState(
        table=config.table_samples,
        sink_name="postgres.samples",
    )
    self._blocks = _TableState(
        table=config.table_blocks,
        sink_name="postgres.blocks",
    )
    self._rows_written = 0

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/nidaqlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect tables.

Idempotent. When create_tables=False (the default), each target's columns are read on open and the per-shape schemas locked immediately. When create_tables=True the locks happen lazily on the first :meth:write_many / :meth:write of each shape.

Source code in src/nidaqlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect tables.

    Idempotent. When ``create_tables=False`` (the default), each
    target's columns are read on open and the per-shape schemas
    locked immediately. When ``create_tables=True`` the locks
    happen lazily on the first :meth:`write_many` / :meth:`write`
    of each shape.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "nidaqlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise NIDaqSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_tables=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_tables,
    )

    if not cfg.create_tables:
        for state in (self._readings, self._samples, self._blocks):
            await self._introspect_existing_table(state)

write async

write(block)

Append one :class:DaqBlock as a summary row.

Source code in src/nidaqlib/sinks/postgres.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a summary row."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write called before open()")
    await self._insert_rows([_block_summary_row(block)], state=self._blocks)

write_many async

write_many(items)

Append :class:DaqReading or :class:DaqSample rows.

Source code in src/nidaqlib/sinks/postgres.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Append :class:`DaqReading` or :class:`DaqSample` rows."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not items:
        return

    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        rows = [reading_to_row(r) for r in items]  # type: ignore[arg-type]
        await self._insert_rows(rows, state=self._readings)
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        rows = [sample_to_row(s) for s in items]  # type: ignore[arg-type]
        await self._insert_rows(rows, state=self._samples)
    else:  # pragma: no cover — defensive
        raise NIDaqSinkWriteError(
            f"PostgresSink.write_many: unsupported record type {type(first).__name__}",
        )

ReadingSink

Bases: Protocol

Sink that consumes :class:DaqReading sequences.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/nidaqlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/nidaqlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/nidaqlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/nidaqlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(items)

Append items to the sink.

Source code in src/nidaqlib/sinks/base.py
async def write_many(self, items: Sequence[DaqReading]) -> None:
    """Append ``items`` to the sink."""
    ...

SampleSink

Bases: Protocol

Sink that consumes :class:DaqSample sequences (one row per sample).

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/nidaqlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/nidaqlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/nidaqlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource.

Source code in src/nidaqlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_many async

write_many(items)

Append items to the sink.

Source code in src/nidaqlib/sinks/base.py
async def write_many(self, items: Sequence[DaqSample]) -> None:
    """Append ``items`` to the sink."""
    ...

SqliteSink

SqliteSink(
    path,
    *,
    table_readings="readings",
    table_samples="samples",
    table_blocks="blocks",
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and per-table schema lock.

One sink instance routes records to up to three tables, one per shape (readings / samples / blocks). Each table's column set is locked on its first write; later writes project onto the locked schema and drop unknown columns with a one-shot WARN.

Parameters:

Name Type Description Default
path str | Path

Destination SQLite file.

required
table_readings str

Table name for :class:DaqReading rows.

'readings'
table_samples str

Table name for :class:DaqSample rows.

'samples'
table_blocks str

Table name for :class:DaqBlock summary rows.

'blocks'
journal_mode _JournalMode

SQLite journal mode pragma. WAL is recommended.

'WAL'
synchronous _Synchronous

SQLite synchronous pragma. NORMAL balances durability and throughput.

'NORMAL'
busy_timeout_ms int

SQLite busy-wait, in milliseconds.

5000
Source code in src/nidaqlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table_readings: str = "readings",
    table_samples: str = "samples",
    table_blocks: str = "blocks",
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table_readings = _validate_identifier(table_readings, label="table_readings")
    self._table_samples = _validate_identifier(table_samples, label="table_samples")
    self._table_blocks = _validate_identifier(table_blocks, label="table_blocks")
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    # Per-table schema state. One :class:`SchemaLock` per table because
    # each carries a different row shape (readings / samples / blocks).
    self._lock_readings = SchemaLock(sink_name="sqlite.readings", logger=_logger)
    self._lock_samples = SchemaLock(sink_name="sqlite.samples", logger=_logger)
    self._lock_blocks = SchemaLock(sink_name="sqlite.blocks", logger=_logger)
    self._insert_readings: str | None = None
    self._insert_samples: str | None = None
    self._insert_blocks: str | None = None

path property

path

Destination SQLite file path.

close async

close()

Close the connection. Idempotent.

Source code in src/nidaqlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info("sinks.sqlite.close", extra={"path": str(self._path)})

open async

open()

Open the SQLite connection and apply pragmas.

Source code in src/nidaqlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection and apply pragmas."""
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={"path": str(self._path), "journal_mode": self._journal_mode},
    )

write async

write(block)

Append one :class:DaqBlock as a summary row.

Source code in src/nidaqlib/sinks/sqlite.py
async def write(self, block: DaqBlock) -> None:
    """Append one :class:`DaqBlock` as a summary row."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write called before open()")
    await self._insert_rows(
        [_block_summary_row(block)],
        table=self._table_blocks,
        lock=self._lock_blocks,
        kind="blocks",
    )

write_many async

write_many(items)

Append :class:DaqReading or :class:DaqSample rows.

Source code in src/nidaqlib/sinks/sqlite.py
async def write_many(
    self,
    items: Sequence[DaqReading] | Sequence[DaqSample],
) -> None:
    """Append :class:`DaqReading` or :class:`DaqSample` rows."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not items:
        return

    from nidaqlib.tasks.models import DaqReading, DaqSample  # noqa: PLC0415

    first = items[0]
    if isinstance(first, DaqReading):
        rows = [reading_to_row(r) for r in items]  # type: ignore[arg-type]
        await self._insert_rows(
            rows, table=self._table_readings, lock=self._lock_readings, kind="readings"
        )
    elif isinstance(first, DaqSample):  # pyright: ignore[reportUnnecessaryIsInstance]
        rows = [sample_to_row(s) for s in items]  # type: ignore[arg-type]
        await self._insert_rows(
            rows, table=self._table_samples, lock=self._lock_samples, kind="samples"
        )
    else:  # pragma: no cover - defensive
        raise NIDaqSinkWriteError(
            f"SqliteSink.write_many: unsupported record type {type(first).__name__}"
        )

block_to_long_rows

block_to_long_rows(block)

Yield one :class:DaqSample per (channel, sample) in block.

Sample timestamps reconstruct from task_started_at + (first_sample_index + k) / sample_rate_hz (design doc §8.7). Use this only when a row-oriented sink is the right target — the natural shape of a hardware-clocked block is rectangular, and fanning out 8 000 dataclass instances per second has a real cost.

Source code in src/nidaqlib/sinks/base.py
def block_to_long_rows(block: DaqBlock) -> Iterator[DaqSample]:
    """Yield one :class:`DaqSample` per (channel, sample) in ``block``.

    Sample timestamps reconstruct from
    ``task_started_at + (first_sample_index + k) / sample_rate_hz``
    (design doc §8.7). Use this only when a row-oriented sink is the right
    target — the natural shape of a hardware-clocked block is rectangular,
    and fanning out 8 000 dataclass instances per second has a real cost.
    """
    # Late import — DaqSample lives in the tasks package, importing it at
    # module top would create a cycle through nidaqlib.streaming on systems
    # that import nidaqlib.sinks first.
    from nidaqlib.tasks.models import DaqSample as _DaqSample  # noqa: PLC0415

    n_channels = len(block.channels)
    n_samples = block.samples_per_channel
    rate_hz = block.sample_rate_hz
    if rate_hz is None:
        # Fall back to the read window — non-clocked blocks have no truer
        # per-sample timestamp anyway. The receive timestamps span the
        # short read window and we space samples uniformly within it.
        span = (block.read_finished_at - block.read_started_at).total_seconds()
        dt = span / max(1, n_samples)
    else:
        dt = 1.0 / rate_hz

    for c in range(n_channels):
        ch_name = block.channels[c]
        unit = block.units.get(ch_name)
        for k in range(n_samples):
            absolute = block.first_sample_index + k
            elapsed_s = absolute * dt if rate_hz is not None else k * dt
            sample_at = block.task_started_at + timedelta(seconds=elapsed_s)
            mono = block.monotonic_ns + int(k * dt * 1e9)
            value = float(block.data[c, k])
            yield _DaqSample(
                device=block.device,
                task=block.task,
                channel=ch_name,
                value=value,
                acquired_at=sample_at,
                monotonic_ns=mono,
                unit=unit,
                error=block.error,
            )

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval_s=1.0)

Drain a row stream into a row-oriented sink with buffered flushes.

Reads records from stream and accumulates them into a list. A flush is triggered when either the buffer reaches batch_size or flush_interval_s elapses since the last flush. On stream exhaustion, the leftover buffer is flushed before returning.

Parameters:

Name Type Description Default
stream AsyncIterator[DaqReading | DaqSample]

Async iterator of records (typically the receive end of a :func:~nidaqlib.streaming.record_polled recorder).

required
sink ReadingSink | SampleSink

An open :class:ReadingSink or :class:SampleSink. The sink's write_many is awaited per flush.

required
batch_size int

Records per flush.

64
flush_interval_s float

Wall-clock seconds between flushes.

1.0

Returns:

Type Description
int

Total records actually handed to the sink.

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval_s <= 0.

Source code in src/nidaqlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[DaqReading | DaqSample],
    sink: ReadingSink | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval_s: float = 1.0,
) -> int:
    """Drain a row stream into a row-oriented sink with buffered flushes.

    Reads records from ``stream`` and accumulates them into a list. A
    flush is triggered when either the buffer reaches ``batch_size`` or
    ``flush_interval_s`` elapses since the last flush. On stream
    exhaustion, the leftover buffer is flushed before returning.

    Args:
        stream: Async iterator of records (typically the receive end of a
            :func:`~nidaqlib.streaming.record_polled` recorder).
        sink: An open :class:`ReadingSink` or :class:`SampleSink`. The
            sink's ``write_many`` is awaited per flush.
        batch_size: Records per flush.
        flush_interval_s: Wall-clock seconds between flushes.

    Returns:
        Total records actually handed to the sink.

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval_s <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval_s <= 0:
        raise ValueError(f"flush_interval_s must be > 0, got {flush_interval_s!r}")

    emitted = 0
    buffer: list[DaqReading | DaqSample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        # Mypy can't prove that buffer is homogeneous (it's a union), but the
        # caller is responsible for feeding one type per pipe — sinks accept
        # the corresponding type only.
        await sink.write_many(buffer)  # type: ignore[arg-type]
        emitted += len(buffer)
        buffer.clear()

    async for record in stream:
        buffer.append(record)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval_s:
            await _flush()
            last_flush = now

    await _flush()
    _logger.info("sinks.pipe_done", extra={"records_emitted": emitted})
    return emitted

pipe_blocks async

pipe_blocks(stream, sink, *, flush_interval_s=None)

Drain a block stream into a :class:BlockSink.

No batching axis — blocks are already batched. flush_interval_s is accepted for API symmetry with :func:pipe but currently unused; sinks that need periodic flush can implement their own.

Returns:

Type Description
int

Total blocks written.

Source code in src/nidaqlib/sinks/base.py
async def pipe_blocks(
    stream: AsyncIterator[DaqBlock],
    sink: BlockSink,
    *,
    flush_interval_s: float | None = None,
) -> int:
    """Drain a block stream into a :class:`BlockSink`.

    No batching axis — blocks are already batched. ``flush_interval_s`` is
    accepted for API symmetry with :func:`pipe` but currently unused; sinks
    that need periodic flush can implement their own.

    Returns:
        Total blocks written.
    """
    del flush_interval_s  # reserved for future per-sink flush helpers
    emitted = 0
    async for block in stream:
        await sink.write(block)
        emitted += 1
    _logger.info("sinks.pipe_blocks_done", extra={"blocks_emitted": emitted})
    return emitted

reading_to_row

reading_to_row(reading)

Flatten a :class:DaqReading into a single row dict.

Layout:

  • device, task — join keys.
  • requested_at / received_at / midpoint_at — ISO 8601.
  • monotonic_ns — int.
  • elapsed_s — float seconds.
  • one column per channel (values keys), values flattened.
  • one <channel>_unit column per channel.
  • error_type / error_message — populated only on error rows.

The same row layout is used by every row-oriented sink.

Source code in src/nidaqlib/sinks/base.py
def reading_to_row(reading: DaqReading) -> dict[str, float | int | str | bool | None]:
    """Flatten a :class:`DaqReading` into a single row dict.

    Layout:

    - ``device``, ``task`` — join keys.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601.
    - ``monotonic_ns`` — int.
    - ``elapsed_s`` — float seconds.
    - one column per channel (``values`` keys), values flattened.
    - one ``<channel>_unit`` column per channel.
    - ``error_type`` / ``error_message`` — populated only on error rows.

    The same row layout is used by every row-oriented sink.
    """
    row: dict[str, float | int | str | bool | None] = {
        "device": reading.device,
        "task": reading.task,
        "requested_at": reading.requested_at.isoformat(),
        "received_at": reading.received_at.isoformat(),
        "midpoint_at": reading.midpoint_at.isoformat(),
        "monotonic_ns": reading.monotonic_ns,
        "elapsed_s": reading.elapsed_s,
    }
    row.update(reading.values)
    row.update({f"{ch}_unit": unit for ch, unit in reading.units.items()})
    err = reading.error
    if err is not None:
        row["error_type"] = f"{type(err).__module__}.{type(err).__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row

sample_to_row

sample_to_row(sample)

Flatten a :class:DaqSample into a single row dict.

Source code in src/nidaqlib/sinks/base.py
def sample_to_row(sample: DaqSample) -> dict[str, float | int | str | bool | None]:
    """Flatten a :class:`DaqSample` into a single row dict."""
    row: dict[str, float | int | str | bool | None] = {
        "device": sample.device,
        "task": sample.task,
        "channel": sample.channel,
        "value": sample.value,
        "acquired_at": sample.acquired_at.isoformat(),
        "monotonic_ns": sample.monotonic_ns,
        "unit": sample.unit,
    }
    err = sample.error
    if err is not None:
        row["error_type"] = f"{type(err).__module__}.{type(err).__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row