Skip to content

servomexlib.testing

FakeTransport, fixture loaders, the MockSlave-backed Modbus fake, and canned frames.

servomexlib.testing

Public testing seam.

Re-exports the in-process :class:FakeTransport plus fixture helpers so tests — in this package and in downstream code — can drive every protocol without hardware. Fixtures use the human-readable > hex / < hex arrow format shared with the sibling .testing modules.

For the Modbus path we do not hand-roll an ADU simulator: the :func:mock_modbus_pair helper preloads the byte-accurate anymodbus.testing.MockSlave (RTU and ASCII framing, FC01/02/04/05/08) with the 4100's register/coil banks and binds our ModbusClient to it over an in-process serial pair. The Modbus imports are lazy so servomexlib.testing stays importable without the optional [modbus] extra.

CoilOp dataclass

CoilOp(function_code, coil, on)

One observed coil operation against a recording :class:MockSlave.

Used by the autocalibration coil-pulse tests to assert the ordered 0→1→0 transition: a start_calibration pulse should record a WRITE_SINGLE_COIL on=True (FC05), then a READ_COILS readback (FC01), then a WRITE_SINGLE_COIL on=False. coil is the PDU address.

FakeTransport

FakeTransport(script=None, *, label='fake://test')

Bases: ByteStreamTransport

Scripted, in-process :class:ByteStreamTransport.

Source code in src/servomexlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    label: str = "fake://test",
) -> None:
    super().__init__(label=label)
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._writes: list[bytes] = []
    self._inbound = bytearray()
    self._event = anyio.Event()
    self._closed = False

writes property

writes

Every payload written through :meth:send, in order.

add_script

add_script(request, reply)

Register or overwrite the canned reply for request.

