Sinks¶
Durable consumers of DaqBlock / DaqReading data. Every sink is an async
context manager satisfying the sink Protocol. See the Sinks guide
for choosing among them and the Raw logging guide for the
loss-proof .dt-raw path.
Sink Protocol and helpers¶
dtollib.sinks.base ¶
Sink Protocols, row helpers, and pipe drivers.
Three Protocols, one per input shape:
- :class:
ReadingSink— accepts :class:DaqReadingsequences. - :class:
BlockSink— accepts one :class:DaqBlockper call. No batching axis; a block is already(n_channels, n_samples). - :class:
RawBlockSink— accepts raw HBUF payloads (:class:RawCountsSink).
Two drivers thread streams to sinks:
- :func:
pipe— row-oriented, batched. - :func:
pipe_blocks— block-native, no batching axis.
Row helpers convert acquisition records into row dicts for the tabular sinks (SQLite / Parquet / Postgres):
- :func:
reading_to_row— flatten :class:DaqReadinginto one row. - :func:
block_to_rows— explicit per-(channel, sample) scalarisation of a :class:DaqBlockinto row dicts. Distinct from :func:~dtollib.tasks.models.block_to_long_rows, which yields typed :class:~dtollib.tasks.models.DaqSampleobjects; this one yields plain dicts ready for a tabular sink. Never invoked automatically.
BlockSink ¶
Bases: Protocol
Sink that consumes one :class:DaqBlock per call.
A block is already (n_channels, n_samples) — wrapping it in a
sequence per call would burn allocations in the hot path. Sinks that
need scalar rows opt in via :func:block_to_rows.
RawBlockSink ¶
Bases: Protocol
Sink that consumes raw HBUF payloads — :class:RawCountsSink only.
Distinct from :class:BlockSink because the raw-counts path carries a
fundamentally different payload (bytes from olDmGetBufferPtr) than
the converted :class:DaqBlock. Typing them as separate Protocols
prevents accidental miswiring at sink-attach time.
Design reference: docs/design.md §15.1.
ReadingSink ¶
block_to_rows ¶
Unroll a :class:DaqBlock into one row per (channel, sample).
Per-sample timestamps reconstruct from block.t_mono_ns,
block.block_period_ns, and block.first_sample_index. When the
block has no clock period, samples are spaced uniformly within the read
window.
Each row carries:
device,task,channel— join keys.block_index,sample_index— block- and task-level indices.t_mono_ns— reconstructed monotonic nanoseconds for this sample.t_utc— reconstructed wall-clock (ISO 8601) for this sample.value— the scalar sample value.unit— engineering unit for the channel (orNone).error_type/error_message— populated only on error blocks.
Distinct from :func:~dtollib.tasks.models.block_to_long_rows (which
yields typed :class:~dtollib.tasks.models.DaqSample objects); this one
yields plain dicts ready for a tabular sink.
Source code in src/dtollib/sinks/base.py
pipe
async
¶
Drain a :class:DaqReading stream into a row-oriented sink with buffered flushes.
Reads records from stream and accumulates them into a list. A flush
is triggered when either the buffer reaches batch_size or
flush_interval_s elapses since the last flush. On stream exhaustion
the leftover buffer is flushed before returning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[DaqReading]
|
Async iterator of :class: |
required |
sink
|
ReadingSink
|
An open :class: |
required |
batch_size
|
int
|
Records per flush. |
64
|
flush_interval_s
|
float
|
Wall-clock seconds between flushes. |
1.0
|
Returns:
| Type | Description |
|---|---|
int
|
Total records actually handed to the sink. |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/dtollib/sinks/base.py
pipe_blocks
async
¶
Drain a block stream into a :class:BlockSink.
No batching axis — blocks are already batched. flush_interval_s is
accepted for API symmetry with :func:pipe but currently unused; sinks
that need a periodic flush implement their own.
Returns:
| Type | Description |
|---|---|
int
|
Total blocks written. |
Source code in src/dtollib/sinks/base.py
reading_to_row ¶
Flatten a :class:DaqReading into a single row dict.
Layout:
device,task— join keys.t_mono_ns— int, canonical monotonic join key.t_utc— ISO 8601, wall-clock acquisition midpoint.t_midpoint_mono_ns— int or None (integration-window midpoint).requested_at/received_at— ISO 8601, I/O provenance.latency_s— float seconds.- one column per channel (
valueskeys), values flattened. - one
<channel>_unitcolumn per channel. error_type/error_message— populated only on error rows.
The same row layout is used by every row-oriented (tabular) sink.
Source code in src/dtollib/sinks/base.py
CSV¶
dtollib.sinks.csv ¶
CSV sink — one row per :class:DaqReading, with optional block support.
Refuses :class:DaqBlock by default to prevent accidental 1-GB CSVs at
10 kHz × 8 channels. Pass accept_blocks=True to enable the
block_to_long_rows per-sample explosion.
Design reference: docs/design.md §15.1.
CsvSink ¶
Write :class:DaqReading rows (and optionally per-sample block rows) to CSV.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Output file path. |
required |
accept_blocks
|
bool
|
When |
False
|
Source code in src/dtollib/sinks/csv.py
close
async
¶
open
async
¶
write
async
¶
Block path — refused by default; explodes per-sample when enabled.
Source code in src/dtollib/sinks/csv.py
write_many
async
¶
Append every reading as one CSV row.
Source code in src/dtollib/sinks/csv.py
JSONL¶
dtollib.sinks.jsonl ¶
JSONL sink — one JSON object per :class:DaqReading (or per sample for blocks).
Refuses :class:DaqBlock by default; accept_blocks=True enables
block_to_long_rows per-sample explosion.
Design reference: docs/design.md §15.1.
JsonlSink ¶
Write one JSON object per line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Output file path. |
required |
accept_blocks
|
bool
|
When |
False
|
Source code in src/dtollib/sinks/jsonl.py
close
async
¶
open
async
¶
write
async
¶
Block path — refused by default; explodes per-sample when enabled.
Source code in src/dtollib/sinks/jsonl.py
write_many
async
¶
Append every reading as one JSON object on its own line.
Source code in src/dtollib/sinks/jsonl.py
Parquet¶
dtollib.sinks.parquet ¶
Parquet sink — :mod:pyarrow, row groups per block, zstd by default.
The preferred sink for hardware-clocked acquisition. One row group per
:meth:write call (one block) — a crash mid-run loses at most the
current block.
Shape-locking. The first call to either :meth:write or :meth:write_many
locks the schema. Mixing record shapes after the first write raises
:class:~dtollib.errors.DtolSinkSchemaError.
Block layout (long-format) — one row per (channel, sample), with
t_mono_ns / t_utc per row reconstructed via
:func:dtollib.sinks.base.block_to_rows.
pyarrow is an optional dependency behind dtollib[parquet]; the import
defers to :meth:open so instantiating the sink succeeds on bare-core
installs.
Design reference: docs/design.md §15.1.
ParquetSink ¶
Parquet writer with first-write shape lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination Parquet file. |
required |
compression
|
_Compression
|
Codec for every row group. |
'zstd'
|
use_dictionary
|
bool
|
Dictionary encoding for string columns. |
True
|
row_group_size
|
int | None
|
Optional max rows per row group. |
None
|
Source code in src/dtollib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/dtollib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory.
The :class:pyarrow.parquet.ParquetWriter itself is opened lazily
on the first write — we don't have the schema until then.
Source code in src/dtollib/sinks/parquet.py
write
async
¶
Append one :class:DaqBlock as a row group of long-format rows.
Long-format layout: one row per (channel, sample). The
block_index / sample_index columns let consumers
re-aggregate efficiently.
Source code in src/dtollib/sinks/parquet.py
write_many
async
¶
Append :class:DaqReading rows.
First call locks the schema and the record shape. Mixing shapes
afterwards raises :class:~dtollib.errors.DtolSinkSchemaError.
Source code in src/dtollib/sinks/parquet.py
SQLite¶
dtollib.sinks.sqlite ¶
SQLite sink — stdlib :mod:sqlite3 + WAL, parameterised executemany.
Accepts :class:DaqReading via :meth:write_many and :class:DaqBlock
via :meth:write — one row per block (summary, no scalarisation).
Readings and block summaries go to separate tables: readings /
blocks by default. Override via the table_* arguments.
The sqlite3 driver is synchronous; calls go through
:func:anyio.to_thread.run_sync so the event loop stays responsive.
Best-practice defaults:
journal_mode=WAL+synchronous=NORMAL.busy_timeout=5000ms.- One
BEGIN IMMEDIATE…COMMITperwrite_many/write. - SQL identifiers validated against
^[A-Za-z_][A-Za-z0-9_]{0,62}$; values always parameterised.
Design reference: docs/design.md §15.1.
SqliteSink ¶
SqliteSink(
path,
*,
table_readings="readings",
table_blocks="blocks",
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and per-table schema lock.
One sink instance routes records to up to two tables, one per shape (readings / blocks). Each table's column set is locked on its first write; later writes project onto the locked schema and drop unknown columns with a one-shot WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination SQLite file. |
required |
table_readings
|
str
|
Table name for :class: |
'readings'
|
table_blocks
|
str
|
Table name for :class: |
'blocks'
|
journal_mode
|
_JournalMode
|
SQLite journal mode pragma. |
'WAL'
|
synchronous
|
_Synchronous
|
SQLite |
'NORMAL'
|
busy_timeout_ms
|
int
|
SQLite busy-wait, in milliseconds. |
5000
|
Source code in src/dtollib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/dtollib/sinks/sqlite.py
open
async
¶
Open the SQLite connection and apply pragmas.
Source code in src/dtollib/sinks/sqlite.py
write
async
¶
Append one :class:DaqBlock as a summary row.
Source code in src/dtollib/sinks/sqlite.py
write_many
async
¶
Append :class:DaqReading rows.
Source code in src/dtollib/sinks/sqlite.py
Postgres¶
dtollib.sinks.postgres ¶
PostgreSQL sink — :mod:asyncpg, COPY by default, parameterised fallback.
:class:PostgresSink writes :class:DaqReading rows via
:meth:write_many, and :class:DaqBlock summary rows via
:meth:write — one row per shape, routed to a per-shape table
(readings / blocks by default). asyncpg is an
optional dependency behind dtollib[postgres]; the import is
deferred to :meth:open so instantiation works on bare-core installs
and :class:~dtollib.errors.DtolSinkDependencyError is raised only
when the user actually tries to open a connection.
Best-practice defaults baked in:
- Binary COPY via :meth:
asyncpg.Connection.copy_records_to_table. COPY is ~5-10x faster than parameterised INSERT for batches and is the recommended asyncpg bulk-ingest path. Callers that run on managed Postgres without COPY privileges can set :attr:PostgresConfig.use_copytoFalseto fall back to a preparedexecutemany. - Connection pool via :func:
asyncpg.create_pool. The pool lifetime equals the sink lifetime; each batch acquires, writes, and releases. - Identifier validation on
schemaand every table name (strict regex). Every value passes through$Nplaceholders — never string-formatted into SQL. - Credential scrubbing — log lines that reference the connection
use :meth:
PostgresConfig.target, which never includes the password. statement_timeoutapplied as a server setting so a wedged query cannot block the acquisition loop forever.
Schema evolution mirrors the other tabular sinks. create_tables=False
reads the target tables' columns from information_schema.columns on
open and locks each per-shape schema to that set. create_tables=True
switches to first-batch inference and runs CREATE TABLE IF NOT EXISTS
per shape.
Design reference: docs/design.md §15.1.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table_readings="readings",
table_blocks="blocks",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_tables=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string. Mutually exclusive with the discrete fields. |
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table_readings |
str
|
Table name for :class: |
table_blocks |
str
|
Table name for :class: |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. |
create_tables |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe URI describing the connection target.
Source code in src/dtollib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer for DAQ readings and block summaries.
One sink instance routes records to up to two tables, one per
shape (readings / blocks). Each table's column set is
locked on first write (create_tables=True) or read on
:meth:open from information_schema.columns
(create_tables=False, the default).
Source code in src/dtollib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
Source code in src/dtollib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect tables.
Idempotent. When create_tables=False (the default), each
target's columns are read on open and the per-shape schemas
locked immediately. When create_tables=True the locks
happen lazily on the first :meth:write_many / :meth:write
of each shape.
Source code in src/dtollib/sinks/postgres.py
write
async
¶
Append one :class:DaqBlock as a summary row.
Source code in src/dtollib/sinks/postgres.py
write_many
async
¶
Append :class:DaqReading rows.
Source code in src/dtollib/sinks/postgres.py
In-memory¶
dtollib.sinks.memory ¶
In-memory sink — collects records in lists for tests and notebooks.
:class:InMemorySink satisfies both sink Protocols
(:class:~dtollib.sinks.base.ReadingSink,
:class:~dtollib.sinks.base.BlockSink). Useful for unit tests, REPL
exploration, and short-run captures.
InMemorySink ¶
Collect every written record in a per-shape list.
:attr:readings / :attr:blocks are appended to (never re-assigned).
:meth:close does not clear the buffers — the point of this sink is
post-run inspection.
Source code in src/dtollib/sinks/memory.py
Raw counts (.dt-raw)¶
dtollib.sinks.raw_counts ¶
:class:RawCountsSink — TDMS-equivalent durable raw-data logger.
The single sink unique to dtollib. Writes the raw int16/int32 buffer
data (not the volt-converted floats) plus a JSON file header and a
per-chunk JSON record header to a .dt-raw v2 file.
Why a custom format instead of TDMS: no third-party dependency, the
format is dead-simple to read in NumPy / MATLAB / any tool
(np.fromfile), and it faithfully preserves what the SDK gave us.
Threading: writes from the drainer thread (the §12.3.2 bridge attaches this sink BEFORE the async stream). Consumer back-pressure does not stop the file from growing.
Design reference: docs/design.md §15.2.
RAW_FORMAT_VERSION
module-attribute
¶
.dt-raw format version — bumped from v1 for per-chunk framing.
RawCountsSink ¶
Writes :class:DaqBlock.raw_codes to a .dt-raw v2 file.
File layout (per docs/design.md §15.2)::
file_header_len:uint32
+ file_header_json:bytes
+ (chunk_record)*
chunk_record = chunk_header_len:uint32
+ chunk_header_json:bytes
+ chunk_payload:bytes
The sink writes synchronously from whatever thread calls write_raw
— the §12.3.2 callback bridge attaches this sink as a passive
observer that runs from the drainer thread, so consumer slowness on
the async path does not slow the file growing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Output file path. Created with |
required |
file_metadata
|
dict[str, Any] | None
|
Optional dict merged into the file header JSON.
Sink adds |
None
|
Source code in src/dtollib/sinks/raw_counts.py
close
async
¶
open
async
¶
Open the file for binary writing. Idempotent.
Source code in src/dtollib/sinks/raw_counts.py
write_raw
async
¶
Append one block's raw-counts payload to the file.
Lazily writes the file header on the first call (when the dtype is known from the block).