Skip to content

watlowlib.protocol

ProtocolKind, the ProtocolClient runtime-checkable Protocol, and the Standard Bus / Modbus RTU client implementations. See Design §4–5.

Public surface

watlowlib.protocol

Protocol layer — framing, parsing, and protocol-client adapters.

Standard Bus and Modbus RTU each have a full subpackage under here. The shared :class:ProtocolClient Protocol and :class:ProtocolKind enum live at this level. See docs/design.md §4.

ProtocolClient

Bases: Protocol

Per-device protocol client.

Implementations own the wire codec and the per-port lock. The :class:watlowlib.devices.session.Session is the only caller; it holds lock for the duration of a single command (request + reply).

disposed property

disposed

Whether :meth:dispose has been called.

kind property

kind

The :class:ProtocolKind this client speaks.

lock property

lock

Per-client lock acquired by :meth:Session.execute.

One lock per port — a single :class:Session serializes its own traffic, and :class:watlowlib.manager.WatlowManager enforces one protocol per port across sessions.

dispose

dispose()

Mark the client unusable. Subsequent execute calls raise.

Synchronous because dispose is called from teardown paths that don't always have an event loop. The client is responsible for closing its transport (or signalling the owning :class:Session to do so) — this method just trips the flag.

Source code in src/watlowlib/protocol/base.py
def dispose(self) -> None:
    """Mark the client unusable. Subsequent ``execute`` calls raise.

    Synchronous because dispose is called from teardown paths that
    don't always have an event loop. The client is responsible for
    closing its transport (or signalling the owning :class:`Session`
    to do so) — this method just trips the flag.
    """
    ...

execute async

execute(request, *, address, timeout=None, command_name='')

Send request to address, return the typed reply.

address travels with every call so one client can serve multiple devices on a multi-drop RS-485 segment without re-construction. Std Bus accepts 1..16, Modbus RTU accepts 1..247.

timeout overrides :attr:watlowlib.config.DEFAULTS.io_timeout_s for this call only. command_name is threaded into log events and error contexts; it is informational, not load-bearing for dispatch.

Source code in src/watlowlib/protocol/base.py
async def execute(
    self,
    request: Request_contra,
    *,
    address: int,
    timeout: float | None = None,
    command_name: str = "",
) -> Reply_co:
    """Send ``request`` to ``address``, return the typed reply.

    ``address`` travels with every call so one client can serve
    multiple devices on a multi-drop RS-485 segment without
    re-construction. Std Bus accepts ``1..16``, Modbus RTU accepts
    ``1..247``.

    ``timeout`` overrides :attr:`watlowlib.config.DEFAULTS.io_timeout_s`
    for this call only. ``command_name`` is threaded into log
    events and error contexts; it is informational, not load-bearing
    for dispatch.
    """
    ...

ProtocolKind

Bases: StrEnum

Wire protocol selected for a session.

AUTO triggers the conservative Std Bus → Modbus probe.

make_protocol_client

make_protocol_client(kind, transport)

Build an address-agnostic :class:ProtocolClient for kind over transport.

The returned client takes a destination address per :meth:ProtocolClient.execute call, so one client can serve every device on a multi-drop RS-485 segment.

Parameters:

Name Type Description Default
kind ProtocolKind

The wire protocol. AUTO is rejected here — the detector must resolve it to a concrete kind first.

required
transport Transport

An open or openable :class:Transport. Lifecycle is the caller's responsibility — the client does not call open() on construction. For MODBUS_RTU this must be a :class:ModbusBusTransport.

required

Raises:

Type Description
WatlowConfigurationError

kind is AUTO (use the detector), or the transport shape doesn't match kind.

Source code in src/watlowlib/protocol/client.py
def make_protocol_client(
    kind: ProtocolKind,
    transport: Transport,
) -> ProtocolClient[Any, Any]:
    """Build an address-agnostic :class:`ProtocolClient` for ``kind`` over ``transport``.

    The returned client takes a destination address per
    :meth:`ProtocolClient.execute` call, so one client can serve every
    device on a multi-drop RS-485 segment.

    Args:
        kind: The wire protocol. ``AUTO`` is rejected here — the
            detector must resolve it to a concrete kind first.
        transport: An open or openable :class:`Transport`. Lifecycle is
            the caller's responsibility — the client does not call
            ``open()`` on construction. For ``MODBUS_RTU`` this must be
            a :class:`ModbusBusTransport`.

    Raises:
        WatlowConfigurationError: ``kind`` is ``AUTO`` (use the
            detector), or the ``transport`` shape doesn't match
            ``kind``.
    """
    if kind is ProtocolKind.STDBUS:
        # Imported lazily so the modbus / detect branches don't pull
        # the stdbus subpackage when they aren't needed.
        from watlowlib.protocol.stdbus.client import StdBusProtocolClient  # noqa: PLC0415

        return StdBusProtocolClient(transport)
    if kind is ProtocolKind.MODBUS_RTU:
        from watlowlib.protocol.modbus.client import ModbusProtocolClient  # noqa: PLC0415
        from watlowlib.protocol.modbus.transport import ModbusBusTransport  # noqa: PLC0415

        if not isinstance(transport, ModbusBusTransport):
            raise WatlowConfigurationError(
                "ProtocolKind.MODBUS_RTU requires a ModbusBusTransport "
                f"(got {type(transport).__name__}).",
            )
        bus_transport = transport
        return ModbusProtocolClient(
            slave_provider=bus_transport.bus.slave,
            port=bus_transport.label,
        )
    if kind is ProtocolKind.AUTO:
        raise WatlowConfigurationError(
            "ProtocolKind.AUTO must be resolved by the detector before reaching "
            "make_protocol_client.",
        )
    # StrEnum is closed; an unknown value would have failed at parse time.
    raise WatlowConfigurationError(f"unsupported protocol kind: {kind!r}")

Base types

watlowlib.protocol.base

Protocol seam: :class:ProtocolKind enum + :class:ProtocolClient Protocol.

The :class:Session holds a :class:ProtocolClient and dispatches every command through execute(...). Variants are pure functions of (ctx, request) — the client owns the wire codec and the per-port serialization (lock).

