Skip to content

watlowlib.commands

The Command[Req, Resp] descriptor, per-protocol variants, the parameter read/write workhorse pair, and per-loop helpers. See Commands for the catalogue and Design §5.

Public surface

watlowlib.commands

Command descriptors — pure (ctx, request) → response variants.

The workhorse pair is :data:READ_PARAMETER / :data:WRITE_PARAMETER. Every public-API method on :class:watlowlib.devices.controller.Controller lowers to these two commands; specialised commands (PID, alarms, profile upload) live alongside.

Command dataclass

Command(
    name,
    stdbus=None,
    modbus=None,
    family_hints=_empty_family_hints(),
    capability_hints=Capability.NONE,
    safety=SafetyTier.READ_ONLY,
    min_firmware=None,
)

A pure descriptor — request type + per-protocol variants.

Attributes:

Name Type Description
name str

Human-readable name; threaded into log events and error contexts.

stdbus StdBusVariant[Req, Resp] | None

Std Bus variant, or None if the command has no Std Bus binding.

modbus ModbusVariant[Req, Resp] | None

Modbus variant, or None if the command has no Modbus binding.

family_hints frozenset[ControllerFamily]

Advisory family priors. frozenset() means "no prior — attempt anywhere".

capability_hints Capability

Capability bits required to attempt the command. :attr:Capability.NONE means "always attempt".

safety SafetyTier

Determines whether confirm=True is required at the facade.

min_firmware FirmwareVersion | None

If set, the session refuses the command on devices reporting a lower firmware version.

CommandContext dataclass

CommandContext(
    registry,
    family=ControllerFamily.UNKNOWN,
    address=0,
    port="",
)

Context available to every variant during encode / decode.

Attributes:

Name Type Description
registry ParameterRegistry

Parameter spec lookup. Callers typically pass :data:watlowlib.registry.PARAMETERS.

family ControllerFamily

Best-known family for the device. UNKNOWN is treated as "no prior" — variants attempt anyway and let availability be observed from the response.

address int

Bus address, threaded through for richer :class:watlowlib.errors.ErrorContext on failure.

port str

Transport label, threaded through for the same reason.

ModbusVariant

Bases: Protocol

Modbus encode + decode for one :class:Command.

Variants emit a typed :class:ModbusOp rather than wire bytes because :mod:anymodbus already owns the PDU codec — handing it bytes would be a layer violation. The :class:watlowlib.protocol.modbus.client.ModbusProtocolClient lowers the op onto the matching :class:anymodbus.Slave method and returns the raw register tuple, which the variant decode converts into the typed response.

Variants are stateless — implementations are typically @dataclass instances stored once on the :class:Command definition.

decode

decode(words, ctx, request)

Convert raw Modbus register words to the typed response.

request is threaded through so the variant can recover per-request context (e.g. which parameter spec to populate the response with) without re-resolving from the registry. Writes return words=() — variants for writes typically echo the request value rather than parsing words.

Source code in src/watlowlib/commands/base.py
def decode(self, words: tuple[int, ...], ctx: CommandContext, request: Req) -> Resp:
    """Convert raw Modbus register words to the typed response.

    ``request`` is threaded through so the variant can recover
    per-request context (e.g. which parameter spec to populate
    the response with) without re-resolving from the registry.
    Writes return ``words=()`` — variants for writes typically
    echo the request value rather than parsing words.
    """
    ...

encode

encode(ctx, request)

Produce a :class:ModbusOp for request.

Source code in src/watlowlib/commands/base.py
def encode(self, ctx: CommandContext, request: Req) -> ModbusOp:
    """Produce a :class:`ModbusOp` for ``request``."""
    ...

PidGains dataclass

PidGains(
    heat_proportional_band=None,
    cool_proportional_band=None,
    time_integral=None,
    time_derivative=None,
    dead_band=None,
)

Decoded PID gain set for one loop.

Fields default to None when the controller doesn't expose the parameter (e.g. cooling fields on a heat-only PM SKU). Callers that need a strict-shape view can check not_none().

not_none

not_none()

True iff every gain was successfully read.

Source code in src/watlowlib/commands/loop.py
def not_none(self) -> bool:
    """``True`` iff every gain was successfully read."""
    return all(
        v is not None
        for v in (
            self.heat_proportional_band,
            self.cool_proportional_band,
            self.time_integral,
            self.time_derivative,
            self.dead_band,
        )
    )

ReadParameterRequest dataclass

ReadParameterRequest(name_or_id, instance=1)

