watlowlib.sync¶
Sync facade over the async core. Every async method has a sync parity
routed through a SyncPortal (an anyio.from_thread.BlockingPortal).
See Sync quickstart.
Public surface¶
watlowlib.sync ¶
Sync facade — blocking wrappers over the async core.
The sync surface targets scripts, notebooks, and REPL use. The async
core remains canonical; every sync facade routes coroutines through a
:class:SyncPortal (an :class:anyio.from_thread.BlockingPortal
wrapper) so the event loop runs on a background thread.
What ships here:
- :class:
SyncPortal— single dispatch primitive used by the rest of the sync facade. - :class:
Watlow/ :class:SyncController— sync mirror of :class:~watlowlib.devices.controller.Controller. - :class:
SyncWatlowManager— sync mirror of :class:~watlowlib.manager.WatlowManager. - :func:
record/ :func:pipe— sync mirrors of the streaming primitives. - :class:
SyncSinkAdapter+ per-sink wrappers (SyncCsvSink,SyncJsonlSink,SyncSqliteSink,SyncInMemorySink,SyncParquetSink,SyncPostgresSink).
Design reference: docs/design.md §6 (sync portal).
SyncAsyncIterator ¶
Blocking view over an async iterator, bound to a :class:SyncPortal.
Source code in src/watlowlib/sync/portal.py
close ¶
Cancel the underlying async iterator if it exposes aclose.
Source code in src/watlowlib/sync/portal.py
SyncController ¶
Blocking facade over :class:watlowlib.devices.controller.Controller.
Instances are produced by :meth:Watlow.open or yielded by the
sync manager; users do not call this constructor directly.
Source code in src/watlowlib/sync/controller.py
close ¶
identify ¶
loop ¶
poll ¶
Blocking :meth:Controller.poll.
Source code in src/watlowlib/sync/controller.py
read_parameter ¶
Blocking :meth:Controller.read_parameter.
Source code in src/watlowlib/sync/controller.py
read_pv ¶
read_setpoint ¶
Blocking :meth:Controller.read_setpoint.
set_setpoint ¶
Blocking :meth:Controller.set_setpoint.
Source code in src/watlowlib/sync/controller.py
write_parameter ¶
Blocking :meth:Controller.write_parameter.
Source code in src/watlowlib/sync/controller.py
SyncControllerLoop ¶
Blocking view over a single control loop (mirror of :class:ControllerLoop).
Returned by :meth:SyncController.loop; never instantiated directly.
Lifetime is bound to the parent :class:SyncController and its
portal — closing the controller is the only cleanup needed.
Source code in src/watlowlib/sync/controller.py
SyncCsvSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.
Source code in src/watlowlib/sync/sinks.py
SyncInMemorySink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.
Source code in src/watlowlib/sync/sinks.py
SyncJsonlSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.
Source code in src/watlowlib/sync/sinks.py
SyncParquetSink ¶
SyncParquetSink(
path,
*,
compression="zstd",
use_dictionary=True,
row_group_size=None,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.
Requires the watlowlib[parquet] extra — the dependency check
runs on :meth:open, same as the async sink.
Source code in src/watlowlib/sync/sinks.py
SyncPortal ¶
Per-context wrapper around :class:anyio.from_thread.BlockingPortal.
Example
with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)
Source code in src/watlowlib/sync/portal.py
call ¶
Run func(*args, **kwargs) on the portal's event loop.
Single-member :class:ExceptionGroup wrappers are stripped.
Source code in src/watlowlib/sync/portal.py
wrap_async_context_manager ¶
Present an async context manager as a sync context manager.
wrap_async_iter ¶
Present an async iterator as a blocking, closeable iterator.
SyncPostgresSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.
Requires the watlowlib[postgres] extra — dependency check runs
on :meth:open.
Source code in src/watlowlib/sync/sinks.py
SyncSinkAdapter ¶
SyncSqliteSink ¶
SyncSqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.
Source code in src/watlowlib/sync/sinks.py
SyncWatlowManager ¶
Blocking facade over :class:watlowlib.manager.WatlowManager.
Source code in src/watlowlib/sync/manager.py
add ¶
add(
name,
source,
*,
protocol=ProtocolKind.STDBUS,
address=1,
serial_settings=None,
family=ControllerFamily.UNKNOWN,
)
Blocking :meth:WatlowManager.add.
Accepts a :class:SyncController as source in addition to
the async shapes — the wrapper is unwrapped to the underlying
:class:Controller before delegation.
Source code in src/watlowlib/sync/manager.py
close ¶
Blocking :meth:WatlowManager.close — idempotent.
execute_each ¶
Blocking :meth:WatlowManager.execute_each.
op receives the async :class:Controller so existing
coroutines compose. If you have a sync helper, wrap it in an
async stub or run it on the portal yourself.
Source code in src/watlowlib/sync/manager.py
get ¶
Return the sync wrapper for the controller registered under name.
Source code in src/watlowlib/sync/manager.py
poll ¶
Blocking :meth:WatlowManager.poll.
Source code in src/watlowlib/sync/manager.py
record_to_sink ¶
record_to_sink(
*,
parameters,
rate_hz,
duration=None,
sink,
names=None,
instances=(1,),
overflow=None,
buffer_size=64,
batch_size=64,
flush_interval=1.0,
)
Record polled samples directly into a sink — one-call convenience.
Combines :func:watlowlib.sync.record and
:func:watlowlib.sync.pipe into a single blocking call. The
manager's portal is reused for both legs so the recorder and
the sink share an event loop. sink may be either a
:class:SyncSinkAdapter (preferred, opened externally) or a
bare async :class:SampleSink — in the latter case this
method opens the sink against the manager's portal and closes
it after the recording finishes.
Returns the :class:AcquisitionSummary from
:func:watlowlib.sync.pipe.
Source code in src/watlowlib/sync/manager.py
Watlow ¶
Namespace for the sync controller entry point.
Use :meth:Watlow.open as a context manager::
from watlowlib.sync import Watlow
with Watlow.open("/dev/ttyUSB0") as ctl:
print(ctl.read_pv())
open
staticmethod
¶
Open a sync :class:SyncController scoped to a with block.
Mirrors :func:watlowlib.open_device parameter-for-parameter
(modulo the portal plumbing). The sync CM drives the async
factory through a :class:SyncPortal; the portal is created
per-call unless one is passed in via portal=.
Source code in src/watlowlib/sync/controller.py
pipe ¶
Sync :func:watlowlib.sinks.pipe.
Source code in src/watlowlib/sync/recording.py
record ¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
portal=None,
)
Sync :func:watlowlib.streaming.record.
If source is a :class:SyncWatlowManager, its portal is
reused — the recorder and manager must share an event loop. Pass
portal= to override.
Source code in src/watlowlib/sync/recording.py
run_sync ¶
Run one coroutine in a throwaway :class:SyncPortal.
Source code in src/watlowlib/sync/portal.py
Portal¶
watlowlib.sync.portal ¶
Blocking portal primitive — sync access to the async core.
:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal
so the rest of the sync facade (controller, manager, recording, sinks)
can share one dispatch primitive.
Shape:
- Lifecycle is a plain
withblock. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises. call(func, *args, **kwargs)runs a coroutine.kwargsare bound through :func:functools.partialbecause :meth:anyio.from_thread.BlockingPortal.callonly accepts positional arguments.- Single-member :class:
ExceptionGroups are unwrapped. The async core runs inside task groups (manager, recorder), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~watlowlib.errors.WatlowErrorsubclass they branch on. Aggregates with two or more exceptions stay as :class:ExceptionGroup, so sync callers under managerErrorPolicy.RAISEhandle one-failure and multi-failure cases with different exception shapes. wrap_async_context_managerdelegates to the portal's helper.wrap_async_iterbridges async iteration. The returned :class:SyncAsyncIteratoris both iterable and closeable.
Design reference: docs/design.md §6.
SyncAsyncIterator ¶
Blocking view over an async iterator, bound to a :class:SyncPortal.
Source code in src/watlowlib/sync/portal.py
close ¶
Cancel the underlying async iterator if it exposes aclose.
Source code in src/watlowlib/sync/portal.py
SyncPortal ¶
Per-context wrapper around :class:anyio.from_thread.BlockingPortal.
Example
with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)
Source code in src/watlowlib/sync/portal.py
call ¶
Run func(*args, **kwargs) on the portal's event loop.
Single-member :class:ExceptionGroup wrappers are stripped.
Source code in src/watlowlib/sync/portal.py
wrap_async_context_manager ¶
Present an async context manager as a sync context manager.
wrap_async_iter ¶
Present an async iterator as a blocking, closeable iterator.
run_sync ¶
Run one coroutine in a throwaway :class:SyncPortal.
Source code in src/watlowlib/sync/portal.py
Controller¶
watlowlib.sync.controller ¶
Sync controller facade — portal-driven wrapper over :class:Controller.
Each :class:SyncController holds a reference to an async
:class:~watlowlib.devices.controller.Controller and a
:class:~watlowlib.sync.portal.SyncPortal; every public method is a
one-liner that hands the underlying coroutine to the portal.
The :class:Watlow namespace exposes a Watlow.open(...) context
manager that drives the async
:func:~watlowlib.devices.factory.open_device through the portal.
Design reference: docs/design.md §6.
SyncController ¶
Blocking facade over :class:watlowlib.devices.controller.Controller.
Instances are produced by :meth:Watlow.open or yielded by the
sync manager; users do not call this constructor directly.
Source code in src/watlowlib/sync/controller.py
close ¶
identify ¶
loop ¶
poll ¶
Blocking :meth:Controller.poll.
Source code in src/watlowlib/sync/controller.py
read_parameter ¶
Blocking :meth:Controller.read_parameter.
Source code in src/watlowlib/sync/controller.py
read_pv ¶
read_setpoint ¶
Blocking :meth:Controller.read_setpoint.
set_setpoint ¶
Blocking :meth:Controller.set_setpoint.
Source code in src/watlowlib/sync/controller.py
write_parameter ¶
Blocking :meth:Controller.write_parameter.
Source code in src/watlowlib/sync/controller.py
SyncControllerLoop ¶
Blocking view over a single control loop (mirror of :class:ControllerLoop).
Returned by :meth:SyncController.loop; never instantiated directly.
Lifetime is bound to the parent :class:SyncController and its
portal — closing the controller is the only cleanup needed.
Source code in src/watlowlib/sync/controller.py
Watlow ¶
Namespace for the sync controller entry point.
Use :meth:Watlow.open as a context manager::
from watlowlib.sync import Watlow
with Watlow.open("/dev/ttyUSB0") as ctl:
print(ctl.read_pv())
open
staticmethod
¶
Open a sync :class:SyncController scoped to a with block.
Mirrors :func:watlowlib.open_device parameter-for-parameter
(modulo the portal plumbing). The sync CM drives the async
factory through a :class:SyncPortal; the portal is created
per-call unless one is passed in via portal=.
Source code in src/watlowlib/sync/controller.py
unwrap_sync_controller ¶
Return the async :class:Controller inside source if wrapped.
Package-private helper used by :class:SyncWatlowManager.
Source code in src/watlowlib/sync/controller.py
wrap_controller ¶
Return a :class:SyncController wrapping controller on portal.
Package-private helper used by :class:SyncWatlowManager.
Source code in src/watlowlib/sync/controller.py
Manager¶
watlowlib.sync.manager ¶
Sync manager facade — portal-driven wrapper over :class:WatlowManager.
:class:SyncWatlowManager wraps the async
:class:~watlowlib.manager.WatlowManager through a
:class:~watlowlib.sync.portal.SyncPortal. Every coroutine method
becomes a blocking method here; the synchronous :meth:get stays
synchronous and delegates directly.
Lifecycle mirrors the async side: the class is a with context
manager. By default each instance owns its own portal; callers that
need several facades to share one event loop can pass portal= to
reuse a long-lived :class:SyncPortal.
Design reference: docs/design.md §6.
DeviceResult
dataclass
¶
Per-device result container — value or error, never both.
:attr:protocol is populated from the controller's session so error
rows from the streaming layer can still record which protocol
produced the failure.
ErrorPolicy ¶
Bases: Enum
How the manager surfaces per-device failures.
Under :attr:RAISE, the manager collects every controller's result
and — if any call failed — raises an :class:ExceptionGroup
containing the per-device exceptions after the task group joins.
Under :attr:RETURN, each controller produces a
:class:DeviceResult and the caller inspects .error per entry.
SyncWatlowManager ¶
Blocking facade over :class:watlowlib.manager.WatlowManager.
Source code in src/watlowlib/sync/manager.py
add ¶
add(
name,
source,
*,
protocol=ProtocolKind.STDBUS,
address=1,
serial_settings=None,
family=ControllerFamily.UNKNOWN,
)
Blocking :meth:WatlowManager.add.
Accepts a :class:SyncController as source in addition to
the async shapes — the wrapper is unwrapped to the underlying
:class:Controller before delegation.
Source code in src/watlowlib/sync/manager.py
close ¶
Blocking :meth:WatlowManager.close — idempotent.
execute_each ¶
Blocking :meth:WatlowManager.execute_each.
op receives the async :class:Controller so existing
coroutines compose. If you have a sync helper, wrap it in an
async stub or run it on the portal yourself.
Source code in src/watlowlib/sync/manager.py
get ¶
Return the sync wrapper for the controller registered under name.
Source code in src/watlowlib/sync/manager.py
poll ¶
Blocking :meth:WatlowManager.poll.
Source code in src/watlowlib/sync/manager.py
record_to_sink ¶
record_to_sink(
*,
parameters,
rate_hz,
duration=None,
sink,
names=None,
instances=(1,),
overflow=None,
buffer_size=64,
batch_size=64,
flush_interval=1.0,
)
Record polled samples directly into a sink — one-call convenience.
Combines :func:watlowlib.sync.record and
:func:watlowlib.sync.pipe into a single blocking call. The
manager's portal is reused for both legs so the recorder and
the sink share an event loop. sink may be either a
:class:SyncSinkAdapter (preferred, opened externally) or a
bare async :class:SampleSink — in the latter case this
method opens the sink against the manager's portal and closes
it after the recording finishes.
Returns the :class:AcquisitionSummary from
:func:watlowlib.sync.pipe.
Source code in src/watlowlib/sync/manager.py
Recording¶
watlowlib.sync.recording ¶
Sync wrappers for :func:watlowlib.streaming.record and :func:watlowlib.sinks.pipe.
:func:record — sync context manager wrapping the async recorder. The
produced iterator is blocking; on CM exit the underlying async task
group is cancelled and joined by the portal.
:func:pipe — sync drain loop matching
:func:watlowlib.sinks.pipe's batch / time flush semantics.
Rebuilt in sync-land rather than wrapping the async driver so
buffering stays under sync control and the time threshold uses
:func:time.monotonic, not :func:anyio.current_time.
Design reference: docs/design.md §6.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at,
samples_emitted,
samples_late,
max_drift_ms,
disconnects=0,
)
Per-run summary emitted after record()'s CM exits.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime
|
Wall-clock at producer shutdown. |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late. |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
disconnects |
int
|
Count of WatlowConnectionError events the
producer absorbed under |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch that was about to be enqueued. Counted as late.
pipe ¶
Sync :func:watlowlib.sinks.pipe.
Source code in src/watlowlib/sync/recording.py
record ¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
portal=None,
)
Sync :func:watlowlib.streaming.record.
If source is a :class:SyncWatlowManager, its portal is
reused — the recorder and manager must share an event loop. Pass
portal= to override.
Source code in src/watlowlib/sync/recording.py
Sinks¶
watlowlib.sync.sinks ¶
Sync wrappers for :mod:watlowlib.sinks.
Every in-tree sink has a one-to-one sync counterpart. All of them
share :class:SyncSinkAdapter: the per-sink subclass only constructs
the matching async sink with its own parameters and hands it to the
adapter, which owns the portal + open/write/close plumbing.
Sinks follow the same portal-ownership pattern as the rest of the sync
facade — each wrapper creates a throwaway :class:SyncPortal on
__enter__ unless the caller passes one in. Pass a shared portal
when the sink must share an event loop with a
:class:SyncWatlowManager or :func:record, otherwise the sink's
writes run on a different loop than the data producer.
Design reference: docs/design.md §6.
PostgresConfig
dataclass
¶
PostgresConfig(
dsn=None,
host=None,
port=5432,
user=None,
password=None,
database=None,
schema="public",
table="samples",
pool_min_size=1,
pool_max_size=4,
statement_timeout_ms=30000,
command_timeout_s=10.0,
create_table=False,
use_copy=True,
)
Connection + target settings for :class:PostgresSink.
Either dsn or the discrete host/user/database set
must be provided. Credentials are not logged.
Attributes:
| Name | Type | Description |
|---|---|---|
dsn |
str | None
|
Full libpq-style connection string (e.g.
|
host |
str | None
|
Database host. Required if |
port |
int
|
Database port. Defaults to |
user |
str | None
|
Database role. |
password |
str | None
|
Role password. Never logged. |
database |
str | None
|
Database name. |
schema |
str
|
Target schema. Validated against
|
table |
str
|
Target table. Validated against the same pattern. |
pool_min_size |
int
|
Minimum pool size. Defaults to |
pool_max_size |
int
|
Maximum pool size. Defaults to |
statement_timeout_ms |
int
|
|
command_timeout_s |
float
|
asyncpg's per-call command timeout. Defaults to 10 s. |
create_table |
bool
|
If |
use_copy |
bool
|
If |
target ¶
Return a log-safe description of the target: host:port/db.schema.table.
Source code in src/watlowlib/sinks/postgres.py
SyncCsvSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.
Source code in src/watlowlib/sync/sinks.py
SyncInMemorySink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.
Source code in src/watlowlib/sync/sinks.py
SyncJsonlSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.
Source code in src/watlowlib/sync/sinks.py
SyncParquetSink ¶
SyncParquetSink(
path,
*,
compression="zstd",
use_dictionary=True,
row_group_size=None,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.
Requires the watlowlib[parquet] extra — the dependency check
runs on :meth:open, same as the async sink.
Source code in src/watlowlib/sync/sinks.py
SyncPostgresSink ¶
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.
Requires the watlowlib[postgres] extra — dependency check runs
on :meth:open.
Source code in src/watlowlib/sync/sinks.py
SyncSampleSink ¶
SyncSinkAdapter ¶
SyncSqliteSink ¶
SyncSqliteSink(
path,
*,
table="samples",
create_table=True,
journal_mode="WAL",
synchronous="NORMAL",
busy_timeout_ms=5000,
portal=None,
)
Bases: SyncSinkAdapter
Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.