Skip to content

sartoriuslib.sinks

SampleSink Protocol, pipe() driver, and first-party sinks. See Logging and acquisition for usage patterns.

Public surface

sartoriuslib.sinks

Sample sinks — stdlib-backed (core) plus optional Parquet & Postgres.

Public surface:

  • :class:SampleSink — the Protocol every sink satisfies.
  • :func:pipe — drains a recorder stream into a sink with buffered flushes.
  • :class:InMemorySink — test-only; collects samples in a list.
  • :class:CsvSink — stdlib-backed CSV; schema locked at first batch.
  • :class:JsonlSink — stdlib-backed JSONL; one object per line.
  • :class:SqliteSink — stdlib-backed SQLite (WAL, parameterised inserts).
  • :class:ParquetSink — pyarrow-backed; requires sartoriuslib[parquet].
  • :class:PostgresSink + :class:PostgresConfig — asyncpg-backed; requires sartoriuslib[postgres].

The optional sinks (:class:ParquetSink, :class:PostgresSink) import their backing drivers lazily inside :meth:open. That means instantiation succeeds without the extra installed — calling :meth:open on an un-provisioned install raises :class:~sartoriuslib.errors.SartoriusSinkDependencyError with a copy-paste install hint.

See docs/design.md §10.

CsvSink

CsvSink(path)

Append-only CSV writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/sartoriuslib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

Locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/sartoriuslib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing. Overwrites any existing file.

Source code in src/sartoriuslib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

Source code in src/sartoriuslib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows."""
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column",
                    extra={
                        "path": str(self._path),
                        "column": key,
                        "action": "drop",
                    },
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned). :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/sartoriuslib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/sartoriuslib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/sartoriuslib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/sartoriuslib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f].

Source code in src/sartoriuslib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/sartoriuslib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing. Overwrites any existing file.

Source code in src/sartoriuslib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/sartoriuslib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    r"""Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/sartoriuslib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/sartoriuslib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={
            "path": str(self._path),
            "rows_written": self._rows_written,
        },
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/sartoriuslib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={
            "path": str(self._path),
            "compression": self._compression,
        },
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/sartoriuslib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/sartoriuslib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/sartoriuslib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/sartoriuslib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close",
            extra={
                "target": self._config.target(),
                "rows_written": self._rows_written,
            },
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/sartoriuslib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "sartoriuslib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open",
        extra={
            "target": cfg.target(),
            "pool_min": cfg.pool_min_size,
            "pool_max": cfg.pool_max_size,
            "use_copy": cfg.use_copy,
            "create_table": cfg.create_table,
        },
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/sartoriuslib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except SartoriusSinkWriteError:
        raise
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (useful for CSV column inference, SQLite batched inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide an async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/sartoriuslib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/sartoriuslib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/sartoriuslib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, …).