Standard Bus and Modbus RTU specialize the request type:

  • :class:watlowlib.protocol.stdbus.client.StdBusProtocolClient is ProtocolClient[bytes, StdBusReply] because watlowlib owns the inner-payload codec; the variant produces raw bytes ready to be framed.
  • :class:watlowlib.protocol.modbus.client.ModbusProtocolClient is ProtocolClient[ModbusOp, tuple[int, ...]] because anymodbus owns the wire codec; handing it bytes would be a layer violation.

See docs/design.md §4.

ProtocolClient

Bases: Protocol

Per-device protocol client.

Implementations own the wire codec and the per-port lock. The :class:watlowlib.devices.session.Session is the only caller; it holds lock for the duration of a single command (request + reply).

disposed property

disposed

Whether :meth:dispose has been called.

kind property

kind

The :class:ProtocolKind this client speaks.

lock property

lock

Per-client lock acquired by :meth:Session.execute.

One lock per port — a single :class:Session serializes its own traffic, and :class:watlowlib.manager.WatlowManager enforces one protocol per port across sessions.

dispose

dispose()

Mark the client unusable. Subsequent execute calls raise.

Synchronous because dispose is called from teardown paths that don't always have an event loop. The client is responsible for closing its transport (or signalling the owning :class:Session to do so) — this method just trips the flag.

Source code in src/watlowlib/protocol/base.py
def dispose(self) -> None:
    """Mark the client unusable. Subsequent ``execute`` calls raise.

    Synchronous because dispose is called from teardown paths that
    don't always have an event loop. The client is responsible for
    closing its transport (or signalling the owning :class:`Session`
    to do so) — this method just trips the flag.
    """
    ...

execute async

execute(request, *, address, timeout=None, command_name='')

Send request to address, return the typed reply.

address travels with every call so one client can serve multiple devices on a multi-drop RS-485 segment without re-construction. Std Bus accepts 1..16, Modbus RTU accepts 1..247.

timeout overrides :attr:watlowlib.config.DEFAULTS.io_timeout_s for this call only. command_name is threaded into log events and error contexts; it is informational, not load-bearing for dispatch.

Source code in src/watlowlib/protocol/base.py
async def execute(
    self,
    request: Request_contra,
    *,
    address: int,
    timeout: float | None = None,
    command_name: str = "",
) -> Reply_co:
    """Send ``request`` to ``address``, return the typed reply.

    ``address`` travels with every call so one client can serve
    multiple devices on a multi-drop RS-485 segment without
    re-construction. Std Bus accepts ``1..16``, Modbus RTU accepts
    ``1..247``.

    ``timeout`` overrides :attr:`watlowlib.config.DEFAULTS.io_timeout_s`
    for this call only. ``command_name`` is threaded into log
    events and error contexts; it is informational, not load-bearing
    for dispatch.
    """
    ...

ProtocolKind

Bases: StrEnum

Wire protocol selected for a session.

AUTO triggers the conservative Std Bus → Modbus probe.

Standard Bus

watlowlib.protocol.stdbus

Watlow Standard Bus protocol — BACnet MS/TP framing + Watlow payload.

This subpackage exposes the codec primitives so reverse-engineering tools and offline decode utilities can use them directly. The :class:StdBusProtocolClient sits on top, and the high-level :class:watlowlib.devices.controller.Controller facade dispatches through it via :class:watlowlib.devices.session.Session.

DataType

Bases: IntEnum

Wire data-type tag bytes.

Tags fall into two families:

  • fixed-width: U8, U16, U32, S32, FLOAT — the value follows the tag with no length byte.
  • length-prefixed: STRING, PACKED — a length byte follows the tag.

The "Wide Enumeration" data type from the EZ-ZONE register list shares tag 0x0F with Enumeration. In every live capture so far the count byte is 1 (single 16-bit word). Multi-word behaviour is implemented but unverified.

ErrorCode

Bases: IntEnum

Error response codes observed when the request selector is invalid.

The error response payload is two bytes: 0x02 (response direction) followed by the code. There is no echo of the failing class/member/instance.

Mapping to :class:watlowlib.devices.capability.Availability (per docs/design.md §5b):

  • NO_SUCH_OBJECT / NO_SUCH_ATTRIBUTEUNSUPPORTED
  • NO_SUCH_INSTANCE → unchanged (the parameter exists; this loop / channel does not)

ErrorResponse dataclass

ErrorResponse(code)

A decoded inner error response.

Frame dataclass

Frame(frame_type, dst, src, payload)

Decoded BACnet MS/TP frame as seen on Standard Bus.

FrameError

Bases: ValueError

A wire frame failed structural or CRC validation.

FrameType

Bases: IntEnum

BACnet MS/TP frame types observed on Standard Bus traffic.

BACnet MS/TP defines 0..7 plus 0x80..0xFF as proprietary. Only 0x05 (request) and 0x06 (response) are honoured on the wire by EZ-ZONE PM controllers — see docs/protocol-stdbus-findings.md ("Frame-type space"). The rest are documented for probing by reverse-engineering tooling.

ReadRequest dataclass

ReadRequest(cls, member, instance)

A decoded inner read request.

ReadResponse dataclass

ReadResponse(cls, member, instance, value, type_tag)

A decoded inner read response.

StdBusError

StdBusError(code, label=None)

Bases: RuntimeError

The controller returned an explicit error response.

Library callers should prefer :func:raise_for_error_code, which surfaces the typed WatlowNoSuch* subclasses defined in :mod:watlowlib.errors (those are what the session catches and maps to :class:Availability). StdBusError is retained as a convenience for ad-hoc decoding of captured payloads outside the session.

Source code in src/watlowlib/protocol/stdbus/payload.py
def __init__(self, code: int, label: str | None = None) -> None:
    try:
        label = label or ErrorCode(code).name
    except ValueError:
        label = label or f"unknown(0x{code:02X})"
    super().__init__(f"Standard Bus error 0x{code:02X} ({label})")
    self.code = code

StdBusProtocolClient

StdBusProtocolClient(transport)

:class:watlowlib.protocol.base.ProtocolClient for Standard Bus.

The client is address-agnostic: execute takes the destination bus address per-call so one client can serve every device on a multi-drop RS-485 segment. The :class:watlowlib.devices.session.Session passes its bound address; :class:watlowlib.manager.WatlowManager shares one client across controllers on the same physical port.

