Skip to content

alicatlib.protocol

Single-in-flight protocol client, wire parsers, and framing helpers. See Design §5.2.

Public surface

alicatlib.protocol

Protocol layer — frames commands and parses responses.

See docs/design.md §5.2, §5.11.

AlicatProtocolClient

AlicatProtocolClient(
    transport,
    *,
    eol=EOL,
    default_timeout=0.5,
    multiline_timeout=1.0,
    multiline_idle_timeout=0.1,
    write_timeout=0.5,
    drain_before_write=False,
)

Request/response client that serialises commands over a transport.

Each method acquires an internal :class:anyio.Lock for its duration, so callers from different tasks may invoke methods concurrently — the lock queues them.

Source code in src/alicatlib/protocol/client.py
def __init__(
    self,
    transport: Transport,
    *,
    eol: bytes = EOL,
    default_timeout: float = 0.5,
    multiline_timeout: float = 1.0,
    multiline_idle_timeout: float = 0.1,
    write_timeout: float = 0.5,
    drain_before_write: bool = False,
) -> None:
    self._transport = transport
    self._eol = eol
    self._default_timeout = default_timeout
    self._multiline_timeout = multiline_timeout
    self._multiline_idle_timeout = multiline_idle_timeout
    self._write_timeout = write_timeout
    self._drain_before_write = drain_before_write
    self._lock = anyio.Lock()
    self._idle_timeout_exits = 0
    # Streaming-mode latch. Set to True by
    # :class:`~alicatlib.devices.streaming.StreamingSession` on
    # entry; every :class:`~alicatlib.devices.session.Session`
    # sharing this client refuses to dispatch commands while True
    # (design §5.8). One streamer per port is the hard invariant;
    # the latch is the mechanism that enforces it across sessions
    # that all share this client.
    self._streaming = False

eol property

eol

The EOL terminator this client expects on read boundaries.

idle_timeout_exits property

idle_timeout_exits

Number of :meth:query_lines calls that exited via idle-timeout.

Rises when commands don't declare is_complete / max_lines — treat a growing counter for a specific command as a bug report against that command's spec (see design §5.2, §5.4).

is_streaming property

is_streaming

True while a :class:StreamingSession owns this client.

Set by :class:~alicatlib.devices.streaming.StreamingSession on entry and cleared on exit. The :class:~alicatlib.devices.session.Session dispatch path consults this and fails fast with :class:~alicatlib.errors.AlicatStreamingModeError rather than writing a command onto a bus the device is already flooding with unsolicited frames (design §5.8).

lock property

lock

Port-level command lock, shared across every :class:Session.

Normal command dispatch goes through :meth:query_line / :meth:query_lines / :meth:write_only, which acquire this lock internally. Lifecycle-changing operations on :class:Session (change_unit_id, change_baud_rate) need to hold the lock for a multi-step sequence — they borrow it directly so the device and client stay in sync across the write → verify → reconfigure boundary. See design §5.7.

transport property

transport

Underlying :class:Transport.

Exposed for lifecycle operations that need direct byte-level access under the shared lock (change_baud_rate needs :meth:Transport.reopen). Normal command dispatch should stay on the public query_* / write_only API.

guard_response

guard_response(response, *, command)

Public alias for :meth:_guard_response.

Session lifecycle paths that bypass :meth:query_line still need the ?-rejection / empty-response guards; exposing the check lets them get the same error shape without duplicating the regex.

Source code in src/alicatlib/protocol/client.py
def guard_response(self, response: bytes, *, command: bytes) -> None:
    """Public alias for :meth:`_guard_response`.

    Session lifecycle paths that bypass :meth:`query_line` still
    need the ``?``-rejection / empty-response guards; exposing the
    check lets them get the same error shape without duplicating
    the regex.
    """
    self._guard_response(response, command=command)

query_line async

query_line(command, *, timeout=None, write_timeout=None)

Send a single-line command and return the single-line response.

The returned bytes have the EOL already stripped. A bare ? / unit-id-prefixed ? surfaces as :class:AlicatCommandRejectedError; an empty response surfaces as :class:AlicatProtocolError.

Source code in src/alicatlib/protocol/client.py
async def query_line(
    self,
    command: bytes,
    *,
    timeout: float | None = None,
    write_timeout: float | None = None,
) -> bytes:
    """Send a single-line command and return the single-line response.

    The returned bytes have the EOL already stripped. A bare ``?`` /
    unit-id-prefixed ``?`` surfaces as :class:`AlicatCommandRejectedError`;
    an empty response surfaces as :class:`AlicatProtocolError`.
    """
    read_to = timeout if timeout is not None else self._default_timeout
    write_to = write_timeout if write_timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)
        raw = await self._transport.read_until(self._eol, timeout=read_to)
        self._trace_rx(raw)
        stripped = strip_eol(raw, eol=self._eol)
        try:
            self._guard_response(stripped, command=command)
        except (AlicatCommandRejectedError, AlicatProtocolError):
            # Some firmware emits a two-part reply on rejection (a bare
            # `\r` then `?\r`, observed on 6v21 FPF-on-absent-statistic).
            # Sleep briefly so trailing bytes land, then drain so the
            # next command starts clean.
            await anyio.sleep(0.02)
            await self._transport.drain_input()
            raise
        return stripped

query_lines async

query_lines(
    command,
    *,
    first_timeout=None,
    idle_timeout=None,
    max_lines=None,
    is_complete=None,
    write_timeout=None,
)

Send a multiline command and collect lines until termination.

Termination priority (design §5.2):

  1. is_complete(lines) returns True — caller-supplied predicate for tables with a computable end condition.
  2. len(lines) >= max_lines — hard cap, useful for fixed-shape tables like ??M* (10 lines).
  3. idle_timeout expires — fallback for unknown-length responses. Increments :attr:idle_timeout_exits; the slow path.

