Skip to content

alicatlib.sinks

The SampleSink protocol, pipe() driver, and first-party sinks. See Logging and acquisition for usage patterns and the benchmarks for throughput numbers.

Public surface

alicatlib.sinks

Sample sinks — stdlib-backed (core) plus optional Parquet & Postgres.

Public surface:

  • :class:SampleSink — the Protocol every sink satisfies.
  • :func:pipe — drains a recorder stream into a sink with buffered flushes.
  • :class:InMemorySink — test-only; collects samples in a list.
  • :class:CsvSink — stdlib-backed CSV; schema locked at first batch.
  • :class:JsonlSink — stdlib-backed JSONL; one object per line.
  • :class:SqliteSink — stdlib-backed SQLite (WAL, parameterised inserts).
  • :class:ParquetSink — pyarrow-backed; requires alicatlib[parquet].
  • :class:PostgresSink + :class:PostgresConfig — asyncpg-backed; requires alicatlib[postgres].

The optional sinks (:class:ParquetSink, :class:PostgresSink) import their backing drivers lazily inside :meth:open. That means instantiation succeeds without the extra installed — calling :meth:open on an un-provisioned install raises :class:~alicatlib.errors.AlicatSinkDependencyError with a copy-paste install hint.

See docs/design.md §5.15.

CsvSink

CsvSink(path)

Append-only CSV writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/alicatlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/alicatlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing. Overwrites any existing file.

Source code in src/alicatlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing. Overwrites any existing file."""
    if self._file is not None:
        return  # already open
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.

Source code in src/alicatlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows.

    On first call, infers the column set from the first sample and
    writes the header. Subsequent calls validate each row's keys
    against that locked set — unknown keys are dropped with a
    one-shot WARN log per unseen key.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column",
                    extra={
                        "path": str(self._path),
                        "column": key,
                        "action": "drop",
                    },
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned) so callers can hold a reference across the sink's lifecycle. :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/alicatlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/alicatlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/alicatlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/alicatlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f]. No header, no schema declaration, no framing overhead beyond the newline.

Source code in src/alicatlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/alicatlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing. Overwrites any existing file.

Source code in src/alicatlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/alicatlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    r"""Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/alicatlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/alicatlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={
            "path": str(self._path),
            "rows_written": self._rows_written,
        },
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/alicatlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={
            "path": str(self._path),
            "compression": self._compression,
        },
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/alicatlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    connect_timeout_s=30.0,
    close_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

connect_timeout_s float

Cap on initial pool establishment in :meth:PostgresSink.open. A misconfigured DSN must not be able to wedge open() indefinitely — defaults to 30 s.

close_timeout_s float

Cap on :meth:PostgresSink.close's wait for in-flight queries to drain. Defaults to 10 s; the pool is then forcibly torn down so shutdown can't hang.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/alicatlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/alicatlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

pool.close() waits for in-flight queries to drain. Capped at :attr:PostgresConfig.close_timeout_s so a wedged query cannot wedge shutdown — on timeout the pool is forcibly terminated via :meth:Pool.terminate.

