Channel samples (parquet)¶
Audience: analysts reading channel samples in polars, pyarrow, or pandas; anyone writing a binding-aware downstream tool.
Scope: the scalars.parquet schema — every column, its dtype, units, the time-base contract, and the round-trip rules for non-float values.
scalars.parquet is the long-format channel-sample table: one row per (channel, t_mono_ns). Every adapter's emissions are projected through their declared ChannelBinding and land in this single file. This is the file post-binding analysis should hit. The per-adapter files under device_records/ are the unprocessed safety net — useful when a binding misfires or you need the library-native field that didn't get promoted, but for normal "what was the specimen temperature 12 seconds after ignition" queries, you want scalars.parquet.
Schema at a glance¶
The schema is locked at module load in channel_samples_sink.py:_arrow_schema():
pa.schema([
pa.field("t_mono_ns", pa.int64(), nullable=False),
pa.field("t_mono_s", pa.float64(), nullable=False),
pa.field("channel", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("value", pa.float64(), nullable=False),
pa.field("value_kind", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("raw_value", pa.float64(), nullable=True),
pa.field("raw_text", pa.string(), nullable=True),
pa.field("raw_kind", pa.dictionary(pa.int32(), pa.string()), nullable=True),
pa.field("unit", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("uncertainty", pa.float64(), nullable=True),
pa.field("status", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("source_record_id", pa.string(), nullable=True),
pa.field("source_field", pa.string(), nullable=True),
])
Thirteen columns, dictionary-encoded where cardinality is low, no nested types. The file is sorted by t_mono_ns ascending and written with zstd compression at level 6 in 262,144-row row groups (see finalize.py).
Column-by-column reference¶
| Column | Type | Nullable | Notes |
|---|---|---|---|
t_mono_ns |
int64 |
no | Canonical persisted time base. Nanoseconds since RunClock.started_mono_ns — the run-relative monotonic frame. Primary sort key on disk. |
t_mono_s |
float64 |
no | Convenience derived value (t_mono_ns / 1e9). Redundant with t_mono_ns but cheap and human-readable for plot axes. |
channel |
dictionary<int32, string> |
no | Channel name as declared in ChannelSpec.name (e.g. tc_specimen, mfc_n2). Dictionary-encoded — see Dictionary encoding. |
value |
float64 |
no | Calibrated, dimensioned value in unit. Always populated — bool channels store 0.0/1.0, int channels store the integer cast to float. See round-tripping. |
value_kind |
dictionary<int32, string> |
no | One of "float", "int", "bool". The type tag needed to round-trip non-float channels out of the float64 value column. |
raw_value |
float64 |
yes | Pre-calibration numeric value when ChannelSpec.keep_raw is set and the raw value is numeric. Null when raw is text or not retained. |
raw_text |
string |
yes | Pre-calibration text value when the source field is a string (status words, mode tags). Null for numeric or absent raws. |
raw_kind |
dictionary<int32, string> |
yes | One of "float", "int", "bool", "str", or null when no raw was retained. Mirrors value_kind for the raw column. |
unit |
dictionary<int32, string> |
no | Canonicalized output unit (e.g. "degC", "sccm", "Pa"). Matches ChannelSpec.derived_unit when set, otherwise ChannelSpec.unit. |
uncertainty |
float64 |
yes | Absolute uncertainty in unit. Populated only when the channel's calibration declares an UncertaintySpec. |
status |
dictionary<int32, string> |
no | Health flag. "ok" is the success path; adapters also emit "underrange", "overload", "sensor_fail", "comm_error". The set is open-ended — sinks store it verbatim, downstream filters should treat unknown values as "non-ok". |
source_record_id |
string |
yes | Back-pointer to the SourceRecord.record_id this sample was derived from. Join key into device_records/<adapter>.parquet. |
source_field |
string |
yes | Which field of the source record was projected. Library-native name — Alicat Mass_Flow, Watlow process_value, NI-DAQ ai0. |
Notably not present:
- No
t_utccolumn. Wall-clock recovery is a post-hoc operation:t_utc = manifest.started_utc + Timedelta(t_mono_ns, "ns"). See The two time bases. - No
metadatacolumn. The PydanticChannelSamplemodel carries ametadata: dict[str, Any]field for in-memory routing; it is not persisted to the parquet. If a binding needs to carry context to the bundle, route it throughsource_fieldor emit a device event.
The two time bases¶
t_mono_ns is the canonical time base. It comes from time.monotonic_ns() on the conductor host, offset so that t_mono_ns == 0 corresponds to RunClock.started_mono_ns (the monotonic instant the run armed). It is immune to wall-clock corrections — NTP step events, manual clock changes, and DST transitions cannot perturb it. Two samples 100 ms apart on a wall clock with a mid-interval NTP correction are still exactly 100 ms apart in t_mono_ns.
t_mono_s is the same value in seconds as a float64. It's a convenience for plotting and human inspection. The float64 mantissa loses sub-microsecond precision past about 100 seconds of run time, so never use t_mono_s as a join key — use t_mono_ns.
Wall-clock recovery, when you need it for log correlation:
import polars as pl
from datetime import datetime, timezone
manifest_started_utc = datetime.fromisoformat(manifest["started_utc"]) # from manifest.json
df = df.with_columns(
t_utc=manifest_started_utc + pl.duration(nanoseconds=pl.col("t_mono_ns"))
)
manifest.started_utc is the wall-clock anchor; t_mono_ns is the run-relative offset. Both live in manifest.json. Do not use t_utc for alignment math across files in the bundle — every other parquet/sqlite file in the bundle uses t_mono_ns for exactly the immunity-to-NTP reason described above. Events, video frames, and device records all align on t_mono_ns; t_utc is for "what time of day did this happen" only.
Round-tripping non-float values¶
value is always float64, even for bool and int channels. The value_kind column carries the original type so analysts can recover it:
| Original | value |
value_kind |
raw_text |
|---|---|---|---|
42.7 (float) |
42.7 |
"float" |
null |
7 (int) |
7.0 |
"int" |
null |
True (bool) |
1.0 |
"bool" |
null |
"running" (text raw) |
NaN |
"float" |
(raw_text holds the string when keep_raw) |
Recovery in polars:
import polars as pl
bool_view = (
df.filter(pl.col("value_kind") == "bool")
.with_columns(pl.col("value").cast(pl.Boolean).alias("bool_value"))
)
Why a single-typed value column instead of a union or struct? The dominant query path on this file is "all rows for one channel, plotted against time" — that path benefits massively from a rectangular, scan-friendly value column. Bool and int channels are a small minority (limit-switch states, mode codes); paying one extra dictionary-encoded column for them keeps the hot path simple. Analysts who never touch a bool channel can ignore value_kind entirely.
For text-valued raw fields (Alicat gas-name strings, Watlow alarm-state words), raw_text carries the original string and raw_kind = "str". The numeric value is whatever the binding chose to project — typically NaN, sometimes a coded integer.
Dictionary encoding¶
Five columns are dictionary-encoded: channel, value_kind, raw_kind, unit, status. The pattern is consistent — each is a low-cardinality string column riding alongside a high row count. A 1-hour run at 60 Hz across 30 channels produces ~6.5 M rows; storing "tc_specimen" as a 4-byte dictionary index instead of a 12-byte string per row collapses tens of megabytes to a single dictionary entry.
Practical implications:
- polars reads these as
pl.Categorical. Filters likepl.col("channel") == "tc_specimen"are O(dict-size) on the dictionary, not O(rows). - pyarrow exposes them as
pa.dictionary(pa.int32(), pa.string()). Use.dictionary_decode()if you need the materialized string column (rarely necessary). - pandas via
pyarrowreads them asCategoricalDtype. Group-by and value-counts are fast; arbitrary string ops require.astype(str)first.
The dictionary indices are scoped per row group. Cross-row-group queries reconstruct the dictionary transparently.
How rows get here¶
The path from device to row:
- An adapter emits a
SourceRecord. One of four shapes — scalar, vector, block, or event. The record carries library-native field names (e.g. Alicat returns a row withMass_Flow,Pressure,Setpoint). - The configured
ChannelBindingprojects record fields intoChannelSampleinstances. A single source record can produce zero, one, or many channel samples — one per bound field. The binding handles calibration, unit conversion, and uncertainty. - The conductor routes the samples to the
ChannelSamplesSink. The sink stages rows in per-column lists and flushes to the in-flight Arrow IPC stream (scalars.in-flight.arrows) everyINFLIGHT_FLUSH_ROWSsamples. Each flush is fsynced — power loss leaves the last completed batch on disk. - At finalize,
_rewrite_inflight_to_parquet()rewrites the stream toscalars.parquet. The rewrite sorts the whole table byt_mono_nsascending, then writes parquet withcompression=zstd,compression_level=6,row_group_size=262_144, anddata_page_version="2.0". The in-flight file is removed once the rewrite verifies.
If the in-flight stream is torn before its first flush boundary (i.e. crash before any complete batch was written), the rewrite returns False and the bundle finalizes without a scalars.parquet. See integrity and sealing for what that looks like in the manifest.
Reading recipe — minimum viable¶
import polars as pl
df = pl.read_parquet("runs/<run_id>/scalars.parquet")
tc_t = (
df.filter(pl.col("channel") == "tc_specimen")
.select(["t_mono_s", "value"])
.sort("t_mono_s")
)
That's the load-and-extract-one-channel recipe. For wider/pivoted queries (multiple channels side by side, resampling to a common time grid, joining with events or video frames), see Reading a bundle.
Implementation notes for contributors¶
- Schema is locked at
channel_samples_sink.py:_arrow_schema(). Adding, removing, renaming, or retyping a column requires abundle_schema_versionbump and a migration entry — see Bundle versioning. - The schema is constructed once at module import and assigned to
CHANNEL_SAMPLES_SCHEMA. Sink instances do not own their own schema; they reference the module-level one. Tests that need to monkey-patch should patch the module attribute, not pass a custom schema. valueisfloat64even for bool channels. Do not "optimize" this to a union or sparse-column representation without coordination — every downstream reader assumes the rectangularvaluecolumn. See round-tripping for the design rationale.- The sink writes Arrow IPC during the run, not Parquet. The streaming format is
scalars.in-flight.arrows— a record-batch-per-flush Arrow IPC stream that survives power loss at batch boundaries. The Parquet form only materializes at finalize. This is the v1 → v2 schema-bump landmark (see bundle versioning); pre-v2 bundles wrote Parquet incrementally and had worse crash semantics. - Sort-by-
t_mono_nshappens in the rewrite, not at write time. In-flight order is insertion order, which is roughly sorted but not guaranteed monotonic across concurrent producers (two adapters flushing samples with overlapping timestamps can interleave). Any code reading the in-flight file directly (e.g. live-viewing tools) must not assume monotonic time. metadataon the in-memoryChannelSampleis not persisted. It exists for in-process routing only. If a binding needs to attach context that survives to the bundle, it must do so throughsource_field, a customstatusvalue, or a separate device event.
See also: Device records (parquet) · Reading a bundle · Manifest and schema · Bundle versioning · Channel bindings