Read one parameter at one instance.

StdBusVariant

Bases: Protocol

Std Bus encode + decode for one :class:Command.

Variants are stateless — implementations are typically @dataclass instances stored once on the :class:Command definition.

decode

decode(reply, ctx)

Convert reply to the typed response.

Source code in src/watlowlib/commands/base.py
def decode(self, reply: StdBusReply, ctx: CommandContext) -> Resp:
    """Convert ``reply`` to the typed response."""
    ...

encode

encode(ctx, request)

Produce inner Watlow payload bytes for request.

Source code in src/watlowlib/commands/base.py
def encode(self, ctx: CommandContext, request: Req) -> bytes:
    """Produce inner Watlow payload bytes for ``request``."""
    ...

WriteParameterRequest dataclass

WriteParameterRequest(name_or_id, value, instance=1)

Write one parameter at one instance.

read_output async

read_output(session, *, instance=1)

Read the loop's working output (output_power).

Returns a :class:Reading matching the rest of the facade so callers don't have to remember a different return shape per operation. Unsupported on devices without the output_power parameter — surfaces as :class:watlowlib.errors.WatlowProtocolUnsupportedError.

Source code in src/watlowlib/commands/loop.py
async def read_output(session: Session, *, instance: int = 1) -> Reading:
    """Read the loop's working output (``output_power``).

    Returns a :class:`Reading` matching the rest of the facade so
    callers don't have to remember a different return shape per
    operation. Unsupported on devices without the
    ``output_power`` parameter — surfaces as
    :class:`watlowlib.errors.WatlowProtocolUnsupportedError`.
    """
    entry = await session.execute(
        READ_PARAMETER,
        ReadParameterRequest(_OUTPUT_PARAMETER, instance=instance),
    )
    value = float(entry.value) if isinstance(entry.value, int | float) else None
    return Reading(
        value=value,
        unit=None,
        received_at=datetime.now(UTC),
        monotonic_ns=time.monotonic_ns(),
        raw=entry.raw,
        protocol=session.protocol_kind,
    )

read_pid async

read_pid(session, *, instance=1, capabilities=None)

Read every PID gain for instance (loop number, 1-indexed).

Issues one parameter read per gain through the session — the output_power and PID parameters live in different rows of the EZ-ZONE registry so a single contiguous Modbus read isn't an option here. Reads run sequentially under the session lock so the snapshot is consistent against concurrent writers on the same port.

Cool-side parameters (cool_proportional_band, dead_band) are gated on :attr:Capability.HAS_COOLING when capabilities is supplied: SKUs with no second control output (PM output_2 == 'A') expose the cool registers but they hold uninitialised bits that decode as garbage floats (e.g. 0xCDCDCDCD ≈ 3.4e12). Skip the read entirely on those devices and report None.

When capabilities is None the gate is permissive — every field is read, matching the pre-capability behaviour. Per design §5b, parameters the controller rejects with a typed unsupported error still surface as None rather than raising.

Source code in src/watlowlib/commands/loop.py
async def read_pid(
    session: Session,
    *,
    instance: int = 1,
    capabilities: Capability | None = None,
) -> PidGains:
    """Read every PID gain for ``instance`` (loop number, 1-indexed).

    Issues one parameter read per gain through the session — the
    ``output_power`` and PID parameters live in different rows of
    the EZ-ZONE registry so a single contiguous Modbus read isn't an
    option here. Reads run sequentially under the session lock so the
    snapshot is consistent against concurrent writers on the same
    port.

    Cool-side parameters (``cool_proportional_band``, ``dead_band``)
    are gated on :attr:`Capability.HAS_COOLING` when ``capabilities``
    is supplied: SKUs with no second control output (PM ``output_2 ==
    'A'``) expose the cool registers but they hold uninitialised bits
    that decode as garbage floats (e.g. ``0xCDCDCDCD ≈ 3.4e12``).
    Skip the read entirely on those devices and report ``None``.

    When ``capabilities`` is ``None`` the gate is permissive — every
    field is read, matching the pre-capability behaviour. Per design
    §5b, parameters the controller rejects with a typed unsupported
    error still surface as ``None`` rather than raising.
    """
    values: dict[str, float | None] = {}
    has_cooling = capabilities is None or bool(capabilities & Capability.HAS_COOLING)
    for field_name, parameter_name, cool_only in _PID_FIELDS:
        if cool_only and not has_cooling:
            values[field_name] = None
            continue
        value = await _safe_read_float(session, parameter_name, instance=instance)
        values[field_name] = value
    return PidGains(**values)

