servomexlib.sinks¶
The SampleSink protocol and the first-party sinks (memory / CSV / JSONL / SQLite / Parquet / Postgres).
servomexlib.sinks ¶
Sinks — durable destinations for streamed :class:Sample rows.
The :class:SampleSink Protocol plus :func:pipe (the recorder→sink driver) and
:func:sample_to_row (the long-format flattener). Core sinks are stdlib-only
(memory/csv/jsonl/sqlite); :class:ParquetSink ([parquet]) and
:class:PostgresSink ([postgres]) lazy-import their optional deps in
:meth:open, raising :class:~servomexlib.errors.ServomexSinkDependencyError if
the extra is missing — so importing this package never requires the extras.
CsvSink ¶
Single-run CSV writer with first-batch schema lock.
:meth:open truncates the destination; the first :meth:write_many writes a
fresh header. Cross-run appending is intentionally not supported (a re-open
with a different column shape would silently mismatch) — use
:class:~servomexlib.sinks.jsonl.JsonlSink or
:class:~servomexlib.sinks.sqlite.SqliteSink for append semantics.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination file, created/overwritten on :meth: |
columns |
tuple[str, ...] | None
|
Locked column order after the first flush ( |
Source code in src/servomexlib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing (truncating). Idempotent.
write_many
async
¶
Append samples as CSV rows, locking the header on first call.
Source code in src/servomexlib/sinks/csv.py
InMemorySink ¶
Collect every written :class:Sample in a single list.
:attr:samples is appended to (never re-assigned), and :meth:close does not
clear it — the point is post-run inspection.
Source code in src/servomexlib/sinks/memory.py
JsonlSink ¶
Append-only JSONL writer — one flattened sample per line.
On-disk format is <sample-row-as-json>\n; read back with
[json.loads(line) for line in f]. Re-opening the same path extends it.
Source code in src/servomexlib/sinks/jsonl.py
close
async
¶
open
async
¶
Open the JSONL file for writing in append mode. Idempotent.
write_many
async
¶
Serialise each sample as one JSON object per line.
Source code in src/servomexlib/sinks/jsonl.py
ParquetSink ¶
Append-only Parquet writer with first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination Parquet file. |
compression |
_Compression
|
Codec applied to every row group. |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/servomexlib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/servomexlib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory. Idempotent.
The ParquetWriter itself opens lazily on the first
:meth:write_many, when the concrete schema is known.
Source code in src/servomexlib/sinks/parquet.py
write_many
async
¶
Append samples as one Parquet row group.
Source code in src/servomexlib/sinks/parquet.py
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set must be
provided (mutually exclusive). Credentials are never logged.
target ¶
Return a log-safe host:port/db.schema.table (no password).
Source code in src/servomexlib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer using pooled asyncpg connections.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
PostgresConfig
|
Frozen :class: |
columns |
tuple[ColumnSpec, ...] | None
|
Locked columns in order, or |
Source code in src/servomexlib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
Source code in src/servomexlib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect the table.
Source code in src/servomexlib/sinks/postgres.py
write_many
async
¶
Append samples — one COPY (or executemany) per call.
Source code in src/servomexlib/sinks/postgres.py
SampleSink ¶
Bases: Protocol
Minimal shape of an acquisition sink.
Concrete sinks own their storage handle and typically follow:
await sink.open()— allocate file descriptors / DB connections.await sink.write_many(samples)— one or more times.await sink.close()— flush and release (idempotent).
The async context-manager methods give the async with sink: shape.
SqliteSink ¶
SqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and first-batch schema lock.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Path
|
Destination SQLite file, created on :meth: |
table |
str
|
Target table name (validated). |
columns |
tuple[ColumnSpec, ...] | None
|
The locked :class: |
Source code in src/servomexlib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/servomexlib/sinks/sqlite.py
open
async
¶
Open the connection, apply PRAGMAs, and introspect the target. Idempotent.
Source code in src/servomexlib/sinks/sqlite.py
write_many
async
¶
Append samples as rows in a single transaction (parameterised).
Source code in src/servomexlib/sinks/sqlite.py
pipe
async
¶
Drain stream into sink with buffered flushes.
Reads per-tick batches from the recorder and accumulates individual
:class:Sample\ s. A flush happens when the buffer reaches batch_size or
flush_interval seconds have elapsed since the last flush, whichever first.
On stream exhaustion any leftover buffer is flushed before returning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[Sequence[Sample]]
|
The async iterator yielded by :func: |
required |
sink
|
SampleSink
|
Any :class: |
required |
batch_size
|
int
|
Buffer threshold in samples (not batches). |
64
|
flush_interval
|
float
|
Time threshold in seconds between flushes. |
1.0
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
AcquisitionSummary
|
class: |
AcquisitionSummary
|
handed to the sink (the sink-side view; |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/servomexlib/sinks/base.py
sample_to_row ¶
Flatten a :class:Sample into a single row dict for tabular sinks.
Long-format schema (one row per channel read), stable across all in-tree
sinks. Error rows (a dropped/corrupt frame — :attr:Sample.error set,
:attr:Sample.reading None) carry None channel fields and a string
error so a resync is still recorded rather than lost.
The sample's raw payload is intentionally not in the row: bytes do not
fit CSV / JSONL / SQLite affinities. Callers needing raw consume the
:class:Sample directly via :class:~servomexlib.sinks.memory.InMemorySink.