Source code in src/watlowlib/protocol/stdbus/client.py
def __init__(self, transport: Transport) -> None:
    self._transport = transport
    self._lock = anyio.Lock()
    self._disposed = False

disposed property

disposed

Whether this client has been disposed.

kind property

kind

Wire protocol kind served by this client.

lock property

lock

Per-client lock used to serialize requests on one port.

dispose

dispose()

Mark this client unusable for future execute calls.

Source code in src/watlowlib/protocol/stdbus/client.py
def dispose(self) -> None:
    """Mark this client unusable for future ``execute`` calls."""
    self._disposed = True

execute async

execute(request, *, address, timeout=None, command_name='')

Send the inner request payload to address and return the framed reply.

timeout is a wall-clock bound on the entire request → reply round-trip. The whole I/O section runs inside a single :func:anyio.fail_after(timeout), so a hung device cannot stall a caller for more than timeout seconds even when the preamble scan and the body read each consume a substantial slice. The transport's per-call timeouts are kept as defence in depth.

Parameters:

Name Type Description Default
request bytes

Inner Watlow payload bytes (e.g. produced by :func:watlowlib.protocol.stdbus.payload.encode_read_request).

required
address int

Standard Bus address (1..16). Mapped to its BACnet MS/TP MAC via :func:addr_to_mac. Validated eagerly before any I/O so a bad address surfaces as a pre-I/O :class:ValueError rather than a wire-level framing failure.

required
timeout float | None

Wall-clock bound on the round-trip. Optional override of :attr:watlowlib.config.DEFAULTS.io_timeout_s.

None
command_name str

Threaded into log events for traceability.

''

Raises:

Type Description
WatlowConnectionError

client is disposed or transport not open.

WatlowFrameError

framing failure (bad preamble, CRC mismatch, truncated body).

WatlowTimeoutError

round-trip exceeded timeout.

ValueError

address is outside 1..16.

Source code in src/watlowlib/protocol/stdbus/client.py
async def execute(
    self,
    request: bytes,
    *,
    address: int,
    timeout: float | None = None,
    command_name: str = "",
) -> StdBusReply:
    """Send the inner ``request`` payload to ``address`` and return the framed reply.

    ``timeout`` is a **wall-clock** bound on the entire request →
    reply round-trip. The whole I/O section runs inside a single
    :func:`anyio.fail_after(timeout)`, so a hung device cannot
    stall a caller for more than ``timeout`` seconds even when the
    preamble scan and the body read each consume a substantial
    slice. The transport's per-call timeouts are kept as defence
    in depth.

    Args:
        request: Inner Watlow payload bytes (e.g. produced by
            :func:`watlowlib.protocol.stdbus.payload.encode_read_request`).
        address: Standard Bus address (``1..16``). Mapped to its
            BACnet MS/TP MAC via :func:`addr_to_mac`. Validated
            eagerly before any I/O so a bad address surfaces as a
            pre-I/O :class:`ValueError` rather than a wire-level
            framing failure.
        timeout: Wall-clock bound on the round-trip. Optional
            override of :attr:`watlowlib.config.DEFAULTS.io_timeout_s`.
        command_name: Threaded into log events for traceability.

    Raises:
        WatlowConnectionError: client is disposed or transport not open.
        WatlowFrameError: framing failure (bad preamble, CRC mismatch,
            truncated body).
        WatlowTimeoutError: round-trip exceeded ``timeout``.
        ValueError: ``address`` is outside ``1..16``.
    """
    if self._disposed:
        raise WatlowConnectionError(
            "StdBusProtocolClient is disposed",
            context=ErrorContext(
                command_name=command_name or None,
                protocol=ProtocolKind.STDBUS,
                port=self._transport.label,
                address=address,
            ),
        )
    # ``addr_to_mac`` validates the address range; let it raise.
    dst_mac = addr_to_mac(address)
    bound = timeout if timeout is not None else DEFAULTS.io_timeout_s

    frame = Frame(
        frame_type=FrameType.DATA_EXPECTING_REPLY,
        dst=dst_mac,
        src=HOST_MAC,
        payload=request,
    )
    wire = encode_frame(frame)

    try:
        with anyio.fail_after(bound):
            await self._transport.write(wire, timeout=bound)
            raw = await self._read_frame(
                bound,
                command_name=command_name,
                address=address,
            )
    except TimeoutError as exc:
        # The outer wall-clock cap fired before the round-trip
        # completed. Re-raise as the typed transport timeout so
        # callers see one shape regardless of which step expired.
        raise WatlowTimeoutError(
            f"Std Bus exec on addr={address} exceeded {bound}s",
            context=ErrorContext(
                command_name=command_name or None,
                protocol=ProtocolKind.STDBUS,
                port=self._transport.label,
                address=address,
                request=wire,
            ),
        ) from exc

    try:
        decoded = decode_frame(raw)
    except FrameError as exc:
        raise WatlowFrameError(
            f"Std Bus frame decode failed: {exc}",
            context=ErrorContext(
                command_name=command_name or None,
                protocol=ProtocolKind.STDBUS,
                port=self._transport.label,
                address=address,
                request=wire,
                response=raw,
            ),
        ) from exc

    try:
        payload = decode_payload(decoded.payload)
    except ValueError as exc:
        raise WatlowFrameError(
            f"Std Bus payload decode failed: {exc}",
            context=ErrorContext(
                command_name=command_name or None,
                protocol=ProtocolKind.STDBUS,
                port=self._transport.label,
                address=address,
                request=wire,
                response=raw,
            ),
        ) from exc

    _log.debug(
        "stdbus exec ok cmd=%s addr=%d req_len=%d rsp_len=%d",
        command_name or "<anon>",
        address,
        len(wire),
        len(raw),
    )
    return StdBusReply(frame=decoded, payload=payload, raw_frame=raw)

StdBusReply dataclass

StdBusReply(frame, payload, raw_frame)

Result of a single :meth:StdBusProtocolClient.execute call.

Attributes:

Name Type Description
frame StdBusFrame

Decoded outer BACnet MS/TP frame (frame_type, dst, src, payload).

payload StdBusReplyPayload

Decoded inner Watlow payload — read response, write response, or error.

raw_frame bytes

The complete on-wire bytes (preamble through data CRC). Stored for diagnostics; used by the session's WARNING log path on errors.