Returned bytes have EOL stripped.

Source code in src/alicatlib/protocol/client.py
async def query_lines(
    self,
    command: bytes,
    *,
    first_timeout: float | None = None,
    idle_timeout: float | None = None,
    max_lines: int | None = None,
    is_complete: Callable[[Sequence[bytes]], bool] | None = None,
    write_timeout: float | None = None,
) -> tuple[bytes, ...]:
    """Send a multiline command and collect lines until termination.

    Termination priority (design §5.2):

    1. ``is_complete(lines)`` returns ``True`` — caller-supplied predicate
       for tables with a computable end condition.
    2. ``len(lines) >= max_lines`` — hard cap, useful for fixed-shape
       tables like ``??M*`` (10 lines).
    3. ``idle_timeout`` expires — fallback for unknown-length responses.
       Increments :attr:`idle_timeout_exits`; the slow path.

    Returned bytes have EOL stripped.
    """
    first_to = first_timeout if first_timeout is not None else self._multiline_timeout
    idle_to = idle_timeout if idle_timeout is not None else self._multiline_idle_timeout
    write_to = write_timeout if write_timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)
        raw_first = await self._transport.read_until(self._eol, timeout=first_to)
        self._trace_rx(raw_first)
        first = strip_eol(raw_first, eol=self._eol)
        try:
            self._guard_response(first, command=command)
        except (AlicatCommandRejectedError, AlicatProtocolError):
            # Drain residual bytes so the next command starts clean —
            # cf. the same handling in ``query_line``.
            await anyio.sleep(0.02)
            await self._transport.drain_input()
            raise
        lines: list[bytes] = [first]
        while True:
            if is_complete is not None and is_complete(lines):
                break
            if max_lines is not None and len(lines) >= max_lines:
                break
            try:
                raw_next = await self._transport.read_until(self._eol, timeout=idle_to)
            except AlicatTimeoutError:
                # Fall-through: no more lines arrived within idle_to.
                # Don't treat as error — it's a legitimate termination.
                self._idle_timeout_exits += 1
                break
            self._trace_rx(raw_next)
            line = strip_eol(raw_next, eol=self._eol)
            lines.append(line)
        return tuple(lines)

reset_idle_timeout_metric

reset_idle_timeout_metric()

Reset :attr:idle_timeout_exits to zero. Primarily for tests.

Source code in src/alicatlib/protocol/client.py
def reset_idle_timeout_metric(self) -> None:
    """Reset :attr:`idle_timeout_exits` to zero. Primarily for tests."""
    self._idle_timeout_exits = 0

write_only async

write_only(command, *, timeout=None)

Send a command with no expected reply (e.g. @@ stop-stream).

Source code in src/alicatlib/protocol/client.py
async def write_only(
    self,
    command: bytes,
    *,
    timeout: float | None = None,
) -> None:
    """Send a command with no expected reply (e.g. ``@@ stop-stream``)."""
    write_to = timeout if timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)

parse_fields

parse_fields(raw, *, command, expected_count=None)

Split a whitespace-delimited response into fields.

Parameters:

Name Type Description Default
raw str

Response text (already ASCII-decoded).

required
command str

Command name, for the error message.

required
expected_count int | None

If given, enforce exactly this many fields.

None

Returns:

Type Description
list[str]

The list of non-empty fields.

Raises:

Type Description
AlicatParseError

If expected_count is set and the actual count differs.

Source code in src/alicatlib/protocol/parser.py
def parse_fields(
    raw: str,
    *,
    command: str,
    expected_count: int | None = None,
) -> list[str]:
    """Split a whitespace-delimited response into fields.

    Args:
        raw: Response text (already ASCII-decoded).
        command: Command name, for the error message.
        expected_count: If given, enforce exactly this many fields.

    Returns:
        The list of non-empty fields.

    Raises:
        AlicatParseError: If ``expected_count`` is set and the actual count
            differs.
    """
    fields = raw.split()
    if expected_count is not None and len(fields) != expected_count:
        raise AlicatParseError(
            f"{command}: expected {expected_count} fields, got {len(fields)}{raw!r}",
            field_name="fields",
            expected=expected_count,
            actual=len(fields),
            context=ErrorContext(command_name=command, raw_response=raw.encode("ascii", "replace")),
        )
    return fields

parse_float

parse_float(value, *, field)

Parse value as a float, raising on failure.

Source code in src/alicatlib/protocol/parser.py
def parse_float(value: str, *, field: str) -> float:
    """Parse ``value`` as a float, raising on failure."""
    try:
        return float(value)
    except ValueError as exc:
        raise AlicatParseError(
            f"could not parse float from {value!r} (field={field})",
            field_name=field,
            expected="float",
            actual=value,
        ) from exc

parse_int

parse_int(value, *, field)

Parse value as a base-10 integer, raising on failure.

Source code in src/alicatlib/protocol/parser.py
def parse_int(value: str, *, field: str) -> int:
    """Parse ``value`` as a base-10 integer, raising on failure."""
    try:
        return int(value)
    except ValueError as exc:
        raise AlicatParseError(
            f"could not parse integer from {value!r} (field={field})",
            field_name=field,
            expected="integer",
            actual=value,
        ) from exc

strip_eol

strip_eol(data, *, eol=EOL)

Return data without a trailing eol.

Idempotent: if data already lacks the EOL, it is returned unchanged.

Source code in src/alicatlib/protocol/framing.py
def strip_eol(data: bytes, *, eol: bytes = EOL) -> bytes:
    """Return ``data`` without a trailing ``eol``.

    Idempotent: if ``data`` already lacks the EOL, it is returned unchanged.
    """
    if data.endswith(eol):
        return data[: -len(eol)]
    return data

Protocol client

alicatlib.protocol.client

One-in-flight request/response client over a :class:Transport.

