sartoriuslib.sinks¶
SampleSink Protocol, pipe() driver, and first-party sinks. See
Logging and acquisition for usage patterns.
Public surface¶
sartoriuslib.sinks ¶
Sample sinks — stdlib-backed (core) plus optional Parquet & Postgres.
Public surface:
- :class:
SampleSink— the Protocol every sink satisfies. - :func:
pipe— drains a recorder stream into a sink with buffered flushes. - :class:
InMemorySink— test-only; collects samples in a list. - :class:
CsvSink— stdlib-backed CSV; schema locked at first batch. - :class:
JsonlSink— stdlib-backed JSONL; one object per line. - :class:
SqliteSink— stdlib-backed SQLite (WAL, parameterised inserts). - :class:
ParquetSink— pyarrow-backed; requiressartoriuslib[parquet]. - :class:
PostgresSink+ :class:PostgresConfig— asyncpg-backed; requiressartoriuslib[postgres].
The optional sinks (:class:ParquetSink, :class:PostgresSink) import
their backing drivers lazily inside :meth:open. That means
instantiation succeeds without the extra installed — calling
:meth:open on an un-provisioned install raises
:class:~sartoriuslib.errors.SartoriusSinkDependencyError with a
copy-paste install hint.
See docs/design.md §10.
CsvSink ¶
Append-only CSV writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination file. Created or overwritten on :meth: |
columns |
tuple[str, ...] | None
|
Locked column order after the first :meth: |
Source code in src/sartoriuslib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing. Overwrites any existing file.
write_many
async
¶
Append samples as CSV rows.
Source code in src/sartoriuslib/sinks/csv.py
InMemorySink ¶
Collect every written :class:Sample in a single list.
:attr:samples is appended to (never re-assigned). :meth:close
does not clear the buffer — the point of this sink is post-run
inspection.
Source code in src/sartoriuslib/sinks/memory.py
JsonlSink ¶
Append-only JSONL writer — one flattened sample per line.
The on-disk format is <sample-row-as-json>\n per sample;
reading back is just [json.loads(line) for line in f].
Source code in src/sartoriuslib/sinks/jsonl.py
close
async
¶
open
async
¶
Open the JSONL file for writing. Overwrites any existing file.
write_many
async
¶
Serialise each sample as one JSON object per line.
Source code in src/sartoriuslib/sinks/jsonl.py
ParquetSink ¶
Append-only Parquet writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination Parquet file. |
compression |
_Compression
|
Codec applied to every row group. |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/sartoriuslib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/sartoriuslib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory. Idempotent.
The actual :class:pyarrow.parquet.ParquetWriter is opened
lazily on the first :meth:write_many call, because the
writer requires a concrete schema — which we don't have until
the first batch is inspected.
Source code in src/sartoriuslib/sinks/parquet.py
write_many
async
¶
Append samples as one Parquet row group.
On first call: infers the schema from the batch, locks it,
constructs the matching :mod:pyarrow schema, and opens the
underlying :class:~pyarrow.parquet.ParquetWriter.
Subsequent calls project each row onto the locked schema and
append the rows as a new row group. Unknown columns are
dropped with one-shot WARN (handled by :class:SchemaLock).
Source code in src/sartoriuslib/sinks/parquet.py
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/sartoriuslib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer using pooled asyncpg connections.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
PostgresConfig
|
Frozen :class: |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/sartoriuslib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
Source code in src/sartoriuslib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect the table.
Idempotent. When create_table=False (the default), the
target's columns are read on open and the schema is locked
immediately. When create_table=True the lock happens
lazily on the first :meth:write_many.
Source code in src/sartoriuslib/sinks/postgres.py
write_many
async
¶
Append samples — one COPY (or executemany) per call.
Source code in src/sartoriuslib/sinks/postgres.py
SampleSink ¶
Bases: Protocol
Minimal shape of an acquisition sink.
Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:
await sink.open()— allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.await sink.write_many(samples)— one or more times.samplesis a :class:~collections.abc.Sequenceso the sink knows the full batch up front (useful for CSV column inference, SQLite batched inserts).await sink.close()— flush and release the handle. Idempotent.
The async context-manager methods provide an async with sink:
shape for the common case of "open → write → close" in one block.
SqliteSink ¶
SqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and first-batch schema lock.
Source code in src/sartoriuslib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/sartoriuslib/sinks/sqlite.py
open
async
¶
Open the SQLite connection, apply PRAGMAs, and introspect the target.
Source code in src/sartoriuslib/sinks/sqlite.py
write_many
async
¶
Append samples as rows in a single transaction.
Source code in src/sartoriuslib/sinks/sqlite.py
pipe
async
¶
Drain stream into sink with buffered flushes.
Reads per-tick batches from the recorder and accumulates the
individual :class:Sample\ s into a list. A flush happens when
either threshold is first crossed:
- the buffer reaches
batch_sizesamples, or flush_intervalseconds have elapsed since the last flush.
On stream exhaustion any leftover buffer is flushed before the summary is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[Mapping[str, Sample]]
|
The async iterator yielded by
:func: |
required |
sink
|
SampleSink
|
Any :class: |
required |
batch_size
|
int
|
Buffer threshold in samples (not batches). |
64
|
flush_interval
|
float
|
Seconds between flushes (wall-clock). |
1.0
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
AcquisitionSummary
|
class: |
AcquisitionSummary
|
the count actually handed to the sink. |
Source code in src/sartoriuslib/sinks/base.py
sample_to_row ¶
Flatten a :class:Sample into a single row dict for tabular sinks.
Schema layout (stable across samples; design §10):
device— manager-assigned name.requested_at/received_at/midpoint_at— ISO 8601.elapsed_s— poll round-trip time, seconds.- reading fields — from :meth:
Reading.as_dict:value,unit,sign,stable,overload,underload,decimals,sequence,protocol,raw. On error samples (reading is None) these all appear asNone. error_type— fully qualified exception class on a failed sample, otherwiseNone.error_message—str(error)on a failed sample, otherwiseNone.
Reading.protocol is the authoritative protocol column on
success rows; on error rows the row's protocol column falls
back to :attr:Sample.protocol (populated by the manager from
the session's active protocol) so sinks never see a missing
column.
Source code in src/sartoriuslib/sinks/base.py
Base Protocol + pipe() + sample_to_row¶
sartoriuslib.sinks.base ¶
Sink Protocol, sample → row helper, and the pipe() driver.
A :class:SampleSink is the minimal shape the recorder's downstream
consumer needs: :meth:open, :meth:write_many, :meth:close, and
the matching async context-manager methods. The in-tree sinks
(:class:~sartoriuslib.sinks.memory.InMemorySink,
:class:~sartoriuslib.sinks.csv.CsvSink,
:class:~sartoriuslib.sinks.jsonl.JsonlSink,
:class:~sartoriuslib.sinks.sqlite.SqliteSink) all satisfy this
Protocol; third-party sinks can slot in without touching library
code.
:func:pipe is the v1 acquisition glue. It reads per-tick batches out
of the recorder's receive stream, buffers them up to batch_size
(or flush_interval seconds, whichever comes first), and calls
sink.write_many to flush. On stream exhaustion it drains any
remaining buffer and returns an :class:AcquisitionSummary with
samples_emitted reflecting the count actually handed to the sink.
Design reference: docs/design.md §10.
SampleSink ¶
Bases: Protocol
Minimal shape of an acquisition sink.
Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:
await sink.open()— allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.await sink.write_many(samples)— one or more times.samplesis a :class:~collections.abc.Sequenceso the sink knows the full batch up front (useful for CSV column inference, SQLite batched inserts).await sink.close()— flush and release the handle. Idempotent.
The async context-manager methods provide an async with sink:
shape for the common case of "open → write → close" in one block.
pipe
async
¶
Drain stream into sink with buffered flushes.
Reads per-tick batches from the recorder and accumulates the
individual :class:Sample\ s into a list. A flush happens when
either threshold is first crossed:
- the buffer reaches
batch_sizesamples, or flush_intervalseconds have elapsed since the last flush.
On stream exhaustion any leftover buffer is flushed before the summary is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[Mapping[str, Sample]]
|
The async iterator yielded by
:func: |
required |
sink
|
SampleSink
|
Any :class: |
required |
batch_size
|
int
|
Buffer threshold in samples (not batches). |
64
|
flush_interval
|
float
|
Seconds between flushes (wall-clock). |
1.0
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
AcquisitionSummary
|
class: |
AcquisitionSummary
|
the count actually handed to the sink. |
Source code in src/sartoriuslib/sinks/base.py
sample_to_row ¶
Flatten a :class:Sample into a single row dict for tabular sinks.
Schema layout (stable across samples; design §10):
device— manager-assigned name.requested_at/received_at/midpoint_at— ISO 8601.elapsed_s— poll round-trip time, seconds.- reading fields — from :meth:
Reading.as_dict:value,unit,sign,stable,overload,underload,decimals,sequence,protocol,raw. On error samples (reading is None) these all appear asNone. error_type— fully qualified exception class on a failed sample, otherwiseNone.error_message—str(error)on a failed sample, otherwiseNone.
Reading.protocol is the authoritative protocol column on
success rows; on error rows the row's protocol column falls
back to :attr:Sample.protocol (populated by the manager from
the session's active protocol) so sinks never see a missing
column.
Source code in src/sartoriuslib/sinks/base.py
Schema¶
sartoriuslib.sinks._schema ¶
Shared first-batch schema-lock for tabular sinks.
Every tabular sink in the tree (SQLite, and eventually Parquet / Postgres behind extras) shares the same schema-evolution policy:
- First batch wins. The column set and order are locked from the
first :meth:
write_manycall. For schema-less sinks this is just bookkeeping; for schema-ful sinks (SQLiteCREATE TABLE) the locked spec drives the backing schema. - Unknown columns are dropped with a one-shot WARN. Later batches carrying a new key don't reshape the file/table silently — each new key logs once, then gets dropped on subsequent batches without re-logging.
- Missing columns are filled with
None. Row projection guarantees every locked column appears in the output dict.
This module is sink-facing only. It has no public re-export.
Design reference: docs/design.md §10.
ColumnSpec
dataclass
¶
One column in a locked tabular schema.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Column name, verbatim from the source row dict. |
python_type |
_SCALAR_TYPE
|
Concrete Python scalar type backing the column —
one of :class: |
nullable |
bool
|
|
SchemaLock ¶
Lock a row-dict schema on first batch; drop unknowns on later batches.
Not thread-safe. Each sink instance owns one :class:SchemaLock
and guards it with whatever lock protects its write path.
Typical sink flow::
self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
# on first write_many:
specs = self._schema.lock([sample_to_row(s) for s in samples])
# for every batch (including the first):
rows = [self._schema.project(sample_to_row(s)) for s in samples]
Source code in src/sartoriuslib/sinks/_schema.py
lock ¶
Infer column specs from rows and lock the schema.
Column order is determined by first-encounter across the batch.
Per-column type is inferred from the first non-None value;
when the batch mixes int and float for one column the
column widens to float; any other mix widens to str.
Columns entirely None in the first batch default to
str / nullable=True.
Source code in src/sartoriuslib/sinks/_schema.py
lock_to ¶
Lock the schema from an externally-supplied spec list.
Used by sinks that validate against an already-existing backing schema rather than inferring from the first batch.
Source code in src/sartoriuslib/sinks/_schema.py
project ¶
Return a new dict containing only keys from the locked schema.
Every locked column appears in the output dict — missing keys
are filled with None. Any key in row that is not part
of the locked schema is dropped, with the first occurrence of
each such key logged at WARN.
Source code in src/sartoriuslib/sinks/_schema.py
In-memory (test-only)¶
sartoriuslib.sinks.memory ¶
In-memory sink — collects :class:Sample\ s in a list for tests.
:class:InMemorySink satisfies the
:class:~sartoriuslib.sinks.base.SampleSink Protocol so acquisition
tests can run the same pipe() call path a production sink uses
and then inspect the captured samples after the fact.
Design reference: docs/design.md §10.
InMemorySink ¶
Collect every written :class:Sample in a single list.
:attr:samples is appended to (never re-assigned). :meth:close
does not clear the buffer — the point of this sink is post-run
inspection.
Source code in src/sartoriuslib/sinks/memory.py
CSV¶
sartoriuslib.sinks.csv ¶
CSV sink — stdlib :mod:csv, schema locked at first batch.
:class:CsvSink writes one row per :class:Sample. The column order
is fixed the first time :meth:write_many is called — inferred from
the first sample's :func:sample_to_row output — and stays stable
for the rest of the run. Unknown columns that appear in later
samples are dropped with a WARN log rather than silently reshaping
the file.
Stdlib-only — the core install pulls in no CSV dependencies.
Design reference: docs/design.md §10.
CsvSink ¶
Append-only CSV writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination file. Created or overwritten on :meth: |
columns |
tuple[str, ...] | None
|
Locked column order after the first :meth: |
Source code in src/sartoriuslib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing. Overwrites any existing file.
write_many
async
¶
Append samples as CSV rows.
Source code in src/sartoriuslib/sinks/csv.py
JSONL¶
sartoriuslib.sinks.jsonl ¶
JSONL sink — stdlib :mod:json, one object per line, no schema lock.
:class:JsonlSink writes one JSON object per :class:Sample. Unlike
:class:~sartoriuslib.sinks.csv.CsvSink, it doesn't lock a schema —
each row stands alone, so a device whose reading format carries an
extra field simply emits a wider object without affecting earlier or
later rows.
Stdlib-only. Design reference: docs/design.md §10.
JsonlSink ¶
Append-only JSONL writer — one flattened sample per line.
The on-disk format is <sample-row-as-json>\n per sample;
reading back is just [json.loads(line) for line in f].
Source code in src/sartoriuslib/sinks/jsonl.py
close
async
¶
open
async
¶
Open the JSONL file for writing. Overwrites any existing file.
write_many
async
¶
Serialise each sample as one JSON object per line.
Source code in src/sartoriuslib/sinks/jsonl.py
SQLite (stdlib)¶
sartoriuslib.sinks.sqlite ¶
SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.
:class:SqliteSink writes one row per :class:Sample into a local
SQLite file. Core-sink (no extra required) because sqlite3 ships
with the Python standard library.
The sqlite3 driver is synchronous; the sink calls it through
:func:anyio.to_thread.run_sync so the event loop stays responsive.
Best-practice defaults baked in:
journal_mode=WAL+synchronous=NORMAL.busy_timeout=5000ms for brief lock contention retries.- One
BEGIN IMMEDIATE…COMMITperwrite_many. - SQL identifiers validated against
^[A-Za-z_][A-Za-z0-9_]{0,62}$; values always parameterised.
Schema evolution mirrors the other tabular sinks: column set locked on the first batch, unknown columns dropped with a one-shot WARN.
Design reference: docs/design.md §10.
SqliteSink ¶
SqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and first-batch schema lock.
Source code in src/sartoriuslib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/sartoriuslib/sinks/sqlite.py
open
async
¶
Open the SQLite connection, apply PRAGMAs, and introspect the target.
Source code in src/sartoriuslib/sinks/sqlite.py
write_many
async
¶
Append samples as rows in a single transaction.
Source code in src/sartoriuslib/sinks/sqlite.py
Parquet (sartoriuslib[parquet])¶
sartoriuslib.sinks.parquet ¶
Parquet sink — :mod:pyarrow, schema locked, zstd by default.
:class:ParquetSink writes one row per :class:Sample into a single
Parquet file. pyarrow is an optional dependency behind
sartoriuslib[parquet]; the import is deferred to :meth:open so
instantiating the sink succeeds on bare-core installs and
:class:~sartoriuslib.errors.SartoriusSinkDependencyError is raised only
when the user actually tries to open the file.
Best-practice defaults baked in:
- zstd compression. It matches or beats snappy on write/read speed with ~30% better ratios and is fully supported across pyarrow ≥ 2, Spark, DuckDB, Polars, and pandas ≥ 1.3. Snappy and gzip remain available for compatibility with readers that don't support zstd.
- Dictionary encoding on for string columns (pyarrow default; surfaced as a knob so callers that know their cardinality is high can disable).
- One row group per :meth:
write_many. Aligns durability with batch cadence — a crash mid-run loses at most the current batch. Callers that want fewer, bigger row groups can passrow_group_size.
Schema evolution mirrors the other tabular sinks: the column set is
locked on the first batch (via
:class:~sartoriuslib.sinks._schema.SchemaLock). Unknown columns that
appear in later batches are dropped with a one-shot WARN. Adding a
new column mid-file would require rewriting the whole file, so it is
deliberately not supported.
Durability caveat: Parquet files are not readable until the footer
is flushed on :meth:close. If the process is killed mid-run you
will get a file with no usable footer. The recommended shutdown path
is the recorder's structured exit, which always reaches the sink's
async-context-manager __aexit__ and runs :meth:close.
Design reference: docs/design.md §10.
ParquetSink ¶
Append-only Parquet writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination Parquet file. |
compression |
_Compression
|
Codec applied to every row group. |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/sartoriuslib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/sartoriuslib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory. Idempotent.
The actual :class:pyarrow.parquet.ParquetWriter is opened
lazily on the first :meth:write_many call, because the
writer requires a concrete schema — which we don't have until
the first batch is inspected.
Source code in src/sartoriuslib/sinks/parquet.py
write_many
async
¶
Append samples as one Parquet row group.
On first call: infers the schema from the batch, locks it,
constructs the matching :mod:pyarrow schema, and opens the
underlying :class:~pyarrow.parquet.ParquetWriter.
Subsequent calls project each row onto the locked schema and
append the rows as a new row group. Unknown columns are
dropped with one-shot WARN (handled by :class:SchemaLock).
Source code in src/sartoriuslib/sinks/parquet.py
PostgreSQL (sartoriuslib[postgres])¶
sartoriuslib.sinks.postgres ¶
PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.
:class:PostgresSink writes one row per :class:Sample into a
PostgreSQL table. asyncpg is an optional dependency behind
sartoriuslib[postgres]; the import is deferred to :meth:open so
instantiation works on bare-core installs and
:class:~sartoriuslib.errors.SartoriusSinkDependencyError is raised only
when the user actually tries to open a connection.
Best-practice defaults baked in:
- Binary COPY via :meth:
asyncpg.Connection.copy_records_to_table. COPY is ~5–10× faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copytoFalseto fall back to a preparedexecutemany. - Connection pool via :func:
asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases, so the pool stays available for concurrent work. - Identifier validation on
schemaandtable(strict regex). Every value passes through$Nplaceholders — never string-formatted into SQL. - Credential scrubbing — log lines that reference the connection
use :meth:
PostgresConfig.target, which composes ahost:port/db.schema.tablestring from the parsed DSN / discrete fields and never includes the password. Tests assert the plain password never appears in :meth:targetoutput. statement_timeoutapplied as a server setting so a wedged query cannot block the acquisition loop forever.
Schema evolution mirrors the other tabular sinks (design §10). The
default create_table=False reads the target table's columns from
information_schema.columns on open and locks the schema to that
set. Passing create_table=True switches to first-batch inference
and runs CREATE TABLE IF NOT EXISTS — convenient for quick runs,
but the user gives up type control (everything text-like becomes
TEXT rather than timestamptz etc.).
Design reference: docs/design.md §10.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/sartoriuslib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer using pooled asyncpg connections.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
PostgresConfig
|
Frozen :class: |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/sartoriuslib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
Source code in src/sartoriuslib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect the table.
Idempotent. When create_table=False (the default), the
target's columns are read on open and the schema is locked
immediately. When create_table=True the lock happens
lazily on the first :meth:write_many.
Source code in src/sartoriuslib/sinks/postgres.py
write_many
async
¶
Append samples — one COPY (or executemany) per call.