WriteRequest dataclass

WriteRequest(cls, member, instance, value, type_tag)

A decoded inner write request.

WriteResponse dataclass

WriteResponse(cls, member, instance, value, type_tag)

A decoded inner write response.

addr_to_mac

addr_to_mac(addr)

Map a Standard Bus address (1..16) to its MS/TP MAC.

Raises:

Type Description
ValueError

addr is outside 1..16.

Source code in src/watlowlib/protocol/stdbus/tables.py
def addr_to_mac(addr: int) -> int:
    """Map a Standard Bus address (``1..16``) to its MS/TP MAC.

    Raises:
        ValueError: ``addr`` is outside ``1..16``.
    """
    if not 1 <= addr <= 16:
        raise ValueError(f"Standard Bus address out of range 1..16: {addr}")
    return ADDR_OFFSET + addr

data_crc16

data_crc16(data)

Compute the BACnet MS/TP data CRC-16.

Returns the host-order int; on the wire, send little-endian via :func:data_crc16_le_bytes.

Source code in src/watlowlib/protocol/stdbus/_crc.py
def data_crc16(data: bytes) -> int:
    """Compute the BACnet MS/TP data CRC-16.

    Returns the host-order int; on the wire, send little-endian via
    :func:`data_crc16_le_bytes`.
    """
    crc = 0xFFFF
    for b in data:
        low = (crc & 0xFF) ^ b
        crc = (
            (crc >> 8)
            ^ (low << 8)
            ^ (low << 3)
            ^ (low << 12)
            ^ (low >> 4)
            ^ (low & 0x0F)
            ^ ((low & 0x0F) << 7)
        ) & 0xFFFF
    return (~crc) & 0xFFFF

data_crc16_le_bytes

data_crc16_le_bytes(data)

Encode the data CRC as the two on-wire bytes (little-endian).

Source code in src/watlowlib/protocol/stdbus/_crc.py
def data_crc16_le_bytes(data: bytes) -> bytes:
    """Encode the data CRC as the two on-wire bytes (little-endian)."""
    return data_crc16(data).to_bytes(2, "little")

decode_frame

decode_frame(buf)

Parse wire bytes into a :class:Frame, verifying both CRCs.

The caller is expected to have already framed on the 55 FF preamble — :class:watlowlib.protocol.stdbus.client.StdBusProtocolClient handles that during read.

Raises:

Type Description
FrameError

short buffer, wrong preamble, header CRC mismatch, truncated body, or data CRC mismatch.

Source code in src/watlowlib/protocol/stdbus/framing.py
def decode_frame(buf: bytes) -> Frame:
    """Parse wire bytes into a :class:`Frame`, verifying both CRCs.

    The caller is expected to have already framed on the ``55 FF``
    preamble — :class:`watlowlib.protocol.stdbus.client.StdBusProtocolClient`
    handles that during read.

    Raises:
        FrameError: short buffer, wrong preamble, header CRC mismatch,
            truncated body, or data CRC mismatch.
    """
    if len(buf) < 8:
        raise FrameError(f"frame too short: {len(buf)} bytes")
    if buf[:2] != PREAMBLE:
        raise FrameError(f"bad preamble: {buf[:2].hex()}")
    frame_type = buf[2]
    dst = buf[3]
    src = buf[4]
    plen = (buf[5] << 8) | buf[6]
    hcrc = buf[7]
    expected_hcrc = header_crc8(buf[2:7])
    if hcrc != expected_hcrc:
        raise FrameError(f"header CRC mismatch: got 0x{hcrc:02X}, want 0x{expected_hcrc:02X}")
    if plen == 0:
        return Frame(frame_type, dst, src, b"")
    expected_total = 8 + plen + 2
    if len(buf) < expected_total:
        raise FrameError(f"frame truncated: have {len(buf)}, need {expected_total}")
    payload = buf[8 : 8 + plen]
    dcrc = buf[8 + plen : 8 + plen + 2]
    expected_dcrc = data_crc16_le_bytes(payload)
    if dcrc != expected_dcrc:
        raise FrameError(f"data CRC mismatch: got {dcrc.hex()}, want {expected_dcrc.hex()}")
    return Frame(frame_type, dst, src, payload)

decode_payload

decode_payload(payload)

Parse an inner Watlow payload into a structured response/request/error.

Source code in src/watlowlib/protocol/stdbus/payload.py
def decode_payload(
    payload: bytes,
) -> ReadResponse | WriteResponse | ReadRequest | WriteRequest | ErrorResponse:
    """Parse an inner Watlow payload into a structured response/request/error."""
    if len(payload) < 2:
        raise ValueError(f"payload too short: {payload.hex()}")
    direction, function = payload[0], payload[1]
    if direction not in (DIR_REQUEST, DIR_RESPONSE):
        raise ValueError(f"unknown direction byte 0x{direction:02X}")

    # Error response: 0x02 followed by an error code with high bit set.
    if direction == DIR_RESPONSE and function & 0x80:
        if len(payload) != 2:
            raise ValueError(f"error response with unexpected trailing bytes: {payload.hex()}")
        return ErrorResponse(function)

    if function == FN_READ:
        if direction == DIR_REQUEST:
            if len(payload) < 6 or payload[2] != 0x01:
                raise ValueError(f"unexpected read-request shape: {payload.hex()}")
            return ReadRequest(payload[3], payload[4], payload[5])
        # read response: 02 03 01 CC MM II <typed value>
        if len(payload) < 7 or payload[2] != 0x01:
            raise ValueError(f"unexpected read-response shape: {payload.hex()}")
        cls, mem, inst = payload[3], payload[4], payload[5]
        value, tag, _ = decode_value(payload[6:])
        return ReadResponse(cls, mem, inst, value, tag)

    if function == FN_WRITE:
        # write request/response: dd 04 CC MM II <typed value>
        if len(payload) < 6:
            raise ValueError(f"unexpected write shape: {payload.hex()}")
        cls, mem, inst = payload[2], payload[3], payload[4]
        value, tag, _ = decode_value(payload[5:])
        return (
            WriteRequest(cls, mem, inst, value, tag)
            if direction == DIR_REQUEST
            else WriteResponse(cls, mem, inst, value, tag)
        )

    raise ValueError(f"unknown function byte 0x{function:02X}")

