Skip to content

watlowlib.devices

The Controller facade, Session, typed dataclasses (Reading, DeviceInfo, PartNumber, AlarmState, LoopState, DiscoveryResult, …), ControllerFamily, Capability, SafetyTier, Availability, open_device, open_controller, and discovery helpers. See Controllers.

Public surface

watlowlib.devices

Device facade — :class:Controller, :class:Session, and dataclasses.

The facade is the public surface; everything else (:mod:watlowlib.protocol, :mod:watlowlib.commands, :mod:watlowlib.registry) is implementation detail callers don't have to import.

AlarmState dataclass

AlarmState(loop, high, low, silenced, raw_bits)

Decoded alarm bits for one loop.

Availability

Bases: StrEnum

Per-command session state.

Sticky for the session: once a command transitions to :attr:UNSUPPORTED, the session short-circuits subsequent invocations with a typed error pre-I/O. The transition table lives in docs/design.md §5b.

Capability

Bases: Flag

Coarse hardware capability bits.

Bits are derived from a decoded part number when one is available (see :func:watlowlib.registry.families.capabilities_for_part_number) and fall back to a per-family prior otherwise. The session widens the set at runtime when a command succeeds against a parameter that proves the capability.

The vocabulary is small on purpose — most Watlow gating is by :class:watlowlib.registry.families.ControllerFamily and by :attr:watlowlib.registry.parameters.ParameterSpec.parameter_id, not by per-feature bits. New bits are added when captured family behaviour requires them.

Controller

Controller(session, transport, *, serial_settings)

Async facade for a single Watlow controller.

Source code in src/watlowlib/devices/controller.py
def __init__(
    self,
    session: Session,
    transport: Transport,
    *,
    serial_settings: SerialSettings,
) -> None:
    self._session = session
    self._transport = transport
    self._serial_settings = serial_settings
    # Cached loop count populated by :meth:`identify`. ``None`` means
    # "we haven't asked yet" — :meth:`loop` then defers validation
    # to the registry's per-spec ``max_instance``. Concrete count is
    # what the part-number decoder produced from the captured part
    # string (PM3 → 1, PM6/8/9 + ``U`` control → 2, etc.).
    self._loops: int | None = None
    # Cached SKU-derived capabilities populated by :meth:`identify`.
    # ``None`` until the part number has been decoded; downstream
    # operations that gate on bits (cool-side PID, etc.) treat
    # ``None`` as "no information, no gate" so calls work pre-
    # identify without surprising the user.
    self._capabilities: Capability | None = None

capabilities property

capabilities

Cached SKU capabilities (set after :meth:identify).

None pre-identify so capability-gated operations behave permissively until the part number is captured. After :meth:identify, callers can branch on :attr:Capability.HAS_COOLING etc. without re-issuing identify.

loops property

loops

Cached loop count (set after :meth:identify).

None until the device's part number has been decoded; :meth:loop accepts any 1-indexed value while loops is None and falls back to per-spec validation at the first wire call. After :meth:identify, loops reflects the decoded value.

session property

session

Underlying session used for command dispatch.

aclose async

aclose()

Close the underlying transport and dispose the protocol client.

Source code in src/watlowlib/devices/controller.py
async def aclose(self) -> None:
    """Close the underlying transport and dispose the protocol client."""
    # The session holds a reference to the protocol client; dispose
    # it so any pending caller learns the controller is gone before
    # the transport close races them.
    try:
        self._session.dispose()
    finally:
        await self._transport.close()

identify async