write_pid async

write_pid(
    session,
    gains,
    *,
    instance=1,
    confirm=False,
    capabilities=None,
)

Write the supplied gains for instance and return what was applied.

Only fields with a non-None value are written; fields left None skip the wire entirely (callers can read-modify-write just one gain without disturbing the rest). Persistent writes require confirm=True — the session raises :class:watlowlib.errors.WatlowConfirmationRequiredError pre-I/O on the first underlying call if the gate is missing.

Cool-side fields (cool_proportional_band, dead_band) are refused with :class:WatlowConfigurationError when capabilities is supplied without :attr:Capability.HAS_COOLING — writing to the cool registers on a single-output PM is at best a no-op and at worst silently corrupts adjacent cool-side state.

Source code in src/watlowlib/commands/loop.py
async def write_pid(
    session: Session,
    gains: PidGains,
    *,
    instance: int = 1,
    confirm: bool = False,
    capabilities: Capability | None = None,
) -> PidGains:
    """Write the supplied gains for ``instance`` and return what was applied.

    Only fields with a non-``None`` value are written; fields left
    ``None`` skip the wire entirely (callers can read-modify-write
    just one gain without disturbing the rest). Persistent writes
    require ``confirm=True`` — the session raises
    :class:`watlowlib.errors.WatlowConfirmationRequiredError`
    pre-I/O on the first underlying call if the gate is missing.

    Cool-side fields (``cool_proportional_band``, ``dead_band``) are
    refused with :class:`WatlowConfigurationError` when
    ``capabilities`` is supplied without
    :attr:`Capability.HAS_COOLING` — writing to the cool registers on
    a single-output PM is at best a no-op and at worst silently
    corrupts adjacent cool-side state.
    """
    has_cooling = capabilities is None or bool(capabilities & Capability.HAS_COOLING)
    applied: dict[str, float | None] = {}
    for field_name, parameter_name, cool_only in _PID_FIELDS:
        value = getattr(gains, field_name)
        if value is None:
            applied[field_name] = None
            continue
        if cool_only and not has_cooling:
            raise WatlowConfigurationError(
                f"PID field {field_name!r} requires a cooling output, but the "
                "controller's capabilities do not include Capability.HAS_COOLING "
                "(PM output_2 == 'A' or equivalent). Pass a PidGains with the "
                "cool-side fields left as None.",
                context=ErrorContext(
                    command_name="write_pid",
                    instance=instance,
                ),
            )
        entry = await session.execute(
            WRITE_PARAMETER,
            WriteParameterRequest(parameter_name, value, instance=instance),
            confirm=confirm,
        )
        # Echo the device-reported value (Modbus echoes the request,
        # Std Bus echoes the parsed write response).
        if isinstance(entry.value, int | float):
            applied[field_name] = float(entry.value)
        else:
            applied[field_name] = value
    return PidGains(**applied)

Command base + variants

watlowlib.commands.base

Command + variant primitives.

A :class:Command is a pure descriptor: it pairs a request type with one variant per protocol. Variants are pure functions of (ctx, request) and never touch transport. The :class:watlowlib.devices.session.Session owns dispatch (gates, logging, availability) and is the only place that calls the variant.

Std Bus variants emit raw inner-payload bytes — watlowlib owns that codec. Modbus variants emit a typed :class:ModbusOp instruction because anymodbus already owns the wire codec; handing it bytes would be a layer violation.

See docs/design.md §5.

Command dataclass

Command(
    name,
    stdbus=None,
    modbus=None,
    family_hints=_empty_family_hints(),
    capability_hints=Capability.NONE,
    safety=SafetyTier.READ_ONLY,
    min_firmware=None,
)

A pure descriptor — request type + per-protocol variants.

Attributes:

Name Type Description
name str

Human-readable name; threaded into log events and error contexts.

stdbus StdBusVariant[Req, Resp] | None

Std Bus variant, or None if the command has no Std Bus binding.

modbus ModbusVariant[Req, Resp] | None

Modbus variant, or None if the command has no Modbus binding.

family_hints frozenset[ControllerFamily]

Advisory family priors. frozenset() means "no prior — attempt anywhere".

capability_hints Capability

Capability bits required to attempt the command. :attr:Capability.NONE means "always attempt".

safety SafetyTier

Determines whether confirm=True is required at the facade.

min_firmware FirmwareVersion | None

If set, the session refuses the command on devices reporting a lower firmware version.

