nidaqlib.sinks¶
nidaqlib.sinks ¶
Data sinks for DaqReading / DaqSample / DaqBlock outputs.
Three Protocols (one per record shape) and two pipe drivers — design
doc §14.1. The row-oriented sinks (CSV, JSONL) refuse :class:DaqBlock
by default; pass accept_blocks=True to opt into per-sample
scalarisation via :func:block_to_long_rows.
BlockSink ¶
Bases: Protocol
Sink that consumes one :class:DaqBlock per call.
A block is already (n_channels, n_samples) — wrapping it in a
sequence per call would burn allocations in the hot path. Sinks that
need scalar rows opt in via :func:block_to_long_rows.
CsvSink ¶
Append-only CSV writer with first-batch schema lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination file. Created or overwritten on :meth: |
required |
accept_blocks
|
bool
|
When |
False
|
Source code in src/nidaqlib/sinks/csv.py
close
async
¶
Flush and close the CSV file. Idempotent.
open
async
¶
Open the CSV file for writing. Overwrites any existing file.
Source code in src/nidaqlib/sinks/csv.py
write
async
¶
Refuse blocks unless accept_blocks=True was set on construction.
With accept_blocks=True, per-(channel, sample) rows are emitted
via :func:block_to_long_rows. The cost of this opt-in is up to
n_channels * samples_per_channel rows per block.
Source code in src/nidaqlib/sinks/csv.py
write_many
async
¶
Append :class:DaqReading or :class:DaqSample rows.
Sniffs the first item's type to choose the row helper.
Source code in src/nidaqlib/sinks/csv.py
InMemorySink ¶
Collect every written record in a per-shape list.
:attr:readings / :attr:samples / :attr:blocks are appended to
(never re-assigned). :meth:close does not clear the buffers — the
point of this sink is post-run inspection.
Source code in src/nidaqlib/sinks/memory.py
close
async
¶
open
async
¶
write
async
¶
Append one :class:DaqBlock to the block buffer.
write_many
async
¶
Append every item to the matching per-shape buffer.
Sniffs the first item's type to dispatch (the sequence is required
to be homogeneous by the :class:SampleSink / :class:ReadingSink
Protocols). An empty sequence is a no-op.
Source code in src/nidaqlib/sinks/memory.py
JsonlSink ¶
Append-only JSONL writer — one flattened record per line.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination file. Created or overwritten on :meth: |
required |
accept_blocks
|
bool
|
When |
False
|
Source code in src/nidaqlib/sinks/jsonl.py
close
async
¶
Flush and close the JSONL file. Idempotent.
open
async
¶
Open the JSONL file for writing. Overwrites any existing file.
Source code in src/nidaqlib/sinks/jsonl.py
write
async
¶
Refuse blocks unless accept_blocks=True.
Source code in src/nidaqlib/sinks/jsonl.py
write_many
async
¶
Serialise each record as one JSON object per line.
Source code in src/nidaqlib/sinks/jsonl.py
ParquetSink ¶
Parquet writer with first-write shape lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination Parquet file. |
required |
compression
|
_Compression
|
Codec for every row group. |
'zstd'
|
use_dictionary
|
bool
|
Dictionary encoding for string columns. |
True
|
row_group_size
|
int | None
|
Optional max rows per row group. |
None
|
Source code in src/nidaqlib/sinks/parquet.py
close
async
¶
Flush the footer and close the writer. Idempotent.
Source code in src/nidaqlib/sinks/parquet.py
open
async
¶
Load pyarrow and create the parent directory.
The :class:pyarrow.parquet.ParquetWriter itself is opened lazily
on the first write — we don't have the schema until then.
Source code in src/nidaqlib/sinks/parquet.py
write
async
¶
Append one :class:DaqBlock as a row group of long-format rows.
Long-format layout: one row per (channel, sample). The
block_index / sample_index columns let consumers
re-aggregate efficiently.
Source code in src/nidaqlib/sinks/parquet.py
write_many
async
¶
Append :class:DaqReading or :class:DaqSample rows.
First call locks the schema and the record shape. Mixing shapes
afterwards raises :class:NIDaqSinkSchemaError.
Source code in src/nidaqlib/sinks/parquet.py
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table_readings="readings",
table_samples="samples",
table_blocks="blocks",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_tables=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string. Mutually exclusive with the discrete fields. |
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table_readings |
str
|
Table name for :class: |
table_samples |
str
|
Table name for :class: |
table_blocks |
str
|
Table name for :class: |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. |
create_tables |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description: host:port/db.schema.
Source code in src/nidaqlib/sinks/postgres.py
PostgresSink ¶
Append-only Postgres writer for DAQ readings, samples, and block summaries.
One sink instance routes records to up to three tables, one per
shape (readings / samples / blocks). Each table's column set is
locked on first write (create_tables=True) or read on
:meth:open from information_schema.columns
(create_tables=False, the default).
Source code in src/nidaqlib/sinks/postgres.py
close
async
¶
Close the pool. Idempotent.
Source code in src/nidaqlib/sinks/postgres.py
open
async
¶
Load asyncpg, open the pool, and (optionally) introspect tables.
Idempotent. When create_tables=False (the default), each
target's columns are read on open and the per-shape schemas
locked immediately. When create_tables=True the locks
happen lazily on the first :meth:write_many / :meth:write
of each shape.
Source code in src/nidaqlib/sinks/postgres.py
write
async
¶
Append one :class:DaqBlock as a summary row.
Source code in src/nidaqlib/sinks/postgres.py
write_many
async
¶
Append :class:DaqReading or :class:DaqSample rows.
Source code in src/nidaqlib/sinks/postgres.py
ReadingSink ¶
SampleSink ¶
SqliteSink ¶
SqliteSink(
path,
*,
table_readings="readings",
table_samples="samples",
table_blocks="blocks",
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
)
Append-only SQLite writer with WAL journaling and per-table schema lock.
One sink instance routes records to up to three tables, one per shape (readings / samples / blocks). Each table's column set is locked on its first write; later writes project onto the locked schema and drop unknown columns with a one-shot WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Destination SQLite file. |
required |
table_readings
|
str
|
Table name for :class: |
'readings'
|
table_samples
|
str
|
Table name for :class: |
'samples'
|
table_blocks
|
str
|
Table name for :class: |
'blocks'
|
journal_mode
|
_JournalMode
|
SQLite journal mode pragma. |
'WAL'
|
synchronous
|
_Synchronous
|
SQLite |
'NORMAL'
|
busy_timeout_ms
|
int
|
SQLite busy-wait, in milliseconds. |
5000
|
Source code in src/nidaqlib/sinks/sqlite.py
close
async
¶
Close the connection. Idempotent.
Source code in src/nidaqlib/sinks/sqlite.py
open
async
¶
Open the SQLite connection and apply pragmas.
Source code in src/nidaqlib/sinks/sqlite.py
write
async
¶
Append one :class:DaqBlock as a summary row.
Source code in src/nidaqlib/sinks/sqlite.py
write_many
async
¶
Append :class:DaqReading or :class:DaqSample rows.
Source code in src/nidaqlib/sinks/sqlite.py
block_to_long_rows ¶
Yield one :class:DaqSample per (channel, sample) in block.
Sample timestamps reconstruct from
task_started_at + (first_sample_index + k) / sample_rate_hz
(design doc §8.7). Use this only when a row-oriented sink is the right
target — the natural shape of a hardware-clocked block is rectangular,
and fanning out 8 000 dataclass instances per second has a real cost.
Source code in src/nidaqlib/sinks/base.py
pipe
async
¶
Drain a row stream into a row-oriented sink with buffered flushes.
Reads records from stream and accumulates them into a list. A
flush is triggered when either the buffer reaches batch_size or
flush_interval_s elapses since the last flush. On stream
exhaustion, the leftover buffer is flushed before returning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[DaqReading | DaqSample]
|
Async iterator of records (typically the receive end of a
:func: |
required |
sink
|
ReadingSink | SampleSink
|
An open :class: |
required |
batch_size
|
int
|
Records per flush. |
64
|
flush_interval_s
|
float
|
Wall-clock seconds between flushes. |
1.0
|
Returns:
| Type | Description |
|---|---|
int
|
Total records actually handed to the sink. |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/nidaqlib/sinks/base.py
pipe_blocks
async
¶
Drain a block stream into a :class:BlockSink.
No batching axis — blocks are already batched. flush_interval_s is
accepted for API symmetry with :func:pipe but currently unused; sinks
that need periodic flush can implement their own.
Returns:
| Type | Description |
|---|---|
int
|
Total blocks written. |
Source code in src/nidaqlib/sinks/base.py
reading_to_row ¶
Flatten a :class:DaqReading into a single row dict.
Layout:
device,task— join keys.requested_at/received_at/midpoint_at— ISO 8601.monotonic_ns— int.elapsed_s— float seconds.- one column per channel (
valueskeys), values flattened. - one
<channel>_unitcolumn per channel. error_type/error_message— populated only on error rows.
The same row layout is used by every row-oriented sink.
Source code in src/nidaqlib/sinks/base.py
sample_to_row ¶
Flatten a :class:DaqSample into a single row dict.