Source code in src/sartoriuslib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, …)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Source code in src/sartoriuslib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Source code in src/sartoriuslib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/sartoriuslib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close",
            extra={"path": str(self._path), "table": self._table},
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Source code in src/sartoriuslib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target."""
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={
            "path": str(self._path),
            "table": self._table,
            "journal_mode": self._journal_mode,
            "synchronous": self._synchronous,
        },
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

Source code in src/sartoriuslib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

On stream exhaustion any leftover buffer is flushed before the summary is returned.

Parameters:

Name Type Description Default
stream AsyncIterator[Mapping[str, Sample]]

The async iterator yielded by :func:~sartoriuslib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Seconds between flushes (wall-clock).

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Source code in src/sartoriuslib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Mapping[str, Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    On stream exhaustion any leftover buffer is flushed before the
    summary is returned.

    Args:
        stream: The async iterator yielded by
            :func:`~sartoriuslib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Seconds between flushes (wall-clock).

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch.values())
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done",
        extra={
            "sink": type(sink).__name__,
            "samples_emitted": emitted,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Schema layout (stable across samples; design §10):

  • device — manager-assigned name.
  • requested_at / received_at / midpoint_at — ISO 8601.
  • elapsed_s — poll round-trip time, seconds.
  • reading fields — from :meth:Reading.as_dict: value, unit, sign, stable, overload, underload, decimals, sequence, protocol, raw. On error samples (reading is None) these all appear as None.
  • error_type — fully qualified exception class on a failed sample, otherwise None.
  • error_messagestr(error) on a failed sample, otherwise None.

Reading.protocol is the authoritative protocol column on success rows; on error rows the row's protocol column falls back to :attr:Sample.protocol (populated by the manager from the session's active protocol) so sinks never see a missing column.

Source code in src/sartoriuslib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Schema layout (stable across samples; design §10):

    - ``device`` — manager-assigned name.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601.
    - ``elapsed_s`` — poll round-trip time, seconds.
    - *reading fields* — from :meth:`Reading.as_dict`: ``value``,
      ``unit``, ``sign``, ``stable``, ``overload``, ``underload``,
      ``decimals``, ``sequence``, ``protocol``, ``raw``. On error
      samples (``reading is None``) these all appear as ``None``.
    - ``error_type`` — fully qualified exception class on a failed
      sample, otherwise ``None``.
    - ``error_message`` — ``str(error)`` on a failed sample, otherwise
      ``None``.

    ``Reading.protocol`` is the authoritative protocol column on
    success rows; on error rows the row's ``protocol`` column falls
    back to :attr:`Sample.protocol` (populated by the manager from
    the session's active protocol) so sinks never see a missing
    column.
    """
    row: dict[str, float | int | str | None] = {
        "device": sample.device,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "elapsed_s": sample.elapsed_s,
    }
    reading = sample.reading
    if reading is not None:
        row.update(reading.as_dict())
    else:
        # Keep the schema stable on error rows so the first batch
        # of mixed results doesn't accidentally lock a narrower
        # schema when the first sample is a failure.
        row.update(
            {
                "value": None,
                "unit": None,
                "sign": None,
                "stable": None,
                "overload": None,
                "underload": None,
                "decimals": None,
                "sequence": None,
                "protocol": sample.protocol.value if sample.protocol is not None else None,
                "raw": None,
            }
        )
    err = sample.error
    if err is not None:
        cls = type(err)
        row["error_type"] = f"{cls.__module__}.{cls.__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row

Base Protocol + pipe() + sample_to_row

sartoriuslib.sinks.base

Sink Protocol, sample → row helper, and the pipe() driver.

A :class:SampleSink is the minimal shape the recorder's downstream consumer needs: :meth:open, :meth:write_many, :meth:close, and the matching async context-manager methods. The in-tree sinks (:class:~sartoriuslib.sinks.memory.InMemorySink, :class:~sartoriuslib.sinks.csv.CsvSink, :class:~sartoriuslib.sinks.jsonl.JsonlSink, :class:~sartoriuslib.sinks.sqlite.SqliteSink) all satisfy this Protocol; third-party sinks can slot in without touching library code.

:func:pipe is the v1 acquisition glue. It reads per-tick batches out of the recorder's receive stream, buffers them up to batch_size (or flush_interval seconds, whichever comes first), and calls sink.write_many to flush. On stream exhaustion it drains any remaining buffer and returns an :class:AcquisitionSummary with samples_emitted reflecting the count actually handed to the sink.

Design reference: docs/design.md §10.

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (useful for CSV column inference, SQLite batched inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide an async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/sartoriuslib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/sartoriuslib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/sartoriuslib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, …).

Source code in src/sartoriuslib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, …)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Source code in src/sartoriuslib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

On stream exhaustion any leftover buffer is flushed before the summary is returned.

Parameters:

Name Type Description Default
stream AsyncIterator[Mapping[str, Sample]]

