Manager — many tasks at once¶
DtolManager owns a named collection of tasks across one or more DT boards. It
gives you a registry, concurrent fan-out polling, bulk lifecycle control, and
coordinated simultaneous starts. Use it when an experiment
spans several subsystems (a couple of AI tasks plus a counter, say) or when you
want the manager-level join key for the ecosystem.
Continuous block streaming is not aggregated by the manager — that is owned
by record(), which takes a single DtolSession. The manager
does fan-out polling (and record_polled() accepts a manager directly).
Design reference: design.md §16.
Lifecycle and registry¶
import anyio
from dtollib import AnalogInputVoltage, DtolManager, TaskSpec
async def main() -> None:
ai = TaskSpec(name="ai", channels=[AnalogInputVoltage(physical_channel=0, name="v0")])
async with DtolManager() as mgr:
await mgr.add("ai", ai) # registers, reserves the subsystem, commits
results = await mgr.poll(["ai"]) # fan-out poll
print(results["ai"].value.values)
# mgr closes every managed session on exit
anyio.run(main)
add(name, spec, *, backend=None)— register a task, reserve its(board, subsystem_type, element)triple, and return its committedDtolSession.Nonebackend creates a freshDataAcqBackendowned by the manager. RaisesDtolValidationErroron a duplicate name andDtolResourceErrorif the subsystem is already reserved.remove(name)— close and unregister.get(name)— the registeredDtolSession.names— registered task names in insertion order.
Fan-out polling¶
poll(names=None), start(names=None), and stop(names=None) each return a
Mapping[str, DeviceResult[...]]. Pass None to address every task.
poll() parallelises across boards: same-board polls serialise on a
per-board lock (one HDRVR can't be re-entered concurrently), while
cross-board polls run together in a task group. So adding a second board
roughly halves wall-clock for a combined poll.
DeviceResult and error policy¶
Each entry in an aggregated result is a DeviceResult[T] with exactly one of
.value / .error set, plus an .ok property. The manager's error_policy
(set at construction) decides how failures are handled:
ErrorPolicy |
Behaviour |
|---|---|
RAISE (default) |
First failure cancels and propagates. Every returned DeviceResult.ok is True. |
RETURN |
Each DeviceResult carries its own outcome — inspect .ok / .error per task. |
LOG_AND_CONTINUE |
Failures are dropped from the result map with a WARNING log line. |
from dtollib.manager import ErrorPolicy
async with DtolManager(error_policy=ErrorPolicy.RETURN) as mgr:
await mgr.add("ai", ai)
for name, res in (await mgr.poll()).items():
if res.ok:
print(name, res.value.values)
else:
print(name, "failed:", res.error)
Polling on a cadence¶
record_polled() accepts a manager and polls every task on each tick, yielding
Mapping[str, DeviceResult[DaqReading]]:
from dtollib import record_polled
async with record_polled(mgr, rate_hz=5.0) as rec:
async for tick in rec.stream:
print({name: r.value.values for name, r in tick.items() if r.ok})
Coordinated starts¶
start_synchronized(names) begins several single-board subsystems together via
the SDK simultaneous-start primitives. Every named task must target one board
and share one backend instance. See Synchronized start for
the full sequence and constraints.