Skip to content

watlowlib.sinks

The SampleSink Protocol, pipe() glue, sample_to_row() flatten, and the first-party sinks (InMemory / CSV / JSONL / SQLite / Parquet / Postgres). See Logging and acquisition.

Public surface

watlowlib.sinks

Sample sinks — drop-in destinations for :func:watlowlib.sinks.pipe.

Every sink satisfies the :class:SampleSink Protocol so the recorder glue (:func:pipe) can drain into any of them. Stdlib-only sinks (:class:InMemorySink, :class:JsonlSink, :class:CsvSink, :class:SqliteSink) ship in the core install. Heavier backends (:class:ParquetSink, :class:PostgresSink) ship behind the watlowlib[parquet] / watlowlib[postgres] extras — the modules import on bare-core installs (so from watlowlib.sinks import ParquetSink works) and the dependency check is deferred to :meth:open.

Design reference: docs/design.md §6.

CsvSink

CsvSink(path)

Single-run CSV writer with first-batch schema lock.

Each :meth:open truncates the destination and writes a fresh header on the first :meth:write_many. Cross-run appending is intentionally not supported: the column set is inferred from the first batch and locked for the run, and a re-open against an existing file with a different column shape would silently produce a CSV with mismatched columns. For append semantics, use :class:~watlowlib.sinks.jsonl.JsonlSink (no schema to coordinate) or :class:~watlowlib.sinks.sqlite.SqliteSink (schema captured in the table).

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/watlowlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/watlowlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing.

Truncates any existing file: the first :meth:write_many will write a fresh header row. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing.

    Truncates any existing file: the first :meth:`write_many` will
    write a fresh header row. Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.

Source code in src/watlowlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows.

    On first call, infers the column set from the first sample and
    writes the header. Subsequent calls validate each row's keys
    against that locked set — unknown keys are dropped with a
    one-shot WARN log per unseen key.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column path=%s column=%s action=drop",
                    str(self._path),
                    key,
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned) so callers can hold a reference across the sink's lifecycle. :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/watlowlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/watlowlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/watlowlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/watlowlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f]. No header, no schema declaration, no framing overhead beyond the newline. Opening points at the same path twice extends the file — useful for resumable acquisitions and crash-restart scripts.

Source code in src/watlowlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/watlowlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing in append mode.