decode_value

decode_value(buf)

Decode a single value starting at buf[0].

Returns (value, type_tag, consumed_bytes).

Source code in src/watlowlib/protocol/stdbus/tlv.py
def decode_value(buf: bytes) -> tuple[float | int | str, int, int]:
    """Decode a single value starting at ``buf[0]``.

    Returns ``(value, type_tag, consumed_bytes)``.
    """
    if not buf:
        raise ValueError("empty value buffer")
    tag = buf[0]
    if tag == DataType.U8:
        if len(buf) < 2:
            raise ValueError("truncated u8 value")
        return buf[1], tag, 2
    if tag == DataType.U16:
        if len(buf) < 3:
            raise ValueError("truncated u16 value")
        (v_u16,) = struct.unpack(">H", buf[1:3])
        return v_u16, tag, 3
    if tag == DataType.U32:
        if len(buf) < 5:
            raise ValueError("truncated u32 value")
        (v_u32,) = struct.unpack(">I", buf[1:5])
        return v_u32, tag, 5
    if tag == DataType.S32:
        if len(buf) < 5:
            raise ValueError("truncated s32 value")
        (v_s32,) = struct.unpack(">i", buf[1:5])
        return v_s32, tag, 5
    if tag == DataType.FLOAT:
        if len(buf) < 5:
            raise ValueError("truncated float value")
        (v_f,) = struct.unpack(">f", buf[1:5])
        return v_f, tag, 5
    if tag == DataType.STRING:
        if len(buf) < 2:
            raise ValueError("truncated string header")
        n = buf[1]
        if len(buf) < 2 + n:
            raise ValueError("truncated string body")
        raw = bytes(buf[2 : 2 + n])
        # Trim trailing NULs for convenience but keep the raw length
        # intact in caller paths.
        s = raw.rstrip(b"\x00").decode("ascii", errors="replace")
        return s, tag, 2 + n
    if tag == DataType.PACKED:
        if len(buf) < 2:
            raise ValueError("truncated packed-int header")
        n = buf[1]  # count of 16-bit words
        if len(buf) < 2 + 2 * n:
            raise ValueError("truncated packed-int body")
        # PACKED with count=1 is the canonical enum / 16-bit case
        # (the most common shape on the wire).
        if n == 1:
            (v_packed,) = struct.unpack(">H", buf[2:4])
            return v_packed, tag, 4
        # count>=2: pack words MSW-first into a single integer
        # (provisional; only confirmed once we observe a real Wide
        # Enum that exceeds 16 bits).
        v_wide = 0
        for i in range(n):
            (w,) = struct.unpack(">H", buf[2 + 2 * i : 4 + 2 * i])
            v_wide = (v_wide << 16) | w
        return v_wide, tag, 2 + 2 * n
    raise ValueError(f"unknown type tag: 0x{tag:02X} (rest: {buf[:8].hex()})")

encode_frame

encode_frame(frame)

Serialise frame to wire bytes (preamble through data CRC).

Raises:

Type Description
FrameError

frame.payload exceeds the 16-bit length field.

Source code in src/watlowlib/protocol/stdbus/framing.py
def encode_frame(frame: Frame) -> bytes:
    """Serialise ``frame`` to wire bytes (preamble through data CRC).

    Raises:
        FrameError: ``frame.payload`` exceeds the 16-bit length field.
    """
    if len(frame.payload) > 0xFFFF:
        raise FrameError(f"payload too long: {len(frame.payload)}")
    header = bytes(
        [
            frame.frame_type & 0xFF,
            frame.dst & 0xFF,
            frame.src & 0xFF,
            (len(frame.payload) >> 8) & 0xFF,
            len(frame.payload) & 0xFF,
        ]
    )
    hcrc = bytes([header_crc8(header)])
    if frame.payload:
        return PREAMBLE + header + hcrc + frame.payload + data_crc16_le_bytes(frame.payload)
    return PREAMBLE + header + hcrc

encode_read_request

encode_read_request(param_id, instance=1)

Build the inner payload for reading a single parameter.

Source code in src/watlowlib/protocol/stdbus/payload.py
def encode_read_request(param_id: int, instance: int = 1) -> bytes:
    """Build the inner payload for reading a single parameter."""
    cls, member = split_param(param_id)
    return bytes([DIR_REQUEST, FN_READ, 0x01, cls, member, instance])

encode_value

encode_value(type_tag, value)

Encode value under type_tag to wire bytes (tag included).

Source code in src/watlowlib/protocol/stdbus/tlv.py
def encode_value(type_tag: int, value: float | int | str | bytes) -> bytes:
    """Encode ``value`` under ``type_tag`` to wire bytes (tag included)."""
    if type_tag == DataType.STRING:
        # STRING is the only branch that accepts a str/bytes payload;
        # the numeric branches all coerce via int()/float() and would
        # silently produce zero-filled buffers if a str slipped through.
        if not isinstance(value, str | bytes):
            raise TypeError(f"STRING type tag requires str or bytes, got {type(value).__name__}")
        b = value.encode("ascii") if isinstance(value, str) else bytes(value)
        if len(b) > 0xFF:
            raise ValueError(f"string too long: {len(b)}")
        return bytes([DataType.STRING, len(b)]) + b
    # Numeric branches: reject str/bytes early with a clear message.
    if isinstance(value, str | bytes):
        raise TypeError(
            f"type tag 0x{type_tag:02X} requires a numeric value, got {type(value).__name__}"
        )
    if type_tag == DataType.FLOAT:
        return bytes([DataType.FLOAT]) + struct.pack(">f", float(value))
    if type_tag == DataType.U8:
        if not 0 <= int(value) <= 0xFF:
            raise ValueError(f"u8 value out of range: {value}")
        return bytes([DataType.U8, int(value)])
    if type_tag == DataType.U16:
        if not 0 <= int(value) <= 0xFFFF:
            raise ValueError(f"u16 value out of range: {value}")
        return bytes([DataType.U16]) + struct.pack(">H", int(value))
    if type_tag == DataType.U32:
        if not 0 <= int(value) <= 0xFFFFFFFF:
            raise ValueError(f"u32 value out of range: {value}")
        return bytes([DataType.U32]) + struct.pack(">I", int(value))
    if type_tag == DataType.S32:
        if not -(2**31) <= int(value) <= 2**31 - 1:
            raise ValueError(f"s32 value out of range: {value}")
        return bytes([DataType.S32]) + struct.pack(">i", int(value))
    if type_tag == DataType.PACKED:
        if not 0 <= int(value) <= 0xFFFF:
            raise ValueError(f"packed-int(1) value out of range: {value}")
        return bytes([DataType.PACKED, 0x01]) + struct.pack(">H", int(value))
    raise ValueError(f"unsupported type tag for encode: 0x{type_tag:02X}")

