Skip to content

alicatlib.transport

The Transport Protocol, SerialTransport for hardware, and FakeTransport for tests. See Design §5.1.

Public surface

alicatlib.transport

Transport layer — moves bytes, knows nothing about Alicat.

See docs/design.md §5.1.

FakeTransport

FakeTransport(
    script=None, *, label="fake://test", latency_s=0.0
)

Scripted :class:Transport for tests.

Parameters:

Name Type Description Default
script Mapping[bytes, ScriptedReply] | None

Mapping of write_bytes → reply. Every known write queues the corresponding reply into the read buffer. Unknown writes are recorded but produce no reply — subsequent reads will then hit idle_timeout / timeout, which is the intended failure mode (tests see a real timeout if they forgot to script a command).

None
label str

Identifier used in errors.

'fake://test'
latency_s float

Per-operation artificial delay, useful for simulating a slow device.

0.0
Source code in src/alicatlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    label: str = "fake://test",
    latency_s: float = 0.0,
) -> None:
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._writes: list[bytes] = []
    self._read_buffer = bytearray()
    self._is_open = False
    self._label = label
    self._latency_s = latency_s
    self._force_read_timeout = False
    self._force_write_timeout = False
    self._force_disconnected = False
    # Track calls to ``reopen`` so baud-change tests can assert the
    # transport observed the reconfiguration. ``None`` until
    # :meth:`reopen` is called; then holds the last requested
    # baudrate. ``reopen_count`` lets tests distinguish "never
    # called" from "called with the default baud".
    self._last_reopen_baud: int | None = None
    self._reopen_count: int = 0
    self._force_reopen_error: bool = False

last_reopen_baud property

last_reopen_baud

Baud rate requested by the most recent :meth:reopen, or None.

reopen_count property

reopen_count

Number of :meth:reopen calls since construction.

writes property

writes

Every write payload recorded since construction, in order.

add_script

add_script(command, reply)

Register or overwrite a scripted reply for command.