Source code in src/alicatlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent.

    ``pool.close()`` waits for in-flight queries to drain. Capped
    at :attr:`PostgresConfig.close_timeout_s` so a wedged query
    cannot wedge shutdown — on timeout the pool is forcibly
    terminated via :meth:`Pool.terminate`.
    """
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    forced = False
    try:
        try:
            with anyio.fail_after(self._config.close_timeout_s):
                await pool.close()
        except TimeoutError:
            # Drain timed out — force-close so shutdown completes.
            forced = True
            pool.terminate()
            _logger.warning(
                "sinks.postgres.close_timeout",
                extra={
                    "target": self._config.target(),
                    "close_timeout_s": self._config.close_timeout_s,
                },
            )
    finally:
        _logger.info(
            "sinks.postgres.close",
            extra={
                "target": self._config.target(),
                "rows_written": self._rows_written,
                "forced": forced,
            },
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/alicatlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "alicatlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        with anyio.fail_after(cfg.connect_timeout_s):
            if cfg.dsn is not None:
                self._pool = await self._asyncpg.create_pool(
                    dsn=cfg.dsn,
                    min_size=cfg.pool_min_size,
                    max_size=cfg.pool_max_size,
                    command_timeout=cfg.command_timeout_s,
                    server_settings=server_settings,
                )
            else:
                self._pool = await self._asyncpg.create_pool(
                    host=cfg.host,
                    port=cfg.port,
                    user=cfg.user,
                    password=cfg.password,
                    database=cfg.database,
                    min_size=cfg.pool_min_size,
                    max_size=cfg.pool_max_size,
                    command_timeout=cfg.command_timeout_s,
                    server_settings=server_settings,
                )
    except TimeoutError as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: pool open timed out after {cfg.connect_timeout_s}s "
            f"for {cfg.target()}",
        ) from exc
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open",
        extra={
            "target": cfg.target(),
            "pool_min": cfg.pool_min_size,
            "pool_max": cfg.pool_max_size,
            "use_copy": cfg.use_copy,
            "create_table": cfg.create_table,
        },
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/alicatlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except AlicatSinkWriteError:
        raise
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (useful for CSV column inference, Parquet row groups, Postgres parameterised inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide a async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/alicatlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/alicatlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/alicatlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, …).

Source code in src/alicatlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, …)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Sequence (not Iterable) because every in-tree sink wants len() — CSV schema inference, batched parameterised inserts, Parquet row-group bookkeeping.

Source code in src/alicatlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink.

    ``Sequence`` (not ``Iterable``) because every in-tree sink wants
    ``len()`` — CSV schema inference, batched parameterised inserts,
    Parquet row-group bookkeeping.
    """
    ...

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file. Created on :meth:open.

table str

Target table name.

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before the first flush.

Source code in src/alicatlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/alicatlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close",
            extra={"path": str(self._path), "table": self._table},
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Idempotent: calling :meth:open on an already-open sink is a no-op. Runs in a worker thread because sqlite3.connect and PRAGMA execution are blocking I/O.

Source code in src/alicatlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target.

    Idempotent: calling :meth:`open` on an already-open sink is a
    no-op. Runs in a worker thread because ``sqlite3.connect`` and
    ``PRAGMA`` execution are blocking I/O.
    """
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={
            "path": str(self._path),
            "table": self._table,
            "journal_mode": self._journal_mode,
            "synchronous": self._synchronous,
        },
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            # Introspection raised (most commonly AlicatSinkSchemaError on
            # a missing table). Release the connection so we don't leak a
            # resource on a failed open.
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

On the first call (when create_table=True), infers the schema from the batch and runs CREATE TABLE IF NOT EXISTS. Subsequent calls insert directly. All values pass through ? placeholders — never string-formatted into SQL.

Source code in src/alicatlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction.

    On the first call (when ``create_table=True``), infers the
    schema from the batch and runs ``CREATE TABLE IF NOT EXISTS``.
    Subsequent calls insert directly. All values pass through
    ``?`` placeholders — never string-formatted into SQL.
    """
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        # else (create_table=False): _introspect_existing_table_blocking
        # already ran in open().
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

On stream exhaustion any leftover buffer is flushed before the summary is returned.

The samples_late / max_drift_ms fields on the returned summary stay at zero here — those are recorder-layer concepts. The recorder emits its own summary via the alicatlib.streaming logger on CM exit; this summary is the sink-side view.

Parameters:

Name Type Description Default
stream AsyncIterator[Mapping[str, Sample]]

The async iterator yielded by :func:~alicatlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches). Defaults to 64 to match the design default.

64
flush_interval float

Time threshold in seconds between flushes. Wall-clock only, not anyio-clock — sinks care about persistence freshness, not scheduling precision.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Raises:

Type Description
ValueError

On non-positive batch_size or flush_interval.