encode_write_request

encode_write_request(
    param_id, value, *, instance=1, type_tag=DataType.FLOAT
)

Build the inner payload for writing a single parameter.

Source code in src/watlowlib/protocol/stdbus/payload.py
def encode_write_request(
    param_id: int,
    value: float | int | str | bytes,
    *,
    instance: int = 1,
    type_tag: int = DataType.FLOAT,
) -> bytes:
    """Build the inner payload for writing a single parameter."""
    cls, member = split_param(param_id)
    head = bytes([DIR_REQUEST, FN_WRITE, cls, member, instance])
    return head + encode_value(type_tag, value)

header_crc8

header_crc8(data)

Compute the BACnet MS/TP header CRC-8.

Returns the on-wire byte (one's complement of the running CRC).

Source code in src/watlowlib/protocol/stdbus/_crc.py
def header_crc8(data: bytes) -> int:
    """Compute the BACnet MS/TP header CRC-8.

    Returns the on-wire byte (one's complement of the running CRC).
    """
    crc = 0xFF
    for b in data:
        x = crc ^ b
        x = x ^ (x << 1) ^ (x << 2) ^ (x << 3) ^ (x << 4) ^ (x << 5) ^ (x << 6) ^ (x << 7)
        crc = ((x & 0xFE) ^ ((x >> 8) & 0x01)) & 0xFF
    return (~crc) & 0xFF

join_param

join_param(cls, member)

Recompose a Parameter ID from its class and member parts.

Source code in src/watlowlib/protocol/stdbus/payload.py
def join_param(cls: int, member: int) -> int:
    """Recompose a Parameter ID from its ``class`` and ``member`` parts."""
    return cls * 1000 + member

mac_to_addr

mac_to_addr(mac)

Map an MS/TP MAC (0x10..0x1F) back to its Standard Bus address.

Raises:

Type Description
ValueError

mac is outside the controller range.

Source code in src/watlowlib/protocol/stdbus/tables.py
def mac_to_addr(mac: int) -> int:
    """Map an MS/TP MAC (``0x10..0x1F``) back to its Standard Bus address.

    Raises:
        ValueError: ``mac`` is outside the controller range.
    """
    if not 0x10 <= mac <= 0x1F:
        raise ValueError(f"MAC out of expected controller range: 0x{mac:02X}")
    return mac - ADDR_OFFSET

raise_for_error_code

raise_for_error_code(code, *, context=None)

Raise the typed :class:watlowlib.errors.WatlowError for code.

Maps Std Bus error bytes to the typed subclasses the session uses for :class:Availability updates. Unknown error codes raise :class:watlowlib.errors.WatlowProtocolError.

Source code in src/watlowlib/protocol/stdbus/payload.py
def raise_for_error_code(
    code: int,
    *,
    context: ErrorContext | None = None,
) -> None:
    """Raise the typed :class:`watlowlib.errors.WatlowError` for ``code``.

    Maps Std Bus error bytes to the typed subclasses the session uses
    for :class:`Availability` updates. Unknown error codes raise
    :class:`watlowlib.errors.WatlowProtocolError`.
    """
    if code == ErrorCode.NO_SUCH_OBJECT:
        raise WatlowNoSuchObjectError(
            f"Standard Bus error 0x{code:02X} (NO_SUCH_OBJECT)",
            context=context,
        )
    if code == ErrorCode.NO_SUCH_ATTRIBUTE:
        raise WatlowNoSuchAttributeError(
            f"Standard Bus error 0x{code:02X} (NO_SUCH_ATTRIBUTE)",
            context=context,
        )
    if code == ErrorCode.NO_SUCH_INSTANCE:
        raise WatlowNoSuchInstanceError(
            f"Standard Bus error 0x{code:02X} (NO_SUCH_INSTANCE)",
            context=context,
        )
    raise WatlowProtocolError(
        f"Standard Bus error 0x{code:02X} (unknown)",
        context=context,
    )

split_param

split_param(param_id)

Split a Watlow Parameter ID into (class, member).

The published "Parameter ID" in user manuals is Class * 1000 + Member4001 decodes to (4, 1), 8003 decodes to (8, 3).

Source code in src/watlowlib/protocol/stdbus/payload.py
def split_param(param_id: int) -> tuple[int, int]:
    """Split a Watlow Parameter ID into ``(class, member)``.

    The published "Parameter ID" in user manuals is
    ``Class * 1000 + Member`` — ``4001`` decodes to ``(4, 1)``,
    ``8003`` decodes to ``(8, 3)``.
    """
    return divmod(param_id, 1000)

Modbus RTU

watlowlib.protocol.modbus

Modbus RTU adapter.

Thin wrapper over the in-house :mod:anymodbus package. The module owns:

  • :class:ModbusOp — typed instruction emitted by Modbus variants.
  • :class:ModbusProtocolClient — :class:ProtocolClient for the Modbus wire.
  • :class:ModbusBusTransport — :class:Transport-shaped adapter that hands the client a live :class:anymodbus.Bus on demand.

The codec for the Modbus PDU itself lives in :mod:anymodbus; this package never touches wire bytes. Modbus variants emit a typed :class:ModbusOp, not bytes — see docs/design.md §5.

ModbusBusTransport

ModbusBusTransport(settings)

Holds an :class:anymodbus.Bus behind the :class:Transport API.

Lifecycle:

  • open() calls :func:anymodbus.open_modbus_rtu and stores the resulting :class:Bus. Re-open raises :class:WatlowConnectionError.
  • close() awaits :meth:Bus.aclose. Safe on an unopened or already-closed instance.
  • write / read_exact / read_available raise :class:NotImplementedError. The :class:ModbusProtocolClient never calls them — it uses :attr:bus instead.
Source code in src/watlowlib/protocol/modbus/transport.py
def __init__(self, settings: SerialSettings) -> None:
    # Validate the parity up front. anymodbus only accepts a small
    # literal set; better to fail at construction than deep inside
    # ``open_modbus_rtu``.
    parity = str(settings.parity.value).lower()
    if parity not in _ALLOWED_PARITY:
        msg = (
            f"unsupported parity for Modbus RTU: {parity!r}; expected one of {_ALLOWED_PARITY}"
        )
        raise WatlowConfigurationError(msg, context=ErrorContext(port=settings.port))
    self._settings = settings
    self._parity = parity
    self._bus: Bus | None = None

bus property

bus

Return the live :class:anymodbus.Bus.

Raises :class:WatlowConnectionError if :meth:open has not completed (or the transport has been closed).

ModbusEncoding dataclass

ModbusEncoding(
    register_count,
    word_order,
    byte_order,
    read_fn=ModbusFn.READ_HOLDING,
)

How a :class:DataType lays out across Modbus registers.

Attributes:

Name Type Description
register_count int

Number of 16-bit registers the value occupies. FLOAT / S32 / U32 → 2 regs; U16 / U8 / PACKED → 1. STRING is length-driven and reads its register count from the spec.

word_order WordOrder

Inter-register order for multi-register values. HIGH_LOW matches PM "Data Map 1" defaults.

byte_order ByteOrder

Within-register byte order. Big-endian on every Watlow family observed to date.

read_fn ModbusFn

Default Modbus function for a read of this type. Always :attr:ModbusFn.READ_HOLDING for the PM — input-register-only parameters land here when a registry row needs the override.

ModbusFn

Bases: StrEnum

Modbus function selector.

The four operations exercised by the registry-driven workhorse (:data:READ_PARAMETER / :data:WRITE_PARAMETER). Coil and discrete-input ops are intentionally absent; they would be added if a registry parameter ever needed them.

ModbusOp dataclass

ModbusOp(fn, address, count=1, values=None)

One Modbus transaction in protocol-neutral form.

Attributes:

Name Type Description
fn ModbusFn

The Modbus function to invoke.

address int

Zero-based register address. The Watlow registry stores this as relative_addr; the legacy 4xxxxx / 3xxxxx notation lives only in :attr:ParameterSpec.absolute_addr.

count int

Number of 16-bit registers to read. Ignored on single-register writes; required on :attr:ModbusFn.WRITE_REGISTERS so the client can pre-validate len(values) == count.

values tuple[int, ...] | None

Register words to write (one int per 16-bit register), or None for read ops. Single-register writes use values=(word,).

ModbusProtocolClient

ModbusProtocolClient(slave_provider, *, port='')

:class:ProtocolClient for Modbus RTU.

The client is address-agnostic: execute takes the slave address per-call so one client can serve every device on a multi-drop bus. The slave_provider receives that address and returns the live :class:Slave.

Parameters:

Name Type Description Default
slave_provider SlaveProvider

Callable mapping address → live :class:Slave. In production the provider closes over the :class:ModbusBusTransport and returns transport.bus.slave(address); in tests it returns a stub.

required
port str

Transport label, threaded into log events / error contexts.

''
Source code in src/watlowlib/protocol/modbus/client.py
def __init__(
    self,
    slave_provider: SlaveProvider,
    *,
    port: str = "",
) -> None:
    self._slave_provider = slave_provider
    self._port = port
    self._lock = anyio.Lock()
    self._disposed = False

disposed property

disposed

Whether :meth:dispose has been called.

kind property

kind

Wire protocol kind served by this client.

lock property

lock

Per-client lock acquired by :meth:Session.execute.

dispose

dispose()

Mark this client unusable for future execute calls.

Source code in src/watlowlib/protocol/modbus/client.py
def dispose(self) -> None:
    """Mark this client unusable for future ``execute`` calls."""
    self._disposed = True

execute async

execute(request, *, address, timeout=None, command_name='')

Run request against address and return the raw register tuple.

Reads return the read words; writes return () so callers downstream can treat reads and writes uniformly.

Parameters:

Name Type Description Default
request ModbusOp

The typed Modbus operation produced by a :class:ModbusVariant.

required
address int

Modbus slave address (1..247). Validated eagerly before any I/O.

required
timeout float | None

Per-call override of :attr:watlowlib.config.DEFAULTS.io_timeout_s. Bound via :func:anyio.fail_after around the dispatch so hung devices cannot stall the session lock indefinitely.

None
command_name str

Threaded into log events for traceability.

''

Raises:

Type Description
WatlowConfigurationError

address is outside 1..247.

WatlowConnectionError

client is disposed or the underlying :class:anymodbus.Bus has been closed.

WatlowModbusError

a Modbus-layer exception (mapped via :func:remap_modbus_exception).

Source code in src/watlowlib/protocol/modbus/client.py
async def execute(
    self,
    request: ModbusOp,
    *,
    address: int,
    timeout: float | None = None,
    command_name: str = "",
) -> tuple[int, ...]:
    """Run ``request`` against ``address`` and return the raw register tuple.

    Reads return the read words; writes return ``()`` so callers
    downstream can treat reads and writes uniformly.

    Args:
        request: The typed Modbus operation produced by a
            :class:`ModbusVariant`.
        address: Modbus slave address (``1..247``). Validated
            eagerly before any I/O.
        timeout: Per-call override of
            :attr:`watlowlib.config.DEFAULTS.io_timeout_s`. Bound
            via :func:`anyio.fail_after` around the dispatch so
            hung devices cannot stall the session lock
            indefinitely.
        command_name: Threaded into log events for traceability.

    Raises:
        WatlowConfigurationError: ``address`` is outside ``1..247``.
        WatlowConnectionError: client is disposed or the underlying
            :class:`anymodbus.Bus` has been closed.
        WatlowModbusError: a Modbus-layer exception (mapped via
            :func:`remap_modbus_exception`).
    """
    if address < 1 or address > 247:
        from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415

        msg = f"Modbus address {address} out of range (1..247)"
        raise WatlowConfigurationError(
            msg,
            context=self._error_context(command_name, request, address=address),
        )
    if self._disposed:
        raise WatlowConnectionError(
            "ModbusProtocolClient is disposed",
            context=self._error_context(command_name, request, address=address),
        )

    bound = timeout if timeout is not None else DEFAULTS.io_timeout_s

    try:
        slave = self._slave_provider(address)
    except WatlowError:
        # The provider itself surfaced a typed error (e.g. transport
        # not open). Let it propagate untouched.
        raise
    except Exception as exc:
        raise remap_modbus_exception(
            exc,
            context=self._error_context(command_name, request, address=address),
        ) from exc

    try:
        with anyio.fail_after(bound):
            result = await self._dispatch(slave, request)
    except TimeoutError as exc:
        from watlowlib.errors import WatlowTimeoutError  # noqa: PLC0415

        raise WatlowTimeoutError(
            f"Modbus {request.fn.value} on addr={address} timed out after {bound}s",
            context=self._error_context(command_name, request, address=address),
        ) from exc
    except WatlowError:
        raise
    except Exception as exc:
        raise remap_modbus_exception(
            exc,
            context=self._error_context(command_name, request, address=address),
        ) from exc

    _log.debug(
        "modbus exec ok cmd=%s addr=%d fn=%s reg=%d count=%d",
        command_name or "<anon>",
        address,
        request.fn.value,
        request.address,
        request.count,
    )
    return result

encoding_for

encoding_for(
    data_type,
    *,
    word_order_override=None,
    register_count_override=None,
)

Return the Modbus encoding for data_type.

Parameters:

Name Type Description Default
data_type DataType

The wire data-type tag from the registry.

required
word_order_override str | None

Per-row override (from :attr:ParameterSpec.word_order). None → use the table default.

None
register_count_override int | None

Per-row override (mainly for :attr:DataType.STRING, which is length-driven).

None

Raises:

Type Description
WatlowProtocolUnsupportedError

data_type has no Modbus mapping yet (e.g. a future tag added to the codec but unmapped here).

Source code in src/watlowlib/protocol/modbus/tables.py
def encoding_for(
    data_type: DataType,
    *,
    word_order_override: str | None = None,
    register_count_override: int | None = None,
) -> ModbusEncoding:
    """Return the Modbus encoding for ``data_type``.

    Args:
        data_type: The wire data-type tag from the registry.
        word_order_override: Per-row override (from
            :attr:`ParameterSpec.word_order`). ``None`` → use the
            table default.
        register_count_override: Per-row override (mainly for
            :attr:`DataType.STRING`, which is length-driven).

    Raises:
        WatlowProtocolUnsupportedError: ``data_type`` has no Modbus
            mapping yet (e.g. a future tag added to the codec but
            unmapped here).
    """
    base = _DEFAULTS.get(data_type)
    if base is None:
        msg = f"data type {data_type.name} has no Modbus encoding"
        raise WatlowProtocolUnsupportedError(msg)
    word_order = _coerce_word_order(word_order_override) or base.word_order
    register_count = register_count_override or base.register_count
    if word_order is base.word_order and register_count == base.register_count:
        return base
    return ModbusEncoding(
        register_count=register_count,
        word_order=word_order,
        byte_order=base.byte_order,
        read_fn=base.read_fn,
    )

remap_modbus_exception

remap_modbus_exception(exc, *, context=None)

Wrap exc in the typed :class:WatlowError for its kind.

The caller is expected to raise the returned exception with from exc so __cause__ preserves the original.

Returns exc re-typed as a :class:WatlowError subclass; never returns None. Unmapped :mod:anymodbus errors fall back to :class:WatlowModbusError. Non-:mod:anymodbus errors fall through to a bare :class:WatlowProtocolError rather than being swallowed — calling sites should normally only feed this function instances of :class:ModbusError.

Source code in src/watlowlib/protocol/modbus/errors.py
def remap_modbus_exception(
    exc: Exception,
    *,
    context: ErrorContext | None = None,
) -> WatlowError:
    """Wrap ``exc`` in the typed :class:`WatlowError` for its kind.

    The caller is expected to ``raise`` the returned exception with
    ``from exc`` so ``__cause__`` preserves the original.

    Returns ``exc`` re-typed as a :class:`WatlowError` subclass; never
    returns ``None``. Unmapped :mod:`anymodbus` errors fall back to
    :class:`WatlowModbusError`. Non-:mod:`anymodbus` errors fall
    through to a bare :class:`WatlowProtocolError` rather than being
    swallowed — calling sites should normally only feed this function
    instances of :class:`ModbusError`.
    """
    msg = str(exc)
    # Order matters: subclasses before parents. Most exception-response
    # types share a single base (``ModbusExceptionResponse``), so the
    # narrowest class always lands first.
    if isinstance(exc, IllegalFunctionError):
        return WatlowModbusIllegalFunctionError(msg, context=context)
    if isinstance(exc, ModbusUnsupportedFunctionError):
        # Library-side "we don't implement this function code" — same
        # availability outcome as the wire-side IllegalFunction.
        return WatlowModbusIllegalFunctionError(msg, context=context)
    if isinstance(exc, IllegalDataAddressError):
        return WatlowModbusIllegalDataAddressError(msg, context=context)
    if isinstance(exc, IllegalDataValueError):
        return WatlowModbusIllegalDataValueError(msg, context=context)
    if isinstance(
        exc,
        SlaveDeviceFailureError
        | SlaveDeviceBusyError
        | AcknowledgeError
        | GatewayPathUnavailableError
        | GatewayTargetFailedToRespondError
        | MemoryParityError,
    ):
        return WatlowModbusSlaveFailureError(msg, context=context)
    if isinstance(exc, FrameTimeoutError):
        return WatlowModbusTimeoutError(msg, context=context)
    if isinstance(exc, BusClosedError | ConnectionLostError):
        return WatlowConnectionError(msg, context=context)
    if isinstance(exc, CRCError | FrameError):
        return WatlowFrameError(msg, context=context)
    if isinstance(exc, ConfigurationError):
        return WatlowConfigurationError(msg, context=context)
    if isinstance(exc, UnexpectedResponseError):
        return WatlowProtocolError(msg, context=context)
    if isinstance(exc, ModbusUnknownExceptionError | ModbusError):
        return WatlowModbusError(msg, context=context)
    return WatlowProtocolError(msg, context=context)