Source code in src/servomexlib/transport/fake.py
def add_script(self, request: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite the canned reply for ``request``."""
    self._script[bytes(request)] = reply

emit async

emit(frames, *, interval=0.0)

Feed frames one at a time, optionally interval seconds apart.

Run as a background task to simulate a continuous broadcaster: task_group.start_soon(lambda: fake.emit(frames, interval=0.05)).

Source code in src/servomexlib/transport/fake.py
async def emit(self, frames: Sequence[bytes], *, interval: float = 0.0) -> None:
    """Feed ``frames`` one at a time, optionally ``interval`` seconds apart.

    Run as a background task to simulate a continuous broadcaster:
    ``task_group.start_soon(lambda: fake.emit(frames, interval=0.05))``.
    """
    for frame in frames:
        if interval:
            await anyio.sleep(interval)
        self.feed(frame)

feed

feed(data)

Push unsolicited bytes into the inbound buffer.

Source code in src/servomexlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the inbound buffer."""
    self._inbound += data
    self._wake()

MockChannel dataclass

MockChannel(
    channel,
    value,
    name=b"\x00\x00\x00\x00\x00\x00",
    unit=b"\x00\x00\x00",
    status_bits=(False,) * 8,
)

One channel's contents to preload into a :class:MockSlave's banks.

name / unit are raw display-ROM bytes (not ASCII), so a test can reproduce e.g. CO₂'s name 43 4F 82 20 20 20 and exercise the charset decode. status_bits are the 8 FC02 discrete bits (Fault, Maintenance, Calibration, WarmingUp, Alarm1..4).

ReadOp dataclass

ReadOp(function_code, address, count)

One observed read request against a recording :class:MockSlave.

coil_ops

coil_ops(slave)

Return the ordered coil ops a recording slave observed ([] otherwise).

Source code in src/servomexlib/testing.py
def coil_ops(slave: MockSlave) -> list[CoilOp]:
    """Return the ordered coil ops a recording slave observed (``[]`` otherwise)."""
    return list(getattr(slave, "coil_ops", []))

load_4100_banks

load_4100_banks(slave, channels=DEFAULT_4100_BANK)

Preload slave's input-register and discrete-input banks from channels.

Values are written as IEEE-754 float32 high-word-first (the HW-confirmed word order); names/units as raw display-ROM register words; status as FC02 bits.

Source code in src/servomexlib/testing.py
def load_4100_banks(slave: MockSlave, channels: Sequence[MockChannel] = DEFAULT_4100_BANK) -> None:
    """Preload ``slave``'s input-register and discrete-input banks from ``channels``.

    Values are written as IEEE-754 float32 high-word-first (the HW-confirmed word
    order); names/units as raw display-ROM register words; status as FC02 bits.
    """
    from anymodbus.decoders import encode_float32

    from servomexlib.protocol.modbus import registers as reg

    for ch in channels:
        hi, lo = encode_float32(ch.value)  # defaults: HIGH_LOW + BIG (HW-confirmed)
        vpdu = reg.value_pdu(ch.channel)
        slave.input_registers[vpdu] = hi
        slave.input_registers[vpdu + 1] = lo
        for offset, word in enumerate(_bytes_to_words(ch.name)):
            slave.input_registers[reg.name_pdu(ch.channel) + offset] = word
        for offset, word in enumerate(_bytes_to_words(ch.unit)):
            slave.input_registers[reg.unit_pdu(ch.channel) + offset] = word
        base = reg.status_pdu(ch.channel)
        for i, on in enumerate(ch.status_bits):
            _set_bit(slave.discrete_inputs, base + i, on=on)

load_arrow_script

load_arrow_script(text)

Parse a > hex / < hex arrow fixture into a write→reply script.

Each > line is a request (host→device) and the following < line is the reply (device→host). # starts a comment; blank lines are ignored. Hex tokens may be space- or colon-separated.

Source code in src/servomexlib/testing.py
def load_arrow_script(text: str) -> dict[bytes, bytes]:
    """Parse a ``> hex`` / ``< hex`` arrow fixture into a write→reply script.

    Each ``>`` line is a request (host→device) and the following ``<`` line is
    the reply (device→host). ``#`` starts a comment; blank lines are ignored.
    Hex tokens may be space- or colon-separated.
    """
    script: dict[bytes, bytes] = {}
    pending: bytes | None = None
    for raw in text.splitlines():
        line = raw.split("#", 1)[0].strip()
        if not line:
            continue
        marker, _, payload = line.partition(" ")
        data = _hex(payload)
        if marker == ">":
            pending = data
        elif marker == "<" and pending is not None:
            script[pending] = data
            pending = None
    return script

load_cal_state

load_cal_state(
    slave, group, *, calibrating=True, gas2=False
)

Set group's cal-group discretes (11009-11016) on slave.

Mirrors :meth:ModbusClient._read_cal_progress: the group's pair of discretes is [calibrating, gas2] at 2*(group-1) within the cal-group block. With calibrating=True and gas2 selecting cal-gas-1 vs -2, a subsequent calibration_status(group) reports active=True and the matching :class:CalPhase.

Source code in src/servomexlib/testing.py
def load_cal_state(
    slave: MockSlave, group: int, *, calibrating: bool = True, gas2: bool = False
) -> None:
    """Set ``group``'s cal-group discretes (``11009``-``11016``) on ``slave``.

    Mirrors :meth:`ModbusClient._read_cal_progress`: the group's pair of discretes
    is ``[calibrating, gas2]`` at ``2*(group-1)`` within the cal-group block. With
    ``calibrating=True`` and ``gas2`` selecting cal-gas-1 vs -2, a subsequent
    ``calibration_status(group)`` reports ``active=True`` and the matching
    :class:`CalPhase`.
    """
    from servomexlib.protocol.modbus import registers as reg

    base = reg.analyser_status_pdu(reg.CAL_GROUP_DISCRETE_BASE)
    idx = base + 2 * (group - 1)
    _set_bit(slave.discrete_inputs, idx, on=calibrating)
    _set_bit(slave.discrete_inputs, idx + 1, on=gas2)

mock_modbus_pair async

mock_modbus_pair(
    *,
    framing=None,
    address=1,
    channels=DEFAULT_4100_BANK,
    client_channels=None,
    device="mock",
    disabled_function_codes=None,
    record_coil_ops=False,
    record_read_ops=False,
    valid_input_ranges=None,
    valid_discrete_ranges=None,
)

Yield (ModbusClient, MockSlave) connected over an in-process serial pair.

The client is the real :class:~servomexlib.protocol.modbus.client.ModbusClient bound through a real :class:~servomexlib.protocol.modbus.session.ModbusSession and :class:~servomexlib.transport.serial.SerialTransport, so the full encode/ frame/decode stack is exercised against byte-accurate ADUs. The mock's banks may be mutated mid-test; reads observe the change on the next request.

Source code in src/servomexlib/testing.py
@asynccontextmanager
async def mock_modbus_pair(
    *,
    framing: Framing | None = None,
    address: int = 1,
    channels: Sequence[MockChannel] = DEFAULT_4100_BANK,
    client_channels: Sequence[ChannelId] | None = None,
    device: str = "mock",
    disabled_function_codes: frozenset[int] | None = None,
    record_coil_ops: bool = False,
    record_read_ops: bool = False,
    valid_input_ranges: Sequence[tuple[int, int]] | None = None,
    valid_discrete_ranges: Sequence[tuple[int, int]] | None = None,
) -> AsyncGenerator[tuple[ModbusClient, MockSlave]]:
    """Yield ``(ModbusClient, MockSlave)`` connected over an in-process serial pair.

    The client is the real :class:`~servomexlib.protocol.modbus.client.ModbusClient`
    bound through a real :class:`~servomexlib.protocol.modbus.session.ModbusSession`
    and :class:`~servomexlib.transport.serial.SerialTransport`, so the full encode/
    frame/decode stack is exercised against byte-accurate ADUs. The mock's banks
    may be mutated mid-test; reads observe the change on the next request.
    """
    from servomexlib.protocol.modbus.client import ModbusClient as _ModbusClient
    from servomexlib.protocol.modbus.session import ModbusSession as _ModbusSession

    async with mock_modbus_transport(
        framing=framing,
        address=address,
        channels=channels,
        disabled_function_codes=disabled_function_codes,
        record_coil_ops=record_coil_ops,
        record_read_ops=record_read_ops,
        valid_input_ranges=valid_input_ranges,
        valid_discrete_ranges=valid_discrete_ranges,
    ) as (transport, slave):
        session = _ModbusSession(transport, address=address, framing=slave.framing)
        client = _ModbusClient(session, channels=client_channels, device=device)
        yield client, slave

mock_modbus_transport async

mock_modbus_transport(
    *,
    framing=None,
    address=1,
    channels=DEFAULT_4100_BANK,
    disabled_function_codes=None,
    record_coil_ops=False,
    record_read_ops=False,
    valid_input_ranges=None,
    valid_discrete_ranges=None,
)

Yield (Transport, MockSlave) — the client-end transport over a serving slave.

Lower-level than :func:mock_modbus_pair: it hands back the raw :class:~servomexlib.transport.serial.SerialTransport (the no-hardware path for the AUTO ladder and ad-hoc binding) connected to a preloaded byte- accurate :class:MockSlave. The transport and the slave end are closed on exit.

Source code in src/servomexlib/testing.py
@asynccontextmanager
async def mock_modbus_transport(
    *,
    framing: Framing | None = None,
    address: int = 1,
    channels: Sequence[MockChannel] = DEFAULT_4100_BANK,
    disabled_function_codes: frozenset[int] | None = None,
    record_coil_ops: bool = False,
    record_read_ops: bool = False,
    valid_input_ranges: Sequence[tuple[int, int]] | None = None,
    valid_discrete_ranges: Sequence[tuple[int, int]] | None = None,
) -> AsyncGenerator[tuple[Transport, MockSlave]]:
    """Yield ``(Transport, MockSlave)`` — the client-end transport over a serving slave.

    Lower-level than :func:`mock_modbus_pair`: it hands back the raw
    :class:`~servomexlib.transport.serial.SerialTransport` (the no-hardware path
    for the ``AUTO`` ladder and ad-hoc binding) connected to a preloaded byte-
    accurate :class:`MockSlave`. The transport and the slave end are closed on exit.
    """
    import anyio
    from anymodbus import ExceptionCode
    from anymodbus import Framing as _Framing
    from anymodbus.testing import MockSlave as _MockSlave
    from anyserial import SerialConfig
    from anyserial.testing import serial_port_pair

    from servomexlib.transport.base import SerialSettings
    from servomexlib.transport.serial import SerialTransport

    resolved_framing = framing if framing is not None else _Framing.RTU
    config = SerialConfig(baudrate=19200)
    client_end, slave_end = serial_port_pair(config_a=config, config_b=config)
    transport = SerialTransport(client_end, SerialSettings(port="mock://servomex"))
    base_cls = _recording_slave_cls() if record_coil_ops or record_read_ops else _MockSlave

    class RangedMockSlave(base_cls):  # type: ignore[misc, valid-type]
        """A ``MockSlave`` variant that can reject reads crossing invalid gaps."""

        def _handle_request(self, pdu: bytes) -> bytes:
            op = _decode_read_op(pdu)
            if op is not None:
                if (
                    op.function_code == _FC_READ_INPUT_REGISTERS
                    and valid_input_ranges is not None
                    and not _inside_any_range(op.address, op.count, valid_input_ranges)
                ):
                    return bytes((op.function_code | 0x80, int(ExceptionCode.ILLEGAL_DATA_ADDRESS)))
                if (
                    op.function_code == _FC_READ_DISCRETE_INPUTS
                    and valid_discrete_ranges is not None
                    and not _inside_any_range(op.address, op.count, valid_discrete_ranges)
                ):
                    return bytes((op.function_code | 0x80, int(ExceptionCode.ILLEGAL_DATA_ADDRESS)))
            response: bytes = super()._handle_request(pdu)
            return response

    has_range_limits = valid_input_ranges is not None or valid_discrete_ranges is not None
    slave_cls = RangedMockSlave if has_range_limits else base_cls
    slave = slave_cls(
        address=address,
        discrete_input_count=1100,  # cover the analyser-status block at PDU 1000+
        framing=resolved_framing,
        disabled_function_codes=disabled_function_codes,
    )
    load_4100_banks(slave, channels)

    async with anyio.create_task_group() as tg:
        tg.start_soon(slave.serve, slave_end)
        try:
            yield transport, slave
        finally:
            tg.cancel_scope.cancel()
            with anyio.CancelScope(shield=True):
                await transport.aclose()
                await slave_end.aclose()

read_ops

read_ops(slave)

Return the ordered read ops a recording slave observed ([] otherwise).

Source code in src/servomexlib/testing.py
def read_ops(slave: MockSlave) -> list[ReadOp]:
    """Return the ordered read ops a recording slave observed (``[]`` otherwise)."""
    return list(getattr(slave, "read_ops", []))

split_continuous_frames

split_continuous_frames(capture)

Split a raw continuous capture into individual frame payloads (no CRLF).

The on-wire terminator is CR LF (design §3.1), but a capture that has been through a text-mode transform keeps only the LF. We split on LF and drop a trailing CR so both framings yield the same payloads — matching the parser, which strips CR LF either way.

Source code in src/servomexlib/testing.py
def split_continuous_frames(capture: bytes) -> list[bytes]:
    """Split a raw continuous capture into individual frame payloads (no CRLF).

    The on-wire terminator is ``CR LF`` (design §3.1), but a capture that has been
    through a text-mode transform keeps only the ``LF``. We split on ``LF`` and
    drop a trailing ``CR`` so both framings yield the same payloads — matching the
    parser, which strips ``CR LF`` either way.
    """
    return [frame.rstrip(b"\r") for frame in capture.split(b"\n") if frame.strip(b" \r")]