Source code in src/alicatlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Mapping[str, Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    On stream exhaustion any leftover buffer is flushed before the
    summary is returned.

    The ``samples_late`` / ``max_drift_ms`` fields on the returned
    summary stay at zero here — those are recorder-layer concepts.
    The recorder emits its own summary via the ``alicatlib.streaming``
    logger on CM exit; this summary is the sink-side view.

    Args:
        stream: The async iterator yielded by
            :func:`~alicatlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches). Defaults
            to ``64`` to match the design default.
        flush_interval: Time threshold in seconds between flushes.
            Wall-clock only, not anyio-clock — sinks care about
            persistence freshness, not scheduling precision.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.

    Raises:
        ValueError: On non-positive ``batch_size`` or ``flush_interval``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch.values())
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done",
        extra={
            "sink": type(sink).__name__,
            "samples_emitted": emitted,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Schema layout (stable across samples):

  • device — manager-assigned name.
  • unit_id — bus-level single-letter id.
  • requested_at / received_at / midpoint_at — ISO 8601.
  • latency_s — poll round-trip, seconds.
  • frame fields — everything from :meth:DataFrame.as_dict except the frame's own received_at (superseded by the sample-level value so all rows have the same received_at semantics).
  • status — comma-joined sorted status codes (empty string when no flags active), from :meth:DataFrame.as_dict.

The frame's own received_at is dropped so the row's received_at consistently means "recorder-observed reply time" across rows — otherwise multi-device rows would mix frame-level and sample-level timings.

Source code in src/alicatlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | str | int | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Schema layout (stable across samples):

    - ``device`` — manager-assigned name.
    - ``unit_id`` — bus-level single-letter id.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601.
    - ``latency_s`` — poll round-trip, seconds.
    - *frame fields* — everything from :meth:`DataFrame.as_dict` *except*
      the frame's own ``received_at`` (superseded by the sample-level
      value so all rows have the same ``received_at`` semantics).
    - ``status`` — comma-joined sorted status codes (empty string when
      no flags active), from :meth:`DataFrame.as_dict`.

    The frame's own ``received_at`` is dropped so the row's ``received_at``
    consistently means "recorder-observed reply time" across rows —
    otherwise multi-device rows would mix frame-level and sample-level
    timings.
    """
    row: dict[str, float | str | int | None] = {
        "device": sample.device,
        "unit_id": sample.unit_id,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "latency_s": sample.latency_s,
    }
    frame_dict = sample.frame.as_dict()
    frame_dict.pop("received_at", None)
    # The first ??D* field is the unit-id echo (design §5.6). It
    # duplicates ``sample.unit_id`` verbatim and collides case-
    # insensitively with the ``unit_id`` column in strict backends like
    # SQLite (hardware-validation finding, 2026-04-17: captured parser names
    # the field ``Unit_ID`` while the sample-level column is
    # ``unit_id`` — SQLite treats them as a duplicate column).
    for key in ("Unit_ID", "unit_id"):
        frame_dict.pop(key, None)
    row.update(frame_dict)
    return row

Base protocol and driver

alicatlib.sinks.base

Sink Protocol, sample → row helper, and the pipe() driver.

A :class:SampleSink is the minimal shape the recorder's downstream consumer needs: :meth:open, :meth:write_many, :meth:close, and the matching async context-manager methods. The in-tree sinks (:class:~alicatlib.sinks.memory.InMemorySink, :class:~alicatlib.sinks.csv.CsvSink, :class:~alicatlib.sinks.jsonl.JsonlSink) all satisfy this Protocol; third-party sinks (Parquet, Postgres, Kafka, …) can slot in without touching library code.

:func:pipe is the v1 acquisition glue. It reads per-tick batches out of the recorder's receive stream, buffers them up to batch_size (or flush_interval seconds, whichever comes first), and calls sink.write_many to flush. On stream exhaustion it drains any remaining buffer and returns an :class:AcquisitionSummary with samples_emitted reflecting the count actually handed to the sink.

Design reference: docs/design.md §5.15.

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (useful for CSV column inference, Parquet row groups, Postgres parameterised inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide a async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/alicatlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/alicatlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/alicatlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, …).

Source code in src/alicatlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, …)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Sequence (not Iterable) because every in-tree sink wants len() — CSV schema inference, batched parameterised inserts, Parquet row-group bookkeeping.

Source code in src/alicatlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink.

    ``Sequence`` (not ``Iterable``) because every in-tree sink wants
    ``len()`` — CSV schema inference, batched parameterised inserts,
    Parquet row-group bookkeeping.
    """
    ...

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

On stream exhaustion any leftover buffer is flushed before the summary is returned.

The samples_late / max_drift_ms fields on the returned summary stay at zero here — those are recorder-layer concepts. The recorder emits its own summary via the alicatlib.streaming logger on CM exit; this summary is the sink-side view.

Parameters:

Name Type Description Default
stream AsyncIterator[Mapping[str, Sample]]

The async iterator yielded by :func:~alicatlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches). Defaults to 64 to match the design default.

64
flush_interval float

Time threshold in seconds between flushes. Wall-clock only, not anyio-clock — sinks care about persistence freshness, not scheduling precision.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Raises:

Type Description
ValueError

On non-positive batch_size or flush_interval.

Source code in src/alicatlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Mapping[str, Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    On stream exhaustion any leftover buffer is flushed before the
    summary is returned.

    The ``samples_late`` / ``max_drift_ms`` fields on the returned
    summary stay at zero here — those are recorder-layer concepts.
    The recorder emits its own summary via the ``alicatlib.streaming``
    logger on CM exit; this summary is the sink-side view.

    Args:
        stream: The async iterator yielded by
            :func:`~alicatlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches). Defaults
            to ``64`` to match the design default.
        flush_interval: Time threshold in seconds between flushes.
            Wall-clock only, not anyio-clock — sinks care about
            persistence freshness, not scheduling precision.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.

    Raises:
        ValueError: On non-positive ``batch_size`` or ``flush_interval``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch.values())
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done",
        extra={
            "sink": type(sink).__name__,
            "samples_emitted": emitted,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Schema layout (stable across samples):

  • device — manager-assigned name.
  • unit_id — bus-level single-letter id.
  • requested_at / received_at / midpoint_at — ISO 8601.
  • latency_s — poll round-trip, seconds.
  • frame fields — everything from :meth:DataFrame.as_dict except the frame's own received_at (superseded by the sample-level value so all rows have the same received_at semantics).
  • status — comma-joined sorted status codes (empty string when no flags active), from :meth:DataFrame.as_dict.

The frame's own received_at is dropped so the row's received_at consistently means "recorder-observed reply time" across rows — otherwise multi-device rows would mix frame-level and sample-level timings.

Source code in src/alicatlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | str | int | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Schema layout (stable across samples):

    - ``device`` — manager-assigned name.
    - ``unit_id`` — bus-level single-letter id.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601.
    - ``latency_s`` — poll round-trip, seconds.
    - *frame fields* — everything from :meth:`DataFrame.as_dict` *except*
      the frame's own ``received_at`` (superseded by the sample-level
      value so all rows have the same ``received_at`` semantics).
    - ``status`` — comma-joined sorted status codes (empty string when
      no flags active), from :meth:`DataFrame.as_dict`.

    The frame's own ``received_at`` is dropped so the row's ``received_at``
    consistently means "recorder-observed reply time" across rows —
    otherwise multi-device rows would mix frame-level and sample-level
    timings.
    """
    row: dict[str, float | str | int | None] = {
        "device": sample.device,
        "unit_id": sample.unit_id,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "latency_s": sample.latency_s,
    }
    frame_dict = sample.frame.as_dict()
    frame_dict.pop("received_at", None)
    # The first ??D* field is the unit-id echo (design §5.6). It
    # duplicates ``sample.unit_id`` verbatim and collides case-
    # insensitively with the ``unit_id`` column in strict backends like
    # SQLite (hardware-validation finding, 2026-04-17: captured parser names
    # the field ``Unit_ID`` while the sample-level column is
    # ``unit_id`` — SQLite treats them as a duplicate column).
    for key in ("Unit_ID", "unit_id"):
        frame_dict.pop(key, None)
    row.update(frame_dict)
    return row

In-memory (test-only)

alicatlib.sinks.memory

In-memory sink — collects :class:Sample\ s in a list for tests.

The one bit of value over "write your own list" is that :class:InMemorySink satisfies the :class:SampleSink Protocol, so acquisition tests can run the same pipe() call path a production sink uses and then inspect the captured samples after the fact.

Design reference: docs/design.md §5.15.

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned) so callers can hold a reference across the sink's lifecycle. :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/alicatlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/alicatlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/alicatlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/alicatlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

CSV

alicatlib.sinks.csv

CSV sink — stdlib :mod:csv, schema locked at first batch.

:class:CsvSink writes one row per :class:Sample. The column order is fixed the first time :meth:write_many is called — inferred from the first sample's :func:sample_to_row output — and stays stable for the rest of the run. Unknown columns that appear in later samples (e.g. a newly-hot-plugged device whose frame format carries an extra field) are dropped with a WARN log rather than silently reshaping the file; if the shape changes mid-run the caller almost always wants to know.

Stdlib-only — the core install pulls in no CSV dependencies. Design reference: docs/design.md §5.15.

CsvSink

CsvSink(path)

Append-only CSV writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/alicatlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/alicatlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing. Overwrites any existing file.

Source code in src/alicatlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing. Overwrites any existing file."""
    if self._file is not None:
        return  # already open
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.

