alicatlib.sync¶
Sync facade over the async core. Every public async name has a sync parity. See Sync quickstart for usage and Design §5.16 for the portal-based architecture.
Public surface¶
alicatlib.sync ¶
Sync facade over the async core.
Async is canonical; the sync facade wraps it through
:class:SyncPortal so scripts, notebooks, and REPL sessions can drive
devices without await.
Surfaces:
- Device / manager — :class:
Alicat, :class:SyncDevice, :class:SyncFlowMeter, :class:SyncFlowController, :class:SyncPressureMeter, :class:SyncPressureController, :class:SyncAlicatManager(+ :class:ErrorPolicy/ :class:DeviceResultre-exports). - Recording — :func:
record, :func:pipe, :class:AcquisitionSummary, :class:OverflowPolicy. - Sinks — :class:
SyncSinkAdapter+ :class:SyncInMemorySink/ :class:SyncCsvSink/ :class:SyncJsonlSink/ :class:SyncSqliteSink/ :class:SyncParquetSink/ :class:SyncPostgresSink(+ :class:PostgresConfigre-export). - Discovery — :func:
list_serial_ports, :func:probe, :func:find_devices, :class:DiscoveryResult, :data:DEFAULT_DISCOVERY_BAUDRATES. - Portal primitives — :class:
SyncPortal, :func:run_sync.
See docs/design.md §5.16 for the design.
AcquisitionSummary
dataclass
¶
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
Alicat ¶
Namespace for the sync device entry point.
Use :meth:Alicat.open as a context manager:
from alicatlib.sync import Alicat with Alicat.open("/dev/ttyUSB0") as dev: # doctest: +SKIP ... print(dev.poll())
open
staticmethod
¶
open(
port,
*,
unit_id="A",
serial=None,
timeout=0.5,
recover_from_stream=True,
model_hint=None,
assume_capabilities=Capability.NONE,
assume_media=None,
portal=None,
)
Open a sync :class:SyncDevice scoped to a with block.
Mirrors :func:alicatlib.devices.factory.open_device parameter
for parameter. The returned sync CM drives the async factory
through a :class:SyncPortal; the portal is created per-call
unless a pre-existing one is passed in via portal=.
Passing a :class:Transport or
:class:AlicatProtocolClient is advanced: the caller is
responsible for ensuring the object was constructed inside the
portal's event loop (or is loop-agnostic). The common case —
passing a str port path — creates the transport inside the
portal and avoids that concern.
Source code in src/alicatlib/sync/device.py
DeviceResult
dataclass
¶
Per-device result container — value or error, never both.
The union is encoded as two optional fields (rather than an
Either / Result ADT) so mypy's narrowing on ok reads
cleanly at call sites without pattern matching.
Attributes:
| Name | Type | Description |
|---|---|---|
value |
T | None
|
The successful result, or |
error |
AlicatError | None
|
The captured :class: |
DiscoveryResult
dataclass
¶
Outcome of a single :func:probe attempt.
Exactly one of :attr:info / :attr:error is populated — ok results
carry a fully-identified :class:DeviceInfo, failed ones carry the
typed :class:AlicatError from the identification pipeline. The
:attr:ok convenience lets callers filter without hasattr.
ErrorPolicy ¶
Bases: Enum
How the manager surfaces per-device failures.
Under :attr:RAISE, the manager collects every device's result
and — if any call failed — raises an :class:ExceptionGroup
containing the per-device exceptions after the task group joins.
Under :attr:RETURN, each device produces a :class:DeviceResult
and the caller inspects .error per entry.
Design reference: docs/design.md §5.13.
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
connect_timeout_s=30.0,
close_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
connect_timeout_s |
float
|
Cap on initial pool establishment in
:meth: |
close_timeout_s |
float
|
Cap on :meth: |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/alicatlib/sinks/postgres.py
SyncAlicatManager ¶
Blocking facade over :class:alicatlib.manager.AlicatManager.
Example
with SyncAlicatManager() as mgr: # doctest: +SKIP ... mgr.add("fuel", "/dev/ttyUSB0") ... mgr.add("air", "/dev/ttyUSB1") ... frames = mgr.poll()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_policy
|
ErrorPolicy
|
Forwarded to :class: |
RAISE
|
portal
|
SyncPortal | None
|
Optional pre-built :class: |
None
|
Source code in src/alicatlib/sync/manager.py
__enter__ ¶
Start the portal, build the async manager, enter its CM.
Source code in src/alicatlib/sync/manager.py
__exit__ ¶
Close the managed devices + portal (if owned).
Source code in src/alicatlib/sync/manager.py
add ¶
Blocking :meth:AlicatManager.add.
Accepts a :class:SyncDevice as source in addition to the
async shapes — the wrapper is unwrapped to the underlying
:class:Device before delegation, matching the manager's
"pre-built device, no lifecycle ownership" contract.
Source code in src/alicatlib/sync/manager.py
close ¶
Blocking :meth:AlicatManager.close — idempotent.
execute ¶
Blocking :meth:AlicatManager.execute.
Source code in src/alicatlib/sync/manager.py
get ¶
Return the sync wrapper for the device registered under name.
The async manager's :meth:AlicatManager.get is already
synchronous — this method caches the sync wrapper so repeated
get calls return the same :class:SyncDevice instance per
device name.
Source code in src/alicatlib/sync/manager.py
poll ¶
Blocking :meth:AlicatManager.poll.
remove ¶
request ¶
Blocking :meth:AlicatManager.request.
Source code in src/alicatlib/sync/manager.py
SyncCsvSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.csv.CsvSink.
Source code in src/alicatlib/sync/sinks.py
SyncDevice ¶
Blocking facade over :class:alicatlib.devices.base.Device.
Instances are produced by :meth:Alicat.open; users do not call
this constructor directly. Every public method delegates to the
underlying :class:Device through a :class:SyncPortal.
Source code in src/alicatlib/sync/device.py
session
property
¶
Underlying async :class:Session for advanced escape-hatch use.
Calling coroutine methods on the returned session requires the
caller to route them through :attr:portal.
__enter__ ¶
__exit__ ¶
Close the device on exit — session close is idempotent.
analog_output_source ¶
Blocking :meth:Device.analog_output_source.
Default channel is :attr:AnalogOutputChannel.PRIMARY — same
default as the async side, bound at module load via the
module-level alias so sync-parity signature comparison sees
the same sentinel object.
Source code in src/alicatlib/sync/device.py
average_timing ¶
Blocking :meth:Device.average_timing.
Source code in src/alicatlib/sync/device.py
blink_display ¶
close ¶
Release the underlying session — idempotent.
Prefer with Alicat.open(...) as dev: over calling this by
hand; the outer context manager owns transport lifecycle.
Source code in src/alicatlib/sync/device.py
engineering_units ¶
Blocking :meth:Device.engineering_units.
Source code in src/alicatlib/sync/device.py
execute ¶
full_scale ¶
gas ¶
gas_list ¶
lock_display ¶
poll ¶
power_up_tare ¶
request ¶
Blocking :meth:Device.request.
Source code in src/alicatlib/sync/device.py
stp_ntp_pressure ¶
Blocking :meth:Device.stp_ntp_pressure.
Source code in src/alicatlib/sync/device.py
stp_ntp_temperature ¶
Blocking :meth:Device.stp_ntp_temperature.
Source code in src/alicatlib/sync/device.py
stream ¶
Blocking :meth:Device.stream — returns a sync context manager.
The returned :class:SyncStreamingSession is both a sync
context manager and a sync iterator; use it as::
with sync_dev.stream(rate_ms=50) as stream:
for frame in stream:
process(frame)
Source code in src/alicatlib/sync/device.py
tare_absolute_pressure ¶
tare_flow ¶
tare_gauge_pressure ¶
totalizer_config ¶
totalizer_config(
totalizer=_TOTALIZER_FIRST,
*,
flow_statistic_code=None,
mode=None,
limit_mode=None,
digits=None,
decimal_place=None,
)
Blocking :meth:Device.totalizer_config.
Source code in src/alicatlib/sync/device.py
totalizer_reset ¶
Blocking :meth:Device.totalizer_reset — destructive; requires confirm=True.
Source code in src/alicatlib/sync/device.py
totalizer_reset_peak ¶
Blocking :meth:Device.totalizer_reset_peak — destructive.
Source code in src/alicatlib/sync/device.py
totalizer_save ¶
Blocking :meth:Device.totalizer_save.
unlock_display ¶
user_data ¶
SyncFlowController ¶
Bases: SyncFlowMeter, _SyncControllerMixin
Flow-controller facade — adds the shared controller surface.
Source code in src/alicatlib/sync/device.py
SyncFlowMeter ¶
Bases: SyncDevice
Flow-meter tag — empty pass-through, mirrors :class:FlowMeter.
Source code in src/alicatlib/sync/device.py
SyncInMemorySink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.memory.InMemorySink.
Source code in src/alicatlib/sync/sinks.py
SyncJsonlSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.jsonl.JsonlSink.
Source code in src/alicatlib/sync/sinks.py
SyncParquetSink ¶
SyncParquetSink(
path,
*,
compression="zstd",
use_dictionary=True,
row_group_size=None,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.parquet.ParquetSink.
Requires the alicatlib[parquet] extra — the dependency check
runs on :meth:open, same as the async sink.
Source code in src/alicatlib/sync/sinks.py
SyncPortal ¶
Per-context wrapper around :class:anyio.from_thread.BlockingPortal.
Example
with SyncPortal() as portal: ... result = portal.call(some_async_func, arg1, arg2)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str
|
AnyIO backend to run on. Defaults to |
'asyncio'
|
Source code in src/alicatlib/sync/portal.py
__enter__ ¶
Start the portal's background thread and event loop.
Source code in src/alicatlib/sync/portal.py
__exit__ ¶
Stop the portal and join its thread.
Source code in src/alicatlib/sync/portal.py
call ¶
Run func(*args, **kwargs) on the portal's event loop.
Single-member :class:ExceptionGroup wrappers are stripped so
callers can catch concrete exception types.
Source code in src/alicatlib/sync/portal.py
wrap_async_context_manager ¶
Present an async context manager as a sync context manager.
wrap_async_iter ¶
Present an async iterator as a blocking, closeable iterator.
The returned object is iterable (for x in it: ...) and
supports :meth:close / context-manager use so outer wrappers
can cancel the producer on early exit.
Source code in src/alicatlib/sync/portal.py
SyncPostgresSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.postgres.PostgresSink.
Requires the alicatlib[postgres] extra — dependency check runs
on :meth:open.
Source code in src/alicatlib/sync/sinks.py
SyncPressureController ¶
Bases: SyncPressureMeter, _SyncControllerMixin
Pressure-controller facade — inherits the shared controller surface.
Source code in src/alicatlib/sync/device.py
SyncPressureMeter ¶
Bases: SyncDevice
Pressure-meter tag — empty pass-through, mirrors :class:PressureMeter.
Source code in src/alicatlib/sync/device.py
SyncSinkAdapter ¶
Generic sync wrapper around any :class:SampleSink.
Subclasses typically only override :meth:__init__ to build the
matching async sink with sink-specific parameters and hand it to
this base class. The portal / open / write / close / context-manager
plumbing is shared.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_sink
|
SampleSink
|
Already-constructed async sink. |
required |
portal
|
SyncPortal | None
|
Optional pre-built :class: |
None
|
Source code in src/alicatlib/sync/sinks.py
__enter__ ¶
Start the portal, open the async sink.
Source code in src/alicatlib/sync/sinks.py
__exit__ ¶
Close the sink and, if owned, stop the portal.
Source code in src/alicatlib/sync/sinks.py
close ¶
Blocking :meth:SampleSink.close — idempotent.
A no-op if the sink never reached :meth:open (portal absent).
Source code in src/alicatlib/sync/sinks.py
open ¶
SyncSqliteSink ¶
SyncSqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.sqlite.SqliteSink.
Source code in src/alicatlib/sync/sinks.py
find_devices ¶
find_devices(
ports=None,
*,
unit_ids=("A",),
baudrates=DEFAULT_DISCOVERY_BAUDRATES,
timeout=0.2,
max_concurrency=8,
stop_on_first_hit=False,
portal=None,
)
Blocking :func:alicatlib.devices.discovery.find_devices.
Source code in src/alicatlib/sync/discovery.py
list_serial_ports ¶
Blocking :func:alicatlib.devices.discovery.list_serial_ports.
Source code in src/alicatlib/sync/discovery.py
pipe ¶
Sync :func:alicatlib.sinks.pipe.
Drains a sync iterator of per-tick batches into sink with the
same buffered-flush semantics as the async driver: flush when the
buffer reaches batch_size samples or flush_interval seconds
have passed since the last flush.
sink may be a :class:SyncSinkAdapter (already open) or a raw
async :class:SampleSink. In the async case a portal must be
reachable — either passed explicitly or derived from a
:class:SyncSinkAdapter — so writes can be dispatched.
Time thresholds use :func:time.monotonic (wall-clock-ish,
independent of the portal's event loop) because the sink cares
about persistence freshness, not scheduling precision.
The returned :class:AcquisitionSummary carries samples_emitted
(count actually handed to the sink); the samples_late and
max_drift_ms fields stay at zero — those are recorder-layer
concepts and the recorder logs its own values via the
alicatlib.streaming logger on CM exit.
Source code in src/alicatlib/sync/recording.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | |
probe ¶
Blocking :func:alicatlib.devices.discovery.probe.
Source code in src/alicatlib/sync/discovery.py
record ¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
portal=None,
)
Sync :func:alicatlib.streaming.record.
The yielded iterator is a blocking bridge to the recorder's
receive stream. Breaking out of the loop or exiting the with
cancels the producer task.
If source is a :class:SyncAlicatManager, its portal is
reused — the recorder and manager must share an event loop.
Pass portal= to override; pass a raw :class:AlicatManager
and the recorder owns its own portal.
Source code in src/alicatlib/sync/recording.py
run_sync ¶
Run one coroutine in a throwaway :class:SyncPortal.
Suitable for short-lived operations — the discovery helpers, for
example — where the portal thread's start/stop cost is acceptable.
For repeated calls, reuse a long-lived :class:SyncPortal.
Source code in src/alicatlib/sync/portal.py
Device facades¶
alicatlib.sync.device ¶
Sync device facade — portal-driven wrapper over :class:Device.
Each :class:SyncDevice holds a reference to an async
:class:~alicatlib.devices.base.Device and a
:class:~alicatlib.sync.portal.SyncPortal; every public method is a
one-liner that hands the underlying coroutine to the portal. The
subclass tree mirrors the async side so isinstance checks work the
same way:
- :class:
SyncDevice— base, all shared methods. - :class:
SyncFlowMeter— tag only (pass-through, design §5.9). - :class:
SyncFlowController— addssetpoint,setpoint_source,loop_control_variable. - :class:
SyncPressureMeter/ :class:SyncPressureController— tag only today (controller-only pressure surface is planned future work).
The :class:Alicat namespace exposes a Alicat.open(...) context
manager that drives the async
:func:~alicatlib.devices.factory.open_device through the portal. By
default each Alicat.open owns its own portal; callers that need
several contexts to share an event loop can pass portal= to reuse
a long-lived :class:SyncPortal.
Design reference: docs/design.md §5.16.
Alicat ¶
Namespace for the sync device entry point.
Use :meth:Alicat.open as a context manager:
from alicatlib.sync import Alicat with Alicat.open("/dev/ttyUSB0") as dev: # doctest: +SKIP ... print(dev.poll())
open
staticmethod
¶
open(
port,
*,
unit_id="A",
serial=None,
timeout=0.5,
recover_from_stream=True,
model_hint=None,
assume_capabilities=Capability.NONE,
assume_media=None,
portal=None,
)
Open a sync :class:SyncDevice scoped to a with block.
Mirrors :func:alicatlib.devices.factory.open_device parameter
for parameter. The returned sync CM drives the async factory
through a :class:SyncPortal; the portal is created per-call
unless a pre-existing one is passed in via portal=.
Passing a :class:Transport or
:class:AlicatProtocolClient is advanced: the caller is
responsible for ensuring the object was constructed inside the
portal's event loop (or is loop-agnostic). The common case —
passing a str port path — creates the transport inside the
portal and avoids that concern.
Source code in src/alicatlib/sync/device.py
SyncDevice ¶
Blocking facade over :class:alicatlib.devices.base.Device.
Instances are produced by :meth:Alicat.open; users do not call
this constructor directly. Every public method delegates to the
underlying :class:Device through a :class:SyncPortal.
Source code in src/alicatlib/sync/device.py
session
property
¶
Underlying async :class:Session for advanced escape-hatch use.
Calling coroutine methods on the returned session requires the
caller to route them through :attr:portal.
__enter__ ¶
__exit__ ¶
Close the device on exit — session close is idempotent.
analog_output_source ¶
Blocking :meth:Device.analog_output_source.
Default channel is :attr:AnalogOutputChannel.PRIMARY — same
default as the async side, bound at module load via the
module-level alias so sync-parity signature comparison sees
the same sentinel object.
Source code in src/alicatlib/sync/device.py
average_timing ¶
Blocking :meth:Device.average_timing.
Source code in src/alicatlib/sync/device.py
blink_display ¶
close ¶
Release the underlying session — idempotent.
Prefer with Alicat.open(...) as dev: over calling this by
hand; the outer context manager owns transport lifecycle.
Source code in src/alicatlib/sync/device.py
engineering_units ¶
Blocking :meth:Device.engineering_units.
Source code in src/alicatlib/sync/device.py
execute ¶
full_scale ¶
gas ¶
gas_list ¶
lock_display ¶
poll ¶
power_up_tare ¶
request ¶
Blocking :meth:Device.request.
Source code in src/alicatlib/sync/device.py
stp_ntp_pressure ¶
Blocking :meth:Device.stp_ntp_pressure.
Source code in src/alicatlib/sync/device.py
stp_ntp_temperature ¶
Blocking :meth:Device.stp_ntp_temperature.
Source code in src/alicatlib/sync/device.py
stream ¶
Blocking :meth:Device.stream — returns a sync context manager.
The returned :class:SyncStreamingSession is both a sync
context manager and a sync iterator; use it as::
with sync_dev.stream(rate_ms=50) as stream:
for frame in stream:
process(frame)
Source code in src/alicatlib/sync/device.py
tare_absolute_pressure ¶
tare_flow ¶
tare_gauge_pressure ¶
totalizer_config ¶
totalizer_config(
totalizer=_TOTALIZER_FIRST,
*,
flow_statistic_code=None,
mode=None,
limit_mode=None,
digits=None,
decimal_place=None,
)
Blocking :meth:Device.totalizer_config.
Source code in src/alicatlib/sync/device.py
totalizer_reset ¶
Blocking :meth:Device.totalizer_reset — destructive; requires confirm=True.
Source code in src/alicatlib/sync/device.py
totalizer_reset_peak ¶
Blocking :meth:Device.totalizer_reset_peak — destructive.
Source code in src/alicatlib/sync/device.py
totalizer_save ¶
Blocking :meth:Device.totalizer_save.
unlock_display ¶
user_data ¶
SyncFlowController ¶
Bases: SyncFlowMeter, _SyncControllerMixin
Flow-controller facade — adds the shared controller surface.
Source code in src/alicatlib/sync/device.py
SyncFlowMeter ¶
Bases: SyncDevice
Flow-meter tag — empty pass-through, mirrors :class:FlowMeter.
Source code in src/alicatlib/sync/device.py
SyncPressureController ¶
Bases: SyncPressureMeter, _SyncControllerMixin
Pressure-controller facade — inherits the shared controller surface.
Source code in src/alicatlib/sync/device.py
SyncPressureMeter ¶
Bases: SyncDevice
Pressure-meter tag — empty pass-through, mirrors :class:PressureMeter.
Source code in src/alicatlib/sync/device.py
SyncStreamingSession ¶
Blocking view over :class:StreamingSession.
Wraps the async streaming context so sync callers see a plain
with / for loop::
with sync_dev.stream(rate_ms=50) as stream:
for frame in stream:
process(frame)
Enter/exit and next() are routed through the device's
:class:SyncPortal; the portal threads one coroutine at a time, so
the underlying producer task keeps running in the background while
the sync consumer polls for frames.
Source code in src/alicatlib/sync/device.py
__enter__ ¶
Enter streaming mode on the async side.
Uses :meth:SyncPortal.wrap_async_context_manager rather than
routing __aenter__/__aexit__ through portal.call:
portal.call wraps each call in a fresh CancelScope,
which conflicts with :meth:StreamingSession.__aenter__
entering a long-lived task group that outlives the entry call.
Hardware validation (2026-04-17) surfaced the resulting
"cancel scope that isn't the current task's" RuntimeError
on real streaming hardware; wrap_async_context_manager
lets anyio manage the portal-side scope for the whole CM
lifetime instead.
Re-enters are rejected the same way as
:meth:StreamingSession.__aenter__ — one streaming context
per instance.
Source code in src/alicatlib/sync/device.py
__exit__ ¶
Exit streaming mode — always sends stop-stream via the portal.
Source code in src/alicatlib/sync/device.py
__next__ ¶
Block until the next frame, or :class:StopIteration on close.
unwrap_sync_device ¶
Return the async :class:Device inside source if it is wrapped.
Package-private: used by :meth:SyncAlicatManager.add so callers
can hand in a previously-wrapped :class:SyncDevice.
Source code in src/alicatlib/sync/device.py
wrap_device ¶
Pick the correct :class:SyncDevice subclass for async_device.
Order is most-specific first — :class:FlowController is also a
:class:FlowMeter, so the check must precede the meter branch.
Package-private: consumed by :mod:alicatlib.sync.manager and
:class:Alicat. Not part of the public API.
Source code in src/alicatlib/sync/device.py
Manager¶
alicatlib.sync.manager ¶
Sync manager facade — portal-driven wrapper over :class:AlicatManager.
:class:SyncAlicatManager wraps the async
:class:~alicatlib.manager.AlicatManager through a
:class:~alicatlib.sync.portal.SyncPortal. Every coroutine method on
the async manager becomes a blocking method here; the synchronous
:meth:AlicatManager.get stays synchronous and delegates directly.
Lifecycle mirrors the async side: the class is a with context
manager. By default each instance owns its own portal; callers that
need several facades to share one event loop can pass portal= to
reuse a long-lived :class:SyncPortal.
Design reference: docs/design.md §5.13 and §5.16.
DeviceResult
dataclass
¶
Per-device result container — value or error, never both.
The union is encoded as two optional fields (rather than an
Either / Result ADT) so mypy's narrowing on ok reads
cleanly at call sites without pattern matching.
Attributes:
| Name | Type | Description |
|---|---|---|
value |
T | None
|
The successful result, or |
error |
AlicatError | None
|
The captured :class: |
ErrorPolicy ¶
Bases: Enum
How the manager surfaces per-device failures.
Under :attr:RAISE, the manager collects every device's result
and — if any call failed — raises an :class:ExceptionGroup
containing the per-device exceptions after the task group joins.
Under :attr:RETURN, each device produces a :class:DeviceResult
and the caller inspects .error per entry.
Design reference: docs/design.md §5.13.
SyncAlicatManager ¶
Blocking facade over :class:alicatlib.manager.AlicatManager.
Example
with SyncAlicatManager() as mgr: # doctest: +SKIP ... mgr.add("fuel", "/dev/ttyUSB0") ... mgr.add("air", "/dev/ttyUSB1") ... frames = mgr.poll()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_policy
|
ErrorPolicy
|
Forwarded to :class: |
RAISE
|
portal
|
SyncPortal | None
|
Optional pre-built :class: |
None
|
Source code in src/alicatlib/sync/manager.py
__enter__ ¶
Start the portal, build the async manager, enter its CM.
Source code in src/alicatlib/sync/manager.py
__exit__ ¶
Close the managed devices + portal (if owned).
Source code in src/alicatlib/sync/manager.py
add ¶
Blocking :meth:AlicatManager.add.
Accepts a :class:SyncDevice as source in addition to the
async shapes — the wrapper is unwrapped to the underlying
:class:Device before delegation, matching the manager's
"pre-built device, no lifecycle ownership" contract.
Source code in src/alicatlib/sync/manager.py
close ¶
Blocking :meth:AlicatManager.close — idempotent.
execute ¶
Blocking :meth:AlicatManager.execute.
Source code in src/alicatlib/sync/manager.py
get ¶
Return the sync wrapper for the device registered under name.
The async manager's :meth:AlicatManager.get is already
synchronous — this method caches the sync wrapper so repeated
get calls return the same :class:SyncDevice instance per
device name.
Source code in src/alicatlib/sync/manager.py
poll ¶
Blocking :meth:AlicatManager.poll.
remove ¶
request ¶
Blocking :meth:AlicatManager.request.
Source code in src/alicatlib/sync/manager.py
Recording¶
alicatlib.sync.recording ¶
Sync wrappers for :func:alicatlib.streaming.record and :func:alicatlib.sinks.pipe.
:func:record — sync context manager wrapping the async recorder. The
produced iterator is blocking; on CM exit the underlying async task
group is cancelled and joined by the portal.
:func:pipe — sync drain loop matching :func:alicatlib.sinks.pipe's
batch / time flush semantics. Rebuilt in sync-land rather than wrapping
the async driver so buffering stays under sync control and the time
threshold uses :func:time.monotonic, not :func:anyio.current_time.
Both entry points accept :class:SyncAlicatManager / :class:SyncSinkAdapter
instances — internally they reach for the wrapped async objects so the
recorder / sink plumbing runs on the shared portal.
Design reference: docs/design.md §5.14, §5.15, §5.16.
AcquisitionSummary
dataclass
¶
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed
onto the receive stream. Partial batches (some devices
errored under |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising
in a data-acquisition setting, so the recorder blocks the producer
rather than quietly discarding samples. The effective sample rate
drops to the consumer's drain rate; samples_late accrues once
the consumer catches up and the producer can check its schedule.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the sample that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch, then enqueue. Counted as late.
pipe ¶
Sync :func:alicatlib.sinks.pipe.
Drains a sync iterator of per-tick batches into sink with the
same buffered-flush semantics as the async driver: flush when the
buffer reaches batch_size samples or flush_interval seconds
have passed since the last flush.
sink may be a :class:SyncSinkAdapter (already open) or a raw
async :class:SampleSink. In the async case a portal must be
reachable — either passed explicitly or derived from a
:class:SyncSinkAdapter — so writes can be dispatched.
Time thresholds use :func:time.monotonic (wall-clock-ish,
independent of the portal's event loop) because the sink cares
about persistence freshness, not scheduling precision.
The returned :class:AcquisitionSummary carries samples_emitted
(count actually handed to the sink); the samples_late and
max_drift_ms fields stay at zero — those are recorder-layer
concepts and the recorder logs its own values via the
alicatlib.streaming logger on CM exit.
Source code in src/alicatlib/sync/recording.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | |
record ¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
portal=None,
)
Sync :func:alicatlib.streaming.record.
The yielded iterator is a blocking bridge to the recorder's
receive stream. Breaking out of the loop or exiting the with
cancels the producer task.
If source is a :class:SyncAlicatManager, its portal is
reused — the recorder and manager must share an event loop.
Pass portal= to override; pass a raw :class:AlicatManager
and the recorder owns its own portal.
Source code in src/alicatlib/sync/recording.py
Sinks¶
alicatlib.sync.sinks ¶
Sync wrappers for :mod:alicatlib.sinks.
Every in-tree sink has a one-to-one sync counterpart. All of them share
:class:SyncSinkAdapter: the per-sink subclass only constructs the
matching async sink with its own parameters and hands it to the
adapter, which owns the portal + open/write/close plumbing.
Sinks follow the same portal-ownership pattern as the rest of the sync
facade — each wrapper creates a throwaway :class:SyncPortal on
__enter__ unless the caller passes one in. Pass a shared portal
when the sink must share an event loop with a :class:SyncAlicatManager
or :func:record, otherwise the sink's writes run on a different loop
than the data producer.
Design reference: docs/design.md §5.15 and §5.16.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
connect_timeout_s=30.0,
close_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
connect_timeout_s |
float
|
Cap on initial pool establishment in
:meth: |
close_timeout_s |
float
|
Cap on :meth: |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/alicatlib/sinks/postgres.py
SyncCsvSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.csv.CsvSink.
Source code in src/alicatlib/sync/sinks.py
SyncInMemorySink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.memory.InMemorySink.
Source code in src/alicatlib/sync/sinks.py
SyncJsonlSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.jsonl.JsonlSink.
Source code in src/alicatlib/sync/sinks.py
SyncParquetSink ¶
SyncParquetSink(
path,
*,
compression="zstd",
use_dictionary=True,
row_group_size=None,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.parquet.ParquetSink.
Requires the alicatlib[parquet] extra — the dependency check
runs on :meth:open, same as the async sink.
Source code in src/alicatlib/sync/sinks.py
SyncPostgresSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.postgres.PostgresSink.
Requires the alicatlib[postgres] extra — dependency check runs
on :meth:open.
Source code in src/alicatlib/sync/sinks.py
SyncSampleSink ¶
SyncSinkAdapter ¶
Generic sync wrapper around any :class:SampleSink.
Subclasses typically only override :meth:__init__ to build the
matching async sink with sink-specific parameters and hand it to
this base class. The portal / open / write / close / context-manager
plumbing is shared.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_sink
|
SampleSink
|
Already-constructed async sink. |
required |
portal
|
SyncPortal | None
|
Optional pre-built :class: |
None
|
Source code in src/alicatlib/sync/sinks.py
__enter__ ¶
Start the portal, open the async sink.
Source code in src/alicatlib/sync/sinks.py
__exit__ ¶
Close the sink and, if owned, stop the portal.
Source code in src/alicatlib/sync/sinks.py
close ¶
Blocking :meth:SampleSink.close — idempotent.
A no-op if the sink never reached :meth:open (portal absent).
Source code in src/alicatlib/sync/sinks.py
open ¶
SyncSqliteSink ¶
SyncSqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~alicatlib.sinks.sqlite.SqliteSink.
Source code in src/alicatlib/sync/sinks.py
Discovery¶
alicatlib.sync.discovery ¶
Sync wrappers for :mod:alicatlib.devices.discovery.
One-shot discovery primitives — each call creates, drives, and tears
down its own :class:SyncPortal unless the caller passes one in. That
matches the async helpers' "fire and forget" shape (open a port, probe,
close).
Design reference: docs/design.md §5.12 and §5.16.
DiscoveryResult
dataclass
¶
Outcome of a single :func:probe attempt.
Exactly one of :attr:info / :attr:error is populated — ok results
carry a fully-identified :class:DeviceInfo, failed ones carry the
typed :class:AlicatError from the identification pipeline. The
:attr:ok convenience lets callers filter without hasattr.
find_devices ¶
find_devices(
ports=None,
*,
unit_ids=("A",),
baudrates=DEFAULT_DISCOVERY_BAUDRATES,
timeout=0.2,
max_concurrency=8,
stop_on_first_hit=False,
portal=None,
)
Blocking :func:alicatlib.devices.discovery.find_devices.
Source code in src/alicatlib/sync/discovery.py
list_serial_ports ¶
Blocking :func:alicatlib.devices.discovery.list_serial_ports.
Source code in src/alicatlib/sync/discovery.py
probe ¶
Blocking :func:alicatlib.devices.discovery.probe.
Source code in src/alicatlib/sync/discovery.py
Portal primitives¶
alicatlib.sync.portal ¶
Blocking portal primitive — sync access to the async core.
:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal
so the rest of the sync facade (device, manager, recording, sinks,
discovery) can share one dispatch primitive.
Shape:
- Lifecycle is a plain
withblock. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises. call(func, *args, **kwargs)runs a coroutine.kwargsare bound through :func:functools.partialbecause :meth:anyio.from_thread.BlockingPortal.callonly accepts positional arguments.- Single-member :class:
ExceptionGroups are unwrapped. Our async core runs inside task groups (manager, recorder, factory), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~alicatlib.errors.AlicatErrorsubclass they branch on. Multi-member groups pass through unchanged — those carry real aggregate failures (design §5.13). wrap_async_context_managerdelegates to the portal's own helper — no extra behaviour, but exposed through :class:SyncPortalso callers reach for one surface.wrap_async_iterbridges async iteration. The returned :class:SyncAsyncIteratoris both iterable and closeable; outer sync CMs (e.g.sync.record()) call :meth:closeon exit to cancel the producer promptly.
Design reference: docs/design.md §5.16.
SyncAsyncIterator ¶
Blocking view over an async iterator, bound to a :class:SyncPortal.
Source code in src/alicatlib/sync/portal.py
close ¶
Cancel the underlying async iterator if it exposes aclose.
Safe to call more than once. Cleanup failures are swallowed —
close runs on teardown paths where re-raising masks the
real failure.
Source code in src/alicatlib/sync/portal.py
SyncPortal ¶
Per-context wrapper around :class:anyio.from_thread.BlockingPortal.
Example
with SyncPortal() as portal: ... result = portal.call(some_async_func, arg1, arg2)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str
|
AnyIO backend to run on. Defaults to |
'asyncio'
|
Source code in src/alicatlib/sync/portal.py
__enter__ ¶
Start the portal's background thread and event loop.
Source code in src/alicatlib/sync/portal.py
__exit__ ¶
Stop the portal and join its thread.
Source code in src/alicatlib/sync/portal.py
call ¶
Run func(*args, **kwargs) on the portal's event loop.
Single-member :class:ExceptionGroup wrappers are stripped so
callers can catch concrete exception types.
Source code in src/alicatlib/sync/portal.py
wrap_async_context_manager ¶
Present an async context manager as a sync context manager.
wrap_async_iter ¶
Present an async iterator as a blocking, closeable iterator.
The returned object is iterable (for x in it: ...) and
supports :meth:close / context-manager use so outer wrappers
can cancel the producer on early exit.
Source code in src/alicatlib/sync/portal.py
run_sync ¶
Run one coroutine in a throwaway :class:SyncPortal.
Suitable for short-lived operations — the discovery helpers, for
example — where the portal thread's start/stop cost is acceptable.
For repeated calls, reuse a long-lived :class:SyncPortal.