:class:AlicatProtocolClient is the narrow waist between the command layer (which knows Alicat semantics) and the transport layer (which knows bytes). It enforces:

  • Exactly one command in flight per client, via :class:anyio.Lock.
  • Every write bounded by an explicit write_timeout — read vs write timeouts are tagged distinctly in :class:ErrorContext so observability can tell a jammed bus from a non-responsive device.
  • Multiline termination priority: is_complete(lines)max_lines → idle-timeout fallback. The fallback is the slow path; a metric counts how often each command falls through to it so we can find commands missing their termination contract.

Design reference: docs/design.md §5.2.

AlicatProtocolClient

AlicatProtocolClient(
    transport,
    *,
    eol=EOL,
    default_timeout=0.5,
    multiline_timeout=1.0,
    multiline_idle_timeout=0.1,
    write_timeout=0.5,
    drain_before_write=False,
)

Request/response client that serialises commands over a transport.

Each method acquires an internal :class:anyio.Lock for its duration, so callers from different tasks may invoke methods concurrently — the lock queues them.

Source code in src/alicatlib/protocol/client.py
def __init__(
    self,
    transport: Transport,
    *,
    eol: bytes = EOL,
    default_timeout: float = 0.5,
    multiline_timeout: float = 1.0,
    multiline_idle_timeout: float = 0.1,
    write_timeout: float = 0.5,
    drain_before_write: bool = False,
) -> None:
    self._transport = transport
    self._eol = eol
    self._default_timeout = default_timeout
    self._multiline_timeout = multiline_timeout
    self._multiline_idle_timeout = multiline_idle_timeout
    self._write_timeout = write_timeout
    self._drain_before_write = drain_before_write
    self._lock = anyio.Lock()
    self._idle_timeout_exits = 0
    # Streaming-mode latch. Set to True by
    # :class:`~alicatlib.devices.streaming.StreamingSession` on
    # entry; every :class:`~alicatlib.devices.session.Session`
    # sharing this client refuses to dispatch commands while True
    # (design §5.8). One streamer per port is the hard invariant;
    # the latch is the mechanism that enforces it across sessions
    # that all share this client.
    self._streaming = False

eol property

eol

The EOL terminator this client expects on read boundaries.

idle_timeout_exits property

idle_timeout_exits

Number of :meth:query_lines calls that exited via idle-timeout.

Rises when commands don't declare is_complete / max_lines — treat a growing counter for a specific command as a bug report against that command's spec (see design §5.2, §5.4).

is_streaming property

is_streaming

True while a :class:StreamingSession owns this client.

Set by :class:~alicatlib.devices.streaming.StreamingSession on entry and cleared on exit. The :class:~alicatlib.devices.session.Session dispatch path consults this and fails fast with :class:~alicatlib.errors.AlicatStreamingModeError rather than writing a command onto a bus the device is already flooding with unsolicited frames (design §5.8).

lock property

lock

Port-level command lock, shared across every :class:Session.

Normal command dispatch goes through :meth:query_line / :meth:query_lines / :meth:write_only, which acquire this lock internally. Lifecycle-changing operations on :class:Session (change_unit_id, change_baud_rate) need to hold the lock for a multi-step sequence — they borrow it directly so the device and client stay in sync across the write → verify → reconfigure boundary. See design §5.7.

transport property

transport

Underlying :class:Transport.

Exposed for lifecycle operations that need direct byte-level access under the shared lock (change_baud_rate needs :meth:Transport.reopen). Normal command dispatch should stay on the public query_* / write_only API.

guard_response

guard_response(response, *, command)

Public alias for :meth:_guard_response.

Session lifecycle paths that bypass :meth:query_line still need the ?-rejection / empty-response guards; exposing the check lets them get the same error shape without duplicating the regex.

Source code in src/alicatlib/protocol/client.py
def guard_response(self, response: bytes, *, command: bytes) -> None:
    """Public alias for :meth:`_guard_response`.

    Session lifecycle paths that bypass :meth:`query_line` still
    need the ``?``-rejection / empty-response guards; exposing the
    check lets them get the same error shape without duplicating
    the regex.
    """
    self._guard_response(response, command=command)

query_line async

query_line(command, *, timeout=None, write_timeout=None)

Send a single-line command and return the single-line response.

The returned bytes have the EOL already stripped. A bare ? / unit-id-prefixed ? surfaces as :class:AlicatCommandRejectedError; an empty response surfaces as :class:AlicatProtocolError.

Source code in src/alicatlib/protocol/client.py
async def query_line(
    self,
    command: bytes,
    *,
    timeout: float | None = None,
    write_timeout: float | None = None,
) -> bytes:
    """Send a single-line command and return the single-line response.

    The returned bytes have the EOL already stripped. A bare ``?`` /
    unit-id-prefixed ``?`` surfaces as :class:`AlicatCommandRejectedError`;
    an empty response surfaces as :class:`AlicatProtocolError`.
    """
    read_to = timeout if timeout is not None else self._default_timeout
    write_to = write_timeout if write_timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)
        raw = await self._transport.read_until(self._eol, timeout=read_to)
        self._trace_rx(raw)
        stripped = strip_eol(raw, eol=self._eol)
        try:
            self._guard_response(stripped, command=command)
        except (AlicatCommandRejectedError, AlicatProtocolError):
            # Some firmware emits a two-part reply on rejection (a bare
            # `\r` then `?\r`, observed on 6v21 FPF-on-absent-statistic).
            # Sleep briefly so trailing bytes land, then drain so the
            # next command starts clean.
            await anyio.sleep(0.02)
            await self._transport.drain_input()
            raise
        return stripped

query_lines async

query_lines(
    command,
    *,
    first_timeout=None,
    idle_timeout=None,
    max_lines=None,
    is_complete=None,
    write_timeout=None,
)

Send a multiline command and collect lines until termination.

