Skip to content

capa.runtime

Per-resource-worker runtime. Each hardware resource gets its own thread hosting its own asyncio event loop; a per-run :class:Conductor coordinates them through :class:ThreadBridge queues.

Narrative guides:

capa.runtime

Per-resource worker runtime — small production facade.

This package implements the per-resource-worker concurrency model. Each hardware resource (one serial port, one DAQmx chassis, one camera handle) gets its own thread hosting its own asyncio event loop; cross-thread emission and command traffic flows over :class:ThreadBridge instances; a per-run :class:Conductor coordinates workers via a config-lifetime :class:WorkerPool.

The names re-exported here are the intentional public surface. Test seams, state-machine internals (edge tables, lifecycle enums), bridge plumbing, heartbeat/saturation helpers, and dispatcher impls live in their concrete submodules (capa.runtime.runner, capa.runtime.bridge, capa.runtime.lifecycle, capa.runtime.dispatch etc.) and should be imported from there. Importing from those modules directly is fine — this package facade is deliberately small.

RUNTIME_VERSION module-attribute

RUNTIME_VERSION: Final[str] = '0.2.0-p4a'

Runtime code revision marker.

Bumped when conductor / worker / pool semantics change in a way that affects bundle interpretation. Newly-sealed bundles carry this in capa_block.engine_version for cross-version diagnostics.

Conductor

Conductor(
    *,
    pool: WorkerPool,
    session: RunSession,
    runner: ConductorRunner | None = None,
    runner_factory: Callable[
        [RunSession, RunContext], ConductorRunner
    ]
    | None = None,
    config: ConductorConfig | None = None,
    thread_name: str = "capa-conductor",
    pre_completion_callback: Any = None,
)

Per-run coordinator. See module docstring for the big picture.

Construction is cheap — no thread is spawned and no I/O happens until :meth:start is called. The pool and session are captured by reference; both must outlive the run.

Source code in src/capa/runtime/conductor.py
def __init__(
    self,
    *,
    pool: WorkerPool,
    session: RunSession,
    runner: ConductorRunner | None = None,
    runner_factory: Callable[[RunSession, RunContext], ConductorRunner] | None = None,
    config: ConductorConfig | None = None,
    thread_name: str = "capa-conductor",
    # Test seam — called on the conductor loop just before the
    # completion-event handshake. Tests use it to assert intermediate
    # state (e.g. "drain has seen N emissions"). Don't use in prod.
    pre_completion_callback: Any = None,
) -> None:
    if runner is not None and runner_factory is not None:
        raise ValueError("Conductor: provide `runner` OR `runner_factory`, not both")
    self._pool = pool
    self._session = session
    # `_runner` is the eventual ConductorRunner instance. When `runner` is
    # supplied at construction time we use it directly; when only
    # `runner_factory` is supplied we defer construction until
    # `session.open()` returns the RunContext, so the factory can wire
    # the runner against per-run resources (e.g. an open bundle writer).
    # Tests usually pass `runner` directly; production headless wiring
    # uses `runner_factory`.
    self._runner = runner
    self._runner_factory = runner_factory
    self._config = config or ConductorConfig()
    self._thread_name = thread_name
    self._pre_completion_callback = pre_completion_callback

    self._state: ConductorState = ConductorState.PREPARING
    self._thread: threading.Thread | None = None
    self._loop: asyncio.AbstractEventLoop | None = None
    self._handle_future: Future[RunHandle] = concurrent.futures.Future()
    self._result_future: Future[RunResult] = concurrent.futures.Future()
    self._completion_event: asyncio.Event | None = None
    self._stop_requested = False
    self._outcome: RunOutcome = RunOutcome.COMPLETED
    self._exit_reason: str | None = None
    self._saturation_event: SaturationEvent | None = None
    self._started_mono_ns = 0
    self._ended_mono_ns = 0
    self._databus: DataBus | None = None
    self._run_context: RunContext | None = None
    self._recording_plan: ResolvedRecordingPlan | None = None
    self._bridges: dict[str, ThreadBridge[WorkerEmission]] = {}
    self._drain_count_observed = 0
    # UI bridge — optional Conductor → UI thread channel. Attached by
    # the UI before :meth:`start`; the drain task ``put_nowait``s every
    # emission after the writer/databus hop. Headless paths leave this
    # ``None`` and pay nothing for it.
    self._ui_bridge: ThreadBridge[WorkerEmission] | None = None
    # Loop-lag observability. A heartbeat task measures wake-up lag
    # against a 50 ms cadence; the percentile ring lands in the runtime
    # diagnostics emitted at finalize.
    self._loop_lag = LoopLagMetric(name="conductor")
    self._heartbeat_stop: anyio.Event | None = None
    # Idempotency flags for the unconditional cleanup callbacks. The
    # success path runs disarm + close explicitly; the AsyncExitStack
    # callbacks only fire on the failure-unwind path and skip if the
    # success path already ran them.
    self._pool_armed = False
    self._session_closed = False

state property

state: ConductorState

Atomic read; advisory. The actual transition happens on the conductor loop, but state writes are single-int stores.

run_id property

run_id: str

Stable identifier of the run managed by this conductor.

bundle_path property

bundle_path: Path | None

Filesystem path for the run bundle once the session has opened.

databus property

databus: DataBus | None

The authoritative :class:DataBus. None before :meth:start has reached the bus-construction step. Procedure subscribers and external analyzers attach here.

result_future property

result_future: Future[RunResult]

Resolves with the final :class:RunResult when the conductor thread exits. Independent of the start-up handle future — callers that only care about the end can ignore :meth:start's return.

loop property

loop: AbstractEventLoop | None

The conductor's loop, or None before :meth:start.

completion_event property

completion_event: Event | None

The conductor's loop-local shutdown signal.

Set when the run completes (naturally, via stop, or saturation). Used by :class:ProcedureRunner to wire a loop-local external_stop event for procedures that await on it.

None before :meth:_run enters its task group.

runtime_diagnostics

runtime_diagnostics() -> dict[str, Any]

Snapshot the per-loop / per-bridge / per-worker metrics.

The dict shape is intended for manifest.queue_health consumption (one entry per queue/bridge/worker keyed by tag), so the existing manifest schema works without extension:

  • loop.conductor — conductor-loop lag percentiles.
  • loop.worker:<resource_id> — per-worker loop lag (zeros for now; workers expose their own LoopLagMetric and a future step plumbs it through here).
  • bridge.outbound:<resource_id> — per-worker outbound bridge latency + blocked time.
  • worker:<resource_id> — tick / poll durations, loop lag, command counts, last-sample age.

Returns an empty dict before :meth:_run enters its task group (i.e. before bridges and workers exist).

Source code in src/capa/runtime/conductor.py
def runtime_diagnostics(self) -> dict[str, Any]:
    """Snapshot the per-loop / per-bridge / per-worker metrics.

    The dict shape is intended for ``manifest.queue_health`` consumption
    (one entry per queue/bridge/worker keyed by tag), so the existing
    manifest schema works without extension:

    * ``loop.conductor`` — conductor-loop lag percentiles.
    * ``loop.worker:<resource_id>`` — per-worker loop lag (zeros for
      now; workers expose their own LoopLagMetric and a future step
      plumbs it through here).
    * ``bridge.outbound:<resource_id>`` — per-worker outbound bridge
      latency + blocked time.
    * ``worker:<resource_id>`` — tick / poll durations, loop lag,
      command counts, last-sample age.

    Returns an empty dict before :meth:`_run` enters its task group
    (i.e. before bridges and workers exist).
    """
    out: dict[str, dict[str, float]] = {}
    # Runtime tunables — included so status-bar / observers can color
    # health values against the configured thresholds without having
    # to import ConductorConfig defaults.
    out["runtime"] = {
        "saturation_deadline_s": float(self._config.saturation_deadline_s),
        "loop_lag_warn_ms": float(self._config.loop_lag_warn_ms),
    }
    # Conductor loop lag.
    out["loop.conductor"] = {
        "samples": float(self._loop_lag.samples_total),
        "lag_p50_ms": self._loop_lag.p50_ms,
        "lag_p99_ms": self._loop_lag.p99_ms,
        "lag_max_ms": self._loop_lag.max_lag_ms,
    }
    # Per-worker bridges (outbound).
    for rid, bridge in self._bridges.items():
        m = bridge.metrics
        blocked_now = m.blocked_since_ms
        out[f"bridge.outbound:{rid}"] = {
            "depth": float(m.depth),
            "depth_max": float(m.depth_max),
            "capacity": float(bridge.capacity),
            "enqueued_total": float(m.enqueued_total),
            "dequeued_total": float(m.dequeued_total),
            "dropped_total": float(m.dropped_total),
            "blocked_total_ms": float(m.blocked_total_ms),
            "blocked_since_ms": float(blocked_now) if blocked_now is not None else -1.0,
            "latency_p50_ms": float(m.latency_p50_ms),
            "latency_p99_ms": float(m.latency_p99_ms),
        }
    # Per-worker tick/loop metrics. Per-adapter failure policy
    # (``WorkerMetrics.on_failure``) is intentionally not serialized
    # into this float-valued diagnostics block. Per-device
    # stream-silence/fatal-error enforcement can attach here using the
    # resolved policy metadata on each worker.
    for rid, worker in self._pool.workers.items():
        wm = worker.metrics
        out[f"worker:{rid}"] = {
            "samples_emitted": float(wm.samples_emitted),
            "polls_emitted": float(wm.polls_emitted),
            "commands_total": float(wm.commands_total),
            "commands_failed": float(wm.commands_failed),
            "tick_duration_p50_ms": float(wm.tick_duration_p50_ms),
            "tick_duration_p99_ms": float(wm.tick_duration_p99_ms),
            "poll_period_p50_ms": float(wm.poll_period_p50_ms),
            "poll_period_p99_ms": float(wm.poll_period_p99_ms),
            "poll_rate_hz": float(wm.poll_rate_hz),
            "loop_lag_p99_ms": float(wm.loop_lag_ms_p99),
            "last_sample_age_s": float(wm.last_sample_age_s),
        }
    return out

start

start() -> Future[RunHandle]

Spawn the conductor thread and begin the run.

Returns a future that resolves with :class:RunHandle once the run reaches RUNNING state (procedure has started). On failure before RUNNING, the handle future rejects and the result future resolves with a CRASHED outcome.

Idempotent only in the failure direction — a second call after :meth:start was invoked raises :class:ConductorStateError.