Source code in src/alicatlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows.

    On first call, infers the column set from the first sample and
    writes the header. Subsequent calls validate each row's keys
    against that locked set — unknown keys are dropped with a
    one-shot WARN log per unseen key.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column",
                    extra={
                        "path": str(self._path),
                        "column": key,
                        "action": "drop",
                    },
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

JSONL

alicatlib.sinks.jsonl

JSONL sink — stdlib :mod:json, one object per line, no schema lock.

:class:JsonlSink writes one JSON object per :class:Sample. Unlike :class:~alicatlib.sinks.csv.CsvSink, it doesn't lock a schema — each row stands alone, so a device whose frame format carries an extra field simply emits a wider object without affecting earlier or later rows.

Stdlib-only — the core install pulls in no JSON dependencies. Design reference: docs/design.md §5.15.

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f]. No header, no schema declaration, no framing overhead beyond the newline.

Source code in src/alicatlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/alicatlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing. Overwrites any existing file.

Source code in src/alicatlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/alicatlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    r"""Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

SQLite (stdlib)

alicatlib.sinks.sqlite

SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.

:class:SqliteSink writes one row per :class:Sample into a local SQLite file. The sink is core (no extra required) because sqlite3 ships with the Python standard library.

The sqlite3 driver is synchronous; the sink calls it through :func:anyio.to_thread.run_sync so the event loop stays responsive. Because every write hops into a worker thread anyway, there's no advantage to taking on aiosqlite as a dependency — stdlib delivers the same latency profile with one fewer package.

Best-practice defaults baked in:

  • journal_mode=WAL + synchronous=NORMAL — the recommended pairing for write-heavy workloads; durable against crashes, significantly faster than the default.
  • busy_timeout=5000 ms so brief lock contention retries transparently instead of raising OperationalError.
  • One BEGIN IMMEDIATECOMMIT transaction per write_many, so a batch of N rows is one fsync rather than N.
  • SQL identifiers (table name) validated against ^[A-Za-z_][A-Za-z0-9_]{0,62}$. Values are always passed as ? parameters — never string-formatted.

Schema evolution mirrors the other tabular sinks: column set locked on the first batch (via :class:~alicatlib.sinks._schema.SchemaLock), unknown columns dropped with a one-shot WARN. When create_table=True (default), the table is created on first batch from the inferred :class:~alicatlib.sinks._schema.ColumnSpec list. When create_table=False, the target table's columns are read from PRAGMA table_info and used as the schema.

Design reference: docs/design.md §5.15.

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file. Created on :meth:open.

table str

Target table name.

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before the first flush.

Source code in src/alicatlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/alicatlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close",
            extra={"path": str(self._path), "table": self._table},
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Idempotent: calling :meth:open on an already-open sink is a no-op. Runs in a worker thread because sqlite3.connect and PRAGMA execution are blocking I/O.

Source code in src/alicatlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target.

    Idempotent: calling :meth:`open` on an already-open sink is a
    no-op. Runs in a worker thread because ``sqlite3.connect`` and
    ``PRAGMA`` execution are blocking I/O.
    """
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={
            "path": str(self._path),
            "table": self._table,
            "journal_mode": self._journal_mode,
            "synchronous": self._synchronous,
        },
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            # Introspection raised (most commonly AlicatSinkSchemaError on
            # a missing table). Release the connection so we don't leak a
            # resource on a failed open.
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