identify(
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Read the identity parameters and return a :class:DeviceInfo.

Reads (in order): part number (1009), hardware id (1001), firmware id (1002), serial number. Missing secondary fields stay None and the result's :attr:DeviceInfo.health is promoted from :attr:DeviceHealth.OK to :attr:DeviceHealth.PARTIAL. If the part-number read itself fails, the result's health is :attr:DeviceHealth.FAILED and capability decoding is skipped (the family prior still applies).

Parameters:

Name Type Description Default
timeout float | None

Per-read timeout override.

None
strict bool

If True, raise the underlying error when the part-number read fails instead of returning a health=FAILED info. Use this in maintenance code paths that need to know the device actually answered before declaring success.

False
query_configured_protocol bool

If True, also read parameter 17009 (Protocol) and populate :attr:DeviceInfo.configured_protocol. Off by default because the read costs an extra round-trip; the maintenance verify pass and the discover CLI opt in.

False

Raises:

Type Description
WatlowError

When strict=True and the part-number read fails. The original transport / protocol error class is preserved.

Source code in src/watlowlib/devices/controller.py
async def identify(
    self,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Read the identity parameters and return a :class:`DeviceInfo`.

    Reads (in order): part number (1009), hardware id (1001),
    firmware id (1002), serial number. Missing secondary fields
    stay ``None`` and the result's :attr:`DeviceInfo.health` is
    promoted from :attr:`DeviceHealth.OK` to
    :attr:`DeviceHealth.PARTIAL`. If the part-number read itself
    fails, the result's health is :attr:`DeviceHealth.FAILED` and
    capability decoding is skipped (the family prior still
    applies).

    Args:
        timeout: Per-read timeout override.
        strict: If ``True``, raise the underlying error when the
            part-number read fails instead of returning a
            ``health=FAILED`` info. Use this in maintenance code
            paths that need to know the device actually answered
            before declaring success.
        query_configured_protocol: If ``True``, also read parameter
            17009 (Protocol) and populate
            :attr:`DeviceInfo.configured_protocol`. Off by default
            because the read costs an extra round-trip; the
            maintenance verify pass and the discover CLI opt in.

    Raises:
        WatlowError: When ``strict=True`` and the part-number read
            fails. The original transport / protocol error class
            is preserved.
    """
    if strict:
        entry = await self.read_parameter("part_number", timeout=timeout)
        part_raw = entry.value if isinstance(entry.value, str) else None
    else:
        part_raw = await self._safe_read_str("part_number", timeout=timeout)
    hw_id = await self._safe_read_int("hardware_id", timeout=timeout)
    fw_id = await self._safe_read_int("firmware_id", timeout=timeout)
    serial_str = await self._safe_read_str("serial_number", timeout=timeout)

    if part_raw:
        part = decode_part_number(part_raw)
        capabilities = capabilities_for_part_number(part)
        # PARTIAL when part_number is fine but a secondary read missed.
        secondary_missing = hw_id is None or fw_id is None
        health = DeviceHealth.PARTIAL if secondary_missing else DeviceHealth.OK
    else:
        part = PartNumber(raw="", family=ControllerFamily.UNKNOWN)
        # No part number → no SKU decode; capability table degrades
        # to the UNKNOWN family prior (NONE).
        capabilities = capabilities_for_part_number(part)
        health = DeviceHealth.FAILED

    configured_protocol: ProtocolKind | None = None
    if query_configured_protocol:
        code = await self._safe_read_int(17009, timeout=timeout)
        if code is not None:
            configured_protocol = _PROTOCOL_CODE_TO_KIND.get(code)

    loops = default_loops(part)
    # Cache for ``self.loop(n)``'s eager validator. Identify is the
    # canonical place that gets to set this — open() doesn't have
    # the part number yet, and a SKU's loop count / capabilities
    # never change mid-session.
    self._loops = loops
    self._capabilities = capabilities
    return DeviceInfo(
        part_number=part,
        hardware_id=hw_id,
        firmware_id=fw_id,
        serial_number=serial_str,
        family=part.family,
        protocol=self._session.protocol_kind,
        address=self._session.address,
        capabilities=capabilities,
        serial_settings=self._serial_settings,
        loops=loops,
        health=health,
        configured_protocol=configured_protocol,
    )

loop

loop(n)

Return a sub-facade bound to loop n (1-indexed).

n is validated eagerly when :attr:loops is known, otherwise per-spec max_instance validation kicks in at the first wire call. Multi-loop access is the public way to reach loop 2 on dual-loop devices — :meth:Controller.read_pv defaults to instance=1.

Source code in src/watlowlib/devices/controller.py
def loop(self, n: int) -> ControllerLoop:
    """Return a sub-facade bound to loop ``n`` (1-indexed).

    ``n`` is validated eagerly when :attr:`loops` is known,
    otherwise per-spec ``max_instance`` validation kicks in at the
    first wire call. Multi-loop access is the public way to reach
    loop 2 on dual-loop devices —
    :meth:`Controller.read_pv` defaults to ``instance=1``.
    """
    return ControllerLoop(self, n)

poll async

poll(parameters, *, names=None, instances=(1,))

Read every (parameter × instance) and return them as :class:Sample\ s.

Satisfies the :class:watlowlib.streaming.PollSource Protocol so a solo :class:Controller can drive :func:watlowlib.streaming.record directly without a manager. names is accepted for Protocol compatibility but ignored — a Controller has only one device.

Failed reads are dropped from the returned list and logged at WARN. The recorder treats absence as "drop this row from the batch" and continues with the next tick.

Source code in src/watlowlib/devices/controller.py
async def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    r"""Read every (parameter × instance) and return them as :class:`Sample`\ s.

    Satisfies the :class:`watlowlib.streaming.PollSource` Protocol so
    a solo :class:`Controller` can drive :func:`watlowlib.streaming.record`
    directly without a manager. ``names`` is accepted for Protocol
    compatibility but ignored — a Controller has only one device.

    Failed reads are dropped from the returned list and logged at
    WARN. The recorder treats absence as "drop this row from the
    batch" and continues with the next tick.
    """
    del names  # solo controller has no name-keyed device map
    from watlowlib.streaming._poll import poll_controller  # noqa: PLC0415 — avoid cycle

    return await poll_controller(
        self,
        name=self._transport.label,
        parameters=parameters,
        instances=instances,
    )

read_parameter async

read_parameter(name_or_id, *, instance=1, timeout=None)

Read any registry parameter.

instance=1 is the default for single-loop devices and the first loop / channel on multi-loop devices.

Source code in src/watlowlib/devices/controller.py
async def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Read any registry parameter.

    ``instance=1`` is the default for single-loop devices and the
    first loop / channel on multi-loop devices.
    """
    return await self._session.execute(
        READ_PARAMETER,
        ReadParameterRequest(name_or_id, instance=instance),
        timeout=timeout,
    )

read_pv async

read_pv(*, instance=1, timeout=None)

Read the process value for instance (loop number, 1-indexed).

Source code in src/watlowlib/devices/controller.py
async def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the process value for ``instance`` (loop number, 1-indexed)."""
    entry = await self.read_parameter("process_value", instance=instance, timeout=timeout)
    return self._reading_from_entry(entry)

read_setpoint async

read_setpoint(*, instance=1, timeout=None)

Read the active setpoint for instance.

Source code in src/watlowlib/devices/controller.py
async def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active setpoint for ``instance``."""
    entry = await self.read_parameter("setpoint", instance=instance, timeout=timeout)
    return self._reading_from_entry(entry)

set_setpoint async

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Write the setpoint and return the device-echoed value as a :class:Reading.

Setpoint is RWES — pass confirm=True to acknowledge the EEPROM write. The returned reading is the device's echo of the value it accepted.

Source code in src/watlowlib/devices/controller.py
async def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write the setpoint and return the device-echoed value as a :class:`Reading`.

    Setpoint is RWES — pass ``confirm=True`` to acknowledge the
    EEPROM write. The returned reading is the device's echo of
    the value it accepted.
    """
    entry = await self.write_parameter(
        "setpoint",
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )
    return self._reading_from_entry(entry)

write_parameter async

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Write any registry parameter.

Persistent (RWE / RWES) writes require confirm=True; the session raises :class:WatlowConfirmationRequiredError before any I/O if the gate is missing.

Source code in src/watlowlib/devices/controller.py
async def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Write any registry parameter.

    Persistent (RWE / RWES) writes require ``confirm=True``;
    the session raises :class:`WatlowConfirmationRequiredError`
    before any I/O if the gate is missing.
    """
    return await self._session.execute(
        WRITE_PARAMETER,
        WriteParameterRequest(name_or_id, value, instance=instance),
        confirm=confirm,
        timeout=timeout,
    )

ControllerFamily

Bases: StrEnum

Watlow controller family discriminator.

Membership here is advisory — :class:watlowlib.devices.session.Session treats family hints as priors, not gates, unless the session was opened with strict=True. See docs/design.md §5b.

ControllerLoop

ControllerLoop(controller, loop_number)

A view over one control loop on a :class:Controller.

Construct via :meth:Controller.loop; never instantiated directly by user code. The sub-facade lives only as long as the parent controller's session — closing the controller is the only cleanup needed.

Source code in src/watlowlib/devices/loop.py
def __init__(self, controller: Controller, loop_number: int) -> None:
    if loop_number < 1:
        raise WatlowValidationError(
            f"loop number must be 1-indexed and >= 1; got {loop_number}",
        )
    # If the controller has identified the device, validate
    # eagerly. Otherwise defer to the registry's per-spec
    # ``validate_instance`` at first call: a registered parameter
    # with ``max_instance=1`` will raise a clear
    # ``WatlowValidationError`` when ``loop(2).read_pv()`` is
    # invoked. That keeps ``Controller.loop(2)`` cheap when called
    # before identify, but still fails before I/O.
    loops = controller.loops
    if loops is not None and loop_number > loops:
        raise WatlowValidationError(
            f"loop {loop_number} out of range for this device (1..{loops})",
        )
    self._controller = controller
    self._loop = loop_number

number property

number

The 1-indexed loop number this view binds.

read_alarms async

read_alarms()

Read the alarm word for this loop.

Currently raises :class:watlowlib.errors.WatlowProtocolUnsupportedError — see :func:watlowlib.commands.alarms.read_alarms for why the decoder is not yet wired up.

Source code in src/watlowlib/devices/loop.py
async def read_alarms(self) -> AlarmState:
    """Read the alarm word for this loop.

    Currently raises :class:`watlowlib.errors.WatlowProtocolUnsupportedError` —
    see :func:`watlowlib.commands.alarms.read_alarms` for why the
    decoder is not yet wired up.
    """
    return await _read_alarms(self._controller.session, instance=self._loop)

read_output async

read_output()

Read this loop's working output (output_power).

Source code in src/watlowlib/devices/loop.py
async def read_output(self) -> Reading:
    """Read this loop's working output (``output_power``)."""
    return await _read_output(self._controller.session, instance=self._loop)

read_pid async

read_pid()

Read every PID gain for this loop. Missing gains return None.

Cool-side gains (cool_proportional_band, dead_band) are skipped when the controller's identified capabilities lack :attr:Capability.HAS_COOLING (e.g. PM output_2 == 'A'). Pre-identify, the gate is permissive.

Source code in src/watlowlib/devices/loop.py
async def read_pid(self) -> PidGains:
    """Read every PID gain for this loop. Missing gains return ``None``.

    Cool-side gains (``cool_proportional_band``, ``dead_band``)
    are skipped when the controller's identified capabilities
    lack :attr:`Capability.HAS_COOLING` (e.g. PM ``output_2 ==
    'A'``). Pre-identify, the gate is permissive.
    """
    return await _read_pid(
        self._controller.session,
        instance=self._loop,
        capabilities=self._controller.capabilities,
    )

read_pv async

read_pv(*, timeout=None)

Read this loop's process value.

Source code in src/watlowlib/devices/loop.py
async def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's process value."""
    return await self._controller.read_pv(instance=self._loop, timeout=timeout)

read_setpoint async

read_setpoint(*, timeout=None)

Read this loop's active setpoint.

Source code in src/watlowlib/devices/loop.py
async def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's active setpoint."""
    return await self._controller.read_setpoint(instance=self._loop, timeout=timeout)

set_setpoint async

set_setpoint(value, *, confirm=False, timeout=None)

Write this loop's setpoint (RWES → confirm=True required).

Source code in src/watlowlib/devices/loop.py
async def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write this loop's setpoint (RWES → ``confirm=True`` required)."""
    return await self._controller.set_setpoint(
        value,
        instance=self._loop,
        confirm=confirm,
        timeout=timeout,
    )

write_pid async

write_pid(gains, *, confirm=False)

Write the supplied gains for this loop.

Persistent — passing confirm=True is required. Fields left None on gains skip the wire entirely. Setting a cool-side field on a controller without :attr:Capability.HAS_COOLING raises :class:watlowlib.errors.WatlowConfigurationError.

Source code in src/watlowlib/devices/loop.py
async def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Write the supplied gains for this loop.

    Persistent — passing ``confirm=True`` is required. Fields
    left ``None`` on ``gains`` skip the wire entirely. Setting a
    cool-side field on a controller without
    :attr:`Capability.HAS_COOLING` raises
    :class:`watlowlib.errors.WatlowConfigurationError`.
    """
    return await _write_pid(
        self._controller.session,
        gains,
        instance=self._loop,
        confirm=confirm,
        capabilities=self._controller.capabilities,
    )

DeviceInfo dataclass

DeviceInfo(
    part_number,
    hardware_id,
    firmware_id,
    serial_number,
    family,
    protocol,
    address,
    capabilities,
    serial_settings,
    loops,
    health=DeviceHealth.OK,
    configured_protocol=None,
)

Identity + connection metadata for an open controller.

Returned by :meth:Controller.identify. Capabilities are decoded from the part number when one is captured (see :func:watlowlib.registry.families.capabilities_for_part_number) and OR-ed with the family prior; unobserved bits stay zero rather than being guessed.

protocol is the wire protocol the host is currently talking; configured_protocol is what the device's persistent EEPROM parameter (PM 17009) reports. They normally match, but when they diverge the helper :attr:protocol_mismatch flags it — useful for catching SKU/firmware combinations where the user wrote a new protocol but the runtime stack didn't pick it up (e.g. comms position-8 = 'A', no Modbus stack present even though 17009 reads 1057).

protocol_mismatch property

protocol_mismatch

True when EEPROM says one protocol and we're talking another.

Always False when :attr:configured_protocol is None (i.e. identify did not query parameter 17009).

DiscoveryResult dataclass

DiscoveryResult(
    port, serial_settings, address, protocol, info, error
)

One row from the discovery sweep.

LoopState dataclass

LoopState(
    loop, pv, setpoint, output_pct, raw=(lambda: {})()
)

Snapshot of one loop. Composed from several reads.

ParameterEntry dataclass

ParameterEntry(spec, instance, value, raw)

Generic registry-driven read/write result.

Returned by :data:watlowlib.commands.READ_PARAMETER and :data:watlowlib.commands.WRITE_PARAMETER. The :class:Controller translates an entry into a :class:Reading / :class:PartNumber / etc. when the public API guarantees a richer shape.

PartNumber dataclass

PartNumber(raw, family, details=(lambda: {})())

Parsed part-number string returned by read_part_number.

Per-family digit decoding is contributed by :mod:watlowlib.registry.families. Decoded fragments live in :attr:details as a free-form mapping so each family can populate only what its ordering format defines, and so adding fragments to the PM decoder later is non-breaking.

The EZ-ZONE PM decoder populates case size, control type, power input, three output codes, and options string. Other families fall through to a stub: only :attr:family is set, and :attr:details is empty.

Reading dataclass

Reading(
    value, unit, received_at, monotonic_ns, raw, protocol
)

A single timestamped value from the controller.

protocol is set by the variant decoder, not by the facade — it reflects which wire protocol produced the value (per docs/design.md invariant 7).

SafetyTier

Bases: IntEnum

How dangerous a command is to invoke.

  • READ_ONLY (R) — no state change.
  • STATEFUL — runtime state change but not EEPROM-backed. Reserved for commands like "start autotune"; no PM parameter maps here today, but the tier exists so future commands have a place to live.
  • PERSISTENT (RW / RWE / RWES) — EEPROM-backed; requires confirm=True at the facade.

Session

Session(client, *, registry, family, address, port)

Owns availability cache, gates, and the dispatch loop.

A :class:Session is bound to exactly one :class:ProtocolClient for its lifetime — one protocol per port (invariant 1).

Source code in src/watlowlib/devices/session.py
def __init__(
    self,
    client: ProtocolClient[Any, Any],
    *,
    registry: ParameterRegistry,
    family: ControllerFamily,
    address: int,
    port: str,
) -> None:
    self._client = client
    self._registry = registry
    self._family = family
    self._address = address
    self._port = port
    self._availability: dict[str, Availability] = {}

address property

address

Session bus address.

client property

client

The bound protocol client.

Exposed for the watlow-raw escape hatch and for diagnostics that need to issue an unframed wire op outside the registry. Callers must acquire :attr:ProtocolClient.lock before :meth:ProtocolClient.execute to honour the per-port serialization invariant, and must pass this session's :attr:address (or another concrete address for multi-drop diagnostics) to execute.

family property

family

Best-known controller family for this session.

port property

port

Transport label (for logs / error context).

protocol_kind property

protocol_kind

The wire protocol this session speaks.

registry property

registry

Parameter registry bound to this session.

Exposed for the streaming layer so polling code can resolve a name / id to a :class:ParameterSpec without an extra import of the module-level :data:PARAMETERS.

availability

availability(command_name)

Cached availability for command_name.

Source code in src/watlowlib/devices/session.py
def availability(self, command_name: str) -> Availability:
    """Cached availability for ``command_name``."""
    return self._availability.get(command_name, Availability.UNKNOWN)

dispose

dispose()

Dispose the bound protocol client.

Source code in src/watlowlib/devices/session.py
def dispose(self) -> None:
    """Dispose the bound protocol client."""
    self._client.dispose()

execute async

execute(command, request, *, confirm=False, timeout=None)

Dispatch command with request and return the typed response.

Source code in src/watlowlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with ``request`` and return the typed response."""
    kind = self._client.kind
    # Variant resolution. The session picks the variant matching
    # the bound protocol; one protocol per port (invariant 1).
    # Resolve to a single ``variant`` local so the rest of the
    # method is protocol-agnostic; we still branch on ``kind``
    # for the encode/decode call shapes (stdbus takes ``reply``;
    # modbus takes ``words, ctx, request``).
    if kind is ProtocolKind.STDBUS:
        stdbus_variant = command.stdbus
        if stdbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Std Bus variant",
                context=self._error_context(command, request),
            )
        modbus_variant = None
    elif kind is ProtocolKind.MODBUS_RTU:
        modbus_variant = command.modbus
        if modbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Modbus variant",
                context=self._error_context(command, request),
            )
        stdbus_variant = None
    else:
        raise WatlowProtocolUnsupportedError(
            f"session has unsupported protocol kind {kind!r}",
            context=self._error_context(command, request),
        )

    # Cache key. We key on ``command_name:parameter_id`` for
    # registry-driven commands so that one ``read_parameter("foo")``
    # rejection doesn't sticky-block every other parameter; bare
    # commands fall back to ``command.name``.
    cache_key = self._cache_key(command, request)

    cached = self._availability.get(cache_key, Availability.UNKNOWN)
    if cached is Availability.UNSUPPORTED:
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} is unsupported on this device",
            context=self._error_context(command, request),
        )

    # Safety gate: PERSISTENT writes need explicit confirm.
    if command.safety is SafetyTier.PERSISTENT and not confirm:
        raise WatlowConfirmationRequiredError(
            f"command {command.name!r} is PERSISTENT and requires confirm=True",
            context=self._error_context(command, request),
        )

    ctx = CommandContext(
        registry=self._registry,
        family=self._family,
        address=self._address,
        port=self._port,
    )

    bound_timeout = timeout if timeout is not None else DEFAULTS.io_timeout_s

    # Encode under the variant. Errors here are pre-I/O — typically
    # validation failures — and should propagate untouched.
    # Exactly one of ``stdbus_variant`` / ``modbus_variant`` is
    # non-None per the resolution above; the type narrowing is
    # explicit so neither mypy nor pyright needs an ``assert`` it
    # can't enforce at runtime under ``-O``.
    wire_request: Any
    if stdbus_variant is not None:
        wire_request = stdbus_variant.encode(ctx, request)
    elif modbus_variant is not None:
        wire_request = modbus_variant.encode(ctx, request)
    else:  # pragma: no cover — variant resolution above guarantees one is set
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} variant resolution lost",
            context=self._error_context(command, request),
        )

    started = time.monotonic()
    # Hold the per-port client lock only for the I/O turn-around.
    # Decode is CPU-only and does not need to block the next
    # request waiting on the same RS-485 segment; ``reply`` is
    # snapshotted before the lock releases.
    async with self._client.lock:
        try:
            reply = await self._client.execute(
                wire_request,
                address=self._address,
                timeout=bound_timeout,
                command_name=command.name,
            )
        except (
            WatlowNoSuchObjectError,
            WatlowNoSuchAttributeError,
            WatlowProtocolUnsupportedError,
        ) as exc:
            self._availability[cache_key] = Availability.UNSUPPORTED
            _log.warning(
                "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise
        except WatlowProtocolError:
            raise
        except WatlowError as exc:
            _log.warning(
                "command error: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise

    # Decode outside the lock — pure compute on the captured reply.
    try:
        if stdbus_variant is not None:
            response = stdbus_variant.decode(reply, ctx)
        else:
            # ``modbus_variant is not None`` per the resolution above;
            # mypy/pyright follow the narrowing without an ``assert``.
            response = modbus_variant.decode(reply, ctx, request)  # type: ignore[union-attr]
    except (
        WatlowNoSuchObjectError,
        WatlowNoSuchAttributeError,
        WatlowProtocolUnsupportedError,
    ) as exc:
        # Decode-side "we don't have this": same availability
        # transition as the wire-side rejection above.
        self._availability[cache_key] = Availability.UNSUPPORTED
        _log.warning(
            "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
            kind.value,
            command.name,
            cache_key,
            exc,
        )
        raise
    except WatlowProtocolError:
        # Decode-failure parity with the inside-lock branch above:
        # NoSuchInstance / IllegalDataValue / generic decode errors
        # don't transition availability per design §5b.
        raise

    elapsed = time.monotonic() - started
    self._availability[cache_key] = Availability.SUPPORTED
    _log.debug(
        "session exec ok protocol=%s cmd=%s key=%s elapsed=%.4fs",
        kind.value,
        command.name,
        cache_key,
        elapsed,
    )
    return response

classify_family

classify_family(part_number)

Return the :class:ControllerFamily for a part-number string.

Only the leading family discriminator is parsed; per-family digit decoding is in :func:decode_part_number.

Source code in src/watlowlib/registry/families.py
def classify_family(part_number: str) -> ControllerFamily:
    """Return the :class:`ControllerFamily` for a part-number string.

    Only the leading family discriminator is parsed; per-family digit
    decoding is in :func:`decode_part_number`.
    """
    head = part_number.strip().upper()
    if head.startswith("PM"):
        return ControllerFamily.PM
    if head.startswith("RM"):
        return ControllerFamily.RM
    if head.startswith("ST"):
        return ControllerFamily.ST
    if head.startswith("F4T"):
        return ControllerFamily.F4T
    return ControllerFamily.UNKNOWN

open_controller async

open_controller(
    transport,
    *,
    protocol,
    address,
    serial_settings,
    family=ControllerFamily.UNKNOWN,
)

Build a :class:Controller over an existing :class:Transport.

Tests use this to drive the facade through a :class:watlowlib.transport.fake.FakeTransport. Production code uses :func:open_device.

Source code in src/watlowlib/devices/factory.py
async def open_controller(
    transport: Transport,
    *,
    protocol: ProtocolKind,
    address: int,
    serial_settings: SerialSettings,
    family: ControllerFamily = ControllerFamily.UNKNOWN,
) -> Controller:
    """Build a :class:`Controller` over an existing :class:`Transport`.

    Tests use this to drive the facade through a
    :class:`watlowlib.transport.fake.FakeTransport`. Production code
    uses :func:`open_device`.
    """
    if protocol is ProtocolKind.AUTO:
        raise WatlowConfigurationError(
            "open_controller requires a concrete protocol; AUTO must be resolved by "
            "open_device (which runs the detector and returns a built Controller).",
            context=ErrorContext(port=transport.label),
        )
    client = make_protocol_client(protocol, transport)
    session = Session(
        client,
        registry=PARAMETERS,
        family=family,
        address=address,
        port=transport.label,
    )
    return Controller(session, transport, serial_settings=serial_settings)

open_device async

open_device(
    port,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
)

Open a controller on a serial port.

Parameters:

Name Type Description Default
port str

Serial-port path (/dev/ttyUSB0, COM3, ...).

required
protocol ProtocolKind

Wire protocol. STDBUS and MODBUS_RTU open directly; AUTO runs the conservative detector (Std Bus → Modbus → fail) per docs/design.md §7.

STDBUS
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247. Under AUTO the same address is tried against both probes.

1
serial_settings SerialSettings | None

Optional override. Default is 38400 8-N-1, the EZ-ZONE PM Standard Bus factory setting; port from the positional arg is applied if serial_settings is None. For Modbus RTU, the typical PM factory framing is 9600 8-E-1 — pass an explicit :class:SerialSettings to override the default. Auto- detect uses the same framing for both probes — there is no baud sweeping in the open path (cross-cutting invariant 5).

None

Returns:

Type Description
Controller

An opened :class:Controller when protocol=AUTO (the

Controller

detector held the transport open after a successful probe),

Controller

otherwise an unopened :class:Controller to be used as an

Controller

async context manager.

Raises:

Type Description
WatlowConfigurationError

address is out of range or protocol is unsupported.

WatlowProtocolUnsupportedError

protocol=AUTO and both probes failed.

Source code in src/watlowlib/devices/factory.py
async def open_device(
    port: str,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
) -> Controller:
    """Open a controller on a serial port.

    Args:
        port: Serial-port path (``/dev/ttyUSB0``, ``COM3``, ...).
        protocol: Wire protocol. ``STDBUS`` and ``MODBUS_RTU`` open
            directly; ``AUTO`` runs the conservative detector
            (Std Bus → Modbus → fail) per ``docs/design.md`` §7.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``. Under ``AUTO`` the same address is
            tried against both probes.
        serial_settings: Optional override. Default is **38400 8-N-1**,
            the EZ-ZONE PM Standard Bus factory setting; ``port`` from
            the positional arg is applied if ``serial_settings`` is
            ``None``. For Modbus RTU, the typical PM factory framing
            is **9600 8-E-1** — pass an explicit
            :class:`SerialSettings` to override the default. Auto-
            detect uses the same framing for both probes — there is
            no baud sweeping in the open path (cross-cutting
            invariant 5).

    Returns:
        An *opened* :class:`Controller` when ``protocol=AUTO`` (the
        detector held the transport open after a successful probe),
        otherwise an *unopened* :class:`Controller` to be used as an
        async context manager.

    Raises:
        WatlowConfigurationError: ``address`` is out of range or
            ``protocol`` is unsupported.
        WatlowProtocolUnsupportedError: ``protocol=AUTO`` and both
            probes failed.
    """
    if protocol not in (ProtocolKind.STDBUS, ProtocolKind.MODBUS_RTU, ProtocolKind.AUTO):
        raise WatlowConfigurationError(
            f"unsupported protocol kind: {protocol!r}",
            context=ErrorContext(port=port),
        )

    settings = serial_settings or SerialSettings(port=port)
    if settings.port != port:
        # User passed both — we honour the explicit ``port`` arg over
        # the settings dataclass to avoid silent surprise.
        from dataclasses import replace  # noqa: PLC0415 — cold path

        settings = replace(settings, port=port)

    if protocol is ProtocolKind.AUTO:
        # Lazy import — keep the Std-Bus-only callers off the anymodbus
        # dep graph until they actually opt in to AUTO.
        from watlowlib.protocol.detect import detect_protocol  # noqa: PLC0415

        resolved = await detect_protocol(
            port,
            address=address,
            serial_settings=settings,
        )
        # Detector returned an *open* transport already paired with the
        # right client; build the controller around them and skip
        # ``Controller.__aenter__``'s open() (it short-circuits when
        # ``transport.is_open`` is already True).
        session = Session(
            resolved.client,
            registry=PARAMETERS,
            family=ControllerFamily.UNKNOWN,
            address=address,
            port=resolved.transport.label,
        )
        return Controller(session, resolved.transport, serial_settings=settings)

    transport: Transport
    if protocol is ProtocolKind.MODBUS_RTU:
        # Lazy import — keep the Std-Bus path off the anymodbus dep
        # graph for users who never reach for Modbus.
        from watlowlib.protocol.modbus.transport import (  # noqa: PLC0415
            ModbusBusTransport,
        )

        transport = ModbusBusTransport(settings)
    else:
        transport = SerialTransport(settings)
    return await open_controller(
        transport,
        protocol=protocol,
        address=address,
        serial_settings=settings,
    )

sweep_modbus async

sweep_modbus(
    port,
    *,
    addresses=DEFAULT_MODBUS_RANGE,
    serial_settings=None,
    timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Yield one :class:DiscoveryResult per Modbus address probed.

Same shape as :func:sweep_stdbus — sequential, with the bus transport opened once and reused across every slave address. The Modbus driver multiplexes slaves over a single open handle so no per-address transport churn is incurred.

Source code in src/watlowlib/devices/discovery.py
async def sweep_modbus(
    port: str,
    *,
    addresses: Iterable[int] = DEFAULT_MODBUS_RANGE,
    serial_settings: SerialSettings | None = None,
    timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> AsyncIterator[DiscoveryResult]:
    """Yield one :class:`DiscoveryResult` per Modbus address probed.

    Same shape as :func:`sweep_stdbus` — sequential, with the bus
    transport opened once and reused across every slave address. The
    Modbus driver multiplexes slaves over a single open handle so no
    per-address transport churn is incurred.
    """
    settings = _resolve_settings(port, serial_settings)
    async for result in _sweep(
        port=port,
        protocol=ProtocolKind.MODBUS_RTU,
        addresses=addresses,
        serial_settings=settings,
        timeout_s=timeout_s,
    ):
        yield result

sweep_stdbus async

sweep_stdbus(
    port,
    *,
    addresses=DEFAULT_STDBUS_RANGE,
    serial_settings=None,
    timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Yield one :class:DiscoveryResult per Std Bus address probed.

Opens the serial transport once, walks addresses sequentially against the same open handle, then closes. Silent rows carry a populated :attr:DiscoveryResult.error of type :class:watlowlib.errors.WatlowTimeoutError (or :class:watlowlib.errors.WatlowTransportError for framing issues) so callers can distinguish "device absent" from "address never tried".

timeout_s bounds every underlying parameter read; the default keeps a 16-address silent sweep under five seconds.

Source code in src/watlowlib/devices/discovery.py
async def sweep_stdbus(
    port: str,
    *,
    addresses: Iterable[int] = DEFAULT_STDBUS_RANGE,
    serial_settings: SerialSettings | None = None,
    timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> AsyncIterator[DiscoveryResult]:
    """Yield one :class:`DiscoveryResult` per Std Bus address probed.

    Opens the serial transport once, walks addresses sequentially
    against the same open handle, then closes. Silent rows carry a
    populated :attr:`DiscoveryResult.error` of type
    :class:`watlowlib.errors.WatlowTimeoutError` (or
    :class:`watlowlib.errors.WatlowTransportError` for framing issues)
    so callers can distinguish "device absent" from "address never
    tried".

    ``timeout_s`` bounds every underlying parameter read; the default
    keeps a 16-address silent sweep under five seconds.
    """
    settings = _resolve_settings(port, serial_settings)
    async for result in _sweep(
        port=port,
        protocol=ProtocolKind.STDBUS,
        addresses=addresses,
        serial_settings=settings,
        timeout_s=timeout_s,
    ):
        yield result

Controller

watlowlib.devices.controller

The :class:Controller facade — public API for one device.

Single-device surface:

  • :meth:identify
  • :meth:read_pv / :meth:read_setpoint / :meth:set_setpoint
  • :meth:read_parameter / :meth:write_parameter
  • :meth:loop (multi-loop access), PID, alarms

Lifecycle is async-context-manager: async with await open_device(...) opens the transport on __aenter__ and disposes the protocol client + closes the transport on __aexit__.

Controller

Controller(session, transport, *, serial_settings)

Async facade for a single Watlow controller.

Source code in src/watlowlib/devices/controller.py
def __init__(
    self,
    session: Session,
    transport: Transport,
    *,
    serial_settings: SerialSettings,
) -> None:
    self._session = session
    self._transport = transport
    self._serial_settings = serial_settings
    # Cached loop count populated by :meth:`identify`. ``None`` means
    # "we haven't asked yet" — :meth:`loop` then defers validation
    # to the registry's per-spec ``max_instance``. Concrete count is
    # what the part-number decoder produced from the captured part
    # string (PM3 → 1, PM6/8/9 + ``U`` control → 2, etc.).
    self._loops: int | None = None
    # Cached SKU-derived capabilities populated by :meth:`identify`.
    # ``None`` until the part number has been decoded; downstream
    # operations that gate on bits (cool-side PID, etc.) treat
    # ``None`` as "no information, no gate" so calls work pre-
    # identify without surprising the user.
    self._capabilities: Capability | None = None

capabilities property

capabilities

Cached SKU capabilities (set after :meth:identify).

None pre-identify so capability-gated operations behave permissively until the part number is captured. After :meth:identify, callers can branch on :attr:Capability.HAS_COOLING etc. without re-issuing identify.

loops property

loops

Cached loop count (set after :meth:identify).

None until the device's part number has been decoded; :meth:loop accepts any 1-indexed value while loops is None and falls back to per-spec validation at the first wire call. After :meth:identify, loops reflects the decoded value.

session property

session

Underlying session used for command dispatch.

aclose async

aclose()

Close the underlying transport and dispose the protocol client.

Source code in src/watlowlib/devices/controller.py
async def aclose(self) -> None:
    """Close the underlying transport and dispose the protocol client."""
    # The session holds a reference to the protocol client; dispose
    # it so any pending caller learns the controller is gone before
    # the transport close races them.
    try:
        self._session.dispose()
    finally:
        await self._transport.close()

identify async

identify(
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Read the identity parameters and return a :class:DeviceInfo.

Reads (in order): part number (1009), hardware id (1001), firmware id (1002), serial number. Missing secondary fields stay None and the result's :attr:DeviceInfo.health is promoted from :attr:DeviceHealth.OK to :attr:DeviceHealth.PARTIAL. If the part-number read itself fails, the result's health is :attr:DeviceHealth.FAILED and capability decoding is skipped (the family prior still applies).

Parameters:

Name Type Description Default
timeout float | None

Per-read timeout override.

None
strict bool

If True, raise the underlying error when the part-number read fails instead of returning a health=FAILED info. Use this in maintenance code paths that need to know the device actually answered before declaring success.

False
query_configured_protocol bool

If True, also read parameter 17009 (Protocol) and populate :attr:DeviceInfo.configured_protocol. Off by default because the read costs an extra round-trip; the maintenance verify pass and the discover CLI opt in.

False

Raises:

Type Description
WatlowError

When strict=True and the part-number read fails. The original transport / protocol error class is preserved.

Source code in src/watlowlib/devices/controller.py
async def identify(
    self,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Read the identity parameters and return a :class:`DeviceInfo`.

    Reads (in order): part number (1009), hardware id (1001),
    firmware id (1002), serial number. Missing secondary fields
    stay ``None`` and the result's :attr:`DeviceInfo.health` is
    promoted from :attr:`DeviceHealth.OK` to
    :attr:`DeviceHealth.PARTIAL`. If the part-number read itself
    fails, the result's health is :attr:`DeviceHealth.FAILED` and
    capability decoding is skipped (the family prior still
    applies).

    Args:
        timeout: Per-read timeout override.
        strict: If ``True``, raise the underlying error when the
            part-number read fails instead of returning a
            ``health=FAILED`` info. Use this in maintenance code
            paths that need to know the device actually answered
            before declaring success.
        query_configured_protocol: If ``True``, also read parameter
            17009 (Protocol) and populate
            :attr:`DeviceInfo.configured_protocol`. Off by default
            because the read costs an extra round-trip; the
            maintenance verify pass and the discover CLI opt in.

    Raises:
        WatlowError: When ``strict=True`` and the part-number read
            fails. The original transport / protocol error class
            is preserved.
    """
    if strict:
        entry = await self.read_parameter("part_number", timeout=timeout)
        part_raw = entry.value if isinstance(entry.value, str) else None
    else:
        part_raw = await self._safe_read_str("part_number", timeout=timeout)
    hw_id = await self._safe_read_int("hardware_id", timeout=timeout)
    fw_id = await self._safe_read_int("firmware_id", timeout=timeout)
    serial_str = await self._safe_read_str("serial_number", timeout=timeout)

    if part_raw:
        part = decode_part_number(part_raw)
        capabilities = capabilities_for_part_number(part)
        # PARTIAL when part_number is fine but a secondary read missed.
        secondary_missing = hw_id is None or fw_id is None
        health = DeviceHealth.PARTIAL if secondary_missing else DeviceHealth.OK
    else:
        part = PartNumber(raw="", family=ControllerFamily.UNKNOWN)
        # No part number → no SKU decode; capability table degrades
        # to the UNKNOWN family prior (NONE).
        capabilities = capabilities_for_part_number(part)
        health = DeviceHealth.FAILED

    configured_protocol: ProtocolKind | None = None
    if query_configured_protocol:
        code = await self._safe_read_int(17009, timeout=timeout)
        if code is not None:
            configured_protocol = _PROTOCOL_CODE_TO_KIND.get(code)

    loops = default_loops(part)
    # Cache for ``self.loop(n)``'s eager validator. Identify is the
    # canonical place that gets to set this — open() doesn't have
    # the part number yet, and a SKU's loop count / capabilities
    # never change mid-session.
    self._loops = loops
    self._capabilities = capabilities
    return DeviceInfo(
        part_number=part,
        hardware_id=hw_id,
        firmware_id=fw_id,
        serial_number=serial_str,
        family=part.family,
        protocol=self._session.protocol_kind,
        address=self._session.address,
        capabilities=capabilities,
        serial_settings=self._serial_settings,
        loops=loops,
        health=health,
        configured_protocol=configured_protocol,
    )

loop

loop(n)

Return a sub-facade bound to loop n (1-indexed).

n is validated eagerly when :attr:loops is known, otherwise per-spec max_instance validation kicks in at the first wire call. Multi-loop access is the public way to reach loop 2 on dual-loop devices — :meth:Controller.read_pv defaults to instance=1.

Source code in src/watlowlib/devices/controller.py
def loop(self, n: int) -> ControllerLoop:
    """Return a sub-facade bound to loop ``n`` (1-indexed).

    ``n`` is validated eagerly when :attr:`loops` is known,
    otherwise per-spec ``max_instance`` validation kicks in at the
    first wire call. Multi-loop access is the public way to reach
    loop 2 on dual-loop devices —
    :meth:`Controller.read_pv` defaults to ``instance=1``.
    """
    return ControllerLoop(self, n)

poll async

poll(parameters, *, names=None, instances=(1,))

Read every (parameter × instance) and return them as :class:Sample\ s.

Satisfies the :class:watlowlib.streaming.PollSource Protocol so a solo :class:Controller can drive :func:watlowlib.streaming.record directly without a manager. names is accepted for Protocol compatibility but ignored — a Controller has only one device.

Failed reads are dropped from the returned list and logged at WARN. The recorder treats absence as "drop this row from the batch" and continues with the next tick.

Source code in src/watlowlib/devices/controller.py
async def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    r"""Read every (parameter × instance) and return them as :class:`Sample`\ s.

    Satisfies the :class:`watlowlib.streaming.PollSource` Protocol so
    a solo :class:`Controller` can drive :func:`watlowlib.streaming.record`
    directly without a manager. ``names`` is accepted for Protocol
    compatibility but ignored — a Controller has only one device.

    Failed reads are dropped from the returned list and logged at
    WARN. The recorder treats absence as "drop this row from the
    batch" and continues with the next tick.
    """
    del names  # solo controller has no name-keyed device map
    from watlowlib.streaming._poll import poll_controller  # noqa: PLC0415 — avoid cycle

    return await poll_controller(
        self,
        name=self._transport.label,
        parameters=parameters,
        instances=instances,
    )

read_parameter async

read_parameter(name_or_id, *, instance=1, timeout=None)

Read any registry parameter.

instance=1 is the default for single-loop devices and the first loop / channel on multi-loop devices.

Source code in src/watlowlib/devices/controller.py
async def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Read any registry parameter.

    ``instance=1`` is the default for single-loop devices and the
    first loop / channel on multi-loop devices.
    """
    return await self._session.execute(
        READ_PARAMETER,
        ReadParameterRequest(name_or_id, instance=instance),
        timeout=timeout,
    )

read_pv async

read_pv(*, instance=1, timeout=None)

Read the process value for instance (loop number, 1-indexed).

Source code in src/watlowlib/devices/controller.py
async def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the process value for ``instance`` (loop number, 1-indexed)."""
    entry = await self.read_parameter("process_value", instance=instance, timeout=timeout)
    return self._reading_from_entry(entry)

read_setpoint async

read_setpoint(*, instance=1, timeout=None)

Read the active setpoint for instance.

Source code in src/watlowlib/devices/controller.py
async def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active setpoint for ``instance``."""
    entry = await self.read_parameter("setpoint", instance=instance, timeout=timeout)
    return self._reading_from_entry(entry)

set_setpoint async

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Write the setpoint and return the device-echoed value as a :class:Reading.

Setpoint is RWES — pass confirm=True to acknowledge the EEPROM write. The returned reading is the device's echo of the value it accepted.

Source code in src/watlowlib/devices/controller.py
async def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write the setpoint and return the device-echoed value as a :class:`Reading`.

    Setpoint is RWES — pass ``confirm=True`` to acknowledge the
    EEPROM write. The returned reading is the device's echo of
    the value it accepted.
    """
    entry = await self.write_parameter(
        "setpoint",
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )
    return self._reading_from_entry(entry)

write_parameter async

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Write any registry parameter.

Persistent (RWE / RWES) writes require confirm=True; the session raises :class:WatlowConfirmationRequiredError before any I/O if the gate is missing.

Source code in src/watlowlib/devices/controller.py
async def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Write any registry parameter.

    Persistent (RWE / RWES) writes require ``confirm=True``;
    the session raises :class:`WatlowConfirmationRequiredError`
    before any I/O if the gate is missing.
    """
    return await self._session.execute(
        WRITE_PARAMETER,
        WriteParameterRequest(name_or_id, value, instance=instance),
        confirm=confirm,
        timeout=timeout,
    )

Session

watlowlib.devices.session

The :class:Session — single dispatch point for every command.

The session is the only place that gates, logs, and updates :class:Availability. Variants are pure (ctx, request) → response functions; protocol clients only own the wire. Per docs/design.md invariant 2, no other layer touches these concerns.

Responsibilities (in order, per execute):

  1. Resolve the protocol variant. UNSUPPORTED is sticky — short- circuit pre-I/O on a typed error.
  2. Enforce confirm=True for :attr:SafetyTier.PERSISTENT writes.
  3. Acquire the per-port lock on the protocol client.
  4. Variant encodeclient.execute → variant decode.
  5. Map success / typed errors to availability transitions and log a structured event.

Variant signatures differ across protocols (see docs/design.md §5):

  • Std Bus variants take decode(reply, ctx) — the reply already carries the parameter selector echoed by the device.
  • Modbus variants take decode(words, ctx, request) — the wire carries no echo, so the variant re-resolves the spec from the request to interpret the words.

Session

Session(client, *, registry, family, address, port)

Owns availability cache, gates, and the dispatch loop.

A :class:Session is bound to exactly one :class:ProtocolClient for its lifetime — one protocol per port (invariant 1).

Source code in src/watlowlib/devices/session.py
def __init__(
    self,
    client: ProtocolClient[Any, Any],
    *,
    registry: ParameterRegistry,
    family: ControllerFamily,
    address: int,
    port: str,
) -> None:
    self._client = client
    self._registry = registry
    self._family = family
    self._address = address
    self._port = port
    self._availability: dict[str, Availability] = {}

address property

address

Session bus address.

client property

client

The bound protocol client.

Exposed for the watlow-raw escape hatch and for diagnostics that need to issue an unframed wire op outside the registry. Callers must acquire :attr:ProtocolClient.lock before :meth:ProtocolClient.execute to honour the per-port serialization invariant, and must pass this session's :attr:address (or another concrete address for multi-drop diagnostics) to execute.

family property

family

Best-known controller family for this session.

port property

port

Transport label (for logs / error context).

protocol_kind property

protocol_kind

The wire protocol this session speaks.

registry property

registry

Parameter registry bound to this session.

Exposed for the streaming layer so polling code can resolve a name / id to a :class:ParameterSpec without an extra import of the module-level :data:PARAMETERS.

availability

availability(command_name)

Cached availability for command_name.

Source code in src/watlowlib/devices/session.py
def availability(self, command_name: str) -> Availability:
    """Cached availability for ``command_name``."""
    return self._availability.get(command_name, Availability.UNKNOWN)

dispose

dispose()

Dispose the bound protocol client.

Source code in src/watlowlib/devices/session.py
def dispose(self) -> None:
    """Dispose the bound protocol client."""
    self._client.dispose()

execute async

execute(command, request, *, confirm=False, timeout=None)

Dispatch command with request and return the typed response.

Source code in src/watlowlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with ``request`` and return the typed response."""
    kind = self._client.kind
    # Variant resolution. The session picks the variant matching
    # the bound protocol; one protocol per port (invariant 1).
    # Resolve to a single ``variant`` local so the rest of the
    # method is protocol-agnostic; we still branch on ``kind``
    # for the encode/decode call shapes (stdbus takes ``reply``;
    # modbus takes ``words, ctx, request``).
    if kind is ProtocolKind.STDBUS:
        stdbus_variant = command.stdbus
        if stdbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Std Bus variant",
                context=self._error_context(command, request),
            )
        modbus_variant = None
    elif kind is ProtocolKind.MODBUS_RTU:
        modbus_variant = command.modbus
        if modbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Modbus variant",
                context=self._error_context(command, request),
            )
        stdbus_variant = None
    else:
        raise WatlowProtocolUnsupportedError(
            f"session has unsupported protocol kind {kind!r}",
            context=self._error_context(command, request),
        )

    # Cache key. We key on ``command_name:parameter_id`` for
    # registry-driven commands so that one ``read_parameter("foo")``
    # rejection doesn't sticky-block every other parameter; bare
    # commands fall back to ``command.name``.
    cache_key = self._cache_key(command, request)

    cached = self._availability.get(cache_key, Availability.UNKNOWN)
    if cached is Availability.UNSUPPORTED:
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} is unsupported on this device",
            context=self._error_context(command, request),
        )

    # Safety gate: PERSISTENT writes need explicit confirm.
    if command.safety is SafetyTier.PERSISTENT and not confirm:
        raise WatlowConfirmationRequiredError(
            f"command {command.name!r} is PERSISTENT and requires confirm=True",
            context=self._error_context(command, request),
        )

    ctx = CommandContext(
        registry=self._registry,
        family=self._family,
        address=self._address,
        port=self._port,
    )

    bound_timeout = timeout if timeout is not None else DEFAULTS.io_timeout_s

    # Encode under the variant. Errors here are pre-I/O — typically
    # validation failures — and should propagate untouched.
    # Exactly one of ``stdbus_variant`` / ``modbus_variant`` is
    # non-None per the resolution above; the type narrowing is
    # explicit so neither mypy nor pyright needs an ``assert`` it
    # can't enforce at runtime under ``-O``.
    wire_request: Any
    if stdbus_variant is not None:
        wire_request = stdbus_variant.encode(ctx, request)
    elif modbus_variant is not None:
        wire_request = modbus_variant.encode(ctx, request)
    else:  # pragma: no cover — variant resolution above guarantees one is set
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} variant resolution lost",
            context=self._error_context(command, request),
        )

    started = time.monotonic()
    # Hold the per-port client lock only for the I/O turn-around.
    # Decode is CPU-only and does not need to block the next
    # request waiting on the same RS-485 segment; ``reply`` is
    # snapshotted before the lock releases.
    async with self._client.lock:
        try:
            reply = await self._client.execute(
                wire_request,
                address=self._address,
                timeout=bound_timeout,
                command_name=command.name,
            )
        except (
            WatlowNoSuchObjectError,
            WatlowNoSuchAttributeError,
            WatlowProtocolUnsupportedError,
        ) as exc:
            self._availability[cache_key] = Availability.UNSUPPORTED
            _log.warning(
                "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise
        except WatlowProtocolError:
            raise
        except WatlowError as exc:
            _log.warning(
                "command error: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise

    # Decode outside the lock — pure compute on the captured reply.
    try:
        if stdbus_variant is not None:
            response = stdbus_variant.decode(reply, ctx)
        else:
            # ``modbus_variant is not None`` per the resolution above;
            # mypy/pyright follow the narrowing without an ``assert``.
            response = modbus_variant.decode(reply, ctx, request)  # type: ignore[union-attr]
    except (
        WatlowNoSuchObjectError,
        WatlowNoSuchAttributeError,
        WatlowProtocolUnsupportedError,
    ) as exc:
        # Decode-side "we don't have this": same availability
        # transition as the wire-side rejection above.
        self._availability[cache_key] = Availability.UNSUPPORTED
        _log.warning(
            "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
            kind.value,
            command.name,
            cache_key,
            exc,
        )
        raise
    except WatlowProtocolError:
        # Decode-failure parity with the inside-lock branch above:
        # NoSuchInstance / IllegalDataValue / generic decode errors
        # don't transition availability per design §5b.
        raise

    elapsed = time.monotonic() - started
    self._availability[cache_key] = Availability.SUPPORTED
    _log.debug(
        "session exec ok protocol=%s cmd=%s key=%s elapsed=%.4fs",
        kind.value,
        command.name,
        cache_key,
        elapsed,
    )
    return response

Loops

watlowlib.devices.loop

Per-loop sub-facade returned by :meth:Controller.loop.

A :class:ControllerLoop is a thin view over a :class:Controller that pre-binds an instance argument. It validates the loop number once at construction (cross-cutting invariant 6: 1-indexed everywhere) and forwards every operation to the parent controller's session, threading the loop index as the registry instance.

The sub-facade is stateless beyond the loop number — it does not duplicate the controller's transport, lock, or availability cache. Multiple :class:ControllerLoop instances over the same controller share the underlying session safely; concurrent calls serialize on the protocol client's lock.

This module intentionally has no protocol-specific code. PID, output, and alarm helpers live in :mod:watlowlib.commands.loop and :mod:watlowlib.commands.alarms so the facade-only logic and the parameter aggregation logic stay separate.

ControllerLoop

ControllerLoop(controller, loop_number)

A view over one control loop on a :class:Controller.

Construct via :meth:Controller.loop; never instantiated directly by user code. The sub-facade lives only as long as the parent controller's session — closing the controller is the only cleanup needed.

Source code in src/watlowlib/devices/loop.py
def __init__(self, controller: Controller, loop_number: int) -> None:
    if loop_number < 1:
        raise WatlowValidationError(
            f"loop number must be 1-indexed and >= 1; got {loop_number}",
        )
    # If the controller has identified the device, validate
    # eagerly. Otherwise defer to the registry's per-spec
    # ``validate_instance`` at first call: a registered parameter
    # with ``max_instance=1`` will raise a clear
    # ``WatlowValidationError`` when ``loop(2).read_pv()`` is
    # invoked. That keeps ``Controller.loop(2)`` cheap when called
    # before identify, but still fails before I/O.
    loops = controller.loops
    if loops is not None and loop_number > loops:
        raise WatlowValidationError(
            f"loop {loop_number} out of range for this device (1..{loops})",
        )
    self._controller = controller
    self._loop = loop_number

number property

number

The 1-indexed loop number this view binds.

read_alarms async

read_alarms()

Read the alarm word for this loop.

Currently raises :class:watlowlib.errors.WatlowProtocolUnsupportedError — see :func:watlowlib.commands.alarms.read_alarms for why the decoder is not yet wired up.

Source code in src/watlowlib/devices/loop.py
async def read_alarms(self) -> AlarmState:
    """Read the alarm word for this loop.

    Currently raises :class:`watlowlib.errors.WatlowProtocolUnsupportedError` —
    see :func:`watlowlib.commands.alarms.read_alarms` for why the
    decoder is not yet wired up.
    """
    return await _read_alarms(self._controller.session, instance=self._loop)

read_output async

read_output()

Read this loop's working output (output_power).

Source code in src/watlowlib/devices/loop.py
async def read_output(self) -> Reading:
    """Read this loop's working output (``output_power``)."""
    return await _read_output(self._controller.session, instance=self._loop)

read_pid async

read_pid()

Read every PID gain for this loop. Missing gains return None.

Cool-side gains (cool_proportional_band, dead_band) are skipped when the controller's identified capabilities lack :attr:Capability.HAS_COOLING (e.g. PM output_2 == 'A'). Pre-identify, the gate is permissive.

Source code in src/watlowlib/devices/loop.py
async def read_pid(self) -> PidGains:
    """Read every PID gain for this loop. Missing gains return ``None``.

    Cool-side gains (``cool_proportional_band``, ``dead_band``)
    are skipped when the controller's identified capabilities
    lack :attr:`Capability.HAS_COOLING` (e.g. PM ``output_2 ==
    'A'``). Pre-identify, the gate is permissive.
    """
    return await _read_pid(
        self._controller.session,
        instance=self._loop,
        capabilities=self._controller.capabilities,
    )

read_pv async

read_pv(*, timeout=None)

Read this loop's process value.

Source code in src/watlowlib/devices/loop.py
async def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's process value."""
    return await self._controller.read_pv(instance=self._loop, timeout=timeout)

read_setpoint async

read_setpoint(*, timeout=None)

Read this loop's active setpoint.

Source code in src/watlowlib/devices/loop.py
async def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's active setpoint."""
    return await self._controller.read_setpoint(instance=self._loop, timeout=timeout)

set_setpoint async

set_setpoint(value, *, confirm=False, timeout=None)

Write this loop's setpoint (RWES → confirm=True required).

Source code in src/watlowlib/devices/loop.py
async def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write this loop's setpoint (RWES → ``confirm=True`` required)."""
    return await self._controller.set_setpoint(
        value,
        instance=self._loop,
        confirm=confirm,
        timeout=timeout,
    )

write_pid async

write_pid(gains, *, confirm=False)

Write the supplied gains for this loop.

Persistent — passing confirm=True is required. Fields left None on gains skip the wire entirely. Setting a cool-side field on a controller without :attr:Capability.HAS_COOLING raises :class:watlowlib.errors.WatlowConfigurationError.

Source code in src/watlowlib/devices/loop.py
async def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Write the supplied gains for this loop.

    Persistent — passing ``confirm=True`` is required. Fields
    left ``None`` on ``gains`` skip the wire entirely. Setting a
    cool-side field on a controller without
    :attr:`Capability.HAS_COOLING` raises
    :class:`watlowlib.errors.WatlowConfigurationError`.
    """
    return await _write_pid(
        self._controller.session,
        gains,
        instance=self._loop,
        confirm=confirm,
        capabilities=self._controller.capabilities,
    )

Capability + safety + availability

watlowlib.devices.capability

Three small enums that set the contract between layers.

  • :class:SafetyTier — derived from RWES; gates confirm=True writes.
  • :class:Capability — coarse hardware-feature bitmap. Bits are added when a captured family needs them and existing values stay stable.
  • :class:Availability — per-command session cache state.

This module is leaf — it imports nothing from :mod:watlowlib.devices siblings, so the registry and command layers can pull these enums without an import cycle. See docs/design.md §5b.

Availability

Bases: StrEnum

Per-command session state.

Sticky for the session: once a command transitions to :attr:UNSUPPORTED, the session short-circuits subsequent invocations with a typed error pre-I/O. The transition table lives in docs/design.md §5b.

Capability

Bases: Flag

Coarse hardware capability bits.

Bits are derived from a decoded part number when one is available (see :func:watlowlib.registry.families.capabilities_for_part_number) and fall back to a per-family prior otherwise. The session widens the set at runtime when a command succeeds against a parameter that proves the capability.

The vocabulary is small on purpose — most Watlow gating is by :class:watlowlib.registry.families.ControllerFamily and by :attr:watlowlib.registry.parameters.ParameterSpec.parameter_id, not by per-feature bits. New bits are added when captured family behaviour requires them.

SafetyTier

Bases: IntEnum

How dangerous a command is to invoke.

  • READ_ONLY (R) — no state change.
  • STATEFUL — runtime state change but not EEPROM-backed. Reserved for commands like "start autotune"; no PM parameter maps here today, but the tier exists so future commands have a place to live.
  • PERSISTENT (RW / RWE / RWES) — EEPROM-backed; requires confirm=True at the facade.

capabilities_for_family

capabilities_for_family(family)

Return the capability prior for family.

The session promotes observed capabilities at runtime and the part- number decoder fills in per-SKU bits via :func:watlowlib.registry.families.capabilities_for_part_number. PM is intentionally :attr:Capability.NONE because PM SKUs vary across every dimension (cooling / modbus / profile / comms).

Source code in src/watlowlib/devices/capability.py
def capabilities_for_family(family: ControllerFamily) -> Capability:
    """Return the capability prior for ``family``.

    The session promotes observed capabilities at runtime and the part-
    number decoder fills in per-SKU bits via
    :func:`watlowlib.registry.families.capabilities_for_part_number`.
    PM is intentionally :attr:`Capability.NONE` because PM SKUs vary
    across every dimension (cooling / modbus / profile / comms).
    """
    return _FAMILY_PRIORS.get(family.value, Capability.NONE)

Family classification

watlowlib.devices.kind

Re-export :class:ControllerFamily under :mod:watlowlib.devices.

Callers can import it from either location. The canonical home is :mod:watlowlib.registry.families so the registry layer can construct family enums without depending on :mod:watlowlib.devices.

ControllerFamily

Bases: StrEnum

Watlow controller family discriminator.

Membership here is advisory — :class:watlowlib.devices.session.Session treats family hints as priors, not gates, unless the session was opened with strict=True. See docs/design.md §5b.

classify_family

classify_family(part_number)

Return the :class:ControllerFamily for a part-number string.

Only the leading family discriminator is parsed; per-family digit decoding is in :func:decode_part_number.

Source code in src/watlowlib/registry/families.py
def classify_family(part_number: str) -> ControllerFamily:
    """Return the :class:`ControllerFamily` for a part-number string.

    Only the leading family discriminator is parsed; per-family digit
    decoding is in :func:`decode_part_number`.
    """
    head = part_number.strip().upper()
    if head.startswith("PM"):
        return ControllerFamily.PM
    if head.startswith("RM"):
        return ControllerFamily.RM
    if head.startswith("ST"):
        return ControllerFamily.ST
    if head.startswith("F4T"):
        return ControllerFamily.F4T
    return ControllerFamily.UNKNOWN

Public dataclasses

watlowlib.devices.models

Public dataclasses returned by the :class:Controller facade.

All frozen, slots=True. py.typed ships.

See docs/design.md §6a.

AlarmState dataclass

AlarmState(loop, high, low, silenced, raw_bits)

Decoded alarm bits for one loop.

DeviceHealth

Bases: StrEnum

Outcome of an :meth:Controller.identify call.

Used by callers (the maintenance verify pass, the configure CLI, discovery rows) to distinguish "the device answered every probe" from "the device answered some probes but not the load-bearing part-number read." Sentinel values stay the same enum across the public API so downstream code can branch on it.

DeviceInfo dataclass

DeviceInfo(
    part_number,
    hardware_id,
    firmware_id,
    serial_number,
    family,
    protocol,
    address,
    capabilities,
    serial_settings,
    loops,
    health=DeviceHealth.OK,
    configured_protocol=None,
)

Identity + connection metadata for an open controller.

Returned by :meth:Controller.identify. Capabilities are decoded from the part number when one is captured (see :func:watlowlib.registry.families.capabilities_for_part_number) and OR-ed with the family prior; unobserved bits stay zero rather than being guessed.

protocol is the wire protocol the host is currently talking; configured_protocol is what the device's persistent EEPROM parameter (PM 17009) reports. They normally match, but when they diverge the helper :attr:protocol_mismatch flags it — useful for catching SKU/firmware combinations where the user wrote a new protocol but the runtime stack didn't pick it up (e.g. comms position-8 = 'A', no Modbus stack present even though 17009 reads 1057).

protocol_mismatch property

protocol_mismatch

True when EEPROM says one protocol and we're talking another.

Always False when :attr:configured_protocol is None (i.e. identify did not query parameter 17009).

DiscoveryResult dataclass

DiscoveryResult(
    port, serial_settings, address, protocol, info, error
)

One row from the discovery sweep.

LoopState dataclass

LoopState(
    loop, pv, setpoint, output_pct, raw=(lambda: {})()
)

Snapshot of one loop. Composed from several reads.

ParameterEntry dataclass

ParameterEntry(spec, instance, value, raw)

Generic registry-driven read/write result.

Returned by :data:watlowlib.commands.READ_PARAMETER and :data:watlowlib.commands.WRITE_PARAMETER. The :class:Controller translates an entry into a :class:Reading / :class:PartNumber / etc. when the public API guarantees a richer shape.

PartNumber dataclass

PartNumber(raw, family, details=(lambda: {})())

Parsed part-number string returned by read_part_number.

Per-family digit decoding is contributed by :mod:watlowlib.registry.families. Decoded fragments live in :attr:details as a free-form mapping so each family can populate only what its ordering format defines, and so adding fragments to the PM decoder later is non-breaking.

The EZ-ZONE PM decoder populates case size, control type, power input, three output codes, and options string. Other families fall through to a stub: only :attr:family is set, and :attr:details is empty.

Reading dataclass

Reading(
    value, unit, received_at, monotonic_ns, raw, protocol
)

A single timestamped value from the controller.

protocol is set by the variant decoder, not by the facade — it reflects which wire protocol produced the value (per docs/design.md invariant 7).

Factory

watlowlib.devices.factory

open_device — single entry point for opening a controller.

Honours :attr:ProtocolKind.STDBUS, :attr:ProtocolKind.MODBUS_RTU, and :attr:ProtocolKind.AUTO (Std Bus probe → Modbus probe → fail). The detector itself lives in :mod:watlowlib.protocol.detect; the factory only orchestrates.

The factory does not sweep bauds — the user sets one. See docs/design.md §7 for why baud sweeping is opt-in via the watlow-discover CLI rather than the open path.

open_controller async

open_controller(
    transport,
    *,
    protocol,
    address,
    serial_settings,
    family=ControllerFamily.UNKNOWN,
)

Build a :class:Controller over an existing :class:Transport.

Tests use this to drive the facade through a :class:watlowlib.transport.fake.FakeTransport. Production code uses :func:open_device.

Source code in src/watlowlib/devices/factory.py
async def open_controller(
    transport: Transport,
    *,
    protocol: ProtocolKind,
    address: int,
    serial_settings: SerialSettings,
    family: ControllerFamily = ControllerFamily.UNKNOWN,
) -> Controller:
    """Build a :class:`Controller` over an existing :class:`Transport`.

    Tests use this to drive the facade through a
    :class:`watlowlib.transport.fake.FakeTransport`. Production code
    uses :func:`open_device`.
    """
    if protocol is ProtocolKind.AUTO:
        raise WatlowConfigurationError(
            "open_controller requires a concrete protocol; AUTO must be resolved by "
            "open_device (which runs the detector and returns a built Controller).",
            context=ErrorContext(port=transport.label),
        )
    client = make_protocol_client(protocol, transport)
    session = Session(
        client,
        registry=PARAMETERS,
        family=family,
        address=address,
        port=transport.label,
    )
    return Controller(session, transport, serial_settings=serial_settings)

open_device async

open_device(
    port,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
)

Open a controller on a serial port.

Parameters:

Name Type Description Default
port str

Serial-port path (/dev/ttyUSB0, COM3, ...).

required
protocol ProtocolKind

Wire protocol. STDBUS and MODBUS_RTU open directly; AUTO runs the conservative detector (Std Bus → Modbus → fail) per docs/design.md §7.

STDBUS
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247. Under AUTO the same address is tried against both probes.

1
serial_settings SerialSettings | None

Optional override. Default is 38400 8-N-1, the EZ-ZONE PM Standard Bus factory setting; port from the positional arg is applied if serial_settings is None. For Modbus RTU, the typical PM factory framing is 9600 8-E-1 — pass an explicit :class:SerialSettings to override the default. Auto- detect uses the same framing for both probes — there is no baud sweeping in the open path (cross-cutting invariant 5).

None

Returns:

Type Description
Controller

An opened :class:Controller when protocol=AUTO (the

Controller

detector held the transport open after a successful probe),

Controller

otherwise an unopened :class:Controller to be used as an

Controller

async context manager.

Raises:

Type Description
WatlowConfigurationError

address is out of range or protocol is unsupported.

WatlowProtocolUnsupportedError

protocol=AUTO and both probes failed.

Source code in src/watlowlib/devices/factory.py
async def open_device(
    port: str,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
) -> Controller:
    """Open a controller on a serial port.

    Args:
        port: Serial-port path (``/dev/ttyUSB0``, ``COM3``, ...).
        protocol: Wire protocol. ``STDBUS`` and ``MODBUS_RTU`` open
            directly; ``AUTO`` runs the conservative detector
            (Std Bus → Modbus → fail) per ``docs/design.md`` §7.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``. Under ``AUTO`` the same address is
            tried against both probes.
        serial_settings: Optional override. Default is **38400 8-N-1**,
            the EZ-ZONE PM Standard Bus factory setting; ``port`` from
            the positional arg is applied if ``serial_settings`` is
            ``None``. For Modbus RTU, the typical PM factory framing
            is **9600 8-E-1** — pass an explicit
            :class:`SerialSettings` to override the default. Auto-
            detect uses the same framing for both probes — there is
            no baud sweeping in the open path (cross-cutting
            invariant 5).

    Returns:
        An *opened* :class:`Controller` when ``protocol=AUTO`` (the
        detector held the transport open after a successful probe),
        otherwise an *unopened* :class:`Controller` to be used as an
        async context manager.

    Raises:
        WatlowConfigurationError: ``address`` is out of range or
            ``protocol`` is unsupported.
        WatlowProtocolUnsupportedError: ``protocol=AUTO`` and both
            probes failed.
    """
    if protocol not in (ProtocolKind.STDBUS, ProtocolKind.MODBUS_RTU, ProtocolKind.AUTO):
        raise WatlowConfigurationError(
            f"unsupported protocol kind: {protocol!r}",
            context=ErrorContext(port=port),
        )

    settings = serial_settings or SerialSettings(port=port)
    if settings.port != port:
        # User passed both — we honour the explicit ``port`` arg over
        # the settings dataclass to avoid silent surprise.
        from dataclasses import replace  # noqa: PLC0415 — cold path

        settings = replace(settings, port=port)

    if protocol is ProtocolKind.AUTO:
        # Lazy import — keep the Std-Bus-only callers off the anymodbus
        # dep graph until they actually opt in to AUTO.
        from watlowlib.protocol.detect import detect_protocol  # noqa: PLC0415

        resolved = await detect_protocol(
            port,
            address=address,
            serial_settings=settings,
        )
        # Detector returned an *open* transport already paired with the
        # right client; build the controller around them and skip
        # ``Controller.__aenter__``'s open() (it short-circuits when
        # ``transport.is_open`` is already True).
        session = Session(
            resolved.client,
            registry=PARAMETERS,
            family=ControllerFamily.UNKNOWN,
            address=address,
            port=resolved.transport.label,
        )
        return Controller(session, resolved.transport, serial_settings=settings)

    transport: Transport
    if protocol is ProtocolKind.MODBUS_RTU:
        # Lazy import — keep the Std-Bus path off the anymodbus dep
        # graph for users who never reach for Modbus.
        from watlowlib.protocol.modbus.transport import (  # noqa: PLC0415
            ModbusBusTransport,
        )

        transport = ModbusBusTransport(settings)
    else:
        transport = SerialTransport(settings)
    return await open_controller(
        transport,
        protocol=protocol,
        address=address,
        serial_settings=settings,
    )

Discovery

watlowlib.devices.discovery

Address sweeps for watlow-discover.

Two sweep entry points, mirrored on the two protocols:

  • :func:sweep_stdbus walks Standard Bus addresses 1..16 (BACnet MS/TP MAC 0x10..0x1F).
  • :func:sweep_modbus walks a configurable Modbus slave-address range (defaults to 1..16 to match the common bench setup; callers can extend to 1..247).

Each address probe issues one bounded read of parameter 1001 (Hardware ID — the auto-detect probe target) and returns a :class:DiscoveryResult row. Successful probes promote into a full :meth:Controller.identify call so the row carries a populated :class:DeviceInfo.

The sweep opens the underlying transport once and reuses it across every address — Standard Bus addresses differ only in the dst-MAC byte of the BACnet MS/TP outer frame, and the Modbus bus driver multiplexes slaves over a single open serial handle. Reopening per address would add ~0.5s of cdc_acm re-init per probe on Linux; the open-once design lands a 16-address sweep in well under a second of wall-clock above the actual wire turnaround.

Discovery is opt-in — it is never run from open_device. The watlow-discover CLI surfaces this module to the command line.

sweep_modbus async

sweep_modbus(
    port,
    *,
    addresses=DEFAULT_MODBUS_RANGE,
    serial_settings=None,
    timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Yield one :class:DiscoveryResult per Modbus address probed.

Same shape as :func:sweep_stdbus — sequential, with the bus transport opened once and reused across every slave address. The Modbus driver multiplexes slaves over a single open handle so no per-address transport churn is incurred.

Source code in src/watlowlib/devices/discovery.py
async def sweep_modbus(
    port: str,
    *,
    addresses: Iterable[int] = DEFAULT_MODBUS_RANGE,
    serial_settings: SerialSettings | None = None,
    timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> AsyncIterator[DiscoveryResult]:
    """Yield one :class:`DiscoveryResult` per Modbus address probed.

    Same shape as :func:`sweep_stdbus` — sequential, with the bus
    transport opened once and reused across every slave address. The
    Modbus driver multiplexes slaves over a single open handle so no
    per-address transport churn is incurred.
    """
    settings = _resolve_settings(port, serial_settings)
    async for result in _sweep(
        port=port,
        protocol=ProtocolKind.MODBUS_RTU,
        addresses=addresses,
        serial_settings=settings,
        timeout_s=timeout_s,
    ):
        yield result

sweep_stdbus async

sweep_stdbus(
    port,
    *,
    addresses=DEFAULT_STDBUS_RANGE,
    serial_settings=None,
    timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Yield one :class:DiscoveryResult per Std Bus address probed.

Opens the serial transport once, walks addresses sequentially against the same open handle, then closes. Silent rows carry a populated :attr:DiscoveryResult.error of type :class:watlowlib.errors.WatlowTimeoutError (or :class:watlowlib.errors.WatlowTransportError for framing issues) so callers can distinguish "device absent" from "address never tried".

timeout_s bounds every underlying parameter read; the default keeps a 16-address silent sweep under five seconds.

Source code in src/watlowlib/devices/discovery.py
async def sweep_stdbus(
    port: str,
    *,
    addresses: Iterable[int] = DEFAULT_STDBUS_RANGE,
    serial_settings: SerialSettings | None = None,
    timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> AsyncIterator[DiscoveryResult]:
    """Yield one :class:`DiscoveryResult` per Std Bus address probed.

    Opens the serial transport once, walks addresses sequentially
    against the same open handle, then closes. Silent rows carry a
    populated :attr:`DiscoveryResult.error` of type
    :class:`watlowlib.errors.WatlowTimeoutError` (or
    :class:`watlowlib.errors.WatlowTransportError` for framing issues)
    so callers can distinguish "device absent" from "address never
    tried".

    ``timeout_s`` bounds every underlying parameter read; the default
    keeps a 16-address silent sweep under five seconds.
    """
    settings = _resolve_settings(port, serial_settings)
    async for result in _sweep(
        port=port,
        protocol=ProtocolKind.STDBUS,
        addresses=addresses,
        serial_settings=settings,
        timeout_s=timeout_s,
    ):
        yield result