Source code in src/capa/runtime/conductor.py
def start(self) -> Future[RunHandle]:
    """Spawn the conductor thread and begin the run.

    Returns a future that resolves with :class:`RunHandle` once the
    run reaches RUNNING state (procedure has started). On failure
    before RUNNING, the handle future rejects and the result future
    resolves with a CRASHED outcome.

    Idempotent only in the failure direction — a second call after
    :meth:`start` was invoked raises :class:`ConductorStateError`.
    """
    if self._thread is not None:
        raise ConductorStateError("Conductor.start() called twice", current=self._state)
    self._thread = threading.Thread(
        target=self._thread_main, name=self._thread_name, daemon=False
    )
    self._thread.start()
    return self._handle_future

stop

stop(*, reason: str = 'operator_stop') -> Future[RunResult]

Request a cooperative shutdown.

Sets the completion event on the conductor loop and returns the same future as :attr:result_future for convenience. Idempotent: subsequent calls observe the existing shutdown but the recorded exit_reason is the first caller's.

Source code in src/capa/runtime/conductor.py
def stop(self, *, reason: str = "operator_stop") -> Future[RunResult]:
    """Request a cooperative shutdown.

    Sets the completion event on the conductor loop and returns the
    same future as :attr:`result_future` for convenience. Idempotent:
    subsequent calls observe the existing shutdown but the recorded
    ``exit_reason`` is the first caller's.
    """
    if self._stop_requested:
        return self._result_future
    self._stop_requested = True
    self._exit_reason = reason
    # Default outcome assumes operator-initiated; the conductor may
    # override to CRASHED / CRASHED_BUT_SEALED if it discovers the
    # real cause first.
    if self._outcome is RunOutcome.COMPLETED:
        self._outcome = RunOutcome.ABORTED
    loop = self._loop
    ev = self._completion_event
    if loop is not None and ev is not None and not ev.is_set():
        loop.call_soon_threadsafe(ev.set)
    return self._result_future

dispatch

dispatch(
    device: str, cmd: DeviceCommand
) -> Future[CommandResult]

Run-time command dispatch.

Refused outside PREPARING / RUNNING. Routes through the pool to the worker hosting device.

Source code in src/capa/runtime/conductor.py
def dispatch(self, device: str, cmd: DeviceCommand) -> Future[CommandResult]:
    """Run-time command dispatch.

    Refused outside PREPARING / RUNNING. Routes through the pool to
    the worker hosting ``device``.
    """
    if not self._state.permits_dispatch():
        raise ConductorStateError(
            f"dispatch refused in state {self._state}", current=self._state
        )
    return self._pool.dispatch(device, cmd)

snapshot

snapshot(device: str) -> Future[DeviceEmission]

One-shot snapshot via the pool's worker for device.

Same state gate as :meth:dispatch.

Source code in src/capa/runtime/conductor.py
def snapshot(self, device: str) -> Future[DeviceEmission]:
    """One-shot snapshot via the pool's worker for ``device``.

    Same state gate as :meth:`dispatch`.
    """
    if not self._state.permits_dispatch():
        raise ConductorStateError(
            f"snapshot refused in state {self._state}", current=self._state
        )
    return self._pool.snapshot(device)

attach_ui_bridge

attach_ui_bridge(
    bridge: ThreadBridge[WorkerEmission],
) -> None

Wire a Conductor → UI :class:ThreadBridge.

Must be called BEFORE :meth:start so the drain task picks the reference up on first dispatch. Headless callers omit this; the drain stays a writer-and-bus-only fan-out.

The UI side owns attach_consumer/attach_producer: the producer is the conductor's loop, so we register that here in a deferred manner — the actual attach_producer call lands inside :meth:_run once the conductor's loop is running. The UI side calls attach_consumer from its own loop after attaching.

Source code in src/capa/runtime/conductor.py
def attach_ui_bridge(self, bridge: ThreadBridge[WorkerEmission]) -> None:
    """Wire a Conductor → UI :class:`ThreadBridge`.

    Must be called BEFORE :meth:`start` so the drain task picks the
    reference up on first dispatch. Headless callers omit this; the
    drain stays a writer-and-bus-only fan-out.

    The UI side owns ``attach_consumer``/``attach_producer``: the
    producer is the conductor's loop, so we register that here in a
    deferred manner — the actual ``attach_producer`` call lands inside
    :meth:`_run` once the conductor's loop is running. The UI side
    calls ``attach_consumer`` from its own loop after attaching.
    """
    if self._thread is not None:
        raise ConductorStateError(
            "attach_ui_bridge must be called before start()",
            current=self._state,
        )
    self._ui_bridge = bridge

procedure_ui_sink

procedure_ui_sink() -> ProcedureUiSink

Return a UI-only sink procedures can publish :class:~capa.runtime.emissions.ProcedureTick payloads through.

Wraps :meth:_publish_ui so the procedure layer stays bridge- agnostic: a tick goes to the UI mirror with no writer write, no data-bus publish. Safe to call before :meth:start — no-op when no UI bridge is attached (headless run); the sink becomes live the moment :meth:attach_ui_bridge runs.

The returned sink captures self and is intended to be invoked from the conductor loop (where the procedure runs). Calling from a different loop is undefined; the underlying bridge's put_nowait is thread-safe but the cross-thread timestamp ordering would be unreliable.

Source code in src/capa/runtime/conductor.py
def procedure_ui_sink(self) -> ProcedureUiSink:
    """Return a UI-only sink procedures can publish
    :class:`~capa.runtime.emissions.ProcedureTick` payloads through.

    Wraps :meth:`_publish_ui` so the procedure layer stays bridge-
    agnostic: a tick goes to the UI mirror with no writer write,
    no data-bus publish. Safe to call before :meth:`start` —
    no-op when no UI bridge is attached (headless run); the sink
    becomes live the moment :meth:`attach_ui_bridge` runs.

    The returned sink captures ``self`` and is intended to be
    invoked from the conductor loop (where the procedure runs).
    Calling from a different loop is undefined; the underlying
    bridge's ``put_nowait`` is thread-safe but the cross-thread
    timestamp ordering would be unreliable.
    """
    return _ConductorProcedureUiSink(conductor=self)

join

join(timeout: float | None = None) -> bool

Block until the conductor thread exits.

Returns True if the thread joined within timeout, False otherwise. Intended for tests and for the CLI driver which awaits the result future synchronously.

Source code in src/capa/runtime/conductor.py
def join(self, timeout: float | None = None) -> bool:
    """Block until the conductor thread exits.

    Returns ``True`` if the thread joined within ``timeout``, ``False``
    otherwise. Intended for tests and for the CLI driver which awaits
    the result future synchronously."""
    if self._thread is None:
        return True
    self._thread.join(timeout=timeout)
    return not self._thread.is_alive()

ConductorConfig dataclass

ConductorConfig(
    saturation_deadline_s: float = DEFAULT_SATURATION_DEADLINE_S,
    saturation_poll_period_s: float = DEFAULT_POLL_PERIOD_S,
    shutdown_grace_s: float = DEFAULT_SHUTDOWN_GRACE_S,
    loop_lag_warn_ms: float = 50.0,
)

Per-run knobs, all with sensible defaults so tests can omit them.

Splits responsibilities with :class:~capa.experiment.config.RuntimeConfig:

  • RuntimeConfig carries the user-tunable knobs an operator may reasonably want to set per experiment (shutdown_grace_s, ui_bridge_capacity, loop_lag_warn_ms).
  • ConductorConfig carries the internal saturation-monitor timing in addition. The saturation knobs are not user-facing today — promote them to RuntimeConfig when a real experiment asks for it.

:meth:from_runtime builds a :class:ConductorConfig by copying the user-tunable knobs off a :class:RuntimeConfig while keeping the saturation defaults.

from_runtime classmethod

from_runtime(
    runtime: RuntimeConfig,
    *,
    saturation_deadline_s: float = DEFAULT_SATURATION_DEADLINE_S,
    saturation_poll_period_s: float = DEFAULT_POLL_PERIOD_S,
) -> ConductorConfig

Build a :class:ConductorConfig from the user-facing :class:RuntimeConfig.

Saturation parameters remain code defaults. Tests and embedding code can override them here for diagnostics; the shipped CLI does not expose them as user-facing flags today.

Source code in src/capa/runtime/conductor.py
@classmethod
def from_runtime(
    cls,
    runtime: RuntimeConfig,
    *,
    saturation_deadline_s: float = DEFAULT_SATURATION_DEADLINE_S,
    saturation_poll_period_s: float = DEFAULT_POLL_PERIOD_S,
) -> ConductorConfig:
    """Build a :class:`ConductorConfig` from the user-facing
    :class:`RuntimeConfig`.

    Saturation parameters remain code defaults. Tests and embedding
    code can override them here for diagnostics; the shipped CLI does
    not expose them as user-facing flags today.
    """
    return cls(
        saturation_deadline_s=saturation_deadline_s,
        saturation_poll_period_s=saturation_poll_period_s,
        shutdown_grace_s=runtime.shutdown_grace_s,
        loop_lag_warn_ms=runtime.loop_lag_warn_ms,
    )

RunOutcome

Bases: StrEnum

How the run actually ended.

Distinct from :class:ConductorState (which is "where am I in the lifecycle"). The bundle's run_status field is populated from this at finalize.

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

Procedure ran to natural completion.

ABORTED class-attribute instance-attribute

ABORTED = 'aborted'

Operator (or supervising code) called :meth:stop before completion.

CRASHED class-attribute instance-attribute

CRASHED = 'crashed'

Unhandled exception in procedure, drain task, or pool. Bundle is still sealed; the failure cause is recorded in events.

CRASHED_BUT_SEALED class-attribute instance-attribute

CRASHED_BUT_SEALED = 'crashed_but_sealed'

Saturation deadline tripped. The conductor sealed the bundle anyway after disarming workers.

RunResult dataclass

RunResult(
    run_id: str,
    bundle_path: Path | None,
    outcome: RunOutcome,
    exit_reason: str | None,
    final_state: ConductorState,
    saturation_event: SaturationEvent | None,
    started_mono_ns: int,
    ended_mono_ns: int,
)

Final outcome — published on :attr:Conductor.result_future.

RunSession

Bases: Protocol

Per-run resources, built by the caller, used by the conductor.

The conductor uses this as an async context manager: :meth:open materializes the bundle / writer / clock and returns a :class:RunContext; :meth:close finalizes the bundle. The caller builds the session (and owns its lifecycle decisions like where the bundle lives) so the conductor stays focused on orchestration. The production session is :class:RealRunSession which wraps :class:RunBundleWriter + :class:WriterThread; tests use a fake.

run_id property

run_id: str

Stable identifier; matches the bundle directory name.

bundle_path property

bundle_path: Path | None

Filesystem path of the bundle, or None if the session hasn't materialized a bundle yet (e.g. in-memory test session).

config property

config: ExperimentConfig