On the first call (when create_table=True), infers the schema from the batch and runs CREATE TABLE IF NOT EXISTS. Subsequent calls insert directly. All values pass through ? placeholders — never string-formatted into SQL.

Source code in src/alicatlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction.

    On the first call (when ``create_table=True``), infers the
    schema from the batch and runs ``CREATE TABLE IF NOT EXISTS``.
    Subsequent calls insert directly. All values pass through
    ``?`` placeholders — never string-formatted into SQL.
    """
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        # else (create_table=False): _introspect_existing_table_blocking
        # already ran in open().
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

Parquet (alicatlib[parquet])

alicatlib.sinks.parquet

Parquet sink — :mod:pyarrow, schema locked, zstd by default.

:class:ParquetSink writes one row per :class:Sample into a single Parquet file. pyarrow is an optional dependency behind alicatlib[parquet]; the import is deferred to :meth:open so instantiating the sink succeeds on bare-core installs and :class:~alicatlib.errors.AlicatSinkDependencyError is raised only when the user actually tries to open the file.

Best-practice defaults baked in:

  • zstd compression. It matches or beats snappy on write/read speed with ~30% better ratios and is fully supported across pyarrow ≥ 2, Spark, DuckDB, Polars, and pandas ≥ 1.3. Snappy and gzip remain available for compatibility with readers that don't support zstd.
  • Dictionary encoding on for string columns (pyarrow default; surfaced as a knob so callers that know their cardinality is high can disable).
  • One row group per :meth:write_many. Aligns durability with batch cadence — a crash mid-run loses at most the current batch. Callers that want fewer, bigger row groups can pass row_group_size.

Schema evolution mirrors the other tabular sinks: the column set is locked on the first batch (via :class:~alicatlib.sinks._schema.SchemaLock). Unknown columns that appear in later batches are dropped with a one-shot WARN. Adding a new column mid-file would require rewriting the whole file, so it is deliberately not supported.

Durability caveat: Parquet files are not readable until the footer is flushed on :meth:close. If the process is killed mid-run you will get a file with no usable footer. The recommended shutdown path is the recorder's structured exit, which always reaches the sink's async-context-manager __aexit__ and runs :meth:close.

Design reference: docs/design.md §5.15.

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/alicatlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/alicatlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={
            "path": str(self._path),
            "rows_written": self._rows_written,
        },
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/alicatlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={
            "path": str(self._path),
            "compression": self._compression,
        },
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/alicatlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PostgreSQL (alicatlib[postgres])

alicatlib.sinks.postgres

PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.

:class:PostgresSink writes one row per :class:Sample into a PostgreSQL table. asyncpg is an optional dependency behind alicatlib[postgres]; the import is deferred to :meth:open so instantiation works on bare-core installs and :class:~alicatlib.errors.AlicatSinkDependencyError is raised only when the user actually tries to open a connection.

Best-practice defaults baked in:

  • Binary COPY via :meth:asyncpg.Connection.copy_records_to_table. COPY is ~5–10× faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copy to False to fall back to a prepared executemany.
  • Connection pool via :func:asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases, so the pool stays available for concurrent work.
  • Identifier validation on schema and table (strict regex). Every value passes through $N placeholders — never string-formatted into SQL.
  • Credential scrubbing — log lines describe the target via :meth:PostgresConfig.target, which only renders host:port/db.schema.table; the DSN (and any embedded password) is never written to a log record.
  • statement_timeout applied as a server setting so a wedged query cannot block the acquisition loop forever.

Schema evolution mirrors the other tabular sinks (design §5.15). The default create_table=False reads the target table's columns from information_schema.columns on open and locks the schema to that set. Passing create_table=True switches to first-batch inference and runs CREATE TABLE IF NOT EXISTS — convenient for quick runs, but the user gives up type control (everything text-like becomes TEXT rather than timestamptz etc.).

Design reference: docs/design.md §5.15, §5.18.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    connect_timeout_s=30.0,
    close_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

connect_timeout_s float

Cap on initial pool establishment in :meth:PostgresSink.open. A misconfigured DSN must not be able to wedge open() indefinitely — defaults to 30 s.

close_timeout_s float

Cap on :meth:PostgresSink.close's wait for in-flight queries to drain. Defaults to 10 s; the pool is then forcibly torn down so shutdown can't hang.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/alicatlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/alicatlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

pool.close() waits for in-flight queries to drain. Capped at :attr:PostgresConfig.close_timeout_s so a wedged query cannot wedge shutdown — on timeout the pool is forcibly terminated via :meth:Pool.terminate.

Source code in src/alicatlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent.

    ``pool.close()`` waits for in-flight queries to drain. Capped
    at :attr:`PostgresConfig.close_timeout_s` so a wedged query
    cannot wedge shutdown — on timeout the pool is forcibly
    terminated via :meth:`Pool.terminate`.
    """
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    forced = False
    try:
        try:
            with anyio.fail_after(self._config.close_timeout_s):
                await pool.close()
        except TimeoutError:
            # Drain timed out — force-close so shutdown completes.
            forced = True
            pool.terminate()
            _logger.warning(
                "sinks.postgres.close_timeout",
                extra={
                    "target": self._config.target(),
                    "close_timeout_s": self._config.close_timeout_s,
                },
            )
    finally:
        _logger.info(
            "sinks.postgres.close",
            extra={
                "target": self._config.target(),
                "rows_written": self._rows_written,
                "forced": forced,
            },
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/alicatlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "alicatlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        with anyio.fail_after(cfg.connect_timeout_s):
            if cfg.dsn is not None:
                self._pool = await self._asyncpg.create_pool(
                    dsn=cfg.dsn,
                    min_size=cfg.pool_min_size,
                    max_size=cfg.pool_max_size,
                    command_timeout=cfg.command_timeout_s,
                    server_settings=server_settings,
                )
            else:
                self._pool = await self._asyncpg.create_pool(
                    host=cfg.host,
                    port=cfg.port,
                    user=cfg.user,
                    password=cfg.password,
                    database=cfg.database,
                    min_size=cfg.pool_min_size,
                    max_size=cfg.pool_max_size,
                    command_timeout=cfg.command_timeout_s,
                    server_settings=server_settings,
                )
    except TimeoutError as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: pool open timed out after {cfg.connect_timeout_s}s "
            f"for {cfg.target()}",
        ) from exc
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open",
        extra={
            "target": cfg.target(),
            "pool_min": cfg.pool_min_size,
            "pool_max": cfg.pool_max_size,
            "use_copy": cfg.use_copy,
            "create_table": cfg.create_table,
        },
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/alicatlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except AlicatSinkWriteError:
        raise
    except Exception as exc:
        raise AlicatSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)