Termination priority (design §5.2):

  1. is_complete(lines) returns True — caller-supplied predicate for tables with a computable end condition.
  2. len(lines) >= max_lines — hard cap, useful for fixed-shape tables like ??M* (10 lines).
  3. idle_timeout expires — fallback for unknown-length responses. Increments :attr:idle_timeout_exits; the slow path.

Returned bytes have EOL stripped.

Source code in src/alicatlib/protocol/client.py
async def query_lines(
    self,
    command: bytes,
    *,
    first_timeout: float | None = None,
    idle_timeout: float | None = None,
    max_lines: int | None = None,
    is_complete: Callable[[Sequence[bytes]], bool] | None = None,
    write_timeout: float | None = None,
) -> tuple[bytes, ...]:
    """Send a multiline command and collect lines until termination.

    Termination priority (design §5.2):

    1. ``is_complete(lines)`` returns ``True`` — caller-supplied predicate
       for tables with a computable end condition.
    2. ``len(lines) >= max_lines`` — hard cap, useful for fixed-shape
       tables like ``??M*`` (10 lines).
    3. ``idle_timeout`` expires — fallback for unknown-length responses.
       Increments :attr:`idle_timeout_exits`; the slow path.

    Returned bytes have EOL stripped.
    """
    first_to = first_timeout if first_timeout is not None else self._multiline_timeout
    idle_to = idle_timeout if idle_timeout is not None else self._multiline_idle_timeout
    write_to = write_timeout if write_timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)
        raw_first = await self._transport.read_until(self._eol, timeout=first_to)
        self._trace_rx(raw_first)
        first = strip_eol(raw_first, eol=self._eol)
        try:
            self._guard_response(first, command=command)
        except (AlicatCommandRejectedError, AlicatProtocolError):
            # Drain residual bytes so the next command starts clean —
            # cf. the same handling in ``query_line``.
            await anyio.sleep(0.02)
            await self._transport.drain_input()
            raise
        lines: list[bytes] = [first]
        while True:
            if is_complete is not None and is_complete(lines):
                break
            if max_lines is not None and len(lines) >= max_lines:
                break
            try:
                raw_next = await self._transport.read_until(self._eol, timeout=idle_to)
            except AlicatTimeoutError:
                # Fall-through: no more lines arrived within idle_to.
                # Don't treat as error — it's a legitimate termination.
                self._idle_timeout_exits += 1
                break
            self._trace_rx(raw_next)
            line = strip_eol(raw_next, eol=self._eol)
            lines.append(line)
        return tuple(lines)

reset_idle_timeout_metric

reset_idle_timeout_metric()

Reset :attr:idle_timeout_exits to zero. Primarily for tests.

Source code in src/alicatlib/protocol/client.py
def reset_idle_timeout_metric(self) -> None:
    """Reset :attr:`idle_timeout_exits` to zero. Primarily for tests."""
    self._idle_timeout_exits = 0

write_only async

write_only(command, *, timeout=None)

Send a command with no expected reply (e.g. @@ stop-stream).

Source code in src/alicatlib/protocol/client.py
async def write_only(
    self,
    command: bytes,
    *,
    timeout: float | None = None,
) -> None:
    """Send a command with no expected reply (e.g. ``@@ stop-stream``)."""
    write_to = timeout if timeout is not None else self._write_timeout
    async with self._lock:
        await self._prepare_for_write()
        self._trace_tx(command)
        await self._transport.write(command, timeout=write_to)

Parsers

alicatlib.protocol.parser

Shared response parsing helpers.

Covers the full parser surface per design §5.11: primitive decoders (parse_ascii, parse_fields, parse_int, parse_float, parse_optional_float, parse_bool_code, parse_enum_code), the all-firmware VE decoder (parse_ve_response), the status-code helper (parse_status_codes), and the table / frame parsers used by identification and polling (parse_manufacturing_info, parse_data_frame_table, parse_data_frame).

Every helper raises :class:alicatlib.errors.AlicatParseError with the raw response preserved in :class:alicatlib.errors.ErrorContext so debugging a bad reply never requires adding print statements.

parse_bool_code

parse_bool_code(
    value, *, field, mapping=_DEFAULT_BOOL_MAPPING
)

Parse a boolean-coded field (default: "1" → True, "0" → False).

Parameters:

Name Type Description Default
value str

The wire-level field.

required
field str

Field name, for the error message.

required
mapping Mapping[str, bool]

Accepted string-to-bool pairs. Override for commands that use non-standard codes (e.g. some commands use "Y" / "N").

_DEFAULT_BOOL_MAPPING

Raises:

Type Description
AlicatParseError

If value is not a key in mapping.

Source code in src/alicatlib/protocol/parser.py
def parse_bool_code(
    value: str,
    *,
    field: str,
    mapping: Mapping[str, bool] = _DEFAULT_BOOL_MAPPING,
) -> bool:
    """Parse a boolean-coded field (default: ``"1"`` → True, ``"0"`` → False).

    Args:
        value: The wire-level field.
        field: Field name, for the error message.
        mapping: Accepted string-to-bool pairs. Override for commands that use
            non-standard codes (e.g. some commands use ``"Y"`` / ``"N"``).

    Raises:
        AlicatParseError: If ``value`` is not a key in ``mapping``.
    """
    try:
        return mapping[value]
    except KeyError as exc:
        accepted = ", ".join(repr(k) for k in mapping)
        raise AlicatParseError(
            f"could not parse bool from {value!r} (field={field}, accepted={accepted})",
            field_name=field,
            expected=tuple(mapping),
            actual=value,
        ) from exc

parse_data_frame

parse_data_frame(raw, fmt)

Parse raw against fmt into a :class:ParsedFrame.

Thin delegator to :meth:DataFrameFormat.parse. Provided as a free-function alias so all low-level parsers share one import site (:mod:alicatlib.protocol.parser), matching the rest of design §5.11. Pure — no clocks; the session captures received_at / monotonic_ns and wraps via :meth:DataFrame.from_parsed.