Source code in src/alicatlib/transport/fake.py
def add_script(self, command: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite a scripted reply for ``command``."""
    self._script[bytes(command)] = reply

feed

feed(data)

Push unsolicited bytes into the read buffer.

Useful for simulating a device that was left streaming, or garbage on the line that the session must drain on recovery.

Source code in src/alicatlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the read buffer.

    Useful for simulating a device that was left streaming, or garbage on
    the line that the session must drain on recovery.
    """
    self._read_buffer.extend(data)

force_read_timeout

force_read_timeout(enabled=True)

Force the next :meth:read_until to raise AlicatTimeoutError.

Source code in src/alicatlib/transport/fake.py
def force_read_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`read_until` to raise ``AlicatTimeoutError``."""
    self._force_read_timeout = enabled

force_reopen_error

force_reopen_error(enabled=True)

Force the next :meth:reopen to raise :class:AlicatConnectionError.

Used by :class:Session.change_baud_rate tests to exercise the BROKEN-state transition without a real serial adapter.

Source code in src/alicatlib/transport/fake.py
def force_reopen_error(self, enabled: bool = True) -> None:
    """Force the next :meth:`reopen` to raise :class:`AlicatConnectionError`.

    Used by :class:`Session.change_baud_rate` tests to exercise
    the BROKEN-state transition without a real serial adapter.
    """
    self._force_reopen_error = enabled

force_write_timeout

force_write_timeout(enabled=True)

Force the next :meth:write to raise AlicatTimeoutError.

Source code in src/alicatlib/transport/fake.py
def force_write_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`write` to raise ``AlicatTimeoutError``."""
    self._force_write_timeout = enabled

reopen async

reopen(*, baudrate)

Simulate a baud-rate change — close, record, reopen.

If force_reopen_error() has been called the reopen raises :class:AlicatConnectionError after the close, leaving the transport closed. That's the "reopen wedged" path tested at the session layer for :attr:SessionState.BROKEN transitions.

Source code in src/alicatlib/transport/fake.py
async def reopen(self, *, baudrate: int) -> None:
    """Simulate a baud-rate change — close, record, reopen.

    If ``force_reopen_error()`` has been called the reopen raises
    :class:`AlicatConnectionError` after the close, leaving the
    transport closed. That's the "reopen wedged" path tested at
    the session layer for :attr:`SessionState.BROKEN` transitions.
    """
    await self.close()
    self._reopen_count += 1
    self._last_reopen_baud = baudrate
    if self._force_reopen_error:
        raise AlicatConnectionError(
            f"forced reopen error on {self._label}",
            context=ErrorContext(port=self._label),
        )
    await self.open()

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=19200,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Serial-port configuration.

Mirrors :class:anyserial.SerialConfig plus a port path. exclusive defaults True so that two processes can't scribble over the same device — the Alicat wire protocol isn't multi-master tolerant.

SerialTransport

SerialTransport(settings)

:class:Transport backed by a real serial port via anyserial.

Tests that don't need hardware can use :class:alicatlib.transport.fake.FakeTransport instead; the two conform to the same structural :class:Transport protocol.

Source code in src/alicatlib/transport/serial.py
def __init__(self, settings: SerialSettings) -> None:
    self._settings = settings
    self._port: SerialPort | None = None
    # Bytes read past a separator in :meth:`read_until` are held here so
    # the next call sees them first. Serial I/O is chunk-oriented — we
    # can't hand the kernel "give me up to separator" without this.
    self._pushback = bytearray()

reopen async

reopen(*, baudrate)

Close and reopen the port at baudrate.

Called by :meth:Session.change_baud_rate after the device has already switched — the transport has to retune to stay in sync. The cached :class:SerialSettings is updated so the new baud survives subsequent lifecycle calls (close + future open round-trip at the same rate).

If :meth:open fails on the new baud the transport is left closed; the caller (the session's baud-change shield) is responsible for surfacing that as a BROKEN session state with recovery guidance.

Source code in src/alicatlib/transport/serial.py
async def reopen(self, *, baudrate: int) -> None:
    """Close and reopen the port at ``baudrate``.

    Called by :meth:`Session.change_baud_rate` after the device
    has already switched — the transport has to retune to stay in
    sync. The cached :class:`SerialSettings` is updated so the
    new baud survives subsequent lifecycle calls (close +
    future open round-trip at the same rate).

    If :meth:`open` fails on the new baud the transport is left
    closed; the caller (the session's baud-change shield) is
    responsible for surfacing that as a ``BROKEN`` session state
    with recovery guidance.
    """
    await self.close()
    # dataclasses.replace on a frozen dataclass — cheap and type-safe.
    self._settings = replace(self._settings, baudrate=baudrate)
    await self.open()

Transport

Bases: Protocol

Byte-level transport.

Every I/O boundary takes an explicit timeout. On expiry, implementations raise :class:alicatlib.errors.AlicatTimeoutError — never return an empty or partial bytes silently. Backend exceptions normalize to :class:alicatlib.errors.AlicatTransportError (or a subclass) with __cause__ preserving the original exception.

Lifecycle is single-shot: :meth:open once, :meth:close once.

is_open property

is_open

Whether :meth:open has run without a matching :meth:close.

label property

label

Short identifier (port path, URL, "fake://...") used in errors.

close async

close()

Close the underlying port. Safe to call when already closed.

Source code in src/alicatlib/transport/base.py
async def close(self) -> None:
    """Close the underlying port. Safe to call when already closed."""
    ...

drain_input async

drain_input()

Discard any buffered input bytes. Best-effort; never raises.

Source code in src/alicatlib/transport/base.py
async def drain_input(self) -> None:
    """Discard any buffered input bytes. Best-effort; never raises."""
    ...

open async

open()

Open the underlying port. Idempotent re-calls are an error.

Source code in src/alicatlib/transport/base.py
async def open(self) -> None:
    """Open the underlying port. Idempotent re-calls are an error."""
    ...

read_available async

read_available(idle_timeout, max_bytes=None)

Read until the line goes idle for idle_timeout seconds.

Never raises on idle expiry — an idle timeout is the expected exit. Returns whatever was accumulated (possibly empty). Used for best-effort drain / stream-stop recovery, not for request/response.

Source code in src/alicatlib/transport/base.py
async def read_available(
    self,
    idle_timeout: float,
    max_bytes: int | None = None,
) -> bytes:
    """Read until the line goes idle for ``idle_timeout`` seconds.

    Never raises on idle expiry — an idle timeout is the *expected* exit.
    Returns whatever was accumulated (possibly empty). Used for
    best-effort drain / stream-stop recovery, not for request/response.
    """
    ...

read_until async

read_until(separator, timeout)

Read bytes up to and including the next occurrence of separator.

Raises :class:alicatlib.errors.AlicatTimeoutError if the separator does not arrive before timeout. Bytes received after the separator remain buffered for the next call — implementations must not discard them.

Source code in src/alicatlib/transport/base.py
async def read_until(self, separator: bytes, timeout: float) -> bytes:
    """Read bytes up to and including the next occurrence of ``separator``.

    Raises :class:`alicatlib.errors.AlicatTimeoutError` if the separator
    does not arrive before ``timeout``. Bytes received after the separator
    remain buffered for the next call — implementations must not discard
    them.
    """
    ...

reopen async

reopen(*, baudrate)

Close and re-open the underlying port at a new baud rate.

Used by :meth:Session.change_baud_rate to retune the port after the device has already switched baud rates mid-sequence (primer NCB command — see design §5.7). Serial transports close the port, update the cached settings, and open at the new baud; non-serial transports (TCP, future) may raise :class:NotImplementedError — baud rates don't apply there.

Implementations must leave the transport in a consistent state: either fully reopened at the new baud, or clearly closed so callers can recognise a failure. Silent partial states are the worst failure mode for this method.

Source code in src/alicatlib/transport/base.py
async def reopen(self, *, baudrate: int) -> None:
    """Close and re-open the underlying port at a new baud rate.

    Used by :meth:`Session.change_baud_rate` to retune the port
    after the device has already switched baud rates mid-sequence
    (primer ``NCB`` command — see design §5.7). Serial transports
    close the port, update the cached settings, and open at the
    new baud; non-serial transports (TCP, future) may raise
    :class:`NotImplementedError` — baud rates don't apply there.

    Implementations must leave the transport in a consistent
    state: either fully reopened at the new baud, or clearly
    closed so callers can recognise a failure. Silent partial
    states are the worst failure mode for this method.
    """
    ...

write async

write(data, *, timeout)

Write every byte of data. Raise AlicatTimeoutError on expiry.

A bounded write timeout is mandatory because sends can block on RS-485 hardware flow control, a stuck device, or (on TCP) a full send buffer. Callers that block indefinitely hide real hangs.

Source code in src/alicatlib/transport/base.py
async def write(self, data: bytes, *, timeout: float) -> None:
    """Write every byte of ``data``. Raise ``AlicatTimeoutError`` on expiry.

    A bounded write timeout is mandatory because sends can block on
    RS-485 hardware flow control, a stuck device, or (on TCP) a full send
    buffer. Callers that block indefinitely hide real hangs.
    """
    ...

Base Protocol + serial settings

alicatlib.transport.base

Transport layer abstraction — moves bytes, knows nothing about Alicat.

The :class:Transport :pep:544 Protocol is the interface every backend implements. :class:SerialSettings is the port-configuration dataclass consumed by :class:alicatlib.transport.serial.SerialTransport.

Design reference: docs/design.md §5.1.

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=19200,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Serial-port configuration.

Mirrors :class:anyserial.SerialConfig plus a port path. exclusive defaults True so that two processes can't scribble over the same device — the Alicat wire protocol isn't multi-master tolerant.

Transport

Bases: Protocol

Byte-level transport.

Every I/O boundary takes an explicit timeout. On expiry, implementations raise :class:alicatlib.errors.AlicatTimeoutError — never return an empty or partial bytes silently. Backend exceptions normalize to :class:alicatlib.errors.AlicatTransportError (or a subclass) with __cause__ preserving the original exception.

Lifecycle is single-shot: :meth:open once, :meth:close once.

is_open property

is_open

Whether :meth:open has run without a matching :meth:close.

label property

label

Short identifier (port path, URL, "fake://...") used in errors.

close async

close()

Close the underlying port. Safe to call when already closed.

Source code in src/alicatlib/transport/base.py
async def close(self) -> None:
    """Close the underlying port. Safe to call when already closed."""
    ...

drain_input async

drain_input()

Discard any buffered input bytes. Best-effort; never raises.

Source code in src/alicatlib/transport/base.py
async def drain_input(self) -> None:
    """Discard any buffered input bytes. Best-effort; never raises."""
    ...

open async

open()

Open the underlying port. Idempotent re-calls are an error.

Source code in src/alicatlib/transport/base.py
async def open(self) -> None:
    """Open the underlying port. Idempotent re-calls are an error."""
    ...

read_available async

read_available(idle_timeout, max_bytes=None)

Read until the line goes idle for idle_timeout seconds.

Never raises on idle expiry — an idle timeout is the expected exit. Returns whatever was accumulated (possibly empty). Used for best-effort drain / stream-stop recovery, not for request/response.

Source code in src/alicatlib/transport/base.py
async def read_available(
    self,
    idle_timeout: float,
    max_bytes: int | None = None,
) -> bytes:
    """Read until the line goes idle for ``idle_timeout`` seconds.

    Never raises on idle expiry — an idle timeout is the *expected* exit.
    Returns whatever was accumulated (possibly empty). Used for
    best-effort drain / stream-stop recovery, not for request/response.
    """
    ...

read_until async

read_until(separator, timeout)

Read bytes up to and including the next occurrence of separator.

Raises :class:alicatlib.errors.AlicatTimeoutError if the separator does not arrive before timeout. Bytes received after the separator remain buffered for the next call — implementations must not discard them.

Source code in src/alicatlib/transport/base.py
async def read_until(self, separator: bytes, timeout: float) -> bytes:
    """Read bytes up to and including the next occurrence of ``separator``.

    Raises :class:`alicatlib.errors.AlicatTimeoutError` if the separator
    does not arrive before ``timeout``. Bytes received after the separator
    remain buffered for the next call — implementations must not discard
    them.
    """
    ...

reopen async

reopen(*, baudrate)

Close and re-open the underlying port at a new baud rate.

Used by :meth:Session.change_baud_rate to retune the port after the device has already switched baud rates mid-sequence (primer NCB command — see design §5.7). Serial transports close the port, update the cached settings, and open at the new baud; non-serial transports (TCP, future) may raise :class:NotImplementedError — baud rates don't apply there.

Implementations must leave the transport in a consistent state: either fully reopened at the new baud, or clearly closed so callers can recognise a failure. Silent partial states are the worst failure mode for this method.

Source code in src/alicatlib/transport/base.py
async def reopen(self, *, baudrate: int) -> None:
    """Close and re-open the underlying port at a new baud rate.

    Used by :meth:`Session.change_baud_rate` to retune the port
    after the device has already switched baud rates mid-sequence
    (primer ``NCB`` command — see design §5.7). Serial transports
    close the port, update the cached settings, and open at the
    new baud; non-serial transports (TCP, future) may raise
    :class:`NotImplementedError` — baud rates don't apply there.

    Implementations must leave the transport in a consistent
    state: either fully reopened at the new baud, or clearly
    closed so callers can recognise a failure. Silent partial
    states are the worst failure mode for this method.
    """
    ...

write async

write(data, *, timeout)

Write every byte of data. Raise AlicatTimeoutError on expiry.

A bounded write timeout is mandatory because sends can block on RS-485 hardware flow control, a stuck device, or (on TCP) a full send buffer. Callers that block indefinitely hide real hangs.

Source code in src/alicatlib/transport/base.py
async def write(self, data: bytes, *, timeout: float) -> None:
    """Write every byte of ``data``. Raise ``AlicatTimeoutError`` on expiry.

    A bounded write timeout is mandatory because sends can block on
    RS-485 hardware flow control, a stuck device, or (on TCP) a full send
    buffer. Callers that block indefinitely hide real hangs.
    """
    ...

Serial transport

alicatlib.transport.serial

Serial-port transport backed by :mod:anyserial.

:class:SerialTransport wraps :class:anyserial.SerialPort. Every I/O call is bounded by :func:anyio.fail_after (reads, writes) or :func:anyio.move_on_after (idle-timeout reads). Backend exceptions normalize to :mod:alicatlib.errors types with __cause__ preserved.

Design reference: docs/design.md §5.1.

SerialTransport

SerialTransport(settings)

:class:Transport backed by a real serial port via anyserial.

Tests that don't need hardware can use :class:alicatlib.transport.fake.FakeTransport instead; the two conform to the same structural :class:Transport protocol.

Source code in src/alicatlib/transport/serial.py
def __init__(self, settings: SerialSettings) -> None:
    self._settings = settings
    self._port: SerialPort | None = None
    # Bytes read past a separator in :meth:`read_until` are held here so
    # the next call sees them first. Serial I/O is chunk-oriented — we
    # can't hand the kernel "give me up to separator" without this.
    self._pushback = bytearray()

reopen async

reopen(*, baudrate)

Close and reopen the port at baudrate.

Called by :meth:Session.change_baud_rate after the device has already switched — the transport has to retune to stay in sync. The cached :class:SerialSettings is updated so the new baud survives subsequent lifecycle calls (close + future open round-trip at the same rate).

If :meth:open fails on the new baud the transport is left closed; the caller (the session's baud-change shield) is responsible for surfacing that as a BROKEN session state with recovery guidance.

Source code in src/alicatlib/transport/serial.py
async def reopen(self, *, baudrate: int) -> None:
    """Close and reopen the port at ``baudrate``.

    Called by :meth:`Session.change_baud_rate` after the device
    has already switched — the transport has to retune to stay in
    sync. The cached :class:`SerialSettings` is updated so the
    new baud survives subsequent lifecycle calls (close +
    future open round-trip at the same rate).

    If :meth:`open` fails on the new baud the transport is left
    closed; the caller (the session's baud-change shield) is
    responsible for surfacing that as a ``BROKEN`` session state
    with recovery guidance.
    """
    await self.close()
    # dataclasses.replace on a frozen dataclass — cheap and type-safe.
    self._settings = replace(self._settings, baudrate=baudrate)
    await self.open()

Fake transport

alicatlib.transport.fake

In-process fake transport for tests.

:class:FakeTransport implements the :class:Transport Protocol without touching a serial port. Tests script the expected write→response mapping and assert the recorded command bytes.

Design reference: docs/design.md §5.1.

FakeTransport

FakeTransport(
    script=None, *, label="fake://test", latency_s=0.0
)

Scripted :class:Transport for tests.

Parameters:

Name Type Description Default
script Mapping[bytes, ScriptedReply] | None

Mapping of write_bytes → reply. Every known write queues the corresponding reply into the read buffer. Unknown writes are recorded but produce no reply — subsequent reads will then hit idle_timeout / timeout, which is the intended failure mode (tests see a real timeout if they forgot to script a command).

None
label str

Identifier used in errors.

'fake://test'
latency_s float

Per-operation artificial delay, useful for simulating a slow device.

0.0
Source code in src/alicatlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    label: str = "fake://test",
    latency_s: float = 0.0,
) -> None:
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._writes: list[bytes] = []
    self._read_buffer = bytearray()
    self._is_open = False
    self._label = label
    self._latency_s = latency_s
    self._force_read_timeout = False
    self._force_write_timeout = False
    self._force_disconnected = False
    # Track calls to ``reopen`` so baud-change tests can assert the
    # transport observed the reconfiguration. ``None`` until
    # :meth:`reopen` is called; then holds the last requested
    # baudrate. ``reopen_count`` lets tests distinguish "never
    # called" from "called with the default baud".
    self._last_reopen_baud: int | None = None
    self._reopen_count: int = 0
    self._force_reopen_error: bool = False

last_reopen_baud property

last_reopen_baud

Baud rate requested by the most recent :meth:reopen, or None.

reopen_count property

reopen_count

Number of :meth:reopen calls since construction.

writes property

writes

Every write payload recorded since construction, in order.

add_script

add_script(command, reply)

Register or overwrite a scripted reply for command.

Source code in src/alicatlib/transport/fake.py
def add_script(self, command: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite a scripted reply for ``command``."""
    self._script[bytes(command)] = reply

feed

feed(data)

Push unsolicited bytes into the read buffer.

Useful for simulating a device that was left streaming, or garbage on the line that the session must drain on recovery.

Source code in src/alicatlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the read buffer.

    Useful for simulating a device that was left streaming, or garbage on
    the line that the session must drain on recovery.
    """
    self._read_buffer.extend(data)

force_read_timeout

force_read_timeout(enabled=True)

Force the next :meth:read_until to raise AlicatTimeoutError.

Source code in src/alicatlib/transport/fake.py
def force_read_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`read_until` to raise ``AlicatTimeoutError``."""
    self._force_read_timeout = enabled

force_reopen_error

force_reopen_error(enabled=True)

Force the next :meth:reopen to raise :class:AlicatConnectionError.

Used by :class:Session.change_baud_rate tests to exercise the BROKEN-state transition without a real serial adapter.

Source code in src/alicatlib/transport/fake.py
def force_reopen_error(self, enabled: bool = True) -> None:
    """Force the next :meth:`reopen` to raise :class:`AlicatConnectionError`.

    Used by :class:`Session.change_baud_rate` tests to exercise
    the BROKEN-state transition without a real serial adapter.
    """
    self._force_reopen_error = enabled

force_write_timeout

force_write_timeout(enabled=True)

Force the next :meth:write to raise AlicatTimeoutError.

Source code in src/alicatlib/transport/fake.py
def force_write_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`write` to raise ``AlicatTimeoutError``."""
    self._force_write_timeout = enabled

reopen async

reopen(*, baudrate)

Simulate a baud-rate change — close, record, reopen.

If force_reopen_error() has been called the reopen raises :class:AlicatConnectionError after the close, leaving the transport closed. That's the "reopen wedged" path tested at the session layer for :attr:SessionState.BROKEN transitions.

Source code in src/alicatlib/transport/fake.py
async def reopen(self, *, baudrate: int) -> None:
    """Simulate a baud-rate change — close, record, reopen.

    If ``force_reopen_error()`` has been called the reopen raises
    :class:`AlicatConnectionError` after the close, leaving the
    transport closed. That's the "reopen wedged" path tested at
    the session layer for :attr:`SessionState.BROKEN` transitions.
    """
    await self.close()
    self._reopen_count += 1
    self._last_reopen_baud = baudrate
    if self._force_reopen_error:
        raise AlicatConnectionError(
            f"forced reopen error on {self._label}",
            context=ErrorContext(port=self._label),
        )
    await self.open()