CommandContext dataclass

CommandContext(
    registry,
    family=ControllerFamily.UNKNOWN,
    address=0,
    port="",
)

Context available to every variant during encode / decode.

Attributes:

Name Type Description
registry ParameterRegistry

Parameter spec lookup. Callers typically pass :data:watlowlib.registry.PARAMETERS.

family ControllerFamily

Best-known family for the device. UNKNOWN is treated as "no prior" — variants attempt anyway and let availability be observed from the response.

address int

Bus address, threaded through for richer :class:watlowlib.errors.ErrorContext on failure.

port str

Transport label, threaded through for the same reason.

ModbusVariant

Bases: Protocol

Modbus encode + decode for one :class:Command.

Variants emit a typed :class:ModbusOp rather than wire bytes because :mod:anymodbus already owns the PDU codec — handing it bytes would be a layer violation. The :class:watlowlib.protocol.modbus.client.ModbusProtocolClient lowers the op onto the matching :class:anymodbus.Slave method and returns the raw register tuple, which the variant decode converts into the typed response.

Variants are stateless — implementations are typically @dataclass instances stored once on the :class:Command definition.

decode

decode(words, ctx, request)

Convert raw Modbus register words to the typed response.

request is threaded through so the variant can recover per-request context (e.g. which parameter spec to populate the response with) without re-resolving from the registry. Writes return words=() — variants for writes typically echo the request value rather than parsing words.

Source code in src/watlowlib/commands/base.py
def decode(self, words: tuple[int, ...], ctx: CommandContext, request: Req) -> Resp:
    """Convert raw Modbus register words to the typed response.

    ``request`` is threaded through so the variant can recover
    per-request context (e.g. which parameter spec to populate
    the response with) without re-resolving from the registry.
    Writes return ``words=()`` — variants for writes typically
    echo the request value rather than parsing words.
    """
    ...

encode

encode(ctx, request)

Produce a :class:ModbusOp for request.

Source code in src/watlowlib/commands/base.py
def encode(self, ctx: CommandContext, request: Req) -> ModbusOp:
    """Produce a :class:`ModbusOp` for ``request``."""
    ...

StdBusVariant

Bases: Protocol

Std Bus encode + decode for one :class:Command.

Variants are stateless — implementations are typically @dataclass instances stored once on the :class:Command definition.

decode

decode(reply, ctx)

Convert reply to the typed response.

Source code in src/watlowlib/commands/base.py
def decode(self, reply: StdBusReply, ctx: CommandContext) -> Resp:
    """Convert ``reply`` to the typed response."""
    ...

encode

encode(ctx, request)

Produce inner Watlow payload bytes for request.

Source code in src/watlowlib/commands/base.py
def encode(self, ctx: CommandContext, request: Req) -> bytes:
    """Produce inner Watlow payload bytes for ``request``."""
    ...

Parameter commands

watlowlib.commands.parameters

Workhorse READ_PARAMETER / WRITE_PARAMETER commands.

These two commands cover the 80% case: read or write any registry parameter through the same code path on either protocol. The variant pulls selector + encoding from the :class:ParameterSpec resolved at encode time, so adding a new parameter to pm_parameters.json extends the surface with no command-layer changes.

See docs/design.md §5 / §5a.

ReadParameterRequest dataclass

ReadParameterRequest(name_or_id, instance=1)

Read one parameter at one instance.

WriteParameterRequest dataclass

WriteParameterRequest(name_or_id, value, instance=1)

Write one parameter at one instance.

Loop helpers

watlowlib.commands.loop

Loop-level facade helpers — PID and output reads.

Loop operations compose registry-driven reads/writes from :mod:watlowlib.commands.parameters rather than introducing new wire-level commands. Each underlying call already has variants on both Std Bus and Modbus, so PID / output operations work uniformly across protocols without protocol-specific code in this module (cross-cutting invariant 2: variants own the wire; this module owns aggregation).

Persistent writes (write_pid) propagate confirm=True into each underlying parameter write — the session enforces the :class:SafetyTier.PERSISTENT gate per write, so a missing confirm fails pre-I/O on the first gain rather than half-applying the gain set.

PidGains dataclass

PidGains(
    heat_proportional_band=None,
    cool_proportional_band=None,
    time_integral=None,
    time_derivative=None,
    dead_band=None,
)

Decoded PID gain set for one loop.

Fields default to None when the controller doesn't expose the parameter (e.g. cooling fields on a heat-only PM SKU). Callers that need a strict-shape view can check not_none().