Source code in src/alicatlib/protocol/parser.py
def parse_data_frame(raw: bytes, fmt: DataFrameFormat) -> ParsedFrame:
    """Parse ``raw`` against ``fmt`` into a :class:`ParsedFrame`.

    Thin delegator to :meth:`DataFrameFormat.parse`. Provided as a
    free-function alias so all low-level parsers share one import site
    (:mod:`alicatlib.protocol.parser`), matching the rest of design §5.11.
    Pure — no clocks; the session captures ``received_at`` /
    ``monotonic_ns`` and wraps via :meth:`DataFrame.from_parsed`.
    """
    return fmt.parse(raw)

parse_data_frame_table

parse_data_frame_table(lines)

Parse a ??D* response into a :class:DataFrameFormat.

Auto-detects the dialect by sniffing the column-header row, then dispatches to the appropriate per-dialect parser:

  • V8+ dialect (DEFAULT) — V8/V9 + V10 captures (design §16.6). Header: <uid> D00 ID_ NAME... TYPE... WIDTH NOTES.... Field rows carry a stat-code column and conditional rows are marked with a leading * on the name.
  • V1_V7 dialect — 5v12 capture (design §16.6.2). Header: <uid> D00 NAME... TYPE... MinVal MaxVal UNITS.... No stat-code column, no * marker; engineering units sit in the trailing column.

Per-field :attr:DataFrameField.unit is bound inline when the dialect carries a recognisable unit label.

Raises:

Type Description
AlicatParseError

Non-ASCII bytes, or no field lines were recognised in either dialect.

Source code in src/alicatlib/protocol/parser.py
def parse_data_frame_table(lines: Sequence[bytes]) -> DataFrameFormat:
    """Parse a ``??D*`` response into a :class:`DataFrameFormat`.

    Auto-detects the dialect by sniffing the column-header row, then
    dispatches to the appropriate per-dialect parser:

    - **V8+ dialect (DEFAULT)** — V8/V9 + V10 captures (design §16.6).
      Header: ``<uid> D00 ID_ NAME... TYPE... WIDTH NOTES...``.
      Field rows carry a stat-code column and conditional rows are
      marked with a leading ``*`` on the name.
    - **V1_V7 dialect** — 5v12 capture (design §16.6.2). Header:
      ``<uid>  D00 NAME... TYPE... MinVal MaxVal UNITS...``. No
      stat-code column, no ``*`` marker; engineering units sit in the
      trailing column.

    Per-field :attr:`DataFrameField.unit` is bound inline when the
    dialect carries a recognisable unit label.

    Raises:
        AlicatParseError: Non-ASCII bytes, or no field lines were
            recognised in either dialect.
    """
    flavor = _df_detect_flavor(lines)
    if flavor is DataFrameFormatFlavor.LEGACY:
        return _parse_data_frame_table_v1_v7(lines)
    return _parse_data_frame_table_default(lines)

parse_enum_code

parse_enum_code(value, *, field, registry)

Parse a numeric code and resolve it against registry.

Wraps :meth:alicatlib.registry.aliases.AliasRegistry.by_code so that unknown codes coming from the device surface as :class:AlicatParseError (a protocol-layer problem) rather than :class:UnknownGasError / :class:UnknownStatisticError (config-layer problems from user input). The original registry error is preserved as __cause__.

Only usable with :class:AliasRegistry (unique-code enums: Gas, Statistic). Unit lookups require a :class:UnitCategory disambiguator and are handled at the data-frame-field parsing layer where the category is known.

Source code in src/alicatlib/protocol/parser.py
def parse_enum_code[E: "Gas | Statistic | Unit"](
    value: str,
    *,
    field: str,
    registry: AliasRegistry[E],
) -> E:
    """Parse a numeric code and resolve it against ``registry``.

    Wraps :meth:`alicatlib.registry.aliases.AliasRegistry.by_code` so that
    unknown codes coming from the device surface as :class:`AlicatParseError`
    (a protocol-layer problem) rather than :class:`UnknownGasError` /
    :class:`UnknownStatisticError` (config-layer problems from user input).
    The original registry error is preserved as ``__cause__``.

    Only usable with :class:`AliasRegistry` (unique-code enums: Gas, Statistic).
    Unit lookups require a :class:`UnitCategory` disambiguator and are handled
    at the data-frame-field parsing layer where the category is known.
    """
    code = parse_int(value, field=field)
    try:
        return registry.by_code(code)
    except (UnknownGasError, UnknownStatisticError, UnknownUnitError) as exc:
        raise AlicatParseError(
            f"unknown code {code} for {field}",
            field_name=field,
            expected="known registry code",
            actual=code,
        ) from exc

parse_fields

parse_fields(raw, *, command, expected_count=None)

Split a whitespace-delimited response into fields.

Parameters:

Name Type Description Default
raw str

Response text (already ASCII-decoded).

required
command str

Command name, for the error message.

required
expected_count int | None

If given, enforce exactly this many fields.

None

Returns:

Type Description
list[str]

The list of non-empty fields.

Raises:

Type Description
AlicatParseError

If expected_count is set and the actual count differs.

Source code in src/alicatlib/protocol/parser.py
def parse_fields(
    raw: str,
    *,
    command: str,
    expected_count: int | None = None,
) -> list[str]:
    """Split a whitespace-delimited response into fields.

    Args:
        raw: Response text (already ASCII-decoded).
        command: Command name, for the error message.
        expected_count: If given, enforce exactly this many fields.

    Returns:
        The list of non-empty fields.

    Raises:
        AlicatParseError: If ``expected_count`` is set and the actual count
            differs.
    """
    fields = raw.split()
    if expected_count is not None and len(fields) != expected_count:
        raise AlicatParseError(
            f"{command}: expected {expected_count} fields, got {len(fields)}{raw!r}",
            field_name="fields",
            expected=expected_count,
            actual=len(fields),
            context=ErrorContext(command_name=command, raw_response=raw.encode("ascii", "replace")),
        )
    return fields

