alicatlib.sinks¶
The SampleSink protocol, pipe() driver, and first-party sinks.
See Logging and acquisition for usage patterns and
the benchmarks for throughput numbers.
Public surface¶
alicatlib.sinks ¶
Sample sinks — stdlib-backed (core) plus optional Parquet & Postgres.
Public surface:
- :class:
SampleSink— the Protocol every sink satisfies. - :func:
pipe— drains a recorder stream into a sink with buffered flushes. - :class:
InMemorySink— test-only; collects samples in a list. - :class:
CsvSink— stdlib-backed CSV; schema locked at first batch. - :class:
JsonlSink— stdlib-backed JSONL; one object per line. - :class:
SqliteSink— stdlib-backed SQLite (WAL, parameterised inserts). - :class:
ParquetSink— pyarrow-backed; requiresalicatlib[parquet]. - :class:
PostgresSink+ :class:PostgresConfig— asyncpg-backed; requiresalicatlib[postgres].
The optional sinks (:class:ParquetSink, :class:PostgresSink) import
their backing drivers lazily inside :meth:open. That means
instantiation succeeds without the extra installed — calling
:meth:open on an un-provisioned install raises
:class:~alicatlib.errors.AlicatSinkDependencyError with a copy-paste
install hint.
See docs/design.md §5.15.
CsvSink ¶
Append-only CSV writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination file. Created or overwritten on :meth: |
columns |
tuple[str, ...] | None
|
Locked column order after the first :meth: |
Source code in src/alicatlib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing. Overwrites any existing file.
write_many
async
¶
Append samples as CSV rows.
On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.
Source code in src/alicatlib/sinks/csv.py
InMemorySink ¶
Collect every written :class:Sample in a single list.
:attr:samples is appended to (never re-assigned) so callers can
hold a reference across the sink's lifecycle. :meth:close does
not clear the buffer — the point of this sink is post-run
inspection.
Source code in src/alicatlib/sinks/memory.py
JsonlSink ¶
Append-only JSONL writer — one flattened sample per line.
The on-disk format is <sample-row-as-json>\n per sample;
reading back is just [json.loads(line) for line in f]. No
header, no schema declaration, no framing overhead beyond the
newline.
Source code in src/alicatlib/sinks/jsonl.py
close
async
¶
open
async
¶
Open the JSONL file for writing. Overwrites any existing file.
write_many
async
¶
Serialise each sample as one JSON object per line.
Source code in src/alicatlib/sinks/jsonl.py
ParquetSink ¶
Append-only Parquet writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination Parquet file. |
compression |
_Compression
|
Codec applied to every row group. |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/alicatlib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/alicatlib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory. Idempotent.
The actual :class:pyarrow.parquet.ParquetWriter is opened
lazily on the first :meth:write_many call, because the
writer requires a concrete schema — which we don't have until
the first batch is inspected.
Source code in src/alicatlib/sinks/parquet.py
write_many
async
¶
Append samples as one Parquet row group.
On first call: infers the schema from the batch, locks it,
constructs the matching :mod:pyarrow schema, and opens the
underlying :class:~pyarrow.parquet.ParquetWriter.
Subsequent calls project each row onto the locked schema and
append the rows as a new row group. Unknown columns are
dropped with one-shot WARN (handled by :class:SchemaLock).
Source code in src/alicatlib/sinks/parquet.py
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
connect_timeout_s=30.0,
close_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
connect_timeout_s |
float
|
Cap on initial pool establishment in
:meth: |
close_timeout_s |
float
|
Cap on :meth: |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/alicatlib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer using pooled asyncpg connections.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
PostgresConfig
|
Frozen :class: |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/alicatlib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
pool.close() waits for in-flight queries to drain. Capped
at :attr:PostgresConfig.close_timeout_s so a wedged query
cannot wedge shutdown — on timeout the pool is forcibly
terminated via :meth:Pool.terminate.
Source code in src/alicatlib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect the table.
Idempotent. When create_table=False (the default), the
target's columns are read on open and the schema is locked
immediately. When create_table=True the lock happens
lazily on the first :meth:write_many.
Source code in src/alicatlib/sinks/postgres.py
write_many
async
¶
Append samples — one COPY (or executemany) per call.
Source code in src/alicatlib/sinks/postgres.py
SampleSink ¶
Bases: Protocol
Minimal shape of an acquisition sink.
Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:
await sink.open()— allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.await sink.write_many(samples)— one or more times.samplesis a :class:~collections.abc.Sequenceso the sink knows the full batch up front (useful for CSV column inference, Parquet row groups, Postgres parameterised inserts).await sink.close()— flush and release the handle. Idempotent.
The async context-manager methods provide a async with sink:
shape for the common case of "open → write → close" in one block.
__aenter__
async
¶
__aexit__
async
¶
close
async
¶
open
async
¶
write_many
async
¶
Append samples to the sink.
Sequence (not Iterable) because every in-tree sink wants
len() — CSV schema inference, batched parameterised inserts,
Parquet row-group bookkeeping.
Source code in src/alicatlib/sinks/base.py
SqliteSink ¶
SqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination SQLite file. Created on :meth: |
table |
str
|
Target table name. |
columns |
tuple[ColumnSpec, ...] | None
|
The locked :class: |
Source code in src/alicatlib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/alicatlib/sinks/sqlite.py
open
async
¶
Open the SQLite connection, apply PRAGMAs, and introspect the target.
Idempotent: calling :meth:open on an already-open sink is a
no-op. Runs in a worker thread because sqlite3.connect and
PRAGMA execution are blocking I/O.
Source code in src/alicatlib/sinks/sqlite.py
write_many
async
¶
Append samples as rows in a single transaction.
On the first call (when create_table=True), infers the
schema from the batch and runs CREATE TABLE IF NOT EXISTS.
Subsequent calls insert directly. All values pass through
? placeholders — never string-formatted into SQL.
Source code in src/alicatlib/sinks/sqlite.py
pipe
async
¶
Drain stream into sink with buffered flushes.
Reads per-tick batches from the recorder and accumulates the
individual :class:Sample\ s into a list. A flush happens when
either threshold is first crossed:
- the buffer reaches
batch_sizesamples, or flush_intervalseconds have elapsed since the last flush.
On stream exhaustion any leftover buffer is flushed before the summary is returned.
The samples_late / max_drift_ms fields on the returned
summary stay at zero here — those are recorder-layer concepts.
The recorder emits its own summary via the alicatlib.streaming
logger on CM exit; this summary is the sink-side view.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[Mapping[str, Sample]]
|
The async iterator yielded by
:func: |
required |
sink
|
SampleSink
|
Any :class: |
required |
batch_size
|
int
|
Buffer threshold in samples (not batches). Defaults
to |
64
|
flush_interval
|
float
|
Time threshold in seconds between flushes. Wall-clock only, not anyio-clock — sinks care about persistence freshness, not scheduling precision. |
1.0
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
AcquisitionSummary
|
class: |
AcquisitionSummary
|
the count actually handed to the sink. |
Raises:
| Type | Description |
|---|---|
ValueError
|
On non-positive |
Source code in src/alicatlib/sinks/base.py
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | |
sample_to_row ¶
Flatten a :class:Sample into a single row dict for tabular sinks.
Schema layout (stable across samples):
device— manager-assigned name.unit_id— bus-level single-letter id.requested_at/received_at/midpoint_at— ISO 8601.latency_s— poll round-trip, seconds.- frame fields — everything from :meth:
DataFrame.as_dictexcept the frame's ownreceived_at(superseded by the sample-level value so all rows have the samereceived_atsemantics). status— comma-joined sorted status codes (empty string when no flags active), from :meth:DataFrame.as_dict.
The frame's own received_at is dropped so the row's received_at
consistently means "recorder-observed reply time" across rows —
otherwise multi-device rows would mix frame-level and sample-level
timings.
Source code in src/alicatlib/sinks/base.py
Base protocol and driver¶
alicatlib.sinks.base ¶
Sink Protocol, sample → row helper, and the pipe() driver.
A :class:SampleSink is the minimal shape the recorder's downstream
consumer needs: :meth:open, :meth:write_many, :meth:close, and
the matching async context-manager methods. The in-tree sinks
(:class:~alicatlib.sinks.memory.InMemorySink,
:class:~alicatlib.sinks.csv.CsvSink,
:class:~alicatlib.sinks.jsonl.JsonlSink) all satisfy this Protocol;
third-party sinks (Parquet, Postgres, Kafka, …) can slot in without
touching library code.
:func:pipe is the v1 acquisition glue. It reads per-tick batches out
of the recorder's receive stream, buffers them up to batch_size
(or flush_interval seconds, whichever comes first), and calls
sink.write_many to flush. On stream exhaustion it drains any
remaining buffer and returns an :class:AcquisitionSummary with
samples_emitted reflecting the count actually handed to the sink.
Design reference: docs/design.md §5.15.
SampleSink ¶
Bases: Protocol
Minimal shape of an acquisition sink.
Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:
await sink.open()— allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.await sink.write_many(samples)— one or more times.samplesis a :class:~collections.abc.Sequenceso the sink knows the full batch up front (useful for CSV column inference, Parquet row groups, Postgres parameterised inserts).await sink.close()— flush and release the handle. Idempotent.
The async context-manager methods provide a async with sink:
shape for the common case of "open → write → close" in one block.
__aenter__
async
¶
__aexit__
async
¶
close
async
¶
open
async
¶
write_many
async
¶
Append samples to the sink.
Sequence (not Iterable) because every in-tree sink wants
len() — CSV schema inference, batched parameterised inserts,
Parquet row-group bookkeeping.
Source code in src/alicatlib/sinks/base.py
pipe
async
¶
Drain stream into sink with buffered flushes.
Reads per-tick batches from the recorder and accumulates the
individual :class:Sample\ s into a list. A flush happens when
either threshold is first crossed:
- the buffer reaches
batch_sizesamples, or flush_intervalseconds have elapsed since the last flush.
On stream exhaustion any leftover buffer is flushed before the summary is returned.
The samples_late / max_drift_ms fields on the returned
summary stay at zero here — those are recorder-layer concepts.
The recorder emits its own summary via the alicatlib.streaming
logger on CM exit; this summary is the sink-side view.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[Mapping[str, Sample]]
|
The async iterator yielded by
:func: |
required |
sink
|
SampleSink
|
Any :class: |
required |
batch_size
|
int
|
Buffer threshold in samples (not batches). Defaults
to |
64
|
flush_interval
|
float
|
Time threshold in seconds between flushes. Wall-clock only, not anyio-clock — sinks care about persistence freshness, not scheduling precision. |
1.0
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
AcquisitionSummary
|
class: |
AcquisitionSummary
|
the count actually handed to the sink. |
Raises:
| Type | Description |
|---|---|
ValueError
|
On non-positive |
Source code in src/alicatlib/sinks/base.py
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | |
sample_to_row ¶
Flatten a :class:Sample into a single row dict for tabular sinks.
Schema layout (stable across samples):
device— manager-assigned name.unit_id— bus-level single-letter id.requested_at/received_at/midpoint_at— ISO 8601.latency_s— poll round-trip, seconds.- frame fields — everything from :meth:
DataFrame.as_dictexcept the frame's ownreceived_at(superseded by the sample-level value so all rows have the samereceived_atsemantics). status— comma-joined sorted status codes (empty string when no flags active), from :meth:DataFrame.as_dict.
The frame's own received_at is dropped so the row's received_at
consistently means "recorder-observed reply time" across rows —
otherwise multi-device rows would mix frame-level and sample-level
timings.
Source code in src/alicatlib/sinks/base.py
In-memory (test-only)¶
alicatlib.sinks.memory ¶
In-memory sink — collects :class:Sample\ s in a list for tests.
The one bit of value over "write your own list" is that
:class:InMemorySink satisfies the :class:SampleSink Protocol, so
acquisition tests can run the same pipe() call path a production
sink uses and then inspect the captured samples after the fact.
Design reference: docs/design.md §5.15.
InMemorySink ¶
Collect every written :class:Sample in a single list.
:attr:samples is appended to (never re-assigned) so callers can
hold a reference across the sink's lifecycle. :meth:close does
not clear the buffer — the point of this sink is post-run
inspection.
Source code in src/alicatlib/sinks/memory.py
CSV¶
alicatlib.sinks.csv ¶
CSV sink — stdlib :mod:csv, schema locked at first batch.
:class:CsvSink writes one row per :class:Sample. The column order
is fixed the first time :meth:write_many is called — inferred from
the first sample's :func:sample_to_row output — and stays stable
for the rest of the run. Unknown columns that appear in later
samples (e.g. a newly-hot-plugged device whose frame format carries
an extra field) are dropped with a WARN log rather than silently
reshaping the file; if the shape changes mid-run the caller almost
always wants to know.
Stdlib-only — the core install pulls in no CSV dependencies. Design
reference: docs/design.md §5.15.
CsvSink ¶
Append-only CSV writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination file. Created or overwritten on :meth: |
columns |
tuple[str, ...] | None
|
Locked column order after the first :meth: |
Source code in src/alicatlib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing. Overwrites any existing file.
write_many
async
¶
Append samples as CSV rows.
On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.
Source code in src/alicatlib/sinks/csv.py
JSONL¶
alicatlib.sinks.jsonl ¶
JSONL sink — stdlib :mod:json, one object per line, no schema lock.
:class:JsonlSink writes one JSON object per :class:Sample. Unlike
:class:~alicatlib.sinks.csv.CsvSink, it doesn't lock a schema — each
row stands alone, so a device whose frame format carries an extra
field simply emits a wider object without affecting earlier or
later rows.
Stdlib-only — the core install pulls in no JSON dependencies. Design
reference: docs/design.md §5.15.
JsonlSink ¶
Append-only JSONL writer — one flattened sample per line.
The on-disk format is <sample-row-as-json>\n per sample;
reading back is just [json.loads(line) for line in f]. No
header, no schema declaration, no framing overhead beyond the
newline.
Source code in src/alicatlib/sinks/jsonl.py
close
async
¶
open
async
¶
Open the JSONL file for writing. Overwrites any existing file.
write_many
async
¶
Serialise each sample as one JSON object per line.
Source code in src/alicatlib/sinks/jsonl.py
SQLite (stdlib)¶
alicatlib.sinks.sqlite ¶
SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.
:class:SqliteSink writes one row per :class:Sample into a local
SQLite file. The sink is core (no extra required) because sqlite3
ships with the Python standard library.
The sqlite3 driver is synchronous; the sink calls it through
:func:anyio.to_thread.run_sync so the event loop stays responsive.
Because every write hops into a worker thread anyway, there's no
advantage to taking on aiosqlite as a dependency — stdlib delivers
the same latency profile with one fewer package.
Best-practice defaults baked in:
journal_mode=WAL+synchronous=NORMAL— the recommended pairing for write-heavy workloads; durable against crashes, significantly faster than the default.busy_timeout=5000ms so brief lock contention retries transparently instead of raisingOperationalError.- One
BEGIN IMMEDIATE…COMMITtransaction perwrite_many, so a batch of N rows is one fsync rather than N. - SQL identifiers (table name) validated against
^[A-Za-z_][A-Za-z0-9_]{0,62}$. Values are always passed as?parameters — never string-formatted.
Schema evolution mirrors the other tabular sinks: column set locked on
the first batch (via :class:~alicatlib.sinks._schema.SchemaLock),
unknown columns dropped with a one-shot WARN. When create_table=True
(default), the table is created on first batch from the inferred
:class:~alicatlib.sinks._schema.ColumnSpec list. When
create_table=False, the target table's columns are read from
PRAGMA table_info and used as the schema.
Design reference: docs/design.md §5.15.
SqliteSink ¶
SqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination SQLite file. Created on :meth: |
table |
str
|
Target table name. |
columns |
tuple[ColumnSpec, ...] | None
|
The locked :class: |
Source code in src/alicatlib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/alicatlib/sinks/sqlite.py
open
async
¶
Open the SQLite connection, apply PRAGMAs, and introspect the target.
Idempotent: calling :meth:open on an already-open sink is a
no-op. Runs in a worker thread because sqlite3.connect and
PRAGMA execution are blocking I/O.
Source code in src/alicatlib/sinks/sqlite.py
write_many
async
¶
Append samples as rows in a single transaction.
On the first call (when create_table=True), infers the
schema from the batch and runs CREATE TABLE IF NOT EXISTS.
Subsequent calls insert directly. All values pass through
? placeholders — never string-formatted into SQL.
Source code in src/alicatlib/sinks/sqlite.py
Parquet (alicatlib[parquet])¶
alicatlib.sinks.parquet ¶
Parquet sink — :mod:pyarrow, schema locked, zstd by default.
:class:ParquetSink writes one row per :class:Sample into a single
Parquet file. pyarrow is an optional dependency behind
alicatlib[parquet]; the import is deferred to :meth:open so
instantiating the sink succeeds on bare-core installs and
:class:~alicatlib.errors.AlicatSinkDependencyError is raised only
when the user actually tries to open the file.
Best-practice defaults baked in:
- zstd compression. It matches or beats snappy on write/read speed with ~30% better ratios and is fully supported across pyarrow ≥ 2, Spark, DuckDB, Polars, and pandas ≥ 1.3. Snappy and gzip remain available for compatibility with readers that don't support zstd.
- Dictionary encoding on for string columns (pyarrow default; surfaced as a knob so callers that know their cardinality is high can disable).
- One row group per :meth:
write_many. Aligns durability with batch cadence — a crash mid-run loses at most the current batch. Callers that want fewer, bigger row groups can passrow_group_size.
Schema evolution mirrors the other tabular sinks: the column set is
locked on the first batch (via
:class:~alicatlib.sinks._schema.SchemaLock). Unknown columns that
appear in later batches are dropped with a one-shot WARN. Adding a
new column mid-file would require rewriting the whole file, so it is
deliberately not supported.
Durability caveat: Parquet files are not readable until the footer
is flushed on :meth:close. If the process is killed mid-run you
will get a file with no usable footer. The recommended shutdown path
is the recorder's structured exit, which always reaches the sink's
async-context-manager __aexit__ and runs :meth:close.
Design reference: docs/design.md §5.15.
ParquetSink ¶
Append-only Parquet writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination Parquet file. |
compression |
_Compression
|
Codec applied to every row group. |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/alicatlib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/alicatlib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory. Idempotent.
The actual :class:pyarrow.parquet.ParquetWriter is opened
lazily on the first :meth:write_many call, because the
writer requires a concrete schema — which we don't have until
the first batch is inspected.
Source code in src/alicatlib/sinks/parquet.py
write_many
async
¶
Append samples as one Parquet row group.
On first call: infers the schema from the batch, locks it,
constructs the matching :mod:pyarrow schema, and opens the
underlying :class:~pyarrow.parquet.ParquetWriter.
Subsequent calls project each row onto the locked schema and
append the rows as a new row group. Unknown columns are
dropped with one-shot WARN (handled by :class:SchemaLock).
Source code in src/alicatlib/sinks/parquet.py
PostgreSQL (alicatlib[postgres])¶
alicatlib.sinks.postgres ¶
PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.
:class:PostgresSink writes one row per :class:Sample into a
PostgreSQL table. asyncpg is an optional dependency behind
alicatlib[postgres]; the import is deferred to :meth:open so
instantiation works on bare-core installs and
:class:~alicatlib.errors.AlicatSinkDependencyError is raised only
when the user actually tries to open a connection.
Best-practice defaults baked in:
- Binary COPY via :meth:
asyncpg.Connection.copy_records_to_table. COPY is ~5–10× faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copytoFalseto fall back to a preparedexecutemany. - Connection pool via :func:
asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases, so the pool stays available for concurrent work. - Identifier validation on
schemaandtable(strict regex). Every value passes through$Nplaceholders — never string-formatted into SQL. - Credential scrubbing — log lines describe the target via
:meth:
PostgresConfig.target, which only rendershost:port/db.schema.table; the DSN (and any embedded password) is never written to a log record. statement_timeoutapplied as a server setting so a wedged query cannot block the acquisition loop forever.
Schema evolution mirrors the other tabular sinks (design §5.15). The
default create_table=False reads the target table's columns from
information_schema.columns on open and locks the schema to that
set. Passing create_table=True switches to first-batch inference
and runs CREATE TABLE IF NOT EXISTS — convenient for quick runs,
but the user gives up type control (everything text-like becomes
TEXT rather than timestamptz etc.).
Design reference: docs/design.md §5.15, §5.18.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
connect_timeout_s=30.0,
close_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
connect_timeout_s |
float
|
Cap on initial pool establishment in
:meth: |
close_timeout_s |
float
|
Cap on :meth: |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/alicatlib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer using pooled asyncpg connections.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
PostgresConfig
|
Frozen :class: |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/alicatlib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
pool.close() waits for in-flight queries to drain. Capped
at :attr:PostgresConfig.close_timeout_s so a wedged query
cannot wedge shutdown — on timeout the pool is forcibly
terminated via :meth:Pool.terminate.
Source code in src/alicatlib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect the table.
Idempotent. When create_table=False (the default), the
target's columns are read on open and the schema is locked
immediately. When create_table=True the lock happens
lazily on the first :meth:write_many.
Source code in src/alicatlib/sinks/postgres.py
write_many
async
¶
Append samples — one COPY (or executemany) per call.