The frozen run recipe. Read by the conductor at arm time for recording-plan resolution (config.run_options.recording_policy and config.hardware). Valid pre- and post-:meth:open.

saturation_source property

saturation_source: WriterSaturationSource | None

Writer-side saturation signal source. None when the session doesn't expose one (some tests).

open async

open() -> RunContext

Materialize per-run resources. Returns the :class:RunContext that the conductor installs into every worker via pool.arm_all.

Failure here is fatal: the conductor cannot recover and the run ends in FAILED state. The session is responsible for cleaning up any partial resources before re-raising.

Source code in src/capa/runtime/conductor.py
async def open(self) -> RunContext:
    """Materialize per-run resources. Returns the :class:`RunContext`
    that the conductor installs into every worker via ``pool.arm_all``.

    Failure here is fatal: the conductor cannot recover and the run
    ends in FAILED state. The session is responsible for cleaning up
    any partial resources before re-raising.
    """
    ...

set_outcome

set_outcome(
    outcome: RunOutcome, exit_reason: str | None
) -> None

Inform the session what to record at finalize. Called by the conductor before :meth:close. Set to COMPLETED by default until :meth:close is called.

Source code in src/capa/runtime/conductor.py
def set_outcome(self, outcome: RunOutcome, exit_reason: str | None) -> None:
    """Inform the session what to record at finalize. Called by the
    conductor before :meth:`close`. Set to ``COMPLETED`` by default
    until :meth:`close` is called.
    """
    ...

ManualClient

ManualClient(
    pool: WorkerPool,
    conductor_provider: Callable[[], Conductor | None],
)

Single async facade for UI manual cards.

Routes :meth:dispatch and :meth:snapshot to a live :class:Conductor when a run is armed (records into the bundle, gates by conductor state), else to the :class:WorkerPool directly. Cards take a :class:ManualClient reference at construction and remain unaware of which side is in use.

The conductor reference is fetched through a callable rather than stored by reference because the conductor's lifetime is per-run: there is no conductor before the first :meth:Conductor.start, no conductor between runs once the previous one has sealed, and a freshly-constructed one on every new run. The provider closes over the :class:~capa.ui.state.RunController's _conductor field and resolves at call time.

A returned conductor is only used if its :meth:~capa.runtime.state.ConductorState.permits_dispatch is true (i.e. PREPARING or RUNNING). DRAINING / FINALIZING / SEALED / FAILED conductors are treated as absent — the manual command falls through to the pool, which is correct: the run is on the way out, the pool is still open, and a between-runs manual command should not be refused because the previous run's conductor is still finalizing.

Source code in src/capa/runtime/dispatch.py
def __init__(
    self,
    pool: WorkerPool,
    conductor_provider: Callable[[], Conductor | None],
) -> None:
    self._pool = pool
    self._get_conductor = conductor_provider

dispatch async

dispatch(device: str, cmd: DeviceCommand) -> CommandResult

Issue cmd against device.

Routes through the conductor when a run is armed and the conductor is dispatchable, else through the pool. The future returned by either side is bridged into the caller's loop via :func:asyncio.wrap_future.

Source code in src/capa/runtime/dispatch.py
async def dispatch(self, device: str, cmd: DeviceCommand) -> CommandResult:
    """Issue ``cmd`` against ``device``.

    Routes through the conductor when a run is armed and the
    conductor is dispatchable, else through the pool. The future
    returned by either side is bridged into the caller's loop via
    :func:`asyncio.wrap_future`.
    """
    conductor = self._active_conductor()
    try:
        if conductor is not None:
            fut = conductor.dispatch(device, cmd)
        else:
            fut = self._pool.dispatch(device, cmd)
    except KeyError as exc:
        raise UnknownDeviceError(device) from exc
    return await asyncio.wrap_future(fut)

snapshot async

snapshot(device: str) -> DeviceEmission

One-shot snapshot of device.

Same routing rules as :meth:dispatch.

Source code in src/capa/runtime/dispatch.py
async def snapshot(self, device: str) -> DeviceEmission:
    """One-shot snapshot of ``device``.

    Same routing rules as :meth:`dispatch`.
    """
    conductor = self._active_conductor()
    try:
        if conductor is not None:
            fut = conductor.snapshot(device)
        else:
            fut = self._pool.snapshot(device)
    except KeyError as exc:
        raise UnknownDeviceError(device) from exc
    return await asyncio.wrap_future(fut)

camera

camera(device_name: str) -> Camera | None

Return the underlying :class:Camera handle for device_name.

Preview JPEGs are NOT consumed through this surface — they ride a per-camera :class:~capa.runtime.bridge.ThreadBridge owned by the pool. Device-probe metadata (UVC ranges, supported resolutions, per-resolution fps caps) is NOT consumed through this surface either — use :meth:camera_metadata for a worker-loop-safe snapshot.

Retained for tests that want to assert the wrapper hosts the expected handle; UI code should not introduce new callers. The returned handle lives on the worker loop, so touching its methods from the qasync loop violates the worker-owns-the-handle invariant.

Returns None if the device name is unknown or refers to a non-camera adapter.

Source code in src/capa/runtime/dispatch.py
def camera(self, device_name: str) -> Camera | None:
    """Return the underlying :class:`Camera` handle for ``device_name``.

    Preview JPEGs are NOT consumed through this surface — they ride
    a per-camera :class:`~capa.runtime.bridge.ThreadBridge` owned by
    the pool. Device-probe metadata (UVC ranges, supported resolutions,
    per-resolution fps caps) is NOT consumed through this surface
    either — use :meth:`camera_metadata` for a worker-loop-safe
    snapshot.

    Retained for tests that want to assert the wrapper hosts the
    expected handle; UI code should not introduce new callers. The
    returned handle lives on the worker loop, so touching its methods
    from the qasync loop violates the worker-owns-the-handle invariant.

    Returns ``None`` if the device name is unknown or refers to a
    non-camera adapter.
    """
    try:
        worker = self._pool.worker_for(device_name)
    except Exception:
        return None
    adapter: object | None = worker.adapters.get(device_name)
    if not isinstance(adapter, CameraDeviceAdapter):
        return None
    return adapter.camera

camera_metadata async

camera_metadata(device_name: str) -> WebcamMetadata | None

Probe a camera's metadata across loops without touching the handle.

Submits the read to the worker that owns device_name; the worker runs camera.snapshot_metadata() on its own loop and signals the resulting future. :func:asyncio.wrap_future bridges back to the caller's loop — same pattern as :meth:dispatch and :meth:snapshot.

Returns None for adapters whose underlying camera does not expose a metadata surface (IR cameras today, plus the safety net for any future non-webcam adapter routed here by mistake). Card code treats None as "fall back to static widget defaults".

Raises :class:UnknownDeviceError for names that aren't configured — same surface as :meth:dispatch. Does not route through the conductor: metadata is a pool-resident probe with no run-state semantics, so the same call path applies whether or not a run is armed.

Source code in src/capa/runtime/dispatch.py
async def camera_metadata(self, device_name: str) -> WebcamMetadata | None:
    """Probe a camera's metadata across loops without touching the handle.

    Submits the read to the worker that owns ``device_name``; the
    worker runs ``camera.snapshot_metadata()`` on its own loop and
    signals the resulting future. :func:`asyncio.wrap_future` bridges
    back to the caller's loop — same pattern as :meth:`dispatch` and
    :meth:`snapshot`.

    Returns ``None`` for adapters whose underlying camera does not
    expose a metadata surface (IR cameras today, plus the safety
    net for any future non-webcam adapter routed here by mistake).
    Card code treats ``None`` as "fall back to static widget defaults".

    Raises :class:`UnknownDeviceError` for names that aren't configured
    — same surface as :meth:`dispatch`. Does **not** route through the
    conductor: metadata is a pool-resident probe with no run-state
    semantics, so the same call path applies whether or not a run is
    armed.
    """
    try:
        fut = self._pool.camera_metadata(device_name)
    except KeyError as exc:
        raise UnknownDeviceError(device_name) from exc
    return await asyncio.wrap_future(fut)

device_readback async

device_readback(device_name: str) -> object

Probe a device's read_state_snapshot() across loops.

Returns the adapter-specific snapshot (e.g. :class:~capa.devices.watlow.WatlowStateSnapshot) or None for adapters that don't implement it. Card code treats None as "fall back to widget defaults".

Does not route through the conductor: this is a pool-resident read used by manual-control cards to prefill their widgets, with no run-state semantics. Submits to the worker that owns device_name and bridges the future back to the caller's loop.

Source code in src/capa/runtime/dispatch.py
async def device_readback(self, device_name: str) -> object:
    """Probe a device's ``read_state_snapshot()`` across loops.

    Returns the adapter-specific snapshot (e.g.
    :class:`~capa.devices.watlow.WatlowStateSnapshot`) or ``None`` for
    adapters that don't implement it. Card code treats ``None`` as
    "fall back to widget defaults".

    Does **not** route through the conductor: this is a pool-resident
    read used by manual-control cards to prefill their widgets, with no
    run-state semantics. Submits to the worker that owns ``device_name``
    and bridges the future back to the caller's loop.
    """
    try:
        fut = self._pool.device_readback(device_name)
    except KeyError as exc:
        raise UnknownDeviceError(device_name) from exc
    return await asyncio.wrap_future(fut)

ConductorStateError

ConductorStateError(
    message: str, *, current: ConductorState
)

Bases: CapaError

Operation attempted in an incompatible :class:~capa.runtime.state.ConductorState.

Raised when a dispatch or other run-gated call arrives outside the PREPARING / RUNNING window — typically a procedure-issued command landing during DRAINING which would race with adapter.stop().

Source code in src/capa/runtime/errors.py
def __init__(self, message: str, *, current: ConductorState) -> None:
    super().__init__(message)
    self.current = current

PoolStateError

PoolStateError(
    message: str,
    *,
    from_state: object | None = None,
    to_state: object | None = None,
)

Bases: CapaError

Pool is in a state that does not permit the attempted operation.

open() may not be called twice; close() may not be called while any worker is non-IDLE; arm_all() requires :attr:~capa.runtime.lifecycle.PoolState.OPEN.

Source code in src/capa/runtime/errors.py
def __init__(
    self,
    message: str,
    *,
    from_state: object | None = None,
    to_state: object | None = None,
) -> None:
    super().__init__(message)
    self.from_state = from_state
    self.to_state = to_state

ResourceConflict

ResourceConflict(
    message: str,
    *,
    conflicting_names: tuple[str, ...] = (),
    resource_key: str | None = None,
)

Bases: CapaError

Two adapters claim the same hardware contention domain.

Raised synchronously from :func:~capa.runtime.build.build_workers before any worker thread is spawned, so a misconfigured config fails fast with no hardware side effects.