not_none

not_none()

True iff every gain was successfully read.

Source code in src/watlowlib/commands/loop.py
def not_none(self) -> bool:
    """``True`` iff every gain was successfully read."""
    return all(
        v is not None
        for v in (
            self.heat_proportional_band,
            self.cool_proportional_band,
            self.time_integral,
            self.time_derivative,
            self.dead_band,
        )
    )

read_output async

read_output(session, *, instance=1)

Read the loop's working output (output_power).

Returns a :class:Reading matching the rest of the facade so callers don't have to remember a different return shape per operation. Unsupported on devices without the output_power parameter — surfaces as :class:watlowlib.errors.WatlowProtocolUnsupportedError.

Source code in src/watlowlib/commands/loop.py
async def read_output(session: Session, *, instance: int = 1) -> Reading:
    """Read the loop's working output (``output_power``).

    Returns a :class:`Reading` matching the rest of the facade so
    callers don't have to remember a different return shape per
    operation. Unsupported on devices without the
    ``output_power`` parameter — surfaces as
    :class:`watlowlib.errors.WatlowProtocolUnsupportedError`.
    """
    entry = await session.execute(
        READ_PARAMETER,
        ReadParameterRequest(_OUTPUT_PARAMETER, instance=instance),
    )
    value = float(entry.value) if isinstance(entry.value, int | float) else None
    return Reading(
        value=value,
        unit=None,
        received_at=datetime.now(UTC),
        monotonic_ns=time.monotonic_ns(),
        raw=entry.raw,
        protocol=session.protocol_kind,
    )

read_pid async

read_pid(session, *, instance=1, capabilities=None)

Read every PID gain for instance (loop number, 1-indexed).

Issues one parameter read per gain through the session — the output_power and PID parameters live in different rows of the EZ-ZONE registry so a single contiguous Modbus read isn't an option here. Reads run sequentially under the session lock so the snapshot is consistent against concurrent writers on the same port.

Cool-side parameters (cool_proportional_band, dead_band) are gated on :attr:Capability.HAS_COOLING when capabilities is supplied: SKUs with no second control output (PM output_2 == 'A') expose the cool registers but they hold uninitialised bits that decode as garbage floats (e.g. 0xCDCDCDCD ≈ 3.4e12). Skip the read entirely on those devices and report None.

When capabilities is None the gate is permissive — every field is read, matching the pre-capability behaviour. Per design §5b, parameters the controller rejects with a typed unsupported error still surface as None rather than raising.

Source code in src/watlowlib/commands/loop.py
async def read_pid(
    session: Session,
    *,
    instance: int = 1,
    capabilities: Capability | None = None,
) -> PidGains:
    """Read every PID gain for ``instance`` (loop number, 1-indexed).

    Issues one parameter read per gain through the session — the
    ``output_power`` and PID parameters live in different rows of
    the EZ-ZONE registry so a single contiguous Modbus read isn't an
    option here. Reads run sequentially under the session lock so the
    snapshot is consistent against concurrent writers on the same
    port.

    Cool-side parameters (``cool_proportional_band``, ``dead_band``)
    are gated on :attr:`Capability.HAS_COOLING` when ``capabilities``
    is supplied: SKUs with no second control output (PM ``output_2 ==
    'A'``) expose the cool registers but they hold uninitialised bits
    that decode as garbage floats (e.g. ``0xCDCDCDCD ≈ 3.4e12``).
    Skip the read entirely on those devices and report ``None``.

    When ``capabilities`` is ``None`` the gate is permissive — every
    field is read, matching the pre-capability behaviour. Per design
    §5b, parameters the controller rejects with a typed unsupported
    error still surface as ``None`` rather than raising.
    """
    values: dict[str, float | None] = {}
    has_cooling = capabilities is None or bool(capabilities & Capability.HAS_COOLING)
    for field_name, parameter_name, cool_only in _PID_FIELDS:
        if cool_only and not has_cooling:
            values[field_name] = None
            continue
        value = await _safe_read_float(session, parameter_name, instance=instance)
        values[field_name] = value
    return PidGains(**values)

write_pid async

write_pid(
    session,
    gains,
    *,
    instance=1,
    confirm=False,
    capabilities=None,
)

Write the supplied gains for instance and return what was applied.

Only fields with a non-None value are written; fields left None skip the wire entirely (callers can read-modify-write just one gain without disturbing the rest). Persistent writes require confirm=True — the session raises :class:watlowlib.errors.WatlowConfirmationRequiredError pre-I/O on the first underlying call if the gate is missing.