parse_float

parse_float(value, *, field)

Parse value as a float, raising on failure.

Source code in src/alicatlib/protocol/parser.py
def parse_float(value: str, *, field: str) -> float:
    """Parse ``value`` as a float, raising on failure."""
    try:
        return float(value)
    except ValueError as exc:
        raise AlicatParseError(
            f"could not parse float from {value!r} (field={field})",
            field_name=field,
            expected="float",
            actual=value,
        ) from exc

parse_gas_list

parse_gas_list(lines)

Parse a ??G* response into {gas_code: short_name}.

Real V10 wire shape (verified 2026-04-17 on a MC-500SCCM-D, design §16.6)::

<unit_id> G<NN>      <short_name>

The integer in the G<NN> row label is the device-side gas code (which coincides with the canonical Appendix-C code for the built-in gases G00..G29 and continues with mixture/specialty slots beyond). The short name (e.g. Air, CH4, N2) is right-aligned in a fixed-width column; we collapse the leading whitespace.

Invariants:

  • A consistent unit_id across every parsed line — mismatch raises :class:AlicatUnitIdMismatchError.
  • Duplicate gas codes raise :class:AlicatParseError rather than silently overwriting — a duplicate would mask firmware oddities.
  • Empty responses or responses with no recognisable gas lines raise :class:AlicatParseError; the ??G* command always has at least one built-in gas on any supported device.

Returns:

Type Description
dict[int, str]

Mapping from Alicat gas code (per-device, often matching primer

dict[int, str]

Appendix C for built-ins) to the wire short name.

dict[int, str]

meth:gas_registry.coerce resolves the short name to the typed

dict[int, str]

class:~alicatlib.registry.Gas member.

Source code in src/alicatlib/protocol/parser.py
def parse_gas_list(lines: Sequence[bytes]) -> dict[int, str]:
    """Parse a ``??G*`` response into ``{gas_code: short_name}``.

    Real V10 wire shape (verified 2026-04-17 on a MC-500SCCM-D, design
    §16.6)::

        <unit_id> G<NN>      <short_name>

    The integer in the ``G<NN>`` row label is the device-side gas code
    (which coincides with the canonical Appendix-C code for the built-in
    gases G00..G29 and continues with mixture/specialty slots beyond).
    The short name (e.g. ``Air``, ``CH4``, ``N2``) is right-aligned in a
    fixed-width column; we collapse the leading whitespace.

    Invariants:

    - A consistent ``unit_id`` across every parsed line — mismatch
      raises :class:`AlicatUnitIdMismatchError`.
    - Duplicate gas codes raise :class:`AlicatParseError` rather than
      silently overwriting — a duplicate would mask firmware oddities.
    - Empty responses or responses with no recognisable gas lines raise
      :class:`AlicatParseError`; the ``??G*`` command always has at
      least one built-in gas on any supported device.

    Returns:
        Mapping from Alicat gas code (per-device, often matching primer
        Appendix C for built-ins) to the wire short name.
        :meth:`gas_registry.coerce` resolves the short name to the typed
        :class:`~alicatlib.registry.Gas` member.
    """
    if not lines:
        raise AlicatParseError(
            "??G*: empty response",
            field_name="gas_list",
            expected=">=1 line",
            actual=0,
            context=ErrorContext(command_name="??G*"),
        )

    # GP firmware returns the full list on one line; split it before the
    # per-line loop so every downstream invariant stays the same.
    lines = _maybe_split_gp_inline_gas_list(lines)

    unit_id: str | None = None
    by_code: dict[int, str] = {}

    for raw_line in lines:
        text = _strip_gp_padding(parse_ascii(raw_line).rstrip("\r\n"))
        if not text.strip():
            continue
        m = _GAS_LIST_LINE_RE.match(text)
        if m is None:
            # Lines that don't match the row shape (preamble, blank, etc.)
            # are silently skipped.
            continue

        label = m.group("label").strip()
        if not label:
            continue

        line_uid = m.group("uid")
        code = int(m.group("code"))

        if unit_id is None:
            unit_id = line_uid
        elif unit_id != line_uid:
            raise AlicatUnitIdMismatchError(
                f"??G*: unit_id {line_uid!r} on G{m.group('code')} line does not match {unit_id!r}",
                context=ErrorContext(
                    command_name="??G*",
                    unit_id=unit_id,
                    raw_response=raw_line,
                ),
            )

        if code in by_code:
            raise AlicatParseError(
                f"??G*: duplicate gas code {code}{text!r}",
                field_name="gas_list_code",
                expected="unique gas codes",
                actual=code,
                context=ErrorContext(command_name="??G*", raw_response=raw_line),
            )

        by_code[code] = label

    if not by_code:
        raise AlicatParseError(
            "??G*: no gas entries recognised in response",
            field_name="gas_list",
            expected=">=1 gas entry",
            actual=0,
            context=ErrorContext(command_name="??G*"),
        )

    return by_code

parse_int

parse_int(value, *, field)

Parse value as a base-10 integer, raising on failure.

Source code in src/alicatlib/protocol/parser.py
def parse_int(value: str, *, field: str) -> int:
    """Parse ``value`` as a base-10 integer, raising on failure."""
    try:
        return int(value)
    except ValueError as exc:
        raise AlicatParseError(
            f"could not parse integer from {value!r} (field={field})",
            field_name=field,
            expected="integer",
            actual=value,
        ) from exc

parse_manufacturing_info

parse_manufacturing_info(lines)

Parse a ??M* response into :class:ManufacturingInfo.