The conflicting_names attribute is the pair (or set) of adapter names that triggered the conflict, surfaced into the operator-facing error toast so the fix is obvious.

Source code in src/capa/runtime/errors.py
def __init__(
    self,
    message: str,
    *,
    conflicting_names: tuple[str, ...] = (),
    resource_key: str | None = None,
) -> None:
    super().__init__(message)
    self.conflicting_names = conflicting_names
    self.resource_key = resource_key

RunnerStateError

Bases: CapaError

:class:~capa.runtime.runner.WorkerRunner used in a state it doesn't permit.

The runner abstraction supports both real-thread and inline test backends; both have lifecycles (startsubmit ... → stop) and both raise this when called out of order. Kept separate from :class:WorkerStateError so test failures point at the harness rather than the production state machine.

UnknownDeviceError

UnknownDeviceError(
    name: str, *, configured_names: tuple[str, ...] = ()
)

Bases: CapaError

Caller asked the pool to dispatch to a device name not in this config.

Distinct from :class:ResourceConflict (which fires at build time) — this fires at dispatch time when a UI card or procedure step references a device whose configuration was removed or renamed. The configured_names attribute lists what is available, so the operator sees the typo immediately.

Source code in src/capa/runtime/errors.py
def __init__(
    self,
    name: str,
    *,
    configured_names: tuple[str, ...] = (),
) -> None:
    super().__init__(f"unknown device {name!r}; configured: {sorted(configured_names)}")
    self.name = name
    self.configured_names = configured_names

WorkerStateError

WorkerStateError(
    message: str,
    *,
    from_state: object | None = None,
    to_state: object | None = None,
    resource_id: str | None = None,
)

Bases: CapaError

Worker is in a state that does not permit the attempted operation.

Raised at two kinds of site:

  • Illegal transition. A caller asked the worker to move from_state → to_state where that edge is not in :data:~capa.runtime.lifecycle.LEGAL_WORKER_EDGES. The from_state/to_state attributes are populated.
  • Operation refused in current state. dispatch() called while the worker is DRAINING/CLOSED, arm() while not IDLE, etc. The from_state is populated; to_state is None.

The exception is the canonical thing the worker-loop coroutine raises; the sync facade (Worker.dispatchconcurrent.futures.Future) re-raises it across the thread seam via asyncio.wrap_future so the caller observes the original.

Source code in src/capa/runtime/errors.py
def __init__(
    self,
    message: str,
    *,
    from_state: object | None = None,
    to_state: object | None = None,
    resource_id: str | None = None,
) -> None:
    super().__init__(message)
    self.from_state = from_state
    self.to_state = to_state
    self.resource_id = resource_id

HeadlessResult dataclass

HeadlessResult(
    run_id: str,
    bundle_path: Path | None,
    run_status: str,
    bundle_status: str,
    integrity_status: str,
    exit_reason: str | None = None,
)

Outcome of one :func:run_headless invocation.

exit_code

exit_code() -> int

Map the outcome to the documented process exit code.

Returns:

Type Description
int

0 on a clean sealed run, 1 on operator abort, 2

int

on procedure / runtime crash, 3 on post-seal verification

int

failure, 5 otherwise (unknown / partial). See

int
Source code in src/capa/runtime/headless.py
def exit_code(self) -> int:
    """Map the outcome to the documented process exit code.

    Returns:
        ``0`` on a clean sealed run, ``1`` on operator abort, ``2``
        on procedure / runtime crash, ``3`` on post-seal verification
        failure, ``5`` otherwise (unknown / partial). See
        [Exit codes](../../docs/reference/exit-codes.md).
    """
    if self.bundle_status == "verification_failed":
        return 3
    if self.run_status == "completed" and self.bundle_status == "sealed":
        return 0
    if self.run_status == "aborted":
        return 1
    if self.run_status == "crashed":
        return 2
    return 5

WorkerPool

WorkerPool(
    *,
    workers: Mapping[str, Worker],
    device_to_resource: Mapping[str, str],
    preview_bridges: Mapping[
        str, ThreadBridge[PreviewFrame]
    ]
    | None = None,
)

All :class:Worker\ s for one loaded config.

The pool is constructed against a prebuilt workers map (so tests can inject fakes without going through TOML); the :classmethod:from_config factory builds the workers via :func:build_workers.

Lifetime: pool lives as long as the loaded config. Reloading the config tears down the old pool (every worker closes) and a new pool is constructed for the new config. Manual-control-between-runs is the load-bearing property: one :meth:open pays the adapter cold-open cost once; every subsequent arm/disarm reuses the same opened hardware.

Source code in src/capa/runtime/pool.py
def __init__(
    self,
    *,
    workers: Mapping[str, Worker],
    device_to_resource: Mapping[str, str],
    preview_bridges: Mapping[str, ThreadBridge[PreviewFrame]] | None = None,
) -> None:
    if not workers:
        raise ValueError("WorkerPool: workers map must not be empty")
    self._workers: dict[str, Worker] = dict(workers)
    self._device_to_resource: dict[str, str] = dict(device_to_resource)
    self._state: PoolState = PoolState.CLOSED
    # The state mutex serializes open() / close() against each other.
    # Run-lifetime methods (arm_all, etc.) don't touch the state — the
    # pool's state only changes at config boundaries.
    self._state_lock = asyncio.Lock()
    # Per-camera preview bridges. Constructed by :meth:`from_config`
    # when a consumer loop is provided; empty for headless runs.
    # Workers receive a partitioned view of this map at construction
    # via :func:`build_workers`.
    self._preview_bridges: dict[str, ThreadBridge[PreviewFrame]] = (
        dict(preview_bridges) if preview_bridges else {}
    )
    # Latches so attach_preview_consumers / close_preview_bridges
    # stay idempotent under reentrant teardown paths.
    self._preview_consumers_attached = False

state property

state: PoolState

Current :class:PoolStateIDLE, OPEN, or DRAINING.

workers property

workers: Mapping[str, Worker]

Immutable view of the worker map keyed by resource_id.

device_names property

device_names: tuple[str, ...]

All device names owned by any worker in the pool, in registration order.

from_config classmethod

from_config(
    config: ExperimentConfig,
    *,
    runner_factory: Callable[..., WorkerRunner]
    | None = None,
    preview_consumer_loop: AbstractEventLoop | None = None,
    preview_capacity: int = 4,
) -> WorkerPool

Build a pool from a real :class:ExperimentConfig.

Materialization and validation run synchronously here. :class:~capa.devices.materialize.ConfigMaterializationError surfaces adapter construction failures; :class:ResourceConflict surfaces grouping conflicts. Both fire before any worker is constructed, so a misconfigured config has no hardware side effects.

preview_consumer_loop: the loop that will drain preview bridges (the qasync UI loop in production; the test loop in integration tests). When None (headless), no preview bridges are constructed and the camera adapter's preview lifecycle no-ops — zero overhead for headless runs.

Outbound bridge capacity is derived per worker from each adapter's :attr:DeviceAdapter.expected_emission_rate_hz inside :func:build_workers; the previous outbound_capacity kwarg has been retired.

Source code in src/capa/runtime/pool.py
@classmethod
def from_config(
    cls,
    config: ExperimentConfig,
    *,
    runner_factory: Callable[..., WorkerRunner] | None = None,
    preview_consumer_loop: asyncio.AbstractEventLoop | None = None,
    preview_capacity: int = 4,
) -> WorkerPool:
    """Build a pool from a real :class:`ExperimentConfig`.

    Materialization and validation run synchronously here.
    :class:`~capa.devices.materialize.ConfigMaterializationError`
    surfaces adapter construction failures; :class:`ResourceConflict`
    surfaces grouping conflicts. Both fire before any worker is
    constructed, so a misconfigured config has no hardware side
    effects.

    ``preview_consumer_loop``: the loop that will drain preview
    bridges (the qasync UI loop in production; the test loop in
    integration tests). When ``None`` (headless), no preview bridges
    are constructed and the camera adapter's preview lifecycle
    no-ops — zero overhead for headless runs.

    Outbound bridge capacity is derived per worker from each
    adapter's :attr:`DeviceAdapter.expected_emission_rate_hz` inside
    :func:`build_workers`; the previous ``outbound_capacity`` kwarg
    has been retired.
    """
    from capa.devices.materialize import materialize_adapters  # noqa: PLC0415
    from capa.runtime.build import build_workers  # noqa: PLC0415

    materialized = materialize_adapters(config)
    preview_bridges: dict[str, ThreadBridge[PreviewFrame]] = {}
    if preview_consumer_loop is not None:
        for cam in config.hardware.cameras:
            preview_bridges[cam.name] = ThreadBridge(
                name=f"preview-{cam.name}",
                capacity=preview_capacity,
                consumer_loop=preview_consumer_loop,
                policy=BridgePolicy.DROP_OLDEST,
            )
    workers, device_to_resource = build_workers(
        materialized,
        runner_factory=runner_factory,
        preview_bridges=preview_bridges,
    )
    return cls(
        workers=workers,
        device_to_resource=device_to_resource,
        preview_bridges=preview_bridges,
    )

preview_bridges

preview_bridges() -> Mapping[
    str, ThreadBridge[PreviewFrame]
]

Per-camera preview bridges, keyed by camera spec name.

Empty when the pool was constructed without a UI consumer loop (headless runs). The mapping itself is read-only — callers that need to spawn drainers should iterate the entries.

Source code in src/capa/runtime/pool.py
def preview_bridges(self) -> Mapping[str, ThreadBridge[PreviewFrame]]:
    """Per-camera preview bridges, keyed by camera spec name.

    Empty when the pool was constructed without a UI consumer loop
    (headless runs). The mapping itself is read-only — callers that
    need to spawn drainers should iterate the entries.
    """
    return MappingProxyType(self._preview_bridges)

attach_preview_consumers

attach_preview_consumers() -> None

Bind every preview bridge's consumer side to the running loop.

Must be called from the consumer loop (the qasync UI loop) before :meth:open is awaited — workers attach their producers inside :meth:Worker._open_all_impl and a producer attach can race a consumer-side enqueue. Calling this first builds the :class:asyncio.Queue so the eventual producer always has a target.

Idempotent: a second call is a no-op.

Source code in src/capa/runtime/pool.py
def attach_preview_consumers(self) -> None:
    """Bind every preview bridge's consumer side to the running loop.

    Must be called from the consumer loop (the qasync UI loop) before
    :meth:`open` is awaited — workers attach their producers inside
    :meth:`Worker._open_all_impl` and a producer attach can race a
    consumer-side enqueue. Calling this first builds the
    :class:`asyncio.Queue` so the eventual producer always has a
    target.

    Idempotent: a second call is a no-op.
    """
    if self._preview_consumers_attached:
        return
    for bridge in self._preview_bridges.values():
        bridge.attach_consumer()
    self._preview_consumers_attached = True

