Skip to content

watlowlib.transport

The Transport Protocol, SerialTransport for hardware, and FakeTransport for tests. See Design §4.

Public surface

watlowlib.transport

Transport layer — moves bytes; knows nothing about Watlow.

The :class:Transport Protocol is the structural interface every backend implements. :class:SerialSettings is the port-configuration dataclass consumed by :class:SerialTransport. Tests use :class:FakeTransport instead.

See docs/design.md §3 / §4.

FakeSlave

FakeSlave(script=None)

Scripted :class:anymodbus.Slave stand-in for tests.

Mirrors the surface :class:watlowlib.protocol.modbus.client.ModbusProtocolClient actually calls (read_holding_registers, read_input_registers, write_register, write_registers) and records every call. Tests assert on :attr:reads and :attr:writes to verify the :class:ModbusOp lowered correctly.

Parameters:

Name Type Description Default
script Mapping[tuple[str, int], ScriptedSlaveEntry] | None

(method, address) → reply map. method is one of the four call names above. The reply is a tuple of register words, an anymodbus exception class (raised at call time, with the right constructor args), or None (treat the call as a no-op success). Missing entries surface a :class:KeyError so an unscripted call fails the test rather than returning empty results.

None
Source code in src/watlowlib/transport/fake.py
def __init__(self, script: Mapping[tuple[str, int], ScriptedSlaveEntry] | None = None) -> None:
    self._script: dict[tuple[str, int], ScriptedSlaveEntry] = dict(script or {})
    self.reads: list[tuple[str, int, int]] = []
    self.writes: list[tuple[str, int, tuple[int, ...]]] = []

add_script

add_script(method, address, reply)

Register or overwrite a scripted reply for (method, address).

Source code in src/watlowlib/transport/fake.py
def add_script(
    self,
    method: str,
    address: int,
    reply: ScriptedSlaveEntry,
) -> None:
    """Register or overwrite a scripted reply for ``(method, address)``."""
    self._script[(method, address)] = reply

FakeTransport

FakeTransport(
    script=None,
    *,
    queue=None,
    label="fake://test",
    latency_s=0.0,
)

Scripted :class:Transport for tests.

Parameters:

Name Type Description Default
script Mapping[bytes, ScriptedReply] | None

Mapping of write_bytes → reply. Every scripted write queues the corresponding reply into the read buffer. Unknown writes are recorded but produce no reply; the next read times out.

None
label str

Identifier used in errors.

'fake://test'
latency_s float

Per-operation artificial delay, useful for simulating a slow device.

0.0
Source code in src/watlowlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    queue: Iterable[tuple[bytes, ScriptedReply]] | None = None,
    label: str = "fake://test",
    latency_s: float = 0.0,
) -> None:
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._queue: deque[tuple[bytes, ScriptedReply]] = deque(queue or [])
    self._writes: list[bytes] = []
    self._unmatched: list[bytes] = []
    self._read_buffer = bytearray()
    self._is_open = False
    self._label = label
    self._latency_s = latency_s
    self._force_read_timeout = False
    self._force_write_timeout = False

remaining_queue property

remaining_queue

Queue entries that have not been consumed yet.

unmatched_writes property

unmatched_writes

Writes that didn't match any scripted reply, in order.

A test can assert transport.unmatched_writes == () to catch accidentally-unscripted traffic — the corresponding read would have timed out, but a precise assertion fails faster and points at the right call.

writes property

writes

Every write payload recorded since construction, in order.

add_script

add_script(command, reply)

Register or overwrite a scripted reply for command.