Expected shape (per primer — verified against hardware fixtures as they're captured): a series of <unit_id> M<NN> <payload> lines, one per code. The parser does not pin a line count (firmware versions have varied) but does enforce:

  • A consistent unit_id across every line — a mismatch is an :class:AlicatUnitIdMismatchError (a sign the bus has bled frames from another device).
  • No duplicate M<NN> codes within the response — a duplicate is an :class:AlicatParseError rather than a silent overwrite.
  • Every non-empty line matches the <uid> M<NN> <payload> shape; a malformed line raises rather than being skipped, so firmware-side format drift surfaces instead of being swallowed.

The semantic mapping (M04 → model, M05 → serial, etc.) is intentionally not applied here — it belongs in the factory (:mod:alicatlib.devices.factory) where it can be firmware-version aware and validated against real captures.

Source code in src/alicatlib/protocol/parser.py
def parse_manufacturing_info(lines: Sequence[bytes]) -> ManufacturingInfo:
    """Parse a ``??M*`` response into :class:`ManufacturingInfo`.

    Expected shape (per primer — verified against hardware fixtures as
    they're captured): a series of ``<unit_id> M<NN> <payload>`` lines,
    one per code. The parser does not pin a line count (firmware versions
    have varied) but does enforce:

    - A consistent ``unit_id`` across every line — a mismatch is an
      :class:`AlicatUnitIdMismatchError` (a sign the bus has bled frames
      from another device).
    - No duplicate ``M<NN>`` codes within the response — a duplicate is
      an :class:`AlicatParseError` rather than a silent overwrite.
    - Every non-empty line matches the ``<uid> M<NN> <payload>`` shape;
      a malformed line raises rather than being skipped, so firmware-side
      format drift surfaces instead of being swallowed.

    The *semantic* mapping (``M04`` → model, ``M05`` → serial, etc.) is
    intentionally not applied here — it belongs in the factory
    (:mod:`alicatlib.devices.factory`) where it can be firmware-version
    aware and validated against real captures.
    """
    if not lines:
        raise AlicatParseError(
            "??M*: empty response",
            field_name="manufacturing_info",
            expected=">=1 line",
            actual=0,
            context=ErrorContext(command_name="??M*"),
        )

    unit_id: str | None = None
    by_code: dict[int, str] = {}

    for raw_line in lines:
        # Strip GP backspace-padding before regex match — see _strip_gp_padding.
        text = _strip_gp_padding(parse_ascii(raw_line).rstrip("\r\n"))
        if not text:
            continue
        m = _MFG_LINE_RE.match(text)
        if m is None:
            raise AlicatParseError(
                f"??M*: malformed line {text!r}",
                field_name="manufacturing_info_line",
                expected="<unit_id> M<NN> <payload>",
                actual=text,
                context=ErrorContext(command_name="??M*", raw_response=raw_line),
            )

        line_uid = m.group("uid")
        code = int(m.group("code"))
        payload = (m.group("payload") or "").strip()

        if unit_id is None:
            unit_id = line_uid
        elif unit_id != line_uid:
            raise AlicatUnitIdMismatchError(
                f"??M*: unit_id {line_uid!r} on M{code:02d} line does not match {unit_id!r}",
                context=ErrorContext(
                    command_name="??M*",
                    unit_id=unit_id,
                    raw_response=raw_line,
                ),
            )

        if code in by_code:
            raise AlicatParseError(
                f"??M*: duplicate M{code:02d} line — {text!r}",
                field_name="manufacturing_info_code",
                expected="unique M-codes",
                actual=code,
                context=ErrorContext(command_name="??M*", raw_response=raw_line),
            )

        by_code[code] = payload

    if unit_id is None:
        # All lines were blank — treat as empty response.
        raise AlicatParseError(
            "??M*: response contained only blank lines",
            field_name="manufacturing_info",
            expected=">=1 non-blank line",
            actual=0,
            context=ErrorContext(command_name="??M*"),
        )

    return ManufacturingInfo(unit_id=unit_id, by_code=MappingProxyType(by_code))

parse_optional_float

parse_optional_float(value, *, field)

Parse value as a float, returning None for the "--" sentinel.

Alicat emits -- in a data frame when a field is unavailable on the current device or in the current mode (e.g. setpoint on a flow meter). Callers that want a strict parse should use :func:parse_float directly.

Source code in src/alicatlib/protocol/parser.py
def parse_optional_float(value: str, *, field: str) -> float | None:
    """Parse ``value`` as a float, returning ``None`` for the ``"--"`` sentinel.

    Alicat emits ``--`` in a data frame when a field is unavailable on the
    current device or in the current mode (e.g. setpoint on a flow meter).
    Callers that want a strict parse should use :func:`parse_float` directly.
    """
    if value == _ABSENT_TOKEN:
        return None
    return parse_float(value, field=field)

parse_status_codes

parse_status_codes(tokens)

Collect :class:StatusCode members from a token sequence.

Any token whose value is not a known status code is silently skipped — callers that want "status-only" semantics should pre-slice the tail of their token stream, since status codes are the trailing run of a data frame per primer convention.

Order on the wire is not preserved (returned as a :class:frozenset); the primer does not specify a canonical ordering for multi-code runs.

Source code in src/alicatlib/protocol/parser.py
def parse_status_codes(tokens: Sequence[str]) -> frozenset[StatusCode]:
    """Collect :class:`StatusCode` members from a token sequence.

    Any token whose value is not a known status code is silently skipped —
    callers that want "status-only" semantics should pre-slice the tail of
    their token stream, since status codes are the trailing run of a data
    frame per primer convention.

    Order on the wire is not preserved (returned as a :class:`frozenset`);
    the primer does not specify a canonical ordering for multi-code runs.
    """
    return frozenset(StatusCode(t) for t in tokens if t in _STATUS_VALUES)

parse_ve_response

parse_ve_response(raw)

Parse a VE (firmware version) response.

VE is the one identification command that works on every firmware family — it is the anchor of the identification pipeline (design §5.9). The response shape varies across families: at minimum it contains a firmware token (GP, GP-10v05, 10v05, 1v00, ...); some devices additionally report a firmware date in ISO YYYY-MM-DD form.

This parser is deliberately tolerant: it scans the decoded response for (a) the first firmware-shaped token and (b) the first ISO-date token, regardless of their relative position or surrounding text.

Returns:

Type Description
FirmwareVersion

A (FirmwareVersion, date | None) pair. The date is None when

date | None

the device's firmware does not include one.

Raises:

Type Description
AlicatParseError

If no firmware token can be found. A malformed date (present but unparseable) also raises, rather than silently dropping the field — a garbled date is a sign of line corruption that the caller should see.

Source code in src/alicatlib/protocol/parser.py
def parse_ve_response(raw: bytes) -> tuple[FirmwareVersion, date | None]:
    """Parse a ``VE`` (firmware version) response.

    ``VE`` is the one identification command that works on every firmware
    family — it is the anchor of the identification pipeline (design §5.9).
    The response shape varies across families: at minimum it contains a
    firmware token (``GP``, ``GP-10v05``, ``10v05``, ``1v00``, ...); some
    devices additionally report a firmware date in ISO ``YYYY-MM-DD`` form.

    This parser is deliberately tolerant: it scans the decoded response for
    (a) the first firmware-shaped token and (b) the first ISO-date token,
    regardless of their relative position or surrounding text.

    Returns:
        A ``(FirmwareVersion, date | None)`` pair. The date is ``None`` when
        the device's firmware does not include one.

    Raises:
        AlicatParseError: If no firmware token can be found. A malformed date
            (present but unparseable) also raises, rather than silently
            dropping the field — a garbled date is a sign of line corruption
            that the caller should see.
    """
    text = parse_ascii(raw)
    fw_match = _FIRMWARE_TOKEN_RE.search(text)
    if fw_match is None:
        raise AlicatParseError(
            f"VE: no firmware token in response {text!r}",
            field_name="firmware",
            expected="GP or <major>v<minor>",
            actual=text,
            context=ErrorContext(command_name="VE", raw_response=raw),
        )

    fw = FirmwareVersion.parse(fw_match.group(0))

    iso_match = _ISO_DATE_RE.search(text)
    if iso_match is not None:
        try:
            fw_date = date(
                int(iso_match.group("y")),
                int(iso_match.group("m")),
                int(iso_match.group("d")),
            )
        except ValueError as exc:
            raise AlicatParseError(
                f"VE: malformed date {iso_match.group(0)!r} in response {text!r}",
                field_name="firmware_date",
                expected="YYYY-MM-DD",
                actual=iso_match.group(0),
                context=ErrorContext(command_name="VE", raw_response=raw),
            ) from exc
        return fw, fw_date

    name_match = _MONTH_NAME_DATE_RE.search(text)
    if name_match is not None:
        try:
            fw_date = date(
                int(name_match.group("y")),
                _MONTH_TO_NUM[name_match.group("m").lower()],
                int(name_match.group("d")),
            )
        except ValueError as exc:
            raise AlicatParseError(
                f"VE: malformed date {name_match.group(0)!r} in response {text!r}",
                field_name="firmware_date",
                expected="<Mon> <DD> <YYYY>",
                actual=name_match.group(0),
                context=ErrorContext(command_name="VE", raw_response=raw),
            ) from exc
        return fw, fw_date

    return fw, None

ASCII framing

alicatlib.protocol.framing

Framing primitives shared by the client, command, and device layers.

Alicat responses are carriage-return delimited and ASCII-encoded. Keeping these two facts — EOL handling and ASCII decoding — in a single small, dependency-free module lets higher layers (protocol client, parsers, data-frame format) import from here without risking an import cycle through :mod:alicatlib.protocol.parser.

Design reference: docs/design.md §5.2.

decode_ascii

decode_ascii(raw)

Decode raw as ASCII, raising :class:AlicatParseError on non-ASCII.

The Alicat wire format is ASCII-only — non-ASCII bytes indicate line noise or a framing error, not a legitimate extended-charset response, so raising with the raw bytes preserved is the right behaviour. Re-exported as :func:alicatlib.protocol.parser.parse_ascii for callers that already import from the parser module; implementation lives here so :mod:alicatlib.devices.data_frame can use it without introducing a parser-layer import cycle.

Source code in src/alicatlib/protocol/framing.py
def decode_ascii(raw: bytes) -> str:
    """Decode ``raw`` as ASCII, raising :class:`AlicatParseError` on non-ASCII.

    The Alicat wire format is ASCII-only — non-ASCII bytes indicate line
    noise or a framing error, not a legitimate extended-charset response,
    so raising with the raw bytes preserved is the right behaviour.
    Re-exported as :func:`alicatlib.protocol.parser.parse_ascii` for
    callers that already import from the parser module; implementation
    lives here so :mod:`alicatlib.devices.data_frame` can use it without
    introducing a parser-layer import cycle.
    """
    try:
        return raw.decode("ascii")
    except UnicodeDecodeError as exc:
        raise AlicatParseError(
            f"non-ASCII bytes in response: {raw!r}",
            field_name="response",
            expected="ASCII bytes",
            actual=raw,
            context=ErrorContext(raw_response=raw),
        ) from exc

strip_eol

strip_eol(data, *, eol=EOL)

Return data without a trailing eol.

Idempotent: if data already lacks the EOL, it is returned unchanged.

Source code in src/alicatlib/protocol/framing.py
def strip_eol(data: bytes, *, eol: bytes = EOL) -> bytes:
    """Return ``data`` without a trailing ``eol``.

    Idempotent: if ``data`` already lacks the EOL, it is returned unchanged.
    """
    if data.endswith(eol):
        return data[: -len(eol)]
    return data