The async iterator yielded by :func:~sartoriuslib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Seconds between flushes (wall-clock).

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Source code in src/sartoriuslib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Mapping[str, Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    On stream exhaustion any leftover buffer is flushed before the
    summary is returned.

    Args:
        stream: The async iterator yielded by
            :func:`~sartoriuslib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Seconds between flushes (wall-clock).

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch.values())
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done",
        extra={
            "sink": type(sink).__name__,
            "samples_emitted": emitted,
            "duration_s": (finished_at - started_at).total_seconds(),
        },
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
        samples_late=0,
        max_drift_ms=0.0,
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Schema layout (stable across samples; design §10):

  • device — manager-assigned name.
  • requested_at / received_at / midpoint_at — ISO 8601.
  • elapsed_s — poll round-trip time, seconds.
  • reading fields — from :meth:Reading.as_dict: value, unit, sign, stable, overload, underload, decimals, sequence, protocol, raw. On error samples (reading is None) these all appear as None.
  • error_type — fully qualified exception class on a failed sample, otherwise None.
  • error_messagestr(error) on a failed sample, otherwise None.

Reading.protocol is the authoritative protocol column on success rows; on error rows the row's protocol column falls back to :attr:Sample.protocol (populated by the manager from the session's active protocol) so sinks never see a missing column.

Source code in src/sartoriuslib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Schema layout (stable across samples; design §10):

    - ``device`` — manager-assigned name.
    - ``requested_at`` / ``received_at`` / ``midpoint_at`` — ISO 8601.
    - ``elapsed_s`` — poll round-trip time, seconds.
    - *reading fields* — from :meth:`Reading.as_dict`: ``value``,
      ``unit``, ``sign``, ``stable``, ``overload``, ``underload``,
      ``decimals``, ``sequence``, ``protocol``, ``raw``. On error
      samples (``reading is None``) these all appear as ``None``.
    - ``error_type`` — fully qualified exception class on a failed
      sample, otherwise ``None``.
    - ``error_message`` — ``str(error)`` on a failed sample, otherwise
      ``None``.

    ``Reading.protocol`` is the authoritative protocol column on
    success rows; on error rows the row's ``protocol`` column falls
    back to :attr:`Sample.protocol` (populated by the manager from
    the session's active protocol) so sinks never see a missing
    column.
    """
    row: dict[str, float | int | str | None] = {
        "device": sample.device,
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "midpoint_at": sample.midpoint_at.isoformat(),
        "elapsed_s": sample.elapsed_s,
    }
    reading = sample.reading
    if reading is not None:
        row.update(reading.as_dict())
    else:
        # Keep the schema stable on error rows so the first batch
        # of mixed results doesn't accidentally lock a narrower
        # schema when the first sample is a failure.
        row.update(
            {
                "value": None,
                "unit": None,
                "sign": None,
                "stable": None,
                "overload": None,
                "underload": None,
                "decimals": None,
                "sequence": None,
                "protocol": sample.protocol.value if sample.protocol is not None else None,
                "raw": None,
            }
        )
    err = sample.error
    if err is not None:
        cls = type(err)
        row["error_type"] = f"{cls.__module__}.{cls.__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row

Schema

sartoriuslib.sinks._schema

Shared first-batch schema-lock for tabular sinks.

Every tabular sink in the tree (SQLite, and eventually Parquet / Postgres behind extras) shares the same schema-evolution policy:

  1. First batch wins. The column set and order are locked from the first :meth:write_many call. For schema-less sinks this is just bookkeeping; for schema-ful sinks (SQLite CREATE TABLE) the locked spec drives the backing schema.
  2. Unknown columns are dropped with a one-shot WARN. Later batches carrying a new key don't reshape the file/table silently — each new key logs once, then gets dropped on subsequent batches without re-logging.
  3. Missing columns are filled with None. Row projection guarantees every locked column appears in the output dict.

This module is sink-facing only. It has no public re-export.

Design reference: docs/design.md §10.

ColumnSpec dataclass

ColumnSpec(name, python_type, nullable)

One column in a locked tabular schema.

Attributes:

Name Type Description
name str

Column name, verbatim from the source row dict.

python_type _SCALAR_TYPE

Concrete Python scalar type backing the column — one of :class:float, :class:int, :class:str. Sinks translate this into their native type system.

nullable bool

True if the first batch contained at least one None for this column, or if the column is entirely absent from some rows.

SchemaLock

SchemaLock(*, sink_name, logger)

Lock a row-dict schema on first batch; drop unknowns on later batches.

Not thread-safe. Each sink instance owns one :class:SchemaLock and guards it with whatever lock protects its write path.

Typical sink flow::

self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
# on first write_many:
specs = self._schema.lock([sample_to_row(s) for s in samples])
# for every batch (including the first):
rows = [self._schema.project(sample_to_row(s)) for s in samples]
Source code in src/sartoriuslib/sinks/_schema.py
def __init__(self, *, sink_name: str, logger: logging.Logger) -> None:
    self._sink_name = sink_name
    self._logger = logger
    self._columns: tuple[ColumnSpec, ...] | None = None
    self._names: frozenset[str] = frozenset()
    self._unknown_warned: set[str] = set()

columns property

columns

Locked columns in declaration order, or None before lock.

is_locked property

is_locked

True once :meth:lock or :meth:lock_to has been called.

lock

lock(rows)

Infer column specs from rows and lock the schema.

Column order is determined by first-encounter across the batch. Per-column type is inferred from the first non-None value; when the batch mixes int and float for one column the column widens to float; any other mix widens to str.

Columns entirely None in the first batch default to str / nullable=True.

Source code in src/sartoriuslib/sinks/_schema.py
def lock(
    self,
    rows: Sequence[Mapping[str, object]],
) -> tuple[ColumnSpec, ...]:
    """Infer column specs from ``rows`` and lock the schema.

    Column order is determined by first-encounter across the batch.
    Per-column type is inferred from the first non-``None`` value;
    when the batch mixes ``int`` and ``float`` for one column the
    column widens to ``float``; any other mix widens to ``str``.

    Columns entirely ``None`` in the first batch default to
    ``str`` / ``nullable=True``.
    """
    if self._columns is not None:
        raise RuntimeError("SchemaLock.lock called twice")
    if not rows:
        raise ValueError("SchemaLock.lock requires a non-empty first batch")

    ordered_keys: list[str] = []
    seen: set[str] = set()
    for row in rows:
        for key in row:
            if key not in seen:
                ordered_keys.append(key)
                seen.add(key)

    specs = [self._infer_column(key, rows) for key in ordered_keys]
    self._columns = tuple(specs)
    self._names = frozenset(ordered_keys)
    return self._columns

lock_to

lock_to(specs)

Lock the schema from an externally-supplied spec list.

Used by sinks that validate against an already-existing backing schema rather than inferring from the first batch.

Source code in src/sartoriuslib/sinks/_schema.py
def lock_to(self, specs: Sequence[ColumnSpec]) -> tuple[ColumnSpec, ...]:
    """Lock the schema from an externally-supplied spec list.

    Used by sinks that validate against an already-existing
    backing schema rather than inferring from the first batch.
    """
    if self._columns is not None:
        raise RuntimeError("SchemaLock.lock_to called twice")
    if not specs:
        raise ValueError("SchemaLock.lock_to requires at least one column")
    self._columns = tuple(specs)
    self._names = frozenset(spec.name for spec in self._columns)
    return self._columns

project

project(row)

Return a new dict containing only keys from the locked schema.

Every locked column appears in the output dict — missing keys are filled with None. Any key in row that is not part of the locked schema is dropped, with the first occurrence of each such key logged at WARN.

Source code in src/sartoriuslib/sinks/_schema.py
def project(self, row: Mapping[str, object]) -> dict[str, object]:
    """Return a new dict containing only keys from the locked schema.

    Every locked column appears in the output dict — missing keys
    are filled with ``None``. Any key in ``row`` that is not part
    of the locked schema is dropped, with the first occurrence of
    each such key logged at WARN.
    """
    if self._columns is None:
        raise RuntimeError("SchemaLock.project called before lock()")

    result: dict[str, object] = {spec.name: None for spec in self._columns}
    for key, value in row.items():
        if key in self._names:
            result[key] = value
            continue
        if key not in self._unknown_warned:
            self._unknown_warned.add(key)
            self._logger.warning(
                "sink.unknown_column_dropped",
                extra={"sink": self._sink_name, "column": key},
            )
    return result

In-memory (test-only)

sartoriuslib.sinks.memory

In-memory sink — collects :class:Sample\ s in a list for tests.

:class:InMemorySink satisfies the :class:~sartoriuslib.sinks.base.SampleSink Protocol so acquisition tests can run the same pipe() call path a production sink uses and then inspect the captured samples after the fact.

Design reference: docs/design.md §10.

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned). :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/sartoriuslib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/sartoriuslib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/sartoriuslib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/sartoriuslib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

CSV

sartoriuslib.sinks.csv

CSV sink — stdlib :mod:csv, schema locked at first batch.

:class:CsvSink writes one row per :class:Sample. The column order is fixed the first time :meth:write_many is called — inferred from the first sample's :func:sample_to_row output — and stays stable for the rest of the run. Unknown columns that appear in later samples are dropped with a WARN log rather than silently reshaping the file.

Stdlib-only — the core install pulls in no CSV dependencies. Design reference: docs/design.md §10.

CsvSink

CsvSink(path)

Append-only CSV writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/sartoriuslib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

Locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/sartoriuslib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing. Overwrites any existing file.

Source code in src/sartoriuslib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

Source code in src/sartoriuslib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows."""
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column",
                    extra={
                        "path": str(self._path),
                        "column": key,
                        "action": "drop",
                    },
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

JSONL

sartoriuslib.sinks.jsonl

JSONL sink — stdlib :mod:json, one object per line, no schema lock.

:class:JsonlSink writes one JSON object per :class:Sample. Unlike :class:~sartoriuslib.sinks.csv.CsvSink, it doesn't lock a schema — each row stands alone, so a device whose reading format carries an extra field simply emits a wider object without affecting earlier or later rows.

Stdlib-only. Design reference: docs/design.md §10.

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f].

Source code in src/sartoriuslib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/sartoriuslib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing. Overwrites any existing file.

Source code in src/sartoriuslib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing. Overwrites any existing file."""
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/sartoriuslib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    r"""Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

SQLite (stdlib)

sartoriuslib.sinks.sqlite

SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.

:class:SqliteSink writes one row per :class:Sample into a local SQLite file. Core-sink (no extra required) because sqlite3 ships with the Python standard library.

The sqlite3 driver is synchronous; the sink calls it through :func:anyio.to_thread.run_sync so the event loop stays responsive.

Best-practice defaults baked in:

  • journal_mode=WAL + synchronous=NORMAL.
  • busy_timeout=5000 ms for brief lock contention retries.
  • One BEGIN IMMEDIATECOMMIT per write_many.
  • SQL identifiers validated against ^[A-Za-z_][A-Za-z0-9_]{0,62}$; values always parameterised.

Schema evolution mirrors the other tabular sinks: column set locked on the first batch, unknown columns dropped with a one-shot WARN.

Design reference: docs/design.md §10.

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Source code in src/sartoriuslib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/sartoriuslib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close",
            extra={"path": str(self._path), "table": self._table},
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Source code in src/sartoriuslib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target."""
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open",
        extra={
            "path": str(self._path),
            "table": self._table,
            "journal_mode": self._journal_mode,
            "synchronous": self._synchronous,
        },
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

Source code in src/sartoriuslib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction."""
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

Parquet (sartoriuslib[parquet])

sartoriuslib.sinks.parquet

Parquet sink — :mod:pyarrow, schema locked, zstd by default.

:class:ParquetSink writes one row per :class:Sample into a single Parquet file. pyarrow is an optional dependency behind sartoriuslib[parquet]; the import is deferred to :meth:open so instantiating the sink succeeds on bare-core installs and :class:~sartoriuslib.errors.SartoriusSinkDependencyError is raised only when the user actually tries to open the file.

Best-practice defaults baked in:

  • zstd compression. It matches or beats snappy on write/read speed with ~30% better ratios and is fully supported across pyarrow ≥ 2, Spark, DuckDB, Polars, and pandas ≥ 1.3. Snappy and gzip remain available for compatibility with readers that don't support zstd.
  • Dictionary encoding on for string columns (pyarrow default; surfaced as a knob so callers that know their cardinality is high can disable).
  • One row group per :meth:write_many. Aligns durability with batch cadence — a crash mid-run loses at most the current batch. Callers that want fewer, bigger row groups can pass row_group_size.

Schema evolution mirrors the other tabular sinks: the column set is locked on the first batch (via :class:~sartoriuslib.sinks._schema.SchemaLock). Unknown columns that appear in later batches are dropped with a one-shot WARN. Adding a new column mid-file would require rewriting the whole file, so it is deliberately not supported.

Durability caveat: Parquet files are not readable until the footer is flushed on :meth:close. If the process is killed mid-run you will get a file with no usable footer. The recommended shutdown path is the recorder's structured exit, which always reaches the sink's async-context-manager __aexit__ and runs :meth:close.

Design reference: docs/design.md §10.

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/sartoriuslib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/sartoriuslib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close",
        extra={
            "path": str(self._path),
            "rows_written": self._rows_written,
        },
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/sartoriuslib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open",
        extra={
            "path": str(self._path),
            "compression": self._compression,
        },
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/sartoriuslib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PostgreSQL (sartoriuslib[postgres])

sartoriuslib.sinks.postgres

PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.

:class:PostgresSink writes one row per :class:Sample into a PostgreSQL table. asyncpg is an optional dependency behind sartoriuslib[postgres]; the import is deferred to :meth:open so instantiation works on bare-core installs and :class:~sartoriuslib.errors.SartoriusSinkDependencyError is raised only when the user actually tries to open a connection.

Best-practice defaults baked in:

  • Binary COPY via :meth:asyncpg.Connection.copy_records_to_table. COPY is ~5–10× faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copy to False to fall back to a prepared executemany.
  • Connection pool via :func:asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases, so the pool stays available for concurrent work.
  • Identifier validation on schema and table (strict regex). Every value passes through $N placeholders — never string-formatted into SQL.
  • Credential scrubbing — log lines that reference the connection use :meth:PostgresConfig.target, which composes a host:port/db.schema.table string from the parsed DSN / discrete fields and never includes the password. Tests assert the plain password never appears in :meth:target output.
  • statement_timeout applied as a server setting so a wedged query cannot block the acquisition loop forever.

Schema evolution mirrors the other tabular sinks (design §10). The default create_table=False reads the target table's columns from information_schema.columns on open and locks the schema to that set. Passing create_table=True switches to first-batch inference and runs CREATE TABLE IF NOT EXISTS — convenient for quick runs, but the user gives up type control (everything text-like becomes TEXT rather than timestamptz etc.).

Design reference: docs/design.md §10.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/sartoriuslib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/sartoriuslib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/sartoriuslib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close",
            extra={
                "target": self._config.target(),
                "rows_written": self._rows_written,
            },
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/sartoriuslib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "sartoriuslib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open",
        extra={
            "target": cfg.target(),
            "pool_min": cfg.pool_min_size,
            "pool_max": cfg.pool_max_size,
            "use_copy": cfg.use_copy,
            "create_table": cfg.create_table,
        },
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/sartoriuslib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except SartoriusSinkWriteError:
        raise
    except Exception as exc:
        raise SartoriusSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)