Source code in src/watlowlib/transport/fake.py
def add_script(self, command: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite a scripted reply for ``command``."""
    self._script[bytes(command)] = reply

extend_queue

extend_queue(rounds)

Append more ordered (write, reply) pairs to the FIFO queue.

Source code in src/watlowlib/transport/fake.py
def extend_queue(self, rounds: Iterable[tuple[bytes, ScriptedReply]]) -> None:
    """Append more ordered ``(write, reply)`` pairs to the FIFO queue."""
    self._queue.extend(rounds)

feed

feed(data)

Push unsolicited bytes into the read buffer.

Useful for simulating a device that left chatter on the line which the protocol client has to drain on recovery.

Source code in src/watlowlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the read buffer.

    Useful for simulating a device that left chatter on the line
    which the protocol client has to drain on recovery.
    """
    self._read_buffer.extend(data)

force_read_timeout

force_read_timeout(enabled=True)

Force the next read to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_read_timeout(self, enabled: bool = True) -> None:
    """Force the next read to raise :class:`WatlowTimeoutError`."""
    self._force_read_timeout = enabled

force_write_timeout

force_write_timeout(enabled=True)

Force the next :meth:write to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_write_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`write` to raise :class:`WatlowTimeoutError`."""
    self._force_write_timeout = enabled

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=38400,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Serial-port configuration for :class:SerialTransport.

Mirrors :class:anyserial.SerialConfig plus a port path. Default framing is 38400 8-N-1, the EZ-ZONE PM Standard Bus factory setting. exclusive defaults True because Standard Bus is poll/response and won't tolerate a second writer.

The __post_init__ accepts int / float / str shorthand at runtime for the framing fields (bytesize=8, parity="none", stopbits=1) and normalises to the enum. The static field types are the enums themselves so mypy --strict users must pass :class:anyserial.ByteSize / :class:anyserial.Parity / :class:anyserial.StopBits directly; the runtime shorthand is primarily for CLI argument parsing and interactive scripts.

factory_for classmethod

factory_for(protocol, *, port)

Return the EZ-ZONE PM factory framing for protocol.

  • STDBUS → 38400 8-N-1 (the Standard Bus factory default).
  • MODBUS_RTU → 9600 8-E-1 (the Modbus RTU factory default per the EZ-ZONE PM manual).

AUTO raises :class:WatlowConfigurationError — there is no single factory framing for AUTO, the detector probes both. Callers crossing protocol boundaries (the maintenance helpers that switch protocol, watlow-discover --protocol both) should rebuild settings per protocol via this method instead of inheriting whatever framing the previous call used.

Source code in src/watlowlib/transport/base.py
@classmethod
def factory_for(cls, protocol: ProtocolKind, *, port: str) -> SerialSettings:
    """Return the EZ-ZONE PM factory framing for ``protocol``.

    - ``STDBUS`` → 38400 8-N-1 (the Standard Bus factory default).
    - ``MODBUS_RTU`` → 9600 8-E-1 (the Modbus RTU factory default
      per the EZ-ZONE PM manual).

    ``AUTO`` raises :class:`WatlowConfigurationError` — there is no
    single factory framing for AUTO, the detector probes both.
    Callers crossing protocol boundaries (the maintenance helpers
    that switch protocol, ``watlow-discover --protocol both``)
    should rebuild settings per protocol via this method instead
    of inheriting whatever framing the previous call used.
    """
    # Lazy import to keep ``transport.base`` a leaf module — the
    # ProtocolKind enum lives under protocol/, which depends on
    # transport indirectly.
    from watlowlib.protocol.base import ProtocolKind  # noqa: PLC0415

    if protocol is ProtocolKind.STDBUS:
        return cls(port=port, baudrate=38400, parity=Parity.NONE)
    if protocol is ProtocolKind.MODBUS_RTU:
        return cls(port=port, baudrate=9600, parity=Parity.EVEN)
    raise WatlowConfigurationError(
        f"SerialSettings.factory_for: no single factory framing for {protocol!r}; "
        "AUTO probes both Std Bus and Modbus, build a concrete protocol's "
        "settings instead.",
    )

SerialTransport

SerialTransport(settings)

:class:Transport backed by a real serial port via anyserial.

Tests that don't need hardware should use :class:watlowlib.transport.fake.FakeTransport; the two conform to the same structural :class:Transport Protocol.

Source code in src/watlowlib/transport/serial.py
def __init__(self, settings: SerialSettings) -> None:
    self._settings = settings
    self._port: SerialPort | None = None
    # Bytes read past ``n`` in :meth:`read_exact` (e.g. when a
    # framing error makes the caller scan for a new preamble) are
    # held here so the next call sees them first. Serial I/O is
    # chunk-oriented; we can't ask the kernel "give me exactly n"
    # without buffering.
    self._pushback = bytearray()

Transport

Bases: Protocol

Byte-level transport.

Every I/O boundary takes an explicit timeout. On expiry, implementations raise :class:watlowlib.errors.WatlowTimeoutError — never return an empty or partial bytes silently. Backend exceptions normalise to :class:watlowlib.errors.WatlowTransportError (or a subclass) with __cause__ preserving the original exception.

Lifecycle is single-shot: :meth:open once, :meth:close once.

is_open property

is_open

Whether :meth:open has run without a matching :meth:close.

label property

label

Short identifier (port path, "fake://...") used in errors.

close async

close()

Close the underlying port. Safe to call when already closed.

Source code in src/watlowlib/transport/base.py
async def close(self) -> None:
    """Close the underlying port. Safe to call when already closed."""
    ...

drain_input async

drain_input()

Discard any buffered input bytes. Best-effort; never raises.

Source code in src/watlowlib/transport/base.py
async def drain_input(self) -> None:
    """Discard any buffered input bytes. Best-effort; never raises."""
    ...

open async

open()

Open the underlying port. Re-open on an already-open transport is an error.

Source code in src/watlowlib/transport/base.py
async def open(self) -> None:
    """Open the underlying port. Re-open on an already-open transport is an error."""
    ...

read_available async

read_available(*, idle_timeout, max_bytes=None)

Read until the line goes idle for idle_timeout seconds.

Never raises on idle expiry — an idle timeout is the expected exit. Returns whatever was accumulated (possibly empty). Used for best-effort drain and ProtocolKind.AUTO probe gaps.

Source code in src/watlowlib/transport/base.py
async def read_available(
    self,
    *,
    idle_timeout: float,
    max_bytes: int | None = None,
) -> bytes:
    """Read until the line goes idle for ``idle_timeout`` seconds.

    Never raises on idle expiry — an idle timeout is the *expected*
    exit. Returns whatever was accumulated (possibly empty). Used
    for best-effort drain and ``ProtocolKind.AUTO`` probe gaps.
    """
    ...

read_exact async

read_exact(n, *, timeout)

Read exactly n bytes.

Raises :class:watlowlib.errors.WatlowTimeoutError if fewer than n bytes arrive before timeout. Partial buffers are retained for the next call — implementations must not discard them.

Source code in src/watlowlib/transport/base.py
async def read_exact(self, n: int, *, timeout: float) -> bytes:
    """Read exactly ``n`` bytes.

    Raises :class:`watlowlib.errors.WatlowTimeoutError` if fewer
    than ``n`` bytes arrive before ``timeout``. Partial buffers are
    retained for the next call — implementations must not discard
    them.
    """
    ...

write async

write(data, *, timeout)

Write every byte of data. Bounded by timeout.

Source code in src/watlowlib/transport/base.py
async def write(self, data: bytes, *, timeout: float) -> None:
    """Write every byte of ``data``. Bounded by ``timeout``."""
    ...

Base Protocol + serial settings

watlowlib.transport.base

Transport :pep:544 Protocol + :class:SerialSettings.

The transport surface is intentionally small — Standard Bus is fully length-prefixed (BACnet MS/TP outer frame), so the protocol client only needs write and read_exact. read_available exists for draining the line between auto-detect probes; drain_input is the synchronous flush used after a framing error before the next attempt.

Default serial framing for Standard Bus on the EZ-ZONE PM family is 38400 8-N-1 per the PM manuals; Modbus RTU on the same family is configurable across 9600 / 19200 / 38400 / 57600 / 115200. The :class:SerialSettings defaults match the Std Bus factory state.

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=38400,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Serial-port configuration for :class:SerialTransport.

Mirrors :class:anyserial.SerialConfig plus a port path. Default framing is 38400 8-N-1, the EZ-ZONE PM Standard Bus factory setting. exclusive defaults True because Standard Bus is poll/response and won't tolerate a second writer.

The __post_init__ accepts int / float / str shorthand at runtime for the framing fields (bytesize=8, parity="none", stopbits=1) and normalises to the enum. The static field types are the enums themselves so mypy --strict users must pass :class:anyserial.ByteSize / :class:anyserial.Parity / :class:anyserial.StopBits directly; the runtime shorthand is primarily for CLI argument parsing and interactive scripts.

factory_for classmethod

factory_for(protocol, *, port)

Return the EZ-ZONE PM factory framing for protocol.

  • STDBUS → 38400 8-N-1 (the Standard Bus factory default).
  • MODBUS_RTU → 9600 8-E-1 (the Modbus RTU factory default per the EZ-ZONE PM manual).

AUTO raises :class:WatlowConfigurationError — there is no single factory framing for AUTO, the detector probes both. Callers crossing protocol boundaries (the maintenance helpers that switch protocol, watlow-discover --protocol both) should rebuild settings per protocol via this method instead of inheriting whatever framing the previous call used.

Source code in src/watlowlib/transport/base.py
@classmethod
def factory_for(cls, protocol: ProtocolKind, *, port: str) -> SerialSettings:
    """Return the EZ-ZONE PM factory framing for ``protocol``.

    - ``STDBUS`` → 38400 8-N-1 (the Standard Bus factory default).
    - ``MODBUS_RTU`` → 9600 8-E-1 (the Modbus RTU factory default
      per the EZ-ZONE PM manual).

    ``AUTO`` raises :class:`WatlowConfigurationError` — there is no
    single factory framing for AUTO, the detector probes both.
    Callers crossing protocol boundaries (the maintenance helpers
    that switch protocol, ``watlow-discover --protocol both``)
    should rebuild settings per protocol via this method instead
    of inheriting whatever framing the previous call used.
    """
    # Lazy import to keep ``transport.base`` a leaf module — the
    # ProtocolKind enum lives under protocol/, which depends on
    # transport indirectly.
    from watlowlib.protocol.base import ProtocolKind  # noqa: PLC0415

    if protocol is ProtocolKind.STDBUS:
        return cls(port=port, baudrate=38400, parity=Parity.NONE)
    if protocol is ProtocolKind.MODBUS_RTU:
        return cls(port=port, baudrate=9600, parity=Parity.EVEN)
    raise WatlowConfigurationError(
        f"SerialSettings.factory_for: no single factory framing for {protocol!r}; "
        "AUTO probes both Std Bus and Modbus, build a concrete protocol's "
        "settings instead.",
    )

Transport

Bases: Protocol

Byte-level transport.

Every I/O boundary takes an explicit timeout. On expiry, implementations raise :class:watlowlib.errors.WatlowTimeoutError — never return an empty or partial bytes silently. Backend exceptions normalise to :class:watlowlib.errors.WatlowTransportError (or a subclass) with __cause__ preserving the original exception.

Lifecycle is single-shot: :meth:open once, :meth:close once.

is_open property

is_open

Whether :meth:open has run without a matching :meth:close.

label property

label

Short identifier (port path, "fake://...") used in errors.

close async

close()

Close the underlying port. Safe to call when already closed.

Source code in src/watlowlib/transport/base.py
async def close(self) -> None:
    """Close the underlying port. Safe to call when already closed."""
    ...

drain_input async

drain_input()

Discard any buffered input bytes. Best-effort; never raises.

Source code in src/watlowlib/transport/base.py
async def drain_input(self) -> None:
    """Discard any buffered input bytes. Best-effort; never raises."""
    ...

open async

open()

Open the underlying port. Re-open on an already-open transport is an error.

Source code in src/watlowlib/transport/base.py
async def open(self) -> None:
    """Open the underlying port. Re-open on an already-open transport is an error."""
    ...

read_available async

read_available(*, idle_timeout, max_bytes=None)

Read until the line goes idle for idle_timeout seconds.

Never raises on idle expiry — an idle timeout is the expected exit. Returns whatever was accumulated (possibly empty). Used for best-effort drain and ProtocolKind.AUTO probe gaps.

Source code in src/watlowlib/transport/base.py
async def read_available(
    self,
    *,
    idle_timeout: float,
    max_bytes: int | None = None,
) -> bytes:
    """Read until the line goes idle for ``idle_timeout`` seconds.

    Never raises on idle expiry — an idle timeout is the *expected*
    exit. Returns whatever was accumulated (possibly empty). Used
    for best-effort drain and ``ProtocolKind.AUTO`` probe gaps.
    """
    ...

read_exact async

read_exact(n, *, timeout)

Read exactly n bytes.

Raises :class:watlowlib.errors.WatlowTimeoutError if fewer than n bytes arrive before timeout. Partial buffers are retained for the next call — implementations must not discard them.

Source code in src/watlowlib/transport/base.py
async def read_exact(self, n: int, *, timeout: float) -> bytes:
    """Read exactly ``n`` bytes.

    Raises :class:`watlowlib.errors.WatlowTimeoutError` if fewer
    than ``n`` bytes arrive before ``timeout``. Partial buffers are
    retained for the next call — implementations must not discard
    them.
    """
    ...

write async

write(data, *, timeout)

Write every byte of data. Bounded by timeout.

Source code in src/watlowlib/transport/base.py
async def write(self, data: bytes, *, timeout: float) -> None:
    """Write every byte of ``data``. Bounded by ``timeout``."""
    ...

Serial transport

watlowlib.transport.serial

Serial-port transport backed by :mod:anyserial.

:class:SerialTransport wraps :class:anyserial.SerialPort. Every I/O call is bounded by :func:anyio.fail_after (reads, writes) or :func:anyio.move_on_after (idle-timeout reads). Backend exceptions normalise to :mod:watlowlib.errors types with __cause__ preserved.

SerialTransport

SerialTransport(settings)

:class:Transport backed by a real serial port via anyserial.

Tests that don't need hardware should use :class:watlowlib.transport.fake.FakeTransport; the two conform to the same structural :class:Transport Protocol.

Source code in src/watlowlib/transport/serial.py
def __init__(self, settings: SerialSettings) -> None:
    self._settings = settings
    self._port: SerialPort | None = None
    # Bytes read past ``n`` in :meth:`read_exact` (e.g. when a
    # framing error makes the caller scan for a new preamble) are
    # held here so the next call sees them first. Serial I/O is
    # chunk-oriented; we can't ask the kernel "give me exactly n"
    # without buffering.
    self._pushback = bytearray()

Fake transport

watlowlib.transport.fake

In-process fake transport for tests and fixture replay.

:class:FakeTransport implements the :class:Transport Protocol without touching a serial port. Tests script the expected write→response mapping; unscripted writes are recorded but produce no reply, which surfaces as a real timeout on the next read (the intended failure mode — tests notice when they forgot to script a command).

The transport is fixture-replay grade:

  • The dict-based script matches by exact bytes.
  • An optional ordered queue of (write, reply) pairs is consumed FIFO and is the right shape for capture-replay scenarios where the same request may legitimately appear more than once with a different reply (a recorder reading PV in a tight loop, say).
  • :attr:unmatched_writes exposes the subset of :attr:writes that hit neither a dict entry nor the next queue entry — useful for tests that want to assert "no surprise traffic hit the wire".

FakeSlave

FakeSlave(script=None)

Scripted :class:anymodbus.Slave stand-in for tests.

Mirrors the surface :class:watlowlib.protocol.modbus.client.ModbusProtocolClient actually calls (read_holding_registers, read_input_registers, write_register, write_registers) and records every call. Tests assert on :attr:reads and :attr:writes to verify the :class:ModbusOp lowered correctly.

Parameters:

Name Type Description Default
script Mapping[tuple[str, int], ScriptedSlaveEntry] | None

(method, address) → reply map. method is one of the four call names above. The reply is a tuple of register words, an anymodbus exception class (raised at call time, with the right constructor args), or None (treat the call as a no-op success). Missing entries surface a :class:KeyError so an unscripted call fails the test rather than returning empty results.

None
Source code in src/watlowlib/transport/fake.py
def __init__(self, script: Mapping[tuple[str, int], ScriptedSlaveEntry] | None = None) -> None:
    self._script: dict[tuple[str, int], ScriptedSlaveEntry] = dict(script or {})
    self.reads: list[tuple[str, int, int]] = []
    self.writes: list[tuple[str, int, tuple[int, ...]]] = []

add_script

add_script(method, address, reply)

Register or overwrite a scripted reply for (method, address).

Source code in src/watlowlib/transport/fake.py
def add_script(
    self,
    method: str,
    address: int,
    reply: ScriptedSlaveEntry,
) -> None:
    """Register or overwrite a scripted reply for ``(method, address)``."""
    self._script[(method, address)] = reply

FakeTransport

FakeTransport(
    script=None,
    *,
    queue=None,
    label="fake://test",
    latency_s=0.0,
)

Scripted :class:Transport for tests.

Parameters:

Name Type Description Default
script Mapping[bytes, ScriptedReply] | None

Mapping of write_bytes → reply. Every scripted write queues the corresponding reply into the read buffer. Unknown writes are recorded but produce no reply; the next read times out.

None
label str

Identifier used in errors.

'fake://test'
latency_s float

Per-operation artificial delay, useful for simulating a slow device.

0.0
Source code in src/watlowlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    queue: Iterable[tuple[bytes, ScriptedReply]] | None = None,
    label: str = "fake://test",
    latency_s: float = 0.0,
) -> None:
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._queue: deque[tuple[bytes, ScriptedReply]] = deque(queue or [])
    self._writes: list[bytes] = []
    self._unmatched: list[bytes] = []
    self._read_buffer = bytearray()
    self._is_open = False
    self._label = label
    self._latency_s = latency_s
    self._force_read_timeout = False
    self._force_write_timeout = False

remaining_queue property

remaining_queue

Queue entries that have not been consumed yet.

unmatched_writes property

unmatched_writes

Writes that didn't match any scripted reply, in order.

A test can assert transport.unmatched_writes == () to catch accidentally-unscripted traffic — the corresponding read would have timed out, but a precise assertion fails faster and points at the right call.

writes property

writes

Every write payload recorded since construction, in order.

add_script

add_script(command, reply)

Register or overwrite a scripted reply for command.

Source code in src/watlowlib/transport/fake.py
def add_script(self, command: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite a scripted reply for ``command``."""
    self._script[bytes(command)] = reply

extend_queue

extend_queue(rounds)

Append more ordered (write, reply) pairs to the FIFO queue.

Source code in src/watlowlib/transport/fake.py
def extend_queue(self, rounds: Iterable[tuple[bytes, ScriptedReply]]) -> None:
    """Append more ordered ``(write, reply)`` pairs to the FIFO queue."""
    self._queue.extend(rounds)

feed

feed(data)

Push unsolicited bytes into the read buffer.

Useful for simulating a device that left chatter on the line which the protocol client has to drain on recovery.

Source code in src/watlowlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the read buffer.

    Useful for simulating a device that left chatter on the line
    which the protocol client has to drain on recovery.
    """
    self._read_buffer.extend(data)

force_read_timeout

force_read_timeout(enabled=True)

Force the next read to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_read_timeout(self, enabled: bool = True) -> None:
    """Force the next read to raise :class:`WatlowTimeoutError`."""
    self._force_read_timeout = enabled

force_write_timeout

force_write_timeout(enabled=True)

Force the next :meth:write to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_write_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`write` to raise :class:`WatlowTimeoutError`."""
    self._force_write_timeout = enabled