Pre-existing content is preserved; new samples are appended. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing in append mode.

    Pre-existing content is preserved; new samples are appended.
    Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("a", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/watlowlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/watlowlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/watlowlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close path=%s rows_written=%s",
        str(self._path),
        self._rows_written,
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/watlowlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open path=%s compression=%s",
        str(self._path),
        self._compression,
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/watlowlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/watlowlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/watlowlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "watlowlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_table=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_table,
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/watlowlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except WatlowSinkWriteError:
        raise
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (CSV column inference, Parquet row groups, parameterised inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide an async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/watlowlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/watlowlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/watlowlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/watlowlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Sequence (not Iterable) because every in-tree sink wants len() — CSV schema inference, batched parameterised inserts, Parquet row-group bookkeeping.

Source code in src/watlowlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink.

    ``Sequence`` (not ``Iterable``) because every in-tree sink wants
    ``len()`` — CSV schema inference, batched parameterised inserts,
    Parquet row-group bookkeeping.
    """
    ...

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file. Created on :meth:open.

table str

Target table name.

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before the first flush.

Source code in src/watlowlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/watlowlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close path=%s table=%s",
            str(self._path),
            self._table,
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Idempotent: calling :meth:open on an already-open sink is a no-op. Runs in a worker thread because sqlite3.connect and PRAGMA execution are blocking I/O.

Source code in src/watlowlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target.

    Idempotent: calling :meth:`open` on an already-open sink is a
    no-op. Runs in a worker thread because ``sqlite3.connect`` and
    ``PRAGMA`` execution are blocking I/O.
    """
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open path=%s table=%s journal_mode=%s synchronous=%s",
        str(self._path),
        self._table,
        self._journal_mode,
        self._synchronous,
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            # Introspection raised (most commonly WatlowSinkSchemaError on
            # a missing table). Release the connection so we don't leak a
            # resource on a failed open.
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

On the first call (when create_table=True), infers the schema from the batch and runs CREATE TABLE IF NOT EXISTS. Subsequent calls insert directly. All values pass through ? placeholders — never string-formatted into SQL.

Source code in src/watlowlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction.

    On the first call (when ``create_table=True``), infers the
    schema from the batch and runs ``CREATE TABLE IF NOT EXISTS``.
    Subsequent calls insert directly. All values pass through
    ``?`` placeholders — never string-formatted into SQL.
    """
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        # else (create_table=False): _introspect_existing_table_blocking
        # already ran in open().
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

The time-based check fires on every incoming batch, so the actual inter-flush latency is bounded below by the recorder's tick period: effective_flush_period ≈ max(flush_interval, 1 / rate_hz). For low-rate acquisitions (rate_hz < 1 / flush_interval) the recorder cadence dominates; for high-rate acquisitions the configured flush_interval dominates. Either way, on stream exhaustion any leftover buffer is flushed before the summary is returned.

The samples_late / max_drift_ms fields on the returned summary stay at zero — those are recorder-layer concepts. The recorder emits its own summary on CM exit; this summary is the sink-side view.

Parameters:

Name Type Description Default
stream AsyncIterator[Sequence[Sample]]

The async iterator yielded by :func:~watlowlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Time threshold in seconds between flushes.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval <= 0.

Source code in src/watlowlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Sequence[Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    The time-based check fires on every incoming batch, so the actual
    inter-flush latency is bounded below by the recorder's tick
    period: ``effective_flush_period ≈ max(flush_interval,
    1 / rate_hz)``. For low-rate acquisitions (rate_hz < 1 / flush_interval)
    the recorder cadence dominates; for high-rate acquisitions the
    configured ``flush_interval`` dominates. Either way, on stream
    exhaustion any leftover buffer is flushed before the summary is
    returned.

    The ``samples_late`` / ``max_drift_ms`` fields on the returned
    summary stay at zero — those are recorder-layer concepts. The
    recorder emits its own summary on CM exit; this summary is the
    sink-side view.

    Args:
        stream: The async iterator yielded by
            :func:`~watlowlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Time threshold in seconds between flushes.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done sink=%s samples_emitted=%s duration_s=%.3f",
        type(sink).__name__,
        emitted,
        (finished_at - started_at).total_seconds(),
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Long-format schema (one row per parameter read), stable across all in-tree sinks:

  • device — manager-assigned name (or controller transport label).
  • address — bus address.
  • protocol — wire protocol that produced the read (string).
  • parameter — canonical parameter name.
  • parameter_id — registry parameter id.
  • instance — 1-indexed loop / channel selector.
  • value — decoded value, coerced to a sink-friendly scalar (bools become "true" / "false" strings so SQLite type inference doesn't pin the column to INTEGER for the run).
  • unit — display string, or None when the registry doesn't carry per-parameter unit metadata.
  • requested_at / received_at / midpoint_at — ISO 8601 strings.
  • latency_s — poll round-trip in seconds.

The sample's raw payload is intentionally not in the row: bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and tabular sinks are for time-series queries, not byte-level diagnostics. Callers that need raw consume :class:Sample directly via :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Long-format schema (one row per parameter read), stable across all
    in-tree sinks:

    - ``device`` — manager-assigned name (or controller transport label).
    - ``address`` — bus address.
    - ``protocol`` — wire protocol that produced the read (string).
    - ``parameter`` — canonical parameter name.
    - ``parameter_id`` — registry parameter id.
    - ``instance`` — 1-indexed loop / channel selector.
    - ``value`` — decoded value, coerced to a sink-friendly scalar
      (bools become ``"true"`` / ``"false"`` strings so SQLite type
      inference doesn't pin the column to INTEGER for the run).
    - ``unit`` — display string, or ``None`` when the registry doesn't
      carry per-parameter unit metadata.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601
      strings.
    - ``latency_s`` — poll round-trip in seconds.

    The sample's ``raw`` payload is intentionally **not** in the row:
    bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and
    tabular sinks are for time-series queries, not byte-level
    diagnostics. Callers that need ``raw`` consume :class:`Sample`
    directly via :class:`~watlowlib.sinks.memory.InMemorySink`.
    """
    raw_value = sample.value
    coerced: float | int | str | None
    if isinstance(raw_value, bool):
        # Coerce before the int-isinstance check below; bool is an int.
        coerced = "true" if raw_value else "false"
    elif isinstance(raw_value, int | float | str):
        coerced = raw_value
    else:
        # raw_value is None — Sample.value's type rules out anything else.
        coerced = None

    return {
        "device": sample.device,
        "address": sample.address,
        "protocol": sample.protocol.value,
        "parameter": sample.parameter,
        "parameter_id": sample.parameter_id,
        "instance": sample.instance,
        "value": coerced,
        "unit": sample.unit,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "latency_s": sample.latency_s,
    }

Base Protocol + pipe()

watlowlib.sinks.base

Sink Protocol, sample_to_row flattener, and the :func:pipe driver.

A :class:SampleSink is the minimal shape the recorder's downstream consumer needs: :meth:open, :meth:write_many, :meth:close, and the matching async context-manager methods. The in-tree sinks (:class:~watlowlib.sinks.memory.InMemorySink, :class:~watlowlib.sinks.csv.CsvSink, :class:~watlowlib.sinks.jsonl.JsonlSink, :class:~watlowlib.sinks.sqlite.SqliteSink) all satisfy this Protocol; third-party sinks (Parquet, Postgres, Kafka, ...) can slot in without touching library code.

:func:pipe is the v1 acquisition glue. It reads per-tick batches out of the recorder's receive stream, buffers them up to batch_size (or flush_interval seconds, whichever comes first), and calls sink.write_many to flush. On stream exhaustion it drains any remaining buffer and returns an :class:AcquisitionSummary with samples_emitted reflecting the count actually handed to the sink.

Design reference: docs/design.md §6.

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (CSV column inference, Parquet row groups, parameterised inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide an async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/watlowlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/watlowlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/watlowlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/watlowlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Sequence (not Iterable) because every in-tree sink wants len() — CSV schema inference, batched parameterised inserts, Parquet row-group bookkeeping.

Source code in src/watlowlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink.

    ``Sequence`` (not ``Iterable``) because every in-tree sink wants
    ``len()`` — CSV schema inference, batched parameterised inserts,
    Parquet row-group bookkeeping.
    """
    ...

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

The time-based check fires on every incoming batch, so the actual inter-flush latency is bounded below by the recorder's tick period: effective_flush_period ≈ max(flush_interval, 1 / rate_hz). For low-rate acquisitions (rate_hz < 1 / flush_interval) the recorder cadence dominates; for high-rate acquisitions the configured flush_interval dominates. Either way, on stream exhaustion any leftover buffer is flushed before the summary is returned.

The samples_late / max_drift_ms fields on the returned summary stay at zero — those are recorder-layer concepts. The recorder emits its own summary on CM exit; this summary is the sink-side view.

Parameters:

Name Type Description Default
stream AsyncIterator[Sequence[Sample]]

The async iterator yielded by :func:~watlowlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Time threshold in seconds between flushes.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval <= 0.

Source code in src/watlowlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Sequence[Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    The time-based check fires on every incoming batch, so the actual
    inter-flush latency is bounded below by the recorder's tick
    period: ``effective_flush_period ≈ max(flush_interval,
    1 / rate_hz)``. For low-rate acquisitions (rate_hz < 1 / flush_interval)
    the recorder cadence dominates; for high-rate acquisitions the
    configured ``flush_interval`` dominates. Either way, on stream
    exhaustion any leftover buffer is flushed before the summary is
    returned.

    The ``samples_late`` / ``max_drift_ms`` fields on the returned
    summary stay at zero — those are recorder-layer concepts. The
    recorder emits its own summary on CM exit; this summary is the
    sink-side view.

    Args:
        stream: The async iterator yielded by
            :func:`~watlowlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Time threshold in seconds between flushes.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done sink=%s samples_emitted=%s duration_s=%.3f",
        type(sink).__name__,
        emitted,
        (finished_at - started_at).total_seconds(),
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Long-format schema (one row per parameter read), stable across all in-tree sinks:

  • device — manager-assigned name (or controller transport label).
  • address — bus address.
  • protocol — wire protocol that produced the read (string).
  • parameter — canonical parameter name.
  • parameter_id — registry parameter id.
  • instance — 1-indexed loop / channel selector.
  • value — decoded value, coerced to a sink-friendly scalar (bools become "true" / "false" strings so SQLite type inference doesn't pin the column to INTEGER for the run).
  • unit — display string, or None when the registry doesn't carry per-parameter unit metadata.
  • requested_at / received_at / midpoint_at — ISO 8601 strings.
  • latency_s — poll round-trip in seconds.

The sample's raw payload is intentionally not in the row: bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and tabular sinks are for time-series queries, not byte-level diagnostics. Callers that need raw consume :class:Sample directly via :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Long-format schema (one row per parameter read), stable across all
    in-tree sinks:

    - ``device`` — manager-assigned name (or controller transport label).
    - ``address`` — bus address.
    - ``protocol`` — wire protocol that produced the read (string).
    - ``parameter`` — canonical parameter name.
    - ``parameter_id`` — registry parameter id.
    - ``instance`` — 1-indexed loop / channel selector.
    - ``value`` — decoded value, coerced to a sink-friendly scalar
      (bools become ``"true"`` / ``"false"`` strings so SQLite type
      inference doesn't pin the column to INTEGER for the run).
    - ``unit`` — display string, or ``None`` when the registry doesn't
      carry per-parameter unit metadata.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601
      strings.
    - ``latency_s`` — poll round-trip in seconds.

    The sample's ``raw`` payload is intentionally **not** in the row:
    bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and
    tabular sinks are for time-series queries, not byte-level
    diagnostics. Callers that need ``raw`` consume :class:`Sample`
    directly via :class:`~watlowlib.sinks.memory.InMemorySink`.
    """
    raw_value = sample.value
    coerced: float | int | str | None
    if isinstance(raw_value, bool):
        # Coerce before the int-isinstance check below; bool is an int.
        coerced = "true" if raw_value else "false"
    elif isinstance(raw_value, int | float | str):
        coerced = raw_value
    else:
        # raw_value is None — Sample.value's type rules out anything else.
        coerced = None

    return {
        "device": sample.device,
        "address": sample.address,
        "protocol": sample.protocol.value,
        "parameter": sample.parameter,
        "parameter_id": sample.parameter_id,
        "instance": sample.instance,
        "value": coerced,
        "unit": sample.unit,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "latency_s": sample.latency_s,
    }

In-memory sink

watlowlib.sinks.memory

In-memory sink — collects :class:Sample\ s in a list for tests.

The one bit of value over "write your own list" is that :class:InMemorySink satisfies the :class:SampleSink Protocol, so acquisition tests can run the same pipe() call path a production sink uses and then inspect the captured samples after the fact.

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned) so callers can hold a reference across the sink's lifecycle. :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/watlowlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/watlowlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/watlowlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/watlowlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

CSV sink

watlowlib.sinks.csv

CSV sink — stdlib :mod:csv, schema locked at first batch.

:class:CsvSink writes one row per :class:Sample. The column order is fixed the first time :meth:write_many is called — inferred from the first sample's :func:sample_to_row output — and stays stable for the rest of the run. Unknown columns that appear in later samples are dropped with a WARN log rather than silently reshaping the file.

Stdlib-only — the core install pulls in no CSV dependencies.

CsvSink

CsvSink(path)

Single-run CSV writer with first-batch schema lock.

Each :meth:open truncates the destination and writes a fresh header on the first :meth:write_many. Cross-run appending is intentionally not supported: the column set is inferred from the first batch and locked for the run, and a re-open against an existing file with a different column shape would silently produce a CSV with mismatched columns. For append semantics, use :class:~watlowlib.sinks.jsonl.JsonlSink (no schema to coordinate) or :class:~watlowlib.sinks.sqlite.SqliteSink (schema captured in the table).

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/watlowlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/watlowlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing.

Truncates any existing file: the first :meth:write_many will write a fresh header row. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing.

    Truncates any existing file: the first :meth:`write_many` will
    write a fresh header row. Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.

Source code in src/watlowlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows.

    On first call, infers the column set from the first sample and
    writes the header. Subsequent calls validate each row's keys
    against that locked set — unknown keys are dropped with a
    one-shot WARN log per unseen key.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column path=%s column=%s action=drop",
                    str(self._path),
                    key,
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

JSONL sink

watlowlib.sinks.jsonl

JSONL sink — stdlib :mod:json, one object per line, no schema lock.

:class:JsonlSink writes one JSON object per :class:Sample. Unlike :class:~watlowlib.sinks.csv.CsvSink, it doesn't lock a schema — each row stands alone, so a hot-plugged device whose row format adds an extra field simply emits a wider object without affecting earlier or later rows. Because there's no header to coordinate, the sink opens in append mode: pointing it at an existing file extends that file rather than overwriting it. Use a fresh path per run if you want isolated outputs.

Stdlib-only — the core install pulls in no JSON dependencies.

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f]. No header, no schema declaration, no framing overhead beyond the newline. Opening points at the same path twice extends the file — useful for resumable acquisitions and crash-restart scripts.

Source code in src/watlowlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/watlowlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing in append mode.

Pre-existing content is preserved; new samples are appended. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing in append mode.

    Pre-existing content is preserved; new samples are appended.
    Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("a", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/watlowlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

SQLite sink

watlowlib.sinks.sqlite

SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.

:class:SqliteSink writes one row per :class:Sample into a local SQLite file. The sink is core (no extra required) because sqlite3 ships with the Python standard library.

The sqlite3 driver is synchronous; the sink calls it through :func:anyio.to_thread.run_sync so the event loop stays responsive. Because every write hops into a worker thread anyway, there's no advantage to taking on aiosqlite as a dependency — stdlib delivers the same latency profile with one fewer package.

Best-practice defaults baked in:

  • journal_mode=WAL + synchronous=NORMAL — recommended for write-heavy workloads; durable against crashes, significantly faster than the default.
  • busy_timeout=5000 ms so brief lock contention retries transparently instead of raising OperationalError.
  • One BEGIN IMMEDIATECOMMIT transaction per write_many, so a batch of N rows is one fsync rather than N.
  • SQL identifiers (table name) validated against ^[A-Za-z_][A-Za-z0-9_]{0,62}$. Values are always passed as ? parameters — never string-formatted.

Schema evolution mirrors the other tabular sinks: column set locked on the first batch (via :class:~watlowlib.sinks._schema.SchemaLock), unknown columns dropped with a one-shot WARN. When create_table=True (default), the table is created on first batch from the inferred :class:ColumnSpec list. When create_table=False, the target table's columns are read from PRAGMA table_info and used as the schema.

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file. Created on :meth:open.

table str

Target table name.

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before the first flush.

Source code in src/watlowlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/watlowlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close path=%s table=%s",
            str(self._path),
            self._table,
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Idempotent: calling :meth:open on an already-open sink is a no-op. Runs in a worker thread because sqlite3.connect and PRAGMA execution are blocking I/O.

Source code in src/watlowlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target.

    Idempotent: calling :meth:`open` on an already-open sink is a
    no-op. Runs in a worker thread because ``sqlite3.connect`` and
    ``PRAGMA`` execution are blocking I/O.
    """
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open path=%s table=%s journal_mode=%s synchronous=%s",
        str(self._path),
        self._table,
        self._journal_mode,
        self._synchronous,
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            # Introspection raised (most commonly WatlowSinkSchemaError on
            # a missing table). Release the connection so we don't leak a
            # resource on a failed open.
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

On the first call (when create_table=True), infers the schema from the batch and runs CREATE TABLE IF NOT EXISTS. Subsequent calls insert directly. All values pass through ? placeholders — never string-formatted into SQL.

Source code in src/watlowlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction.

    On the first call (when ``create_table=True``), infers the
    schema from the batch and runs ``CREATE TABLE IF NOT EXISTS``.
    Subsequent calls insert directly. All values pass through
    ``?`` placeholders — never string-formatted into SQL.
    """
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        # else (create_table=False): _introspect_existing_table_blocking
        # already ran in open().
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

Parquet sink (watlowlib[parquet])

watlowlib.sinks.parquet

Parquet sink — :mod:pyarrow, schema locked, zstd by default.

:class:ParquetSink writes one row per :class:Sample into a single Parquet file. pyarrow is an optional dependency behind watlowlib[parquet]; the import is deferred to :meth:open so instantiating the sink succeeds on bare-core installs and :class:~watlowlib.errors.WatlowSinkDependencyError is raised only when the user actually tries to open the file.

Best-practice defaults baked in:

  • zstd compression. It matches or beats snappy on write/read speed with ~30% better ratios and is fully supported across pyarrow >= 2, Spark, DuckDB, Polars, and pandas >= 1.3. Snappy and gzip remain available for compatibility with readers that don't support zstd.
  • Dictionary encoding on for string columns (pyarrow default; surfaced as a knob so callers that know their cardinality is high can disable).
  • One row group per :meth:write_many. Aligns durability with batch cadence — a crash mid-run loses at most the current batch. Callers that want fewer, bigger row groups can pass row_group_size.

Schema evolution mirrors the other tabular sinks: the column set is locked on the first batch (via :class:~watlowlib.sinks._schema.SchemaLock). Unknown columns that appear in later batches are dropped with a one-shot WARN. Adding a new column mid-file would require rewriting the whole file, so it is deliberately not supported.

Durability caveat: Parquet files are not readable until the footer is flushed on :meth:close. If the process is killed mid-run you will get a file with no usable footer. The recommended shutdown path is the recorder's structured exit, which always reaches the sink's async-context-manager __aexit__ and runs :meth:close.

Design reference: docs/design.md §6.

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/watlowlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/watlowlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close path=%s rows_written=%s",
        str(self._path),
        self._rows_written,
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/watlowlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open path=%s compression=%s",
        str(self._path),
        self._compression,
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/watlowlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

Postgres sink (watlowlib[postgres])

watlowlib.sinks.postgres

PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.

:class:PostgresSink writes one row per :class:Sample into a PostgreSQL table. asyncpg is an optional dependency behind watlowlib[postgres]; the import is deferred to :meth:open so instantiation works on bare-core installs and :class:~watlowlib.errors.WatlowSinkDependencyError is raised only when the user actually tries to open a connection.

Best-practice defaults baked in:

  • Binary COPY via :meth:asyncpg.Connection.copy_records_to_table. COPY is ~5-10x faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copy to False to fall back to a prepared executemany.
  • Connection pool via :func:asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases, so the pool stays available for concurrent work.
  • Identifier validation on schema and table (strict regex). Every value passes through $N placeholders — never string-formatted into SQL.
  • Credential scrubbing — log lines that reference the connection use :meth:PostgresConfig.target, which composes a host:port/db.schema.table string from the parsed DSN / discrete fields and never includes the password. Tests assert the plain password never appears in :meth:target output.
  • statement_timeout applied as a server setting so a wedged query cannot block the acquisition loop forever.

Schema evolution mirrors the other tabular sinks (docs/design.md §6). The default create_table=False reads the target table's columns from information_schema.columns on open and locks the schema to that set. Passing create_table=True switches to first-batch inference and runs CREATE TABLE IF NOT EXISTS — convenient for quick runs, but the user gives up type control (everything text-like becomes TEXT rather than timestamptz etc.).

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/watlowlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/watlowlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "watlowlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_table=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_table,
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/watlowlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except WatlowSinkWriteError:
        raise
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)