Cool-side fields (cool_proportional_band, dead_band) are refused with :class:WatlowConfigurationError when capabilities is supplied without :attr:Capability.HAS_COOLING — writing to the cool registers on a single-output PM is at best a no-op and at worst silently corrupts adjacent cool-side state.

Source code in src/watlowlib/commands/loop.py
async def write_pid(
    session: Session,
    gains: PidGains,
    *,
    instance: int = 1,
    confirm: bool = False,
    capabilities: Capability | None = None,
) -> PidGains:
    """Write the supplied gains for ``instance`` and return what was applied.

    Only fields with a non-``None`` value are written; fields left
    ``None`` skip the wire entirely (callers can read-modify-write
    just one gain without disturbing the rest). Persistent writes
    require ``confirm=True`` — the session raises
    :class:`watlowlib.errors.WatlowConfirmationRequiredError`
    pre-I/O on the first underlying call if the gate is missing.

    Cool-side fields (``cool_proportional_band``, ``dead_band``) are
    refused with :class:`WatlowConfigurationError` when
    ``capabilities`` is supplied without
    :attr:`Capability.HAS_COOLING` — writing to the cool registers on
    a single-output PM is at best a no-op and at worst silently
    corrupts adjacent cool-side state.
    """
    has_cooling = capabilities is None or bool(capabilities & Capability.HAS_COOLING)
    applied: dict[str, float | None] = {}
    for field_name, parameter_name, cool_only in _PID_FIELDS:
        value = getattr(gains, field_name)
        if value is None:
            applied[field_name] = None
            continue
        if cool_only and not has_cooling:
            raise WatlowConfigurationError(
                f"PID field {field_name!r} requires a cooling output, but the "
                "controller's capabilities do not include Capability.HAS_COOLING "
                "(PM output_2 == 'A' or equivalent). Pass a PidGains with the "
                "cool-side fields left as None.",
                context=ErrorContext(
                    command_name="write_pid",
                    instance=instance,
                ),
            )
        entry = await session.execute(
            WRITE_PARAMETER,
            WriteParameterRequest(parameter_name, value, instance=instance),
            confirm=confirm,
        )
        # Echo the device-reported value (Modbus echoes the request,
        # Std Bus echoes the parsed write response).
        if isinstance(entry.value, int | float):
            applied[field_name] = float(entry.value)
        else:
            applied[field_name] = value
    return PidGains(**applied)

Alarm helpers

watlowlib.commands.alarms

Alarm-state decoder placeholder.

EZ-ZONE PM exposes per-alarm-instance status as a PACKED 16-bit word at parameter 10005 (class 10 / alarms, member 5, max_instance=8). Watlow does not publish a stable public bit map for that PACKED word — different firmware revisions and SKU configurations move bits around — so a decoder would either need RE provenance we don't have or speculative bit guesses that fail silently on real hardware.

Until that bit map is captured, :func:read_alarms raises :class:WatlowProtocolUnsupportedError. The public signature returning :class:AlarmState is stable, so swapping in a real decoder later is non-breaking.

read_alarms async

read_alarms(session, *, instance=1)

Raise :class:WatlowProtocolUnsupportedError.

Parameters:

Name Type Description Default
session Session

The session whose facade triggered the call.

required
instance int

1-indexed alarm instance, threaded into the error context so callers can see which loop/alarm was asked for.

1

Raises:

Type Description
WatlowProtocolUnsupportedError

Always — see module docstring for why the decoder is intentionally absent.

Source code in src/watlowlib/commands/alarms.py
async def read_alarms(session: Session, *, instance: int = 1) -> AlarmState:
    """Raise :class:`WatlowProtocolUnsupportedError`.

    Args:
        session: The session whose facade triggered the call.
        instance: 1-indexed alarm instance, threaded into the error
            context so callers can see which loop/alarm was asked
            for.

    Raises:
        WatlowProtocolUnsupportedError: Always — see module docstring
            for why the decoder is intentionally absent.
    """
    raise WatlowProtocolUnsupportedError(
        "alarm-state decoding is not implemented: PM exposes alarm status as a "
        "PACKED 16-bit word at parameter 10005 but the bit layout has no public, "
        "firmware-stable spec yet.",
        context=ErrorContext(
            command_name="read_alarms",
            protocol=session.protocol_kind,
            port=session.port or None,
            address=session.address or None,
            parameter_id=10005,
            instance=instance,
        ),
    )