close_preview_bridges

close_preview_bridges() -> None

Close every preview bridge. Called from :meth:close after all workers have closed, so no producer can write afterwards.

Idempotent: closing an already-closed bridge is a no-op (see :meth:ThreadBridge.close).

Source code in src/capa/runtime/pool.py
def close_preview_bridges(self) -> None:
    """Close every preview bridge. Called from :meth:`close` after
    all workers have closed, so no producer can write afterwards.

    Idempotent: closing an already-closed bridge is a no-op (see
    :meth:`ThreadBridge.close`).
    """
    for bridge in self._preview_bridges.values():
        bridge.close()

worker_for

worker_for(device_name: str) -> Worker

Return the worker hosting the named adapter.

Raises :class:UnknownDeviceError if the name is not configured.

Source code in src/capa/runtime/pool.py
def worker_for(self, device_name: str) -> Worker:
    """Return the worker hosting the named adapter.

    Raises :class:`UnknownDeviceError` if the name is not configured.
    """
    try:
        rid = self._device_to_resource[device_name]
    except KeyError as exc:
        raise UnknownDeviceError(device_name, configured_names=self.device_names) from exc
    return self._workers[rid]

open async

open(
    progress_callback: OpenProgressCallback | None = None,
) -> None

Start every worker; on first failure, roll back in reverse order.

Workers start in parallel — bus-collision avoidance is the resource_id grouping job, not a sequential warm-up responsibility. At 6 workers this cuts pool-open from a ~2 s sequential warm-up to ~500 ms.

On any worker's start failure: every already-opened worker is closed in reverse order (LIFO of completion), then the original exception propagates. The pool returns to :attr:PoolState.CLOSED.

Source code in src/capa/runtime/pool.py
async def open(self, progress_callback: OpenProgressCallback | None = None) -> None:
    """Start every worker; on first failure, roll back in reverse order.

    Workers start in parallel — bus-collision avoidance is the
    ``resource_id`` grouping job, not a sequential warm-up
    responsibility. At 6 workers this cuts pool-open from a ~2 s
    sequential warm-up to ~500 ms.

    On any worker's start failure: every already-opened worker is
    closed in reverse order (LIFO of completion), then the original
    exception propagates. The pool returns to :attr:`PoolState.CLOSED`.
    """
    async with self._state_lock:
        self._transition(PoolState.OPENING)

        # Kick off every worker's start in parallel. Use async_start() so
        # we don't need to wrap futures — we get native coroutines that
        # can be gathered directly.
        start_tasks: list[tuple[Worker, asyncio.Task[None]]] = []
        for worker in self._workers.values():
            task = asyncio.create_task(worker.async_start(progress_callback=progress_callback))
            start_tasks.append((worker, task))

        # Wait for ALL to complete (success or failure). We don't want
        # to abandon in-progress starts on first failure — they may
        # leave half-opened threads behind. Collect outcomes, then
        # decide rollback policy.
        await asyncio.gather(*(t for _, t in start_tasks), return_exceptions=True)

        # Categorize.
        opened: list[Worker] = []
        failed: list[tuple[Worker, BaseException]] = []
        for worker, task in start_tasks:
            exc = task.exception()
            if exc is None:
                opened.append(worker)
            else:
                failed.append((worker, exc))

        if not failed:
            self._transition(PoolState.OPEN)
            _logger.info(
                "pool.open",
                worker_count=len(self._workers),
                resources=tuple(self._workers),
            )
            return

        # Rollback path. Close every successfully-opened worker in
        # REVERSE order so paired resources (think watlowlib + serial)
        # tear down LIFO. We swallow rollback errors — they cannot
        # mask the original cause, which is the first failure.
        _logger.warning(
            "pool.open_partial_failure",
            failed_count=len(failed),
            opened_count=len(opened),
            first_error=str(failed[0][1]),
        )
        self._transition(PoolState.CLOSING)
        for worker in reversed(opened):
            # async_close() now returns a structured result rather than
            # raising on adapter-level errors. Any unexpected
            # exception (impl crash) gets swallowed here so the
            # original open failure can propagate; degraded close
            # outcomes are logged but never mask the open cause.
            try:
                rollback_result = await worker.async_close(grace_s=5.0)
            except Exception as rollback_exc:
                _logger.warning(
                    "pool.open_rollback_close_crashed",
                    resource_id=worker.resource_id,
                    error=str(rollback_exc),
                )
                for adapter in worker.adapters.values():
                    self._emit_open_progress(
                        progress_callback,
                        worker=worker,
                        adapter=adapter,
                        status=DeviceInitStatus.ROLLED_BACK,
                        detail=f"rollback close failed: {rollback_exc}",
                    )
                continue
            if not rollback_result.runner_stop.joined or (rollback_result.adapter_close_errors):
                _logger.warning(
                    "pool.open_rollback_close_degraded",
                    resource_id=worker.resource_id,
                    adapter_close_errors=rollback_result.adapter_close_errors,
                    runner_joined=rollback_result.runner_stop.joined,
                )
            detail = "rolled back after another device failed"
            if rollback_result.adapter_close_errors or not rollback_result.runner_stop.joined:
                detail = "rollback degraded; see logs"
            for adapter in worker.adapters.values():
                self._emit_open_progress(
                    progress_callback,
                    worker=worker,
                    adapter=adapter,
                    status=DeviceInitStatus.ROLLED_BACK,
                    detail=detail,
                )
        # Close preview bridges too — producers are gone now, so any
        # pending UI-side drainer wakes with ThreadBridgeClosedError
        # and exits cleanly.
        self.close_preview_bridges()
        self._transition(PoolState.CLOSED)
        # Propagate the first failure — the caller wants the original
        # exception type and message, not a synthetic aggregate.
        raise failed[0][1]

shutdown_close async

shutdown_close(*, grace_s: float = 5.0) -> PoolCloseResult

Best-effort close for the :class:ShutdownCoordinator.

Unlike :meth:close, this does NOT refuse on non-IDLE workers. The coordinator has already attempted to abort the active run by the time it calls here, but a worker may still be in ARMED/SAMPLING if the conductor's drain itself wedged. This method:

  1. Disarms any worker still in ARMED/SAMPLING (parallel, bounded by grace_s). Disarm failures are captured as pool errors, never raised — the goal is to release as much hardware as we can before the coordinator's hard fuse fires.
  2. Closes every worker in parallel, exactly like :meth:close.
  3. Returns the aggregate :class:PoolCloseResult so the coordinator can record non-IDLE entry and any disarm timeouts in its :class:ShutdownResult.errors.

The strict :meth:close stays for config-reload teardown where IDLE is a real invariant.

Source code in src/capa/runtime/pool.py
async def shutdown_close(self, *, grace_s: float = 5.0) -> PoolCloseResult:
    """Best-effort close for the :class:`ShutdownCoordinator`.

    Unlike :meth:`close`, this does NOT refuse on non-IDLE workers.
    The coordinator has already attempted to abort the active run by
    the time it calls here, but a worker may still be in
    ``ARMED``/``SAMPLING`` if the conductor's drain itself wedged.
    This method:

    1. Disarms any worker still in ARMED/SAMPLING (parallel, bounded
       by ``grace_s``). Disarm failures are captured as pool errors,
       never raised — the goal is to release as much hardware as we
       can before the coordinator's hard fuse fires.
    2. Closes every worker in parallel, exactly like :meth:`close`.
    3. Returns the aggregate :class:`PoolCloseResult` so the
       coordinator can record non-IDLE entry and any disarm
       timeouts in its :class:`ShutdownResult.errors`.

    The strict :meth:`close` stays for config-reload teardown where
    IDLE is a real invariant.
    """
    async with self._state_lock:
        if self._state is PoolState.CLOSED:
            return PoolCloseResult(clean=True, worker_results=(), errors=())

        pool_errors: list[str] = []

        # Step 1: disarm any non-IDLE workers, in parallel. Workers
        # that are already IDLE skip this step (worker.async_disarm() would
        # raise). Failures are recorded as pool errors but never
        # short-circuit — we still want to close every adapter we can.
        disarm_tasks: dict[str, asyncio.Task[DisarmResult]] = {}
        for rid, worker in self._workers.items():
            if worker.state in (WorkerState.ARMED, WorkerState.SAMPLING):
                disarm_tasks[rid] = asyncio.create_task(worker.async_disarm(grace_s=grace_s))
        if disarm_tasks:
            await asyncio.gather(*disarm_tasks.values(), return_exceptions=True)
            for rid, task in disarm_tasks.items():
                exc = task.exception()
                if exc is not None:
                    msg = f"worker {rid!r} shutdown disarm failed: {exc!r}"
                    pool_errors.append(msg)
                    _logger.warning(
                        "pool.shutdown_close_disarm_failed",
                        resource_id=rid,
                        error=str(exc),
                    )

        # Capture any worker that remained non-IDLE after the disarm
        # attempt — these may still have running stream tasks that
        # close() will see; we record but proceed.
        still_non_idle = [
            (w.resource_id, w.state.value)
            for w in self._workers.values()
            if w.state is not WorkerState.IDLE
        ]
        if still_non_idle:
            pool_errors.append(f"workers non-IDLE entering close: {still_non_idle}")
            _logger.warning(
                "pool.shutdown_close_workers_non_idle",
                non_idle=tuple(still_non_idle),
            )

        self._transition(PoolState.CLOSING)

        # Step 2: close every worker. Only close IDLE workers; non-IDLE
        # workers are recorded as errors.
        idle_workers = [w for w in self._workers.values() if w.state is WorkerState.IDLE]
        for worker in self._workers.values():
            if worker.state is not WorkerState.IDLE:
                pool_errors.append(f"worker {worker.resource_id!r} non-IDLE; close skipped")

        close_results, close_errors = await self._close_workers_parallel(
            idle_workers, grace_s=grace_s
        )
        pool_errors.extend(close_errors)

        self.close_preview_bridges()
        self._transition(PoolState.CLOSED)
        clean = not pool_errors and all(
            not w.adapter_close_errors and not w.adapter_stop_errors and w.runner_stop.joined
            for w in close_results
        )
        _logger.info(
            "shutdown.pool_close_result",
            worker_count=len(self._workers),
            clean=clean,
            pool_errors=tuple(pool_errors),
        )
        return PoolCloseResult(
            clean=clean,
            worker_results=tuple(close_results),
            errors=tuple(pool_errors),
        )

arm_all async

arm_all(run_context: RunContext) -> None

Transition every worker IDLE → ARMED with the same run context.

Parallel. On any failure, in-flight arms are awaited but the pool does not undo successful arms — that's the conductor's job via :meth:disarm_all.

Source code in src/capa/runtime/pool.py
async def arm_all(self, run_context: RunContext) -> None:
    """Transition every worker IDLE → ARMED with the same run context.

    Parallel. On any failure, in-flight arms are awaited but the
    pool does not undo successful arms — that's the conductor's job
    via :meth:`disarm_all`.
    """
    self._require_open()
    tasks = [
        asyncio.create_task(worker.async_arm(run_context)) for worker in self._workers.values()
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    first_exc: BaseException | None = None
    for worker, result in zip(self._workers.values(), results, strict=True):
        if isinstance(result, BaseException) and first_exc is None:
            first_exc = result
            _logger.warning(
                "pool.arm_failed",
                resource_id=worker.resource_id,
                error=str(result),
            )
    if first_exc is not None:
        raise first_exc

begin_sampling_all async

begin_sampling_all(
    *, consumer_loop: AbstractEventLoop
) -> dict[str, ThreadBridge[WorkerEmission]]

Transition every worker ARMED → SAMPLING. Return outbound bridges.

Returns a dict keyed by resource_id; the conductor's drain tasks iterate bridges.items() and spawn one drain coroutine per bridge.

consumer_loop is the loop that will drain the bridges — the conductor's loop in production; the test's loop in integration tests.

Source code in src/capa/runtime/pool.py
async def begin_sampling_all(
    self, *, consumer_loop: asyncio.AbstractEventLoop
) -> dict[str, ThreadBridge[WorkerEmission]]:
    """Transition every worker ARMED → SAMPLING. Return outbound bridges.

    Returns a dict keyed by ``resource_id``; the conductor's drain
    tasks iterate ``bridges.items()`` and spawn one drain coroutine
    per bridge.

    ``consumer_loop`` is the loop that will drain the bridges — the
    conductor's loop in production; the test's loop in integration
    tests.
    """
    self._require_open()
    tasks = [
        (
            rid,
            asyncio.create_task(worker.async_begin_sampling(consumer_loop=consumer_loop)),
        )
        for rid, worker in self._workers.items()
    ]
    bridges: dict[str, ThreadBridge[WorkerEmission]] = {}
    first_exc: BaseException | None = None
    for rid, task in tasks:
        try:
            bridges[rid] = await task
        except BaseException as exc:
            if first_exc is None:
                first_exc = exc
            _logger.warning(
                "pool.begin_sampling_failed",
                resource_id=rid,
                error=str(exc),
            )
    if first_exc is not None:
        # Some workers entered SAMPLING; the caller (conductor) will
        # call disarm_all to roll back. We don't unilaterally disarm
        # here because the conductor owns shutdown ordering.
        raise first_exc
    return bridges

disarm_all async

disarm_all(
    *, grace_s: float = 5.0
) -> dict[str, DisarmResult]

Transition every worker SAMPLING/ARMED → DRAINING → IDLE.

Per-worker grace; the slowest worker bounds the overall time but every worker's disarm runs in parallel. The result maps resource_id → DisarmResult so the conductor can identify which workers force-cancelled.

Workers that are already in IDLE (e.g. never armed) are skipped rather than erroring — disarm_all is idempotent over a not-fully-armed pool.

Source code in src/capa/runtime/pool.py
async def disarm_all(self, *, grace_s: float = 5.0) -> dict[str, DisarmResult]:
    """Transition every worker SAMPLING/ARMED → DRAINING → IDLE.

    Per-worker grace; the slowest worker bounds the overall time
    but every worker's disarm runs in parallel. The result maps
    resource_id → DisarmResult so the conductor can identify which
    workers force-cancelled.

    Workers that are already in IDLE (e.g. never armed) are skipped
    rather than erroring — disarm_all is idempotent over a
    not-fully-armed pool.
    """
    self._require_open()
    tasks: dict[str, asyncio.Task[DisarmResult]] = {}
    for rid, worker in self._workers.items():
        if worker.state in (WorkerState.ARMED, WorkerState.SAMPLING):
            tasks[rid] = asyncio.create_task(worker.async_disarm(grace_s=grace_s))
    if not tasks:
        return {}
    await asyncio.gather(*tasks.values(), return_exceptions=True)
    results: dict[str, DisarmResult] = {}
    for rid, task in tasks.items():
        exc = task.exception()
        if exc is not None:
            # A disarm exception is unusual — the worker.disarm body is
            # bounded and catches adapter.stop failures internally.
            # Record as FORCED for conservatism and surface in logs.
            _logger.warning(
                "pool.disarm_failed",
                resource_id=rid,
                error=str(exc),
            )
            results[rid] = DisarmResult.FORCED
        else:
            results[rid] = task.result()
    return results

dispatch

dispatch(
    device: str, cmd: DeviceCommand
) -> Future[CommandResult]

Route a command to the worker hosting device.

Synchronous facade (returns a :class:concurrent.futures.Future) — the PoolClient async-wraps this. Used directly by tests.

State-gating happens inside the worker (per worker's own state). Pool-level state is not checked here on purpose: a CLOSED pool has empty _device_to_resource so :meth:worker_for would raise :class:UnknownDeviceError first.

Source code in src/capa/runtime/pool.py
def dispatch(self, device: str, cmd: DeviceCommand) -> Future[CommandResult]:
    """Route a command to the worker hosting ``device``.

    Synchronous facade (returns a :class:`concurrent.futures.Future`)
    — the PoolClient async-wraps this. Used directly by tests.

    State-gating happens inside the worker (per worker's own state).
    Pool-level state is not checked here on purpose: a CLOSED pool
    has empty ``_device_to_resource`` so :meth:`worker_for` would
    raise :class:`UnknownDeviceError` first.
    """
    worker = self.worker_for(device)
    return worker.dispatch(device, cmd)

snapshot

snapshot(device: str) -> Future[DeviceEmission]

One-shot adapter.snapshot() on the worker hosting device.

Source code in src/capa/runtime/pool.py
def snapshot(self, device: str) -> Future[DeviceEmission]:
    """One-shot ``adapter.snapshot()`` on the worker hosting ``device``."""
    worker = self.worker_for(device)
    return worker.snapshot(device)

camera_metadata

camera_metadata(
    device: str,
) -> Future[WebcamMetadata | None]

Probe one camera's metadata on the worker that owns it.

Future resolves to :class:WebcamMetadata when device is a webcam, None otherwise (IR cameras, non-camera adapters). Pool-level state isn't checked here — :meth:worker_for raises :class:UnknownDeviceError on a CLOSED pool's empty routing map before we get this far.

Source code in src/capa/runtime/pool.py
def camera_metadata(self, device: str) -> Future[WebcamMetadata | None]:
    """Probe one camera's metadata on the worker that owns it.

    Future resolves to :class:`WebcamMetadata` when ``device`` is a
    webcam, ``None`` otherwise (IR cameras, non-camera adapters).
    Pool-level state isn't checked here — :meth:`worker_for` raises
    :class:`UnknownDeviceError` on a CLOSED pool's empty routing map
    before we get this far.
    """
    worker = self.worker_for(device)
    return worker.camera_metadata(device)

device_readback

device_readback(device: str) -> Future[Any]

Probe one device's read_state_snapshot() on the owning worker.

Future resolves to whatever the adapter returns from its :meth:read_state_snapshot (e.g. :class:WatlowStateSnapshot), or None when the adapter doesn't implement that surface. Used by manual-control cards to prefill their widgets with the device's current operator-facing values.

Source code in src/capa/runtime/pool.py
def device_readback(self, device: str) -> Future[Any]:
    """Probe one device's ``read_state_snapshot()`` on the owning worker.

    Future resolves to whatever the adapter returns from its
    :meth:`read_state_snapshot` (e.g. :class:`WatlowStateSnapshot`),
    or ``None`` when the adapter doesn't implement that surface.
    Used by manual-control cards to prefill their widgets with the
    device's current operator-facing values.
    """
    worker = self.worker_for(device)
    return worker.device_readback(device)

RealRunSession

RealRunSession(
    *,
    config: ExperimentConfig,
    runs_root: Path,
    run_id: str | None = None,
    plugins_lock: PluginsLock | None = None,
    repo_root: Path | None = None,
    lockfile_source: Path | None = None,
    adapter_by_device: dict[str, DeviceAdapter]
    | None = None,
    adapter_by_camera: dict[str, Any] | None = None,
    catalog: RunCatalog | None = None,
    metrics: MetricsRegistry | None = None,
    engine_version: str = "conductor",
    configure_logging_for_bundle: bool = True,
    config_path: Path | None = None,
)

Production :class:RunSession for the Conductor.

Built by the headless entry point (and by the GUI). The session is per-run; reuse across runs is not supported.

The :meth:writer_thread / :meth:bundle_writer / :meth:clock / :meth:authorization properties become valid only after :meth:open has run. Callers needing those for downstream wiring (e.g. :class:ProcedureRunner construction) get them from the conductor's runner_factory callback, which fires after :meth:open returns.

Source code in src/capa/runtime/session.py
def __init__(
    self,
    *,
    config: ExperimentConfig,
    runs_root: Path,
    run_id: str | None = None,
    plugins_lock: PluginsLock | None = None,
    repo_root: Path | None = None,
    lockfile_source: Path | None = None,
    adapter_by_device: dict[str, DeviceAdapter] | None = None,
    adapter_by_camera: dict[str, Any] | None = None,
    catalog: RunCatalog | None = None,
    metrics: MetricsRegistry | None = None,
    engine_version: str = "conductor",
    configure_logging_for_bundle: bool = True,
    config_path: Path | None = None,
) -> None:
    self._config = config
    self._runs_root = runs_root
    self._run_id = run_id or make_run_id(sample_id=config.sample.id)
    self._plugins_lock = plugins_lock
    self._repo_root = repo_root
    self._lockfile_source = lockfile_source
    # The adapter maps are populated by the caller AFTER pool.open() (the
    # caller walks pool.workers to build them) and BEFORE conductor.start.
    # Empty dicts here = headless tests / runs that don't need equipment
    # blocks.
    self._adapter_by_device = adapter_by_device or {}
    self._adapter_by_camera = adapter_by_camera or {}
    self._catalog = catalog
    self._metrics = metrics
    self._engine_version = engine_version
    self._configure_logging_for_bundle = configure_logging_for_bundle
    self._config_path = config_path

    self._bundle_writer: RunBundleWriter | None = None
    self._writer_thread: WriterThread | None = None
    self._clock: RunClock | None = None
    self._authorization: Authorization | None = None
    self._logger: Any = structlog.get_logger("capa")
    self._bundle_path: Path | None = None
    self._opened = False
    # Active-bundle checkpoint state. Written immediately after the
    # bundle directory is created so a hard exit between
    # open and finalize leaves a recoverable breadcrumb at
    # ``<runs_root>/.runtime-active.json``. Cleared at clean close.
    self._checkpoint_written = False
    self._outcome: RunOutcome = RunOutcome.COMPLETED
    self._exit_reason: str | None = None
    # Per-loop / per-bridge / per-worker diagnostics handed in by the
    # Conductor before close. Merged into the manifest's queue_health
    # dict at finalize so the bundle's on-disk schema stays put.
    self._extra_queue_health: dict[str, dict[str, float]] = {}

run_id property

run_id: str

Stable identifier assigned to this run at session construction.

bundle_path property

bundle_path: Path | None

Bundle directory on disk. None before :meth:open succeeds.

saturation_source property

saturation_source: WriterThread | None

The writer thread (which satisfies :class:WriterSaturationSource).

None before :meth:open.

clock property

clock: RunClock

Run-authoritative monotonic clock. Valid only after :meth:open.

bundle_writer property

bundle_writer: RunBundleWriter

The open bundle writer. Valid only after :meth:open.

writer_thread property

writer_thread: WriterThread

The running writer thread. Valid only after :meth:open.

authorization property

authorization: Authorization

Run-arm authorization handle. Valid only after :meth:open.

logger property

logger: Any

Bundle-aware structlog logger. Pre-:meth:open, returns a plain structlog logger (no bundle log sink yet); post-open, the same logger also tees into the bundle's run.log.

config property

config: ExperimentConfig

The frozen run recipe. Valid pre- and post-:meth:open. The conductor reads config.run_options.recording_policy and config.hardware during plan resolution.

update_recording_plan

update_recording_plan(plan: Any) -> None

Write the resolved recording plan into the bundle manifest.

Called by the conductor after :meth:Procedure.plan_capture runs at arm time. Delegates to :meth:RunBundleWriter.update_recording_plan so the manifest snapshot reflects what the run will actually persist before any adapter starts emitting.

Reads policy_mode from this session's :attr:ExperimentConfig.run_options.recording_policy so the bundle records both the operator-facing policy and the materialised plan in one place.

Source code in src/capa/runtime/session.py
def update_recording_plan(self, plan: Any) -> None:
    """Write the resolved recording plan into the bundle manifest.

    Called by the conductor after :meth:`Procedure.plan_capture`
    runs at arm time. Delegates to
    :meth:`RunBundleWriter.update_recording_plan` so the manifest
    snapshot reflects what the run will actually persist before any
    adapter starts emitting.

    Reads ``policy_mode`` from this session's
    :attr:`ExperimentConfig.run_options.recording_policy` so the
    bundle records both the operator-facing policy and the
    materialised plan in one place.
    """
    if self._bundle_writer is None:
        return
    self._bundle_writer.update_recording_plan(
        policy_mode=self._config.run_options.recording_policy.mode,
        plan=plan,
    )

attach_adapters

attach_adapters(
    *,
    adapter_by_device: dict[str, DeviceAdapter]
    | None = None,
    adapter_by_camera: dict[str, Any] | None = None,
) -> None

Late-bind the adapter maps used for equipment/camera identity blocks at finalize.

Callers that build the pool first (and therefore the workers, which own the adapter instances) walk pool.workers to assemble these maps and call :meth:attach_adapters before constructing the conductor. Idempotent — last write wins.

Source code in src/capa/runtime/session.py
def attach_adapters(
    self,
    *,
    adapter_by_device: dict[str, DeviceAdapter] | None = None,
    adapter_by_camera: dict[str, Any] | None = None,
) -> None:
    """Late-bind the adapter maps used for equipment/camera identity
    blocks at finalize.

    Callers that build the pool first (and therefore the workers, which
    own the adapter instances) walk ``pool.workers`` to assemble these
    maps and call :meth:`attach_adapters` before constructing the
    conductor. Idempotent — last write wins.
    """
    if adapter_by_device is not None:
        self._adapter_by_device = adapter_by_device
    if adapter_by_camera is not None:
        self._adapter_by_camera = adapter_by_camera

open async

open() -> RunContext

Open the bundle, start the writer thread, mint authorization, build the :class:RunContext.

Idempotent — calling :meth:open twice returns the same :class:RunContext. The conductor calls this inside its own task group; if the caller has already opened the session externally (e.g. to construct a :class:ProcedureRunner against the bundle writer first), the second call is a no-op.

Source code in src/capa/runtime/session.py
async def open(self) -> RunContext:
    """Open the bundle, start the writer thread, mint authorization,
    build the :class:`RunContext`.

    Idempotent — calling :meth:`open` twice returns the same
    :class:`RunContext`. The conductor calls this inside its own task
    group; if the caller has already opened the session externally
    (e.g. to construct a :class:`ProcedureRunner` against the bundle
    writer first), the second call is a no-op.
    """
    if self._opened:
        assert self._clock is not None
        assert self._bundle_writer is not None
        assert self._writer_thread is not None
        return self._make_run_context()

    bind_run_context(
        run_id=self._run_id,
        operator_id=self._config.operator.id,
        procedure_id=self._config.procedure.id,
    )

    # Mint authorization BEFORE the bundle so any failure here leaves
    # no half-open bundle on disk.
    self._authorization = Authorization(
        operator_id=self._config.operator.id,
        run_id=self._run_id,
    )

    try:
        self._bundle_writer = RunBundleWriter(
            self._config,
            runs_root=self._runs_root,
            run_id=self._run_id,
            started_utc=datetime.now(UTC),
            started_mono_ns_anchor=0,
        )
        self._bundle_writer.open(
            repo_root=self._repo_root,
            lockfile_source=self._lockfile_source,
            plugins_lock=self._plugins_lock,
            engine_version=self._engine_version,
        )
        self._bundle_path = self._bundle_writer.bundle_path

        # Active-bundle checkpoint: atomic JSON at
        # ``<runs_root>/.runtime-active.json`` so a hard exit before
        # finalize leaves a durable breadcrumb the next launch can
        # reconcile via :func:`recover_active_bundle_checkpoint`. We
        # write this BEFORE the writer thread starts because the
        # writer is one of the more likely places a future shutdown
        # path could wedge — the breadcrumb must exist before any
        # sampling can begin.
        now = datetime.now(UTC)
        try:
            write_active_checkpoint(
                self._runs_root,
                ActiveCheckpoint(
                    pid=os.getpid(),
                    run_id=self._run_id,
                    bundle_path=self._bundle_path,
                    config_path=self._config_path,
                    started_utc=now,
                    last_update_utc=now,
                ),
            )
            self._checkpoint_written = True
            self._logger.info(
                "shutdown.bundle_checkpoint_written",
                run_id=self._run_id,
                bundle_path=str(self._bundle_path),
            )
        except OSError as ckpt_exc:
            # A failed checkpoint write is not fatal — the run can
            # still proceed; we just lose the recovery breadcrumb.
            # Log loudly so an operator notices a misconfigured
            # runs_root (read-only volume, missing parent, etc.).
            self._logger.warning(
                "shutdown.bundle_checkpoint_write_failed",
                run_id=self._run_id,
                bundle_path=str(self._bundle_path),
                error=str(ckpt_exc),
            )

        # Reconfigure logging now that the bundle's run.log exists so
        # every log line from this point teeing into the bundle is
        # captured even if the process dies before finalize.
        if self._configure_logging_for_bundle:
            self._logger = configure_logging(bundle_log_sink=self._bundle_writer.log_sink)
        else:
            self._logger = structlog.get_logger("capa")

        # Spawn the writer thread BEFORE anything records into the
        # bundle, so no on-loop write path can race the worker.
        writer_metrics = self._metrics.writer("bundle") if self._metrics is not None else None
        self._writer_thread = WriterThread(
            self._bundle_writer,
            metrics=writer_metrics,
            logger=self._logger.bind(component="writer_thread"),
        )
        self._writer_thread.start()

        self._clock = RunClock.now()
        _stamp_clock_anchor(self._bundle_writer, self._clock)

        self._logger.info(
            "session.open",
            run_id=self._run_id,
            bundle_path=str(self._bundle_path),
            engine_version=self._engine_version,
            operator_id=self._config.operator.id,
        )
        # Compatibility breadcrumb for existing bundle/log consumers:
        # run.log carries the historical audit event names.
        self._logger.info(
            "engine.run.start",
            run_id=self._run_id,
            bundle_path=str(self._bundle_path),
            engine_version=self._engine_version,
            operator_id=self._config.operator.id,
        )

        # Catalog row at open time so a crashed run still leaves a
        # row marked as "running" the catalog can find.
        if self._catalog is not None:
            try:
                manifest = BundleManifest.read(self._bundle_path / "manifest.json")
                self._catalog.insert_run_at_open(manifest, bundle_path=self._bundle_path)
            except Exception as exc:
                self._logger.warning(
                    "session.catalog.insert_failed",
                    error=str(exc),
                )
    except BaseException:
        # Roll back partial state — but keep the bundle dir on disk so
        # an operator can inspect a half-written run for debugging.
        if self._writer_thread is not None:
            with suppress(WriterThreadError):
                self._writer_thread.close()
            self._writer_thread = None
        self._authorization = None
        clear_run_context()
        raise

    self._opened = True
    return self._make_run_context()

set_outcome

set_outcome(
    outcome: RunOutcome, exit_reason: str | None
) -> None

Inform the session of the run's outcome so :meth:close can record the right run_status in the bundle manifest.

Source code in src/capa/runtime/session.py
def set_outcome(self, outcome: RunOutcome, exit_reason: str | None) -> None:
    """Inform the session of the run's outcome so :meth:`close` can
    record the right ``run_status`` in the bundle manifest."""
    self._outcome = outcome
    self._exit_reason = exit_reason

set_runtime_diagnostics

set_runtime_diagnostics(
    diagnostics: dict[str, dict[str, float]],
) -> None

Stash per-loop / per-bridge / per-worker metrics produced by the Conductor for inclusion in the finalize manifest.

Idempotent — last write wins. The conductor calls this once just before :meth:close; tests / subclassed sessions may also call it directly.

Source code in src/capa/runtime/session.py
def set_runtime_diagnostics(self, diagnostics: dict[str, dict[str, float]]) -> None:
    """Stash per-loop / per-bridge / per-worker metrics produced by
    the Conductor for inclusion in the finalize manifest.

    Idempotent — last write wins. The conductor calls this once just
    before :meth:`close`; tests / subclassed sessions may also call
    it directly.
    """
    self._extra_queue_health = dict(diagnostics)

run_headless async

run_headless(
    config: ExperimentConfig,
    *,
    runs_root: Path,
    plugins_lock: PluginsLock | None = None,
    repo_root: Path | None = None,
    lockfile_source: Path | None = None,
    external_stop: Event | None = None,
    catalog: RunCatalog | None = None,
    run_id: str | None = None,
    conductor_config: ConductorConfig | None = None,
    saturation_deadline_s: float | None = None,
) -> HeadlessResult

Run one experiment via the conductor / pool stack.

The caller owns runs_root (this function does not create it) and the optional catalog (lifecycle stays with the caller's with block).

Source code in src/capa/runtime/headless.py
async def run_headless(
    config: ExperimentConfig,
    *,
    runs_root: Path,
    plugins_lock: PluginsLock | None = None,
    repo_root: Path | None = None,
    lockfile_source: Path | None = None,
    external_stop: anyio.Event | None = None,
    catalog: RunCatalog | None = None,
    run_id: str | None = None,
    conductor_config: ConductorConfig | None = None,
    saturation_deadline_s: float | None = None,
) -> HeadlessResult:
    """Run one experiment via the conductor / pool stack.

    The caller owns ``runs_root`` (this function does not create it) and
    the optional ``catalog`` (lifecycle stays with the caller's ``with``
    block).
    """
    if conductor_config is None:
        if saturation_deadline_s is not None:
            conductor_config = ConductorConfig.from_runtime(
                config.runtime,
                saturation_deadline_s=saturation_deadline_s,
            )
        else:
            conductor_config = ConductorConfig.from_runtime(config.runtime)

    # 1. Resolve the procedure. Refusal here = no bundle on disk.
    try:
        plugin_mode = resolve_mode()
        registry = ProcedureRegistry.discover(plugins_lock=plugins_lock, mode=plugin_mode)
        plugin_id = config.procedure.id
        if plugin_id not in registry:
            available = ", ".join(registry.ids()) or "<none>"
            raise ProcedureError(
                f"procedure {plugin_id!r} is not in the trusted registry "
                f"(mode={plugin_mode}); available: {available}"
            )
        procedure = registry.instantiate(plugin_id, config.procedure.config)
    except ProcedureError as exc:
        _logger.error("headless.procedure_resolution.failed", error=str(exc))
        return HeadlessResult(
            run_id=run_id or "preflight-refused",
            bundle_path=None,
            run_status="aborted",
            bundle_status="open",
            integrity_status="unknown",
            exit_reason=f"procedure_resolution: {exc}",
        )

    # Batch procedure needs to know runs_root for child bundles. Small
    # enough to inline rather than invent a generic "procedure
    # post-construct hook" API.
    if isinstance(procedure, _Batch):
        procedure.configure_runs_root(runs_root)

    # 2. Build + open the worker pool.
    try:
        pool = WorkerPool.from_config(config)
    except ConfigMaterializationError as exc:
        _logger.error("headless.adapter_materialization_failed", error=str(exc))
        return HeadlessResult(
            run_id=run_id or "preflight-refused",
            bundle_path=None,
            run_status="aborted",
            bundle_status="open",
            integrity_status="unknown",
            exit_reason=f"adapter_materialization: {exc}",
        )
    except ResourceConflict as exc:
        _logger.error("headless.resource_conflict", error=str(exc))
        return HeadlessResult(
            run_id=run_id or "preflight-refused",
            bundle_path=None,
            run_status="aborted",
            bundle_status="open",
            integrity_status="unknown",
            exit_reason=f"resource_conflict: {exc}",
        )

    try:
        await pool.open()
    except BaseException as exc:
        _logger.error("headless.pool_open.failed", error=str(exc), error_type=type(exc).__name__)
        # Pool failed to open; nothing to close.
        return HeadlessResult(
            run_id=run_id or "preflight-refused",
            bundle_path=None,
            run_status="crashed",
            bundle_status="open",
            integrity_status="unknown",
            exit_reason=f"pool_open: {exc}",
        )

    try:
        # 3. Collect adapter maps for the bundle's equipment + camera
        #    identity blocks at finalize. Cameras are wrapped in
        #    :class:`CameraDeviceAdapter`; the
        #    session's collector reads ``device_info`` off the
        #    underlying :class:`Camera`, so we expose ``.camera`` here
        #    rather than the wrapper itself.
        adapter_by_device: dict[str, Any] = {}
        adapter_by_camera: dict[str, Any] = {}
        for worker in pool.workers.values():
            for name, adapter in cast(Mapping[str, object], worker.adapters).items():
                if isinstance(adapter, CameraDeviceAdapter):
                    adapter_by_camera[name] = adapter.camera
                else:
                    adapter_by_device[name] = adapter

        # 4. Build the session. NOT opened yet — the conductor opens it
        #    inside its task group.
        session = RealRunSession(
            config=config,
            runs_root=runs_root,
            run_id=run_id,
            plugins_lock=plugins_lock,
            repo_root=repo_root,
            lockfile_source=lockfile_source,
            adapter_by_device=adapter_by_device,
            adapter_by_camera=adapter_by_camera,
            catalog=catalog,
        )

        # 5. Build the runner factory. The factory is invoked by the
        #    conductor on its loop AFTER session.open() returns; that's
        #    when the bundle writer + clock are valid.
        # `_conductor_holder` is a single-element list mutated after
        # Conductor construction (right below) so the factory can read
        # the live conductor reference at invocation time — a cheap
        # forward-reference cell that avoids reshaping the public API.
        _conductor_holder: list[Conductor] = []

        def _runner_factory(s: RunSession, ctx: RunContext) -> ConductorRunner:
            # The factory captures everything that's known pre-conductor
            # (procedure, config, channel registry) and resolves
            # post-open() resources lazily off the session.
            assert isinstance(s, RealRunSession), "headless runner factory requires RealRunSession"
            # Build the frozen channel registry the executor + procedure
            # resolve names against.
            channel_registry = ChannelRegistry.from_specs(list(config.hardware.channels))
            channel_registry.freeze()

            # Procedure-side dispatcher. PoolDispatcher (not
            # ConductorDispatcher) avoids the conductor-self-reference
            # circular dep — and the worker-level state gate is
            # sufficient: the procedure task is cancelled by the
            # conductor's task group before disarm starts, so a
            # procedure-issued command can never land during DRAINING.
            dispatcher = PoolDispatcher(pool)

            # MethodExecutor is built only when a method exists. The
            # executor's ctx must share the conductor's authoritative
            # DataBus — the executor's ``_wait_for`` subscribes to
            # channels here, and only the conductor's drain tasks publish
            # into it. Wiring a fresh DataBus would silently hang every
            # wait. The conductor's databus is created before the runner
            # factory is invoked (see Conductor._run), so reading it
            # off `_conductor_holder[0]` is safe.
            method_executor: MethodExecutor | None = None
            if config.method is not None:
                assert _conductor_holder, "runner_factory invoked before conductor was holdered"
                conductor_databus = _conductor_holder[0].databus
                assert conductor_databus is not None
                method_executor = _build_method_executor_for_runner(
                    config=config,
                    clock=s.clock,
                    bundle_writer=s.bundle_writer,
                    databus=conductor_databus,
                    channel_registry=channel_registry,
                    adapter_by_device=adapter_by_device,
                    dispatcher=dispatcher,
                    authorization=s.authorization,
                )

            # Wire the procedure's loop-local external_stop to the
            # conductor's completion event so procedures awaiting
            # ``ctx.external_stop.wait()`` (e.g. FreeRun) exit cleanly
            # on operator stop / saturation rather than via raised
            # CancelledError.
            stop_signal: asyncio.Event | None = None
            if _conductor_holder:
                stop_signal = _conductor_holder[0].completion_event

            runner = ProcedureRunner(
                procedure=procedure,
                config=config,
                channel_registry=channel_registry,
                dispatcher=dispatcher,
                authorization=s.authorization,
                adapters=adapter_by_device,
                bundle_writer=s.bundle_writer,
                method_executor=method_executor,
                stop_signal=stop_signal,
            )
            return runner

        # 6. Construct + start the conductor.
        conductor = Conductor(
            pool=pool,
            session=session,
            runner_factory=_runner_factory,
            config=conductor_config,
        )
        # Late-bind the conductor reference into the factory's closure cell
        # so it can wire the procedure's stop_signal when the factory is
        # invoked on the conductor's loop.
        _conductor_holder.append(conductor)

        # 7. Wire external_stop → conductor.stop(). The external_stop
        #    is an anyio.Event living on the caller's loop; we poll it
        #    in a background task and call conductor.stop() when it
        #    fires.
        stop_watcher_done = asyncio.Event()
        stop_watcher_task: asyncio.Task[None] | None = None
        if external_stop is not None:
            stop_watcher_task = asyncio.create_task(
                _watch_external_stop(external_stop, conductor, stop_watcher_done)
            )

        conductor.start()
        # Wait for the result on the calling loop. The conductor's result
        # future is a concurrent.futures.Future resolved from the conductor
        # thread; asyncio.wrap_future bridges back here.
        result: RunResult = await asyncio.wrap_future(conductor.result_future)
        conductor.join(timeout=5.0)
        stop_watcher_done.set()
        if stop_watcher_task is not None:
            with contextlib.suppress(asyncio.CancelledError):
                await stop_watcher_task

        return _result_to_headless(result, session)
    finally:
        # Close the pool — adapters close here. Best-effort: a misbehaving
        # adapter must not prevent us returning a result.
        try:
            await pool.close()
        except BaseException as exc:
            _logger.error(
                "headless.pool_close.failed",
                error=str(exc),
                error_type=type(exc).__name__,
            )

install_sigint_handler

install_sigint_handler(stop_event: Event) -> None

Install a SIGINT handler that sets stop_event.

Idempotent against re-entry: a second Ctrl-C terminates the process via the OS default handler. The first Ctrl-C lets the conductor unwind gracefully (drain the writer, finalize the bundle).

Source code in src/capa/runtime/signals.py
def install_sigint_handler(stop_event: anyio.Event) -> None:
    """Install a ``SIGINT`` handler that sets ``stop_event``.

    Idempotent against re-entry: a second Ctrl-C terminates the process via
    the OS default handler. The first Ctrl-C lets the conductor unwind
    gracefully (drain the writer, finalize the bundle).
    """
    triggered = False

    def _handler(signum: int, frame: object) -> None:
        nonlocal triggered
        if triggered:
            sys.stderr.write("\nsecond SIGINT — exiting hard\n")
            sys.stderr.flush()
            signal.signal(signal.SIGINT, signal.SIG_DFL)
            return
        triggered = True
        sys.stderr.write("\nSIGINT received — initiating graceful stop (Ctrl-C again to force)\n")
        sys.stderr.flush()
        stop_event.set()

    signal.signal(signal.SIGINT, _handler)