Skip to content

sartoriuslib.protocol

ProtocolKind, ProtocolClient, detect_protocol(), and the xBPI / SBI sub-packages. See Design §4 and the wire-protocol reference.

Public surface

sartoriuslib.protocol

Protocol layer — framing, parsing, and protocol-client adapters.

xBPI and SBI each have a full subpackage under here; the shared :class:ProtocolClient protocol and :class:ProtocolKind live at this level. See design doc §2 layer map and §4.

DetectionResult dataclass

DetectionResult(
    protocol, autoprint_active=False, pending_lines=tuple()
)

Outcome of :func:detect_protocol.

Attributes:

Name Type Description
protocol ProtocolKind

The resolved :class:ProtocolKind. Always XBPI or SBI — never AUTO (a successful detect has resolved it).

autoprint_active bool

True only when the SBI passive sniff observed an unsolicited autoprint/status line.

pending_lines tuple[bytes, ...]

Complete CRLF-terminated SBI lines consumed during the sniff that the caller may want to re-queue on the live client. Empty unless autoprint_active is True.

ProtocolClient

Bases: Protocol

One request/response round-trip over a transport.

Every implementation owns an :class:anyio.Lock (exposed as :attr:lock) that serialises calls on the shared transport. Session code does not hold its own lock — it piggybacks on the client's, so two sessions on the same port collapse to one serialized queue when they (exceptionally) share a client.

Errors the device tells us about (xBPI subtype 0x01, SBI refusal lines) are raised as typed exceptions inside :meth:execute — callers decode only success replies. Transport-level failures (timeout, connection) surface as the corresponding :class:sartoriuslib.errors.SartoriusTransportError subclass with __cause__ preserving the original.

disposed property

disposed

Whether the client has been retired by a protocol reconfiguration.

lock property

lock

Shared serialisation lock. Held across one full request/response.

dispose

dispose()

Retire the client so queued waiters fail before writing bytes.

Source code in src/sartoriuslib/protocol/base.py
def dispose(self) -> None:
    """Retire the client so queued waiters fail before writing bytes."""
    ...

execute async

execute(
    request, *, timeout=None, command_name="", opcode=None
)

Send request and return the decoded reply.

Raises:

Type Description
SartoriusTimeoutError

No response within timeout.

SartoriusConnectionError

Port closed or lost mid-exchange.

SartoriusFrameError

Framing / checksum invalid.

SartoriusCapabilityError(subclass)

Device responded with a recognised refusal code (xBPI 0x04, 0x06, 0x03, 0x07, 0x10). Callers should catch specific subclasses and let others propagate.

SartoriusCommandRejectedError

Refusal code outside the classified set.

Source code in src/sartoriuslib/protocol/base.py
async def execute(
    self,
    request: bytes,
    *,
    timeout: float | None = None,
    command_name: str = "",
    opcode: int | None = None,
) -> Reply_co:
    """Send ``request`` and return the decoded reply.

    Raises:
        SartoriusTimeoutError: No response within ``timeout``.
        SartoriusConnectionError: Port closed or lost mid-exchange.
        SartoriusFrameError: Framing / checksum invalid.
        SartoriusCapabilityError (subclass): Device responded with a
            recognised refusal code (xBPI ``0x04``, ``0x06``,
            ``0x03``, ``0x07``, ``0x10``). Callers should catch
            specific subclasses and let others propagate.
        SartoriusCommandRejectedError: Refusal code outside the
            classified set.
    """
    ...

ProtocolKind

Bases: StrEnum

Which wire protocol is active on a session.

AUTO is only valid at open_device call time; by the time a session exists, AUTO has resolved to XBPI or SBI.

detect_protocol async

detect_protocol(
    transport,
    *,
    timeout=_DEFAULT_PROBE_TIMEOUT,
    sniff_window=_DEFAULT_SNIFF_WINDOW,
    src_sbn=HOST_SBN_DEFAULT,
    dst_sbn=BALANCE_SBN_DEFAULT,
)

Detect xBPI vs SBI on an already-open transport.

Runs the conservative sequence from design §4.3 in order — drain → passive sniff → xBPI probe → SBI probe → fail. Each probe writes at most one frame. The transport's serial settings are never changed.

Parameters:

Name Type Description Default
transport Transport

An open :class:Transport. Caller owns lifecycle.

required
timeout float

Per-probe timeout for the xBPI and SBI identity probes.

_DEFAULT_PROBE_TIMEOUT
sniff_window float

Passive listen window for SBI autoprint, in seconds.

_DEFAULT_SNIFF_WINDOW
src_sbn int

Source SBN for the xBPI probe frame (0x01 host convention by default).

HOST_SBN_DEFAULT
dst_sbn int

Destination SBN for the xBPI probe frame (0x09 balance factory default by default).

BALANCE_SBN_DEFAULT

Returns:

Name Type Description
A DetectionResult

class:DetectionResult whose protocol is XBPI or

DetectionResult

SBI. When autoprint_active is True, pending_lines

DetectionResult

carries the sniffed bytes for re-queue.

Raises:

Type Description
SartoriusError

No xBPI or SBI device responded — neither the passive sniff nor either probe produced a recognisable reply. Hard transport faults (e.g. the port closed mid-detect) propagate as :class:SartoriusConnectionError unchanged.

Source code in src/sartoriuslib/protocol/detect.py
async def detect_protocol(
    transport: Transport,
    *,
    timeout: float = _DEFAULT_PROBE_TIMEOUT,
    sniff_window: float = _DEFAULT_SNIFF_WINDOW,
    src_sbn: int = HOST_SBN_DEFAULT,
    dst_sbn: int = BALANCE_SBN_DEFAULT,
) -> DetectionResult:
    """Detect xBPI vs SBI on an already-open ``transport``.

    Runs the conservative sequence from design §4.3 in order — drain →
    passive sniff → xBPI probe → SBI probe → fail. Each probe writes at
    most one frame. The transport's serial settings are never changed.

    Arguments:
        transport: An open :class:`Transport`. Caller owns lifecycle.
        timeout: Per-probe timeout for the xBPI and SBI identity probes.
        sniff_window: Passive listen window for SBI autoprint, in seconds.
        src_sbn: Source SBN for the xBPI probe frame (``0x01`` host
            convention by default).
        dst_sbn: Destination SBN for the xBPI probe frame (``0x09``
            balance factory default by default).

    Returns:
        A :class:`DetectionResult` whose ``protocol`` is ``XBPI`` or
        ``SBI``. When ``autoprint_active`` is ``True``, ``pending_lines``
        carries the sniffed bytes for re-queue.

    Raises:
        SartoriusError: No xBPI or SBI device responded — neither the
            passive sniff nor either probe produced a recognisable reply.
            Hard transport faults (e.g. the port closed mid-detect)
            propagate as :class:`SartoriusConnectionError` unchanged.
    """
    label = transport.label

    # 1. Passive autoprint sniff. We deliberately do NOT drain first: a
    #    balance left in autoprint mode may have a complete line already
    #    sitting in the OS buffer when we connect, and the design promises
    #    we will not drop the first sample. Stale partial bytes (no CRLF)
    #    can't fool the sniff — read_until times out without consuming them
    #    and the pre-probe drain below clears them before xBPI runs.
    autoprint, sniffed = await _sniff_for_autoprint(transport, sniff_window)
    if autoprint:
        return DetectionResult(
            protocol=ProtocolKind.SBI,
            autoprint_active=True,
            pending_lines=sniffed,
        )

    # 2. Drain anything left over from the sniff (partial lines, stray
    #    bytes that didn't form CRLF) so the xBPI length-prefix read starts
    #    from a clean buffer.
    with contextlib.suppress(SartoriusError):
        await transport.drain_input()

    # 3. xBPI probe — READ_MODEL. A valid frame (even one carrying an error
    #    subtype) confirms xBPI: only an xBPI device builds a length-prefixed
    #    marker-tagged frame.
    if await _probe_xbpi(transport, timeout=timeout, src_sbn=src_sbn, dst_sbn=dst_sbn):
        return DetectionResult(protocol=ProtocolKind.XBPI)

    with contextlib.suppress(SartoriusError):
        await transport.drain_input()

    # 4. SBI identity probe. Any CRLF-terminated reply counts as evidence —
    #    content interpretation is the SBI parser's job, not detection's.
    if await _probe_sbi(transport, timeout=timeout):
        return DetectionResult(protocol=ProtocolKind.SBI)

    # 5. Clear failure. No opcode sweeps, no fallback baud rates.
    raise SartoriusError(
        f"auto-detect: no responsive xBPI or SBI device on {label!r} "
        f"(sniff {sniff_window}s, probe timeout {timeout}s)",
        context=ErrorContext(
            port=label,
            command_name="auto_detect",
            extra={"sniff_window_s": sniff_window, "probe_timeout_s": timeout},
        ),
    )

make_protocol_client

make_protocol_client(
    protocol, transport, *, default_timeout=1.0
)

Build the concrete protocol client for protocol on transport.

:attr:AUTO is never valid at factory time — the detection step resolves it first.

Source code in src/sartoriuslib/protocol/client.py
def make_protocol_client(
    protocol: ProtocolKind,
    transport: Transport,
    *,
    default_timeout: float = 1.0,
) -> XbpiProtocolClient | SbiProtocolClient:
    """Build the concrete protocol client for ``protocol`` on ``transport``.

    :attr:`AUTO` is never valid at factory time — the detection step
    resolves it first.
    """
    if protocol is ProtocolKind.XBPI:
        return XbpiProtocolClient(transport, default_timeout=default_timeout)
    if protocol is ProtocolKind.SBI:
        return SbiProtocolClient(transport, default_timeout=default_timeout)
    raise ValueError(
        f"cannot build a protocol client for {protocol!r}; "
        "AUTO must resolve to XBPI or SBI before this point",
    )

Base Protocol

sartoriuslib.protocol.base

Shared ProtocolClient Protocol and :class:ProtocolKind enum.

ProtocolKind is named with the Kind suffix to avoid colliding with :class:typing.Protocol at import sites. See design doc §7.

:class:ProtocolClient is the structural interface both :class:XbpiProtocolClient and :class:SbiProtocolClient satisfy. A session holds at most one client per protocol; both may be present when AUTO detection resolves to one.

ProtocolClient

Bases: Protocol

One request/response round-trip over a transport.

Every implementation owns an :class:anyio.Lock (exposed as :attr:lock) that serialises calls on the shared transport. Session code does not hold its own lock — it piggybacks on the client's, so two sessions on the same port collapse to one serialized queue when they (exceptionally) share a client.

Errors the device tells us about (xBPI subtype 0x01, SBI refusal lines) are raised as typed exceptions inside :meth:execute — callers decode only success replies. Transport-level failures (timeout, connection) surface as the corresponding :class:sartoriuslib.errors.SartoriusTransportError subclass with __cause__ preserving the original.

disposed property

disposed

Whether the client has been retired by a protocol reconfiguration.

lock property

lock

Shared serialisation lock. Held across one full request/response.

dispose

dispose()

Retire the client so queued waiters fail before writing bytes.

Source code in src/sartoriuslib/protocol/base.py
def dispose(self) -> None:
    """Retire the client so queued waiters fail before writing bytes."""
    ...

execute async

execute(
    request, *, timeout=None, command_name="", opcode=None
)

Send request and return the decoded reply.

Raises:

Type Description
SartoriusTimeoutError

No response within timeout.

SartoriusConnectionError

Port closed or lost mid-exchange.

SartoriusFrameError

Framing / checksum invalid.

SartoriusCapabilityError(subclass)

Device responded with a recognised refusal code (xBPI 0x04, 0x06, 0x03, 0x07, 0x10). Callers should catch specific subclasses and let others propagate.

SartoriusCommandRejectedError

Refusal code outside the classified set.

Source code in src/sartoriuslib/protocol/base.py
async def execute(
    self,
    request: bytes,
    *,
    timeout: float | None = None,
    command_name: str = "",
    opcode: int | None = None,
) -> Reply_co:
    """Send ``request`` and return the decoded reply.

    Raises:
        SartoriusTimeoutError: No response within ``timeout``.
        SartoriusConnectionError: Port closed or lost mid-exchange.
        SartoriusFrameError: Framing / checksum invalid.
        SartoriusCapabilityError (subclass): Device responded with a
            recognised refusal code (xBPI ``0x04``, ``0x06``,
            ``0x03``, ``0x07``, ``0x10``). Callers should catch
            specific subclasses and let others propagate.
        SartoriusCommandRejectedError: Refusal code outside the
            classified set.
    """
    ...

ProtocolKind

Bases: StrEnum

Which wire protocol is active on a session.

AUTO is only valid at open_device call time; by the time a session exists, AUTO has resolved to XBPI or SBI.

Client factory

sartoriuslib.protocol.client

Protocol-client factory — xBPI vs SBI selection.

make_protocol_client

make_protocol_client(
    protocol, transport, *, default_timeout=1.0
)

Build the concrete protocol client for protocol on transport.

:attr:AUTO is never valid at factory time — the detection step resolves it first.

Source code in src/sartoriuslib/protocol/client.py
def make_protocol_client(
    protocol: ProtocolKind,
    transport: Transport,
    *,
    default_timeout: float = 1.0,
) -> XbpiProtocolClient | SbiProtocolClient:
    """Build the concrete protocol client for ``protocol`` on ``transport``.

    :attr:`AUTO` is never valid at factory time — the detection step
    resolves it first.
    """
    if protocol is ProtocolKind.XBPI:
        return XbpiProtocolClient(transport, default_timeout=default_timeout)
    if protocol is ProtocolKind.SBI:
        return SbiProtocolClient(transport, default_timeout=default_timeout)
    raise ValueError(
        f"cannot build a protocol client for {protocol!r}; "
        "AUTO must resolve to XBPI or SBI before this point",
    )

Auto-detection

sartoriuslib.protocol.detect

Conservative protocol auto-detection.

Drain input → passively sniff for SBI autoprint → probe xBPI (0x02) → probe SBI (ESC x1_ then ESC P fallback) → fail clearly. Never sweeps opcodes, never changes baud, never changes the balance's protocol mode. See design doc §4.3.

The SBI probe tries ESC x1_ first (which gives an identity string we can validate later) and falls back to ESC P (a print/weight read) if the identity token is silent. The ESC P fallback is kept as defense-in-depth for unknown firmware revisions: an earlier hardware-day note claimed Cubis MSE1203S silently ignored Format-2 identity tokens, but re-testing on 2026-04-25 (MSE1203S-100-DR, BAC 00-39-21) showed all three identity tokens reply cleanly. Both probes are READ_ONLY, so the fallback costs nothing on devices that do reply to ESC x1_.

The result is a :class:DetectionResult carrying the resolved :class:ProtocolKind plus, when autoprint was observed, the sniffed line so the caller can re-queue it on the eventual SBI client and not lose the first sample. Anything observed during the sniff that does not match an autoprint pattern is discarded — those bytes are ambiguous and we drain before each subsequent probe.

DetectionResult dataclass

DetectionResult(
    protocol, autoprint_active=False, pending_lines=tuple()
)

Outcome of :func:detect_protocol.

Attributes:

Name Type Description
protocol ProtocolKind

The resolved :class:ProtocolKind. Always XBPI or SBI — never AUTO (a successful detect has resolved it).

autoprint_active bool

True only when the SBI passive sniff observed an unsolicited autoprint/status line.

pending_lines tuple[bytes, ...]

Complete CRLF-terminated SBI lines consumed during the sniff that the caller may want to re-queue on the live client. Empty unless autoprint_active is True.

detect_protocol async

detect_protocol(
    transport,
    *,
    timeout=_DEFAULT_PROBE_TIMEOUT,
    sniff_window=_DEFAULT_SNIFF_WINDOW,
    src_sbn=HOST_SBN_DEFAULT,
    dst_sbn=BALANCE_SBN_DEFAULT,
)

Detect xBPI vs SBI on an already-open transport.

Runs the conservative sequence from design §4.3 in order — drain → passive sniff → xBPI probe → SBI probe → fail. Each probe writes at most one frame. The transport's serial settings are never changed.

Parameters:

Name Type Description Default
transport Transport

An open :class:Transport. Caller owns lifecycle.

required
timeout float

Per-probe timeout for the xBPI and SBI identity probes.

_DEFAULT_PROBE_TIMEOUT
sniff_window float

Passive listen window for SBI autoprint, in seconds.

_DEFAULT_SNIFF_WINDOW
src_sbn int

Source SBN for the xBPI probe frame (0x01 host convention by default).

HOST_SBN_DEFAULT
dst_sbn int

Destination SBN for the xBPI probe frame (0x09 balance factory default by default).

BALANCE_SBN_DEFAULT

Returns:

Name Type Description
A DetectionResult

class:DetectionResult whose protocol is XBPI or

DetectionResult

SBI. When autoprint_active is True, pending_lines

DetectionResult

carries the sniffed bytes for re-queue.

Raises:

Type Description
SartoriusError

No xBPI or SBI device responded — neither the passive sniff nor either probe produced a recognisable reply. Hard transport faults (e.g. the port closed mid-detect) propagate as :class:SartoriusConnectionError unchanged.

Source code in src/sartoriuslib/protocol/detect.py
async def detect_protocol(
    transport: Transport,
    *,
    timeout: float = _DEFAULT_PROBE_TIMEOUT,
    sniff_window: float = _DEFAULT_SNIFF_WINDOW,
    src_sbn: int = HOST_SBN_DEFAULT,
    dst_sbn: int = BALANCE_SBN_DEFAULT,
) -> DetectionResult:
    """Detect xBPI vs SBI on an already-open ``transport``.

    Runs the conservative sequence from design §4.3 in order — drain →
    passive sniff → xBPI probe → SBI probe → fail. Each probe writes at
    most one frame. The transport's serial settings are never changed.

    Arguments:
        transport: An open :class:`Transport`. Caller owns lifecycle.
        timeout: Per-probe timeout for the xBPI and SBI identity probes.
        sniff_window: Passive listen window for SBI autoprint, in seconds.
        src_sbn: Source SBN for the xBPI probe frame (``0x01`` host
            convention by default).
        dst_sbn: Destination SBN for the xBPI probe frame (``0x09``
            balance factory default by default).

    Returns:
        A :class:`DetectionResult` whose ``protocol`` is ``XBPI`` or
        ``SBI``. When ``autoprint_active`` is ``True``, ``pending_lines``
        carries the sniffed bytes for re-queue.

    Raises:
        SartoriusError: No xBPI or SBI device responded — neither the
            passive sniff nor either probe produced a recognisable reply.
            Hard transport faults (e.g. the port closed mid-detect)
            propagate as :class:`SartoriusConnectionError` unchanged.
    """
    label = transport.label

    # 1. Passive autoprint sniff. We deliberately do NOT drain first: a
    #    balance left in autoprint mode may have a complete line already
    #    sitting in the OS buffer when we connect, and the design promises
    #    we will not drop the first sample. Stale partial bytes (no CRLF)
    #    can't fool the sniff — read_until times out without consuming them
    #    and the pre-probe drain below clears them before xBPI runs.
    autoprint, sniffed = await _sniff_for_autoprint(transport, sniff_window)
    if autoprint:
        return DetectionResult(
            protocol=ProtocolKind.SBI,
            autoprint_active=True,
            pending_lines=sniffed,
        )

    # 2. Drain anything left over from the sniff (partial lines, stray
    #    bytes that didn't form CRLF) so the xBPI length-prefix read starts
    #    from a clean buffer.
    with contextlib.suppress(SartoriusError):
        await transport.drain_input()

    # 3. xBPI probe — READ_MODEL. A valid frame (even one carrying an error
    #    subtype) confirms xBPI: only an xBPI device builds a length-prefixed
    #    marker-tagged frame.
    if await _probe_xbpi(transport, timeout=timeout, src_sbn=src_sbn, dst_sbn=dst_sbn):
        return DetectionResult(protocol=ProtocolKind.XBPI)

    with contextlib.suppress(SartoriusError):
        await transport.drain_input()

    # 4. SBI identity probe. Any CRLF-terminated reply counts as evidence —
    #    content interpretation is the SBI parser's job, not detection's.
    if await _probe_sbi(transport, timeout=timeout):
        return DetectionResult(protocol=ProtocolKind.SBI)

    # 5. Clear failure. No opcode sweeps, no fallback baud rates.
    raise SartoriusError(
        f"auto-detect: no responsive xBPI or SBI device on {label!r} "
        f"(sniff {sniff_window}s, probe timeout {timeout}s)",
        context=ErrorContext(
            port=label,
            command_name="auto_detect",
            extra={"sniff_window_s": sniff_window, "probe_timeout_s": timeout},
        ),
    )

xBPI — framing

sartoriuslib.protocol.xbpi.framing

xBPI frame codec.

Two concrete operations:

  • :func:build_command — assemble a host→balance TX frame.
  • :func:parse_frame — validate and decompose a balance→host RX frame into an :class:XbpiFrame.

TX frame::

[len] [src_sbn] [dst_sbn] [opcode] [args...] [chk]

RX frame::

[len] [marker=0x41] [subtype] [body...] [chk]

len counts every byte that follows the length byte (including the checksum). chk is sum(every preceding byte) & 0xFF. Defaults: src_sbn=0x01 (host convention) and dst_sbn=0x09 (balance factory default). See docs/protocol.md §3.

build_command

build_command(
    opcode,
    args=b"",
    *,
    src_sbn=HOST_SBN_DEFAULT,
    dst_sbn=BALANCE_SBN_DEFAULT,
)

Assemble a host→balance frame.

Parameters:

Name Type Description Default
opcode int

Command byte (0x000xFF).

required
args bytes

Pre-encoded argument bytes (usually one or more TLVs — see :mod:sartoriuslib.protocol.xbpi.tlv). Empty for no-arg commands.

b''
src_sbn int

Source SBN; defaults to the host convention 0x01.

HOST_SBN_DEFAULT
dst_sbn int

Destination SBN; defaults to the balance factory default 0x09.

BALANCE_SBN_DEFAULT

Returns:

Type Description
bytes

The complete frame bytes, length-prefixed and checksummed, ready

bytes

to hand to :meth:Transport.write.

Source code in src/sartoriuslib/protocol/xbpi/framing.py
def build_command(
    opcode: int,
    args: bytes = b"",
    *,
    src_sbn: int = HOST_SBN_DEFAULT,
    dst_sbn: int = BALANCE_SBN_DEFAULT,
) -> bytes:
    """Assemble a host→balance frame.

    Arguments:
        opcode: Command byte (``0x00`` – ``0xFF``).
        args: Pre-encoded argument bytes (usually one or more TLVs — see
            :mod:`sartoriuslib.protocol.xbpi.tlv`). Empty for no-arg
            commands.
        src_sbn: Source SBN; defaults to the host convention ``0x01``.
        dst_sbn: Destination SBN; defaults to the balance factory
            default ``0x09``.

    Returns:
        The complete frame bytes, length-prefixed and checksummed, ready
        to hand to :meth:`Transport.write`.
    """
    _require_byte(opcode, "opcode")
    _require_byte(src_sbn, "src_sbn")
    _require_byte(dst_sbn, "dst_sbn")
    payload = bytes([src_sbn, dst_sbn, opcode]) + bytes(args)
    # ``length`` counts every byte that will follow the length byte —
    # including the not-yet-appended checksum.
    length = len(payload) + 1
    if length > _MAX_LENGTH:
        raise SartoriusFrameError(
            f"frame too long: {length + 1} bytes (max 256)",
            context=ErrorContext(opcode=opcode, extra={"length": length}),
        )
    pre_chk = bytes([length]) + payload
    return pre_chk + bytes([checksum(pre_chk)])

checksum

checksum(data)

Return sum(data) & 0xFF — the xBPI frame checksum.

Trivial by design. No CRC, no seed; see docs/protocol.md §12.

Source code in src/sartoriuslib/protocol/xbpi/framing.py
def checksum(data: bytes) -> int:
    """Return ``sum(data) & 0xFF`` — the xBPI frame checksum.

    Trivial by design. No CRC, no seed; see ``docs/protocol.md`` §12.
    """
    return sum(data) & 0xFF

parse_frame

parse_frame(data)

Validate and decompose a balance→host frame.

Raises:

Type Description
SartoriusFrameError

Frame too short, length byte inconsistent with buffer size, marker is not 0x41, or checksum mismatch.

Source code in src/sartoriuslib/protocol/xbpi/framing.py
def parse_frame(data: bytes) -> XbpiFrame:
    """Validate and decompose a balance→host frame.

    Raises:
        SartoriusFrameError: Frame too short, length byte inconsistent
            with buffer size, marker is not ``0x41``, or checksum
            mismatch.
    """
    raw = bytes(data)
    if len(raw) < MIN_FRAME_SIZE:
        raise SartoriusFrameError(
            f"frame too short: got {len(raw)} bytes (min {MIN_FRAME_SIZE})",
            context=ErrorContext(raw_response=raw),
        )
    length = raw[0]
    expected_total = length + 1
    if len(raw) != expected_total:
        raise SartoriusFrameError(
            f"frame length mismatch: length byte says {expected_total} bytes, got {len(raw)}",
            context=ErrorContext(
                raw_response=raw,
                extra={"declared_length": length, "buffer_size": len(raw)},
            ),
        )
    marker = raw[1]
    if marker != RX_MARKER:
        raise SartoriusFrameError(
            f"bad marker byte 0x{marker:02x} (expected 0x{RX_MARKER:02x})",
            context=ErrorContext(raw_response=raw, extra={"marker": marker}),
        )
    subtype = raw[2]
    body = raw[3:-1]
    chk = raw[-1]
    expected_chk = checksum(raw[:-1])
    if chk != expected_chk:
        raise SartoriusFrameError(
            f"bad checksum 0x{chk:02x} (expected 0x{expected_chk:02x})",
            context=ErrorContext(
                raw_response=raw,
                extra={"checksum": chk, "expected_checksum": expected_chk},
            ),
        )
    return XbpiFrame(
        length=length,
        marker=marker,
        subtype=subtype,
        body=bytes(body),
        checksum=chk,
        raw=raw,
    )

xBPI — types

sartoriuslib.protocol.xbpi.types

Immutable xBPI wire-level types.

:class:XbpiFrame is what :func:sartoriuslib.protocol.xbpi.framing.parse_frame produces from a balance→host reply: parsed but not yet interpreted. :class:SubtypeFamily groups the reply subtype byte into the families from docs/protocol.md §4. Decoded bodies (measurement, status, typed-float, error) live here as frozen dataclasses too.

Per-protocol types live here; the protocol-neutral public :class:Reading that the :class:Balance facade returns is composed from these.

ErrorBody dataclass

ErrorBody(code)

1-byte error body (subtype 0x01).

code is the raw device error code; mapping to typed exceptions happens at the protocol-client / session layer so the codec can stay agnostic.

LongMeasurementBody dataclass

LongMeasurementBody(measurement, delimiter, status)

17-byte long streaming-measurement body (subtype 0x48).

Emitted when the caller requests 0x1E 09 30 (short + status-block concatenation per docs/protocol.md §8.3). The delimiter byte is always 0x48 — the same value as the subtype — and separates the two 8-byte halves.

MeasurementBody dataclass

MeasurementBody(
    raw,
    value,
    aux,
    decimals,
    unit,
    sign,
    stable,
    off_scale,
    unit_raw,
    flags,
)

8-byte short measurement body (subtype 0x48, non-status-block form).

Raw fields mirror docs/protocol.md §8.1 byte-for-byte. Derived fields (value, unit, sign, stable, decimals, off_scale) are the decoded view callers build a :class:Reading from.

value is None on the off-scale sentinel (bytes[0..4] == 7f ff ff ff ff). Distinguishing overload from underload requires the status block; the measurement alone cannot tell them apart.

StatusBlockBody dataclass

StatusBlockBody(
    raw,
    aux_flag,
    state,
    status,
    sequence,
    stable,
    overload,
    underload,
    adc_trusted,
    isocal_due,
)

8-byte status block (subtype 0x48, from opcode 0x30).

See docs/protocol.md §8.2. State/status encoding is family-specific; the derived stable / overload / underload flags extract the portable bits. adc_trusted and isocal_due are MSE-only signals and are None when the source is not a Cubis.

SubtypeFamily

Bases: IntEnum

Top-level classifier for the reply subtype byte.

The xBPI subtype byte packs a family (high nibble) and a body-length hint (low nibble, per docs/protocol.md §4). Parsers dispatch on family, not on the raw subtype, so a new same-family subtype does not need a parser change.

TypedFloatBody dataclass

TypedFloatBody(value, aux)

5-byte typed-float body (subtype 0x35).

Used by temperature, capacity, and increment reads. The aux byte is an extra payload byte that carries unit-family or decimal-place information depending on the opcode.

XbpiFrame dataclass

XbpiFrame(length, marker, subtype, body, checksum, raw)

One fully-validated balance→host xBPI frame.

length is the value of byte[0] — the count of bytes that follow the length byte (so len(raw) == length + 1). marker is byte[1], always 0x41 for device-to-host replies; :func:parse_frame raises :class:SartoriusFrameError otherwise. body is everything between the subtype and the checksum; raw is the full on-wire bytes.

xBPI — tables

sartoriuslib.protocol.xbpi.tables

xBPI opcode, subtype, and error-code tables.

Tables in this module are lookup data — pure-Python dicts with no behaviour. The codec treats every table as an open set: unknown values decode to :attr:SubtypeFamily.UNKNOWN or an otherwise-empty mapping lookup rather than raising. Forward-compatibility beats strictness here because Sartorius firmware revisions routinely introduce new subtypes.

References: docs/protocol.md §4 (subtype families), §6 (error codes), §7 (opcode inventory).

body_length_for_subtype

body_length_for_subtype(subtype)

Expected body length for subtype, or None if variable.

Implements the §4 formula: "high nibble = type class, low nibble = body length for types 0..4; for type 5, length = 16 + low". Returns None for subtypes whose body length is genuinely variable (notably 0x48 which carries either 8 or 17 bytes, and 0x00 which usually carries 0 but occasionally a variable body — e.g. opcode 0xBC module list).

Source code in src/sartoriuslib/protocol/xbpi/tables.py
def body_length_for_subtype(subtype: int) -> int | None:
    """Expected body length for ``subtype``, or ``None`` if variable.

    Implements the §4 formula: "high nibble = type class, low nibble =
    body length for types 0..4; for type 5, length = 16 + low". Returns
    ``None`` for subtypes whose body length is genuinely variable
    (notably ``0x48`` which carries either 8 or 17 bytes, and ``0x00``
    which usually carries 0 but occasionally a variable body — e.g.
    opcode ``0xBC`` module list).
    """
    # 0x48 measurement: 8 *or* 17 — caller disambiguates by the length
    # byte on the outer frame. 0x00 ACK: usually empty but 0xBC's module
    # list rides this subtype with a variable body. Both are variable.
    if subtype in _VARIABLE_BODY_SUBTYPES:
        return None
    high = subtype & 0xF0
    low = subtype & 0x0F
    if high in _FIXED_LOW_NIBBLE_LENGTH_FAMILIES:
        return low
    if high == _LONG_STRING_FAMILY_HIGH:
        return _LONG_STRING_FAMILY_OFFSET + low
    return None

subtype_family

subtype_family(subtype)

Return the family classifier for a reply subtype byte.

Falls back to high-nibble dispatch for subtypes not explicitly tabulated, and to :attr:SubtypeFamily.UNKNOWN when even that fails.

Source code in src/sartoriuslib/protocol/xbpi/tables.py
def subtype_family(subtype: int) -> SubtypeFamily:
    """Return the family classifier for a reply subtype byte.

    Falls back to high-nibble dispatch for subtypes not explicitly
    tabulated, and to :attr:`SubtypeFamily.UNKNOWN` when even that fails.
    """
    if subtype in _KNOWN_SUBTYPES:
        return _KNOWN_SUBTYPES[subtype]
    return _HIGH_NIBBLE_FAMILIES.get(subtype & 0xF0, SubtypeFamily.UNKNOWN)

xBPI — TLV

sartoriuslib.protocol.xbpi.tlv

xBPI TLV (tag-length-value) helpers.

Per docs/protocol.md §5, Cubis MSE requires request args wrapped as TLV records rather than plain byte arguments. The tag's low nibble encodes the value size:

+------+-------+---------------------------------------------+ | Tag | Size | Meaning | +======+=======+=============================================+ | 0x11 | 1 B | u8 (rare in requests) | | 0x12 | 2 B | u16 BE | | 0x14 | 4 B | u32 BE | | 0x21 | 1 B | u8 — the most common request-arg wrapper | | 0x22 | 2 B | u16 BE (seen in response bodies) | | 0x24 | 4 B | u32 BE (seen in response bodies) | +------+-------+---------------------------------------------+

Response bodies may contain multiple concatenated TLVs (see §5.2), so this module also exposes :func:parse_tlv_sequence for walking them.

decode_tlv

decode_tlv(data, offset=0)

Decode one TLV record starting at offset.

Returns (tag, value_bytes, next_offset). Raises :class:SartoriusFrameError on unknown tags or truncated values.

Source code in src/sartoriuslib/protocol/xbpi/tlv.py
def decode_tlv(data: bytes, offset: int = 0) -> tuple[int, bytes, int]:
    """Decode one TLV record starting at ``offset``.

    Returns ``(tag, value_bytes, next_offset)``. Raises
    :class:`SartoriusFrameError` on unknown tags or truncated values.
    """
    if offset >= len(data):
        raise SartoriusFrameError(
            "TLV sequence truncated at tag position",
            context=ErrorContext(extra={"offset": offset, "total": len(data)}),
        )
    tag = data[offset]
    if tag not in TLV_TAG_SIZES:
        raise SartoriusFrameError(
            f"unknown TLV tag 0x{tag:02x} at offset {offset}",
            context=ErrorContext(extra={"tag": tag, "offset": offset}),
        )
    size = TLV_TAG_SIZES[tag]
    value_start = offset + 1
    value_end = value_start + size
    if value_end > len(data):
        raise SartoriusFrameError(
            f"TLV value for tag 0x{tag:02x} truncated "
            f"(needed {size} bytes, have {len(data) - value_start})",
            context=ErrorContext(extra={"tag": tag, "offset": offset}),
        )
    return tag, bytes(data[value_start:value_end]), value_end

encode_tlv

encode_tlv(tag, value)

Encode a single TLV record.

value may be an int (encoded big-endian into the tag's size) or raw bytes (emitted verbatim, length-checked against the tag).

Source code in src/sartoriuslib/protocol/xbpi/tlv.py
def encode_tlv(tag: int, value: int | bytes) -> bytes:
    """Encode a single TLV record.

    ``value`` may be an ``int`` (encoded big-endian into the tag's size)
    or raw ``bytes`` (emitted verbatim, length-checked against the tag).
    """
    if tag not in TLV_TAG_SIZES:
        raise SartoriusFrameError(
            f"unknown TLV tag 0x{tag:02x}",
            context=ErrorContext(extra={"tag": tag}),
        )
    size = TLV_TAG_SIZES[tag]
    if isinstance(value, int):
        if value < 0:
            raise SartoriusFrameError(
                f"TLV value must be non-negative (got {value})",
                context=ErrorContext(extra={"tag": tag, "value": value}),
            )
        try:
            payload = value.to_bytes(size, "big")
        except OverflowError as exc:
            raise SartoriusFrameError(
                f"TLV value {value} does not fit in {size} byte(s) for tag 0x{tag:02x}",
                context=ErrorContext(extra={"tag": tag, "value": value}),
            ) from exc
        return bytes([tag]) + payload
    if len(value) != size:
        raise SartoriusFrameError(
            f"TLV value for tag 0x{tag:02x} must be {size} byte(s), got {len(value)}",
            context=ErrorContext(extra={"tag": tag, "value_len": len(value)}),
        )
    return bytes([tag]) + bytes(value)

parse_tlv_sequence

parse_tlv_sequence(body)

Walk body as a concatenation of TLV records.

Returns a list of (tag, value_bytes) tuples. Raises :class:SartoriusFrameError if any record is truncated or any tag is unknown.

Note: parameter-table replies (opcode 0x55) have the subtype byte double as the first TLV tag per §5.3 — the caller must prepend the subtype byte before passing the body here.

Source code in src/sartoriuslib/protocol/xbpi/tlv.py
def parse_tlv_sequence(body: bytes) -> list[tuple[int, bytes]]:
    """Walk ``body`` as a concatenation of TLV records.

    Returns a list of ``(tag, value_bytes)`` tuples. Raises
    :class:`SartoriusFrameError` if any record is truncated or any tag is
    unknown.

    Note: parameter-table replies (opcode ``0x55``) have the subtype byte
    double as the first TLV tag per §5.3 — the caller must prepend the
    subtype byte before passing the body here.
    """
    out: list[tuple[int, bytes]] = []
    offset = 0
    while offset < len(body):
        tag, value, offset = decode_tlv(body, offset)
        out.append((tag, value))
    return out

tlv_value_as_int

tlv_value_as_int(value)

Decode a TLV value as a big-endian unsigned integer.

Source code in src/sartoriuslib/protocol/xbpi/tlv.py
def tlv_value_as_int(value: bytes) -> int:
    """Decode a TLV value as a big-endian unsigned integer."""
    return int.from_bytes(value, "big")

xBPI — units

sartoriuslib.protocol.xbpi.units

xBPI unit-code decoding.

xBPI measurement frames pack the unit and sign into byte [6] of the 8-byte body and the decimal-place count into byte [5]'s high nibble. See docs/protocol.md §8.4 for the full encoding.

decode_decimals

decode_decimals(byte5)

Return the displayed decimal-place count encoded in byte [5].

High nibble = decimals (docs/protocol.md §8.4). The low nibble has a WZA-mg quirk that is advisory and ignored here.

Source code in src/sartoriuslib/protocol/xbpi/units.py
def decode_decimals(byte5: int) -> int:
    """Return the displayed decimal-place count encoded in byte [5].

    High nibble = decimals (``docs/protocol.md`` §8.4). The low nibble
    has a WZA-mg quirk that is *advisory* and ignored here.
    """
    return (byte5 >> 4) & 0x0F

decode_sign

decode_sign(byte6)

Decode the sign bits (top 2) of byte [6].

0x00 = exactly zero, 0x40 = positive, 0x80 = negative. Any other combination (shouldn't occur — only one bit of the 2-bit field is set at a time on the wire) decodes to :attr:Sign.UNKNOWN.

Source code in src/sartoriuslib/protocol/xbpi/units.py
def decode_sign(byte6: int) -> Sign:
    """Decode the sign bits (top 2) of byte [6].

    ``0x00`` = exactly zero, ``0x40`` = positive, ``0x80`` = negative.
    Any other combination (shouldn't occur — only one bit of the 2-bit
    field is set at a time on the wire) decodes to :attr:`Sign.UNKNOWN`.
    """
    bits = byte6 & SIGN_MASK
    if bits == SIGN_BITS_ZERO:
        return Sign.ZERO
    if bits == SIGN_BITS_POSITIVE:
        return Sign.POSITIVE
    if bits == SIGN_BITS_NEGATIVE:
        return Sign.NEGATIVE
    return Sign.UNKNOWN

decode_unit

decode_unit(byte6)

Full decode of byte [6] to a :class:Unit (strips the sign bits).

Source code in src/sartoriuslib/protocol/xbpi/units.py
def decode_unit(byte6: int) -> Unit:
    """Full decode of byte [6] to a :class:`Unit` (strips the sign bits)."""
    return unit_byte_to_unit(byte6 & UNIT_ID_MASK)

unit_byte_to_unit

unit_byte_to_unit(unit_id)

Map a 6-bit base-unit ID to a :class:Unit.

Unknown IDs decode to :attr:Unit.UNKNOWN — never raises — because new unit codes are expected as more of the parameter-table display enum gets mapped (see docs/protocol.md §10.1 index 7).

Source code in src/sartoriuslib/protocol/xbpi/units.py
def unit_byte_to_unit(unit_id: int) -> Unit:
    """Map a 6-bit base-unit ID to a :class:`Unit`.

    Unknown IDs decode to :attr:`Unit.UNKNOWN` — never raises — because
    new unit codes are expected as more of the parameter-table display
    enum gets mapped (see ``docs/protocol.md`` §10.1 index 7).
    """
    return _UNIT_ID_TO_UNIT.get(unit_id & UNIT_ID_MASK, Unit.UNKNOWN)

xBPI — parser

sartoriuslib.protocol.xbpi.parser

xBPI subtype decoders — measurement, status block, typed float, errors.

This module turns an :class:XbpiFrame's body into one of the decoded-body dataclasses from :mod:sartoriuslib.protocol.xbpi.types. It knows nothing about opcodes or sessions; callers at the command layer pick the right decoder based on what they sent.

References: docs/protocol.md §8 (measurement/status decoding), §9 (temperature), §4 (subtype encoding).

decode_error_body

decode_error_body(body)

Decode a 1-byte error body into :class:ErrorBody.

The single byte is the device error code (0x03 / 0x04 / 0x06 / 0x07 / 0x10 / 0x11 — see docs/protocol.md §6). Mapping to typed exceptions happens at the session layer; this decoder stays neutral.

Source code in src/sartoriuslib/protocol/xbpi/parser.py
def decode_error_body(body: bytes) -> ErrorBody:
    """Decode a 1-byte error body into :class:`ErrorBody`.

    The single byte is the device error code (``0x03`` / ``0x04`` /
    ``0x06`` / ``0x07`` / ``0x10`` / ``0x11`` — see
    ``docs/protocol.md`` §6). Mapping to typed exceptions happens at
    the session layer; this decoder stays neutral.
    """
    if len(body) != _ERROR_BODY_LEN:
        raise SartoriusParseError(
            f"error body must be {_ERROR_BODY_LEN} byte, got {len(body)}",
            context=ErrorContext(raw_response=bytes(body)),
        )
    return ErrorBody(code=body[0])

decode_long_measurement_body

decode_long_measurement_body(body)

Decode a 17-byte long streaming measurement (§8.3).

Layout is short measurement (8 B) + delimiter (1 B, always 0x48) + status block (8 B). The delimiter matches the subtype byte by coincidence.

Source code in src/sartoriuslib/protocol/xbpi/parser.py
def decode_long_measurement_body(body: bytes) -> LongMeasurementBody:
    """Decode a 17-byte long streaming measurement (§8.3).

    Layout is short measurement (8 B) + delimiter (1 B, always ``0x48``)
    + status block (8 B). The delimiter matches the subtype byte by
    coincidence.
    """
    if len(body) != _LONG_MEASUREMENT_LEN:
        raise SartoriusParseError(
            f"long measurement body must be {_LONG_MEASUREMENT_LEN} bytes, got {len(body)}",
            context=ErrorContext(raw_response=bytes(body)),
        )
    raw = bytes(body)
    measurement = decode_measurement_body(raw[0:_SHORT_MEASUREMENT_LEN])
    delimiter = raw[_SHORT_MEASUREMENT_LEN]
    if delimiter != _LONG_MEASUREMENT_DELIMITER:
        raise SartoriusParseError(
            f"long measurement delimiter must be 0x{_LONG_MEASUREMENT_DELIMITER:02x}, "
            f"got 0x{delimiter:02x}",
            context=ErrorContext(raw_response=raw),
        )
    status = decode_status_block_body(raw[_SHORT_MEASUREMENT_LEN + 1 :])
    return LongMeasurementBody(
        measurement=measurement,
        delimiter=delimiter,
        status=status,
    )

decode_measurement_body

decode_measurement_body(body)

Decode an 8-byte short measurement body.

The value is float32 big-endian in bytes [0..3]. Bytes [5..6] pack decimals (high nibble of [5]), sign (top 2 bits of [6]), and the base-unit ID (low 6 bits of [6]). Byte [7]'s 0x40 bit marks a stable reading universally across families. An off-scale reading presents the 7f ff ff ff ff sentinel in bytes [0..4]; the status block disambiguates overload vs underload, so this decoder just reports off_scale and leaves value as None.

.. note::

The MSE Cubis emits the same ``7f ff ff ff ff`` sentinel for
~6 frames (~2 s) immediately after :meth:`Balance.zero` while
the cell recomputes its zero point. The wire is ambiguous —
from the body alone we can't tell "cell busy" apart from
"overload" or "underload". Callers that need that distinction
invoke :meth:`Balance.status` (xBPI ``0x32``); the status
block's ``state`` byte carries the disambiguation
(``0x82``/``0x84`` for overload/underload, plain stable/
unstable for a settling cell). Observed on hardware day —
:class:`Reading` faithfully reflects ``value=None`` /
``off_scale=True`` either way.
Source code in src/sartoriuslib/protocol/xbpi/parser.py
def decode_measurement_body(body: bytes) -> MeasurementBody:
    """Decode an 8-byte short measurement body.

    The value is ``float32`` big-endian in bytes [0..3]. Bytes [5..6]
    pack decimals (high nibble of [5]), sign (top 2 bits of [6]), and
    the base-unit ID (low 6 bits of [6]). Byte [7]'s ``0x40`` bit marks
    a stable reading universally across families. An off-scale reading
    presents the ``7f ff ff ff ff`` sentinel in bytes [0..4]; the
    status block disambiguates overload vs underload, so this decoder
    just reports ``off_scale`` and leaves ``value`` as ``None``.

    .. note::

        The MSE Cubis emits the same ``7f ff ff ff ff`` sentinel for
        ~6 frames (~2 s) immediately after :meth:`Balance.zero` while
        the cell recomputes its zero point. The wire is ambiguous —
        from the body alone we can't tell "cell busy" apart from
        "overload" or "underload". Callers that need that distinction
        invoke :meth:`Balance.status` (xBPI ``0x32``); the status
        block's ``state`` byte carries the disambiguation
        (``0x82``/``0x84`` for overload/underload, plain stable/
        unstable for a settling cell). Observed on hardware day —
        :class:`Reading` faithfully reflects ``value=None`` /
        ``off_scale=True`` either way.
    """
    if len(body) != _SHORT_MEASUREMENT_LEN:
        raise SartoriusParseError(
            f"short measurement body must be {_SHORT_MEASUREMENT_LEN} bytes, got {len(body)}",
            context=ErrorContext(raw_response=bytes(body)),
        )
    raw = bytes(body)
    off_scale = raw[0:5] == OFF_SCALE_SENTINEL
    if off_scale:
        value: float | None = None
    else:
        value = struct.unpack(">f", raw[0:4])[0]
    aux = raw[4]
    byte5 = raw[5]
    byte6 = raw[6]
    flags = raw[7]
    decimals = decode_decimals(byte5)
    sign = decode_sign(byte6)
    unit = decode_unit(byte6)
    stable = bool(flags & STABLE_FLAG)
    return MeasurementBody(
        raw=raw,
        value=value,
        aux=aux,
        decimals=decimals,
        unit=unit,
        sign=sign,
        stable=stable,
        off_scale=off_scale,
        unit_raw=byte6,
        flags=flags,
    )

decode_status_block_body

decode_status_block_body(body)

Decode an 8-byte status block.

See docs/protocol.md §8.2. The portable stability bit is state & 0x08 — Cubis encodes state=0x88 stable vs 0x80 unstable (the 0x80 base marks "measurement valid"), while WZA/BCE use state=0x08 vs 0x00. Overload (state 0x82) and underload (0x84) have only been captured on Cubis; on non-Cubis this decoder still reports them when the exact pattern matches, and False otherwise.

adc_trusted and isocal_due are MSE-only; decoded as None when the frame does not look like a Cubis-style status block (base bit 0x80 clear in state).

Source code in src/sartoriuslib/protocol/xbpi/parser.py
def decode_status_block_body(body: bytes) -> StatusBlockBody:
    """Decode an 8-byte status block.

    See ``docs/protocol.md`` §8.2. The portable stability bit is
    ``state & 0x08`` — Cubis encodes ``state=0x88`` stable vs ``0x80``
    unstable (the ``0x80`` base marks "measurement valid"), while
    WZA/BCE use ``state=0x08`` vs ``0x00``. Overload (state ``0x82``)
    and underload (``0x84``) have only been captured on Cubis; on
    non-Cubis this decoder still reports them when the exact pattern
    matches, and ``False`` otherwise.

    ``adc_trusted`` and ``isocal_due`` are MSE-only; decoded as ``None``
    when the frame does not look like a Cubis-style status block (base
    bit ``0x80`` clear in ``state``).
    """
    if len(body) != _STATUS_BLOCK_LEN:
        raise SartoriusParseError(
            f"status block must be {_STATUS_BLOCK_LEN} bytes, got {len(body)}",
            context=ErrorContext(raw_response=bytes(body)),
        )
    raw = bytes(body)
    aux_flag = raw[0]
    state = raw[3]
    status = raw[4]
    sequence = raw[7]
    stable = bool(state & _STATE_STABLE_BIT)
    overload = state == _STATE_OVERLOAD
    underload = state == _STATE_UNDERLOAD
    # Cubis signature: the "measurement valid" base bit 0x80 is set on
    # stable/unstable/overload/underload states (0x88/0x80/0x82/0x84).
    # Non-Cubis never sets 0x80, so treat ``adc_trusted``/``isocal_due``
    # as Cubis-only signals.
    cubis_shape = bool(state & _CUBIS_STATE_BASE)
    adc_trusted: bool | None = bool(status & _STATUS_ADC_TRUSTED) if cubis_shape else None
    isocal_due: bool | None = bool(status & _STATUS_ISOCAL_DUE) if cubis_shape else None
    return StatusBlockBody(
        raw=raw,
        aux_flag=aux_flag,
        state=state,
        status=status,
        sequence=sequence,
        stable=stable,
        overload=overload,
        underload=underload,
        adc_trusted=adc_trusted,
        isocal_due=isocal_due,
    )

decode_typed_float_body

decode_typed_float_body(body)

Decode a 5-byte typed-float body: float32 BE + 1-byte aux.

Source code in src/sartoriuslib/protocol/xbpi/parser.py
def decode_typed_float_body(body: bytes) -> TypedFloatBody:
    """Decode a 5-byte typed-float body: ``float32 BE`` + 1-byte aux."""
    if len(body) != _TYPED_FLOAT_LEN:
        raise SartoriusParseError(
            f"typed_float body must be {_TYPED_FLOAT_LEN} bytes, got {len(body)}",
            context=ErrorContext(raw_response=bytes(body)),
        )
    raw = bytes(body)
    value = struct.unpack(">f", raw[0:4])[0]
    aux = raw[4]
    return TypedFloatBody(value=value, aux=aux)

is_status_block_body

is_status_block_body(body)

Heuristic: does this 8-byte body look like a status block?

Short measurements and status blocks share subtype 0x48 and both carry 8-byte bodies. Disambiguation uses the §8.2 marker pattern: bytes [1..2] == 00 81 and bytes [5..6] == 10 00 reliably identify a status block; short measurements only match by accident.

Source code in src/sartoriuslib/protocol/xbpi/parser.py
def is_status_block_body(body: bytes) -> bool:
    """Heuristic: does this 8-byte body look like a status block?

    Short measurements and status blocks share subtype ``0x48`` and both
    carry 8-byte bodies. Disambiguation uses the §8.2 marker pattern:
    bytes [1..2] == ``00 81`` and bytes [5..6] == ``10 00`` reliably
    identify a status block; short measurements only match by accident.
    """
    return (
        len(body) == _STATUS_BLOCK_LEN
        and body[1:3] == STATUS_BLOCK_MARKER_PREFIX
        and body[5:7] == STATUS_BLOCK_MARKER_SUFFIX
    )

xBPI — client

sartoriuslib.protocol.xbpi.client

xBPI protocol client — single-in-flight request/response over a transport.

Holds an :class:anyio.Lock shared across one full exchange. Runs the xBPI length-prefix read sequence (read_exact(1) then read_exact(length)), validates the frame via :func:sartoriuslib.protocol.xbpi.framing.parse_frame, and maps subtype-0x01 error replies to typed :class:sartoriuslib.errors.SartoriusError subclasses.

Transport errors propagate unchanged. Framing errors and error-subtype replies both trigger a best-effort :meth:Transport.drain_input before re-raising, so the next call starts from a clean buffer.

Design reference: doc §4.1 (protocol-duality seam), §6.1.1 (response-to-availability mapping).

XbpiProtocolClient

XbpiProtocolClient(transport, *, default_timeout=1.0)

xBPI request/response client over a :class:Transport.

Single-in-flight via an internal :class:anyio.Lock. The lock is also exposed so :class:sartoriuslib.devices.session.Session (and the multi-device manager) can share serialisation with other machinery on the same port.

Source code in src/sartoriuslib/protocol/xbpi/client.py
def __init__(
    self,
    transport: Transport,
    *,
    default_timeout: float = 1.0,
) -> None:
    self._transport = transport
    self._default_timeout = default_timeout
    self._lock = anyio.Lock()
    self._disposed = False

disposed property

disposed

Whether this client has been retired by a protocol reconfiguration.

lock property

lock

Shared serialisation lock; held for one full request/response.

transport property

transport

The underlying :class:Transport this client writes and reads.

dispose

dispose()

Retire this client after its owning session installs a replacement.

Source code in src/sartoriuslib/protocol/xbpi/client.py
def dispose(self) -> None:
    """Retire this client after its owning session installs a replacement."""
    self._disposed = True

execute async

execute(
    request, *, timeout=None, command_name="", opcode=None
)

Send request and return the decoded reply frame.

Holds :attr:lock for the full exchange. Drains the input buffer on framing errors and on error-subtype replies so the next call is not corrupted by a partial response.

Source code in src/sartoriuslib/protocol/xbpi/client.py
async def execute(
    self,
    request: bytes,
    *,
    timeout: float | None = None,
    command_name: str = "",
    opcode: int | None = None,
) -> XbpiFrame:
    """Send ``request`` and return the decoded reply frame.

    Holds :attr:`lock` for the full exchange. Drains the input
    buffer on framing errors and on error-subtype replies so the
    next call is not corrupted by a partial response.
    """
    t = timeout if timeout is not None else self._default_timeout
    async with self._lock:
        return await self._execute_locked(request, t, command_name, opcode)

SBI — framing

sartoriuslib.protocol.sbi.framing

SBI line codec and ESC-token helpers.

build_command

build_command(command, *, terminator=b'')

Build an SBI command token, optionally appending a terminator.

Source code in src/sartoriuslib/protocol/sbi/framing.py
def build_command(command: bytes | str, *, terminator: bytes = b"") -> bytes:
    """Build an SBI command token, optionally appending a terminator."""
    token = normalize_token(command)
    return token + terminator

is_autoprint_line

is_autoprint_line(line)

Return True when line looks like an unsolicited weight line.

Source code in src/sartoriuslib/protocol/sbi/framing.py
def is_autoprint_line(line: bytes | str) -> bool:
    """Return ``True`` when ``line`` looks like an unsolicited weight line."""
    raw = line.encode("ascii", errors="ignore") if isinstance(line, str) else line
    body = strip_line_terminator(raw).decode("ascii", errors="replace")
    text = body.strip()
    if not text:
        return False
    if _BARE_NUMBER_RE.fullmatch(body) and (
        body == text or len(body) < _MIN_FORMATTED_WEIGHT_BODY_CHARS
    ):
        return False
    marker = " ".join(text.upper().split())
    if marker.startswith("STAT "):
        marker = marker.removeprefix("STAT ").strip()
    return _AUTOPRINT_RE.match(body) is not None or marker in _AUTOPRINT_SPECIALS

normalize_token

normalize_token(command)

Normalize a user-facing SBI command into on-wire bytes.

Accepts raw bytes, strings containing the literal escape character, or readable forms like "ESC P" and "ESC x1_". Trailing CR/LF is stripped because Sartorius documents command terminators as optional and the existing fake-transport fixtures use the bare token.

Source code in src/sartoriuslib/protocol/sbi/framing.py
def normalize_token(command: bytes | str) -> bytes:
    """Normalize a user-facing SBI command into on-wire bytes.

    Accepts raw bytes, strings containing the literal escape character, or
    readable forms like ``"ESC P"`` and ``"ESC x1_"``. Trailing CR/LF is
    stripped because Sartorius documents command terminators as optional and
    the existing fake-transport fixtures use the bare token.
    """
    if isinstance(command, bytes):
        token = command
    else:
        text = command.strip()
        if text.upper().startswith("ESC"):
            text = "\x1b" + text[3:].lstrip()
        token = text.encode("ascii", errors="strict")
    token = token.rstrip(b"\r\n")
    if not token:
        raise SartoriusValidationError(
            "SBI command token cannot be empty",
            context=ErrorContext(protocol="sbi"),
        )
    if not token.startswith(ESC):
        raise SartoriusValidationError(
            "SBI command token must start with ESC",
            context=ErrorContext(
                protocol="sbi",
                sbi_token=token,
                extra={"token": token.hex()},
            ),
        )
    return token

split_lines

split_lines(raw)

Split a raw SBI payload into complete lines.

Every non-empty line must include \r\n (or at least \n for lenient fixture parsing). Returns lines including their terminators.

Source code in src/sartoriuslib/protocol/sbi/framing.py
def split_lines(raw: bytes) -> tuple[bytes, ...]:
    r"""Split a raw SBI payload into complete lines.

    Every non-empty line must include ``\r\n`` (or at least ``\n`` for
    lenient fixture parsing). Returns lines including their terminators.
    """
    if raw == b"":
        return ()
    out: list[bytes] = []
    start = 0
    while start < len(raw):
        idx = raw.find(b"\n", start)
        if idx < 0:
            raise SartoriusFrameError(
                "SBI payload ended before a line terminator",
                context=ErrorContext(raw_response=raw, protocol="sbi"),
            )
        out.append(raw[start : idx + 1])
        start = idx + 1
    return tuple(out)

strip_line_terminator

strip_line_terminator(line)

Remove one SBI line terminator if present.

Source code in src/sartoriuslib/protocol/sbi/framing.py
def strip_line_terminator(line: bytes) -> bytes:
    """Remove one SBI line terminator if present."""
    if line.endswith(LINE_TERMINATOR):
        return line[: -len(LINE_TERMINATOR)]
    if line.endswith(b"\n"):
        return line[:-1].rstrip(b"\r")
    return line

SBI — types

sartoriuslib.protocol.sbi.types

Immutable SBI wire-level types.

SBI is line-oriented ASCII. The transport reads complete \r\n-terminated records; the parser turns each record into an :class:SbiLine and collects them into an :class:SbiReply for command variants to decode.

SbiLine dataclass

SbiLine(raw, text, kind, reading=None)

One parsed SBI line.

raw includes the on-wire line terminator if it was present. text is ASCII-decoded with the terminator stripped. reading is populated only for weight/autoprint lines.

SbiLineKind

Bases: StrEnum

Classifier for one decoded SBI line.

SbiReply dataclass

SbiReply(lines, raw)

One SBI reply.

lines holds parsed records; raw is the concatenated on-wire payload that produced them. No-response commands such as front-panel key emulation use an empty lines tuple and raw=b"".

first_line property

first_line

First parsed line, or None for no-response commands.

SBI — tables

sartoriuslib.protocol.sbi.tables

SBI command-token and decoding tables.

describe_token

describe_token(token)

Human-readable token name used in errors and debug output.

Source code in src/sartoriuslib/protocol/sbi/tables.py
def describe_token(token: bytes) -> str:
    """Human-readable token name used in errors and debug output."""
    return SBI_COMMAND_TOKENS.get(token, token.hex())

unit_from_sbi

unit_from_sbi(text)

Decode an SBI unit string to :class:Unit.

Unknown strings collapse to :attr:Unit.UNKNOWN so new firmware formats remain parseable.

Source code in src/sartoriuslib/protocol/sbi/tables.py
def unit_from_sbi(text: str) -> Unit:
    """Decode an SBI unit string to :class:`Unit`.

    Unknown strings collapse to :attr:`Unit.UNKNOWN` so new firmware formats
    remain parseable.
    """
    key = text.strip().replace(" ", "").lower()
    return SBI_UNIT_STRINGS.get(key, Unit.UNKNOWN)

SBI — parser

sartoriuslib.protocol.sbi.parser

SBI line parser — command replies and autoprint output.

parse_line

parse_line(raw)

Parse one SBI line.

Weight/autoprint lines get a decoded :class:Reading; other printable lines are preserved as identity text. Truly undecodable bytes raise :class:SartoriusParseError.

Source code in src/sartoriuslib/protocol/sbi/parser.py
def parse_line(raw: bytes) -> SbiLine:
    """Parse one SBI line.

    Weight/autoprint lines get a decoded :class:`Reading`; other printable
    lines are preserved as identity text. Truly undecodable bytes raise
    :class:`SartoriusParseError`.
    """
    body = strip_line_terminator(raw)
    try:
        text = body.decode("ascii")
    except UnicodeDecodeError as exc:
        raise SartoriusParseError(
            "SBI line is not ASCII",
            context=ErrorContext(raw_response=raw, protocol="sbi"),
        ) from exc
    stripped = text.strip()
    if not stripped:
        return SbiLine(raw=raw, text="", kind=SbiLineKind.EMPTY)
    marker = _status_marker(stripped)
    if _is_refusal_marker(marker):
        return SbiLine(raw=raw, text=stripped, kind=SbiLineKind.REFUSAL)
    if marker in _SPECIAL_NON_WEIGHT_MARKERS:
        return SbiLine(raw=raw, text=stripped, kind=SbiLineKind.UNKNOWN)
    if _looks_like_weight(text):
        return SbiLine(
            raw=raw,
            text=stripped,
            kind=SbiLineKind.WEIGHT,
            reading=parse_weight_line(raw),
        )
    return SbiLine(raw=raw, text=stripped, kind=SbiLineKind.IDENTITY)

parse_reply

parse_reply(raw)

Parse one raw SBI payload into an :class:SbiReply.

Source code in src/sartoriuslib/protocol/sbi/parser.py
def parse_reply(raw: bytes) -> SbiReply:
    """Parse one raw SBI payload into an :class:`SbiReply`."""
    lines = tuple(parse_line(line) for line in split_lines(raw))
    return SbiReply(lines=lines, raw=raw)

parse_weight_line

parse_weight_line(raw)

Decode an SBI weight/autoprint line into a protocol-neutral reading.

Source code in src/sartoriuslib/protocol/sbi/parser.py
def parse_weight_line(raw: bytes | str) -> Reading:
    """Decode an SBI weight/autoprint line into a protocol-neutral reading."""
    line_bytes = raw.encode("ascii") if isinstance(raw, str) else raw
    body = strip_line_terminator(line_bytes)
    text = body.decode("ascii", errors="replace")
    stripped = text.strip()
    marker = _status_marker(stripped)
    if marker in _OVERLOAD_MARKERS or marker in _UNDERLOAD_MARKERS:
        overload = marker in _OVERLOAD_MARKERS
        underload = marker in _UNDERLOAD_MARKERS
        return Reading(
            value=None,
            unit=Unit.UNKNOWN,
            sign=Sign.UNKNOWN,
            stable=False,
            overload=overload,
            underload=underload,
            decimals=None,
            sequence=None,
            status_flags={
                "stable": False,
                "overload": overload,
                "underload": underload,
            },
            protocol=ProtocolKind.SBI,
            received_at=datetime.now(UTC),
            monotonic_ns=time.monotonic_ns(),
            raw=line_bytes,
        )

    match = _WEIGHT_RE.match(text)
    if match is None:
        raise SartoriusParseError(
            f"unparseable SBI weight line {stripped!r}",
            context=ErrorContext(raw_response=line_bytes, protocol="sbi"),
        )

    number_text = match.group("number")
    sign_char = match.group("sign") or "+"
    unit_text = match.group("unit") or ""
    value = float(number_text)
    if sign_char == "-":
        value = -value
    decimals = _decimal_places(number_text)
    sign = _sign_from_wire(sign_char, value)
    stable = match.group("unstable") != "?" and bool(unit_text.strip())

    return Reading(
        value=value,
        unit=unit_from_sbi(unit_text),
        sign=sign,
        stable=stable,
        overload=False,
        underload=False,
        decimals=decimals,
        sequence=None,
        status_flags={"stable": stable},
        protocol=ProtocolKind.SBI,
        received_at=datetime.now(UTC),
        monotonic_ns=time.monotonic_ns(),
        raw=line_bytes,
    )

require_identity_text

require_identity_text(reply, *, allow_weight_like=True)

Return the first text-bearing line suitable for identity commands.

SBI identity fields may be numeric-only (serial numbers, software versions), so a line that also looks like a numeric display value is still valid text in this command context by default. Callers that expect a non-numeric identity field, such as the model string, can set allow_weight_like=False to avoid mistaking an autoprint reading for a command reply. Refusal and special status lines are excluded.

Source code in src/sartoriuslib/protocol/sbi/parser.py
def require_identity_text(
    reply: SbiReply,
    *,
    allow_weight_like: bool = True,
) -> str:
    """Return the first text-bearing line suitable for identity commands.

    SBI identity fields may be numeric-only (serial numbers, software
    versions), so a line that also looks like a numeric display value is still
    valid text in this command context by default. Callers that expect a
    non-numeric identity field, such as the model string, can set
    ``allow_weight_like=False`` to avoid mistaking an autoprint reading for a
    command reply. Refusal and special status lines are excluded.
    """
    for line in reply.lines:
        if line.kind not in (
            SbiLineKind.EMPTY,
            SbiLineKind.REFUSAL,
            SbiLineKind.UNKNOWN,
        ) and (allow_weight_like or line.kind is not SbiLineKind.WEIGHT):
            return line.text
    raise SartoriusParseError(
        "SBI reply did not contain identity text",
        context=ErrorContext(raw_response=reply.raw, protocol="sbi"),
    )

require_reading

require_reading(reply)

Return the first reading in reply or raise a parse error.

Source code in src/sartoriuslib/protocol/sbi/parser.py
def require_reading(reply: SbiReply) -> Reading:
    """Return the first reading in ``reply`` or raise a parse error."""
    for line in reply.lines:
        if line.reading is not None:
            return line.reading
    raise SartoriusParseError(
        "SBI reply did not contain a weight line",
        context=ErrorContext(raw_response=reply.raw, protocol="sbi"),
    )

SBI — client

sartoriuslib.protocol.sbi.client

SBI protocol client — single-in-flight line request/response.

SbiProtocolClient

SbiProtocolClient(transport, *, default_timeout=1.0)

SBI request/response client over a :class:Transport.

SBI replies are ASCII lines. Some control commands emulate front-panel keypresses and do not produce an acknowledgement; callers pass expect_lines=0 for those and receive an empty :class:SbiReply.

Source code in src/sartoriuslib/protocol/sbi/client.py
def __init__(
    self,
    transport: Transport,
    *,
    default_timeout: float = 1.0,
) -> None:
    self._transport = transport
    self._default_timeout = default_timeout
    self._lock = anyio.Lock()
    self._pending_lines: deque[bytes] = deque()
    self._autoprint_active = False
    self._disposed = False

autoprint_active property

autoprint_active

Whether passive sniffing has observed unsolicited SBI output.

disposed property

disposed

Whether this client has been retired by a protocol reconfiguration.

lock property

lock

Shared serialisation lock; held for one full exchange.

transport property

transport

The underlying transport.

detect_autoprint async

detect_autoprint(*, timeout=None, max_lines=4)

Passively sniff for unsolicited SBI autoprint/status lines.

Any complete lines read during detection are queued so the first stream read still sees them. This keeps detection cheap and honest: it never writes to the balance and never loses the first sample.

Source code in src/sartoriuslib/protocol/sbi/client.py
async def detect_autoprint(
    self,
    *,
    timeout: float | None = None,
    max_lines: int = 4,
) -> bool:
    """Passively sniff for unsolicited SBI autoprint/status lines.

    Any complete lines read during detection are queued so the first
    stream read still sees them. This keeps detection cheap and honest:
    it never writes to the balance and never loses the first sample.
    """
    t = timeout if timeout is not None else min(self._default_timeout, 0.25)
    if max_lines <= 0:
        raise ValueError(f"max_lines must be > 0, got {max_lines!r}")
    async with self._lock:
        self._raise_if_disposed(command_name="detect_autoprint", sbi_token=None)
        deadline = anyio.current_time() + t
        while len(self._pending_lines) < max_lines and anyio.current_time() < deadline:
            remaining = max(0.001, deadline - anyio.current_time())
            try:
                raw = await self._transport.read_until(
                    LINE_TERMINATOR,
                    timeout=remaining,
                )
            except SartoriusTimeoutError:
                break
            self._pending_lines.append(raw)
            if is_autoprint_line(raw):
                self._autoprint_active = True
                break
    return self._autoprint_active

dispose

dispose()

Retire this client after its owning session installs a replacement.

Source code in src/sartoriuslib/protocol/sbi/client.py
def dispose(self) -> None:
    """Retire this client after its owning session installs a replacement."""
    self._disposed = True

execute async

execute(
    request,
    *,
    timeout=None,
    command_name="",
    opcode=None,
    sbi_token=None,
    expect_lines=1,
)

Write request and parse expect_lines SBI reply lines.

Source code in src/sartoriuslib/protocol/sbi/client.py
async def execute(
    self,
    request: bytes,
    *,
    timeout: float | None = None,
    command_name: str = "",
    opcode: int | None = None,
    sbi_token: bytes | None = None,
    expect_lines: int = 1,
) -> SbiReply:
    """Write ``request`` and parse ``expect_lines`` SBI reply lines."""
    del opcode
    if expect_lines < 0:
        raise ValueError(f"expect_lines must be >= 0, got {expect_lines!r}")
    t = timeout if timeout is not None else self._default_timeout
    async with self._lock:
        return await self._execute_locked(
            request,
            timeout=t,
            command_name=command_name,
            sbi_token=sbi_token,
            expect_lines=expect_lines,
        )

mark_autoprint_active

mark_autoprint_active(*, pending=None)

Record that callers have successfully consumed autoprint output.

Source code in src/sartoriuslib/protocol/sbi/client.py
def mark_autoprint_active(self, *, pending: bytes | None = None) -> None:
    """Record that callers have successfully consumed autoprint output."""
    self._autoprint_active = True
    if pending:
        self._queue_pending_front(pending)

mark_autoprint_inactive

mark_autoprint_inactive()

Return the client to SBI command/reply mode.

Source code in src/sartoriuslib/protocol/sbi/client.py
def mark_autoprint_inactive(self) -> None:
    """Return the client to SBI command/reply mode."""
    self._autoprint_active = False

read_line async

read_line(*, timeout=None, command_name='sbi_autoprint')

Read and parse one unsolicited SBI line without writing first.

Source code in src/sartoriuslib/protocol/sbi/client.py
async def read_line(
    self,
    *,
    timeout: float | None = None,
    command_name: str = "sbi_autoprint",
) -> SbiReply:
    """Read and parse one unsolicited SBI line without writing first."""
    t = timeout if timeout is not None else self._default_timeout
    async with self._lock:
        self._raise_if_disposed(command_name=command_name, sbi_token=None)
        if self._pending_lines:
            raw = self._pending_lines.popleft()
        else:
            raw = await self._transport.read_until(LINE_TERMINATOR, timeout=t)
        return await self._parse_or_drain(
            raw,
            command_name=command_name,
            sbi_token=None,
        )

refresh_autoprint_state async

refresh_autoprint_state(*, timeout=None, max_lines=4)

Re-sniff autoprint state after a user-side mode change.

Unlike :meth:detect_autoprint, this is an explicit resync point: pending unsolicited lines are discarded before sniffing. A quiet line clears :attr:autoprint_active; observed autoprint/status output sets it and queues the observed line for later consumption.

Source code in src/sartoriuslib/protocol/sbi/client.py
async def refresh_autoprint_state(
    self,
    *,
    timeout: float | None = None,
    max_lines: int = 4,
) -> bool:
    """Re-sniff autoprint state after a user-side mode change.

    Unlike :meth:`detect_autoprint`, this is an explicit resync point:
    pending unsolicited lines are discarded before sniffing. A quiet line
    clears :attr:`autoprint_active`; observed autoprint/status output sets
    it and queues the observed line for later consumption.
    """
    t = timeout if timeout is not None else min(self._default_timeout, 0.25)
    if max_lines <= 0:
        raise ValueError(f"max_lines must be > 0, got {max_lines!r}")
    async with self._lock:
        self._raise_if_disposed(command_name="refresh_autoprint_state", sbi_token=None)
        self._pending_lines.clear()
        self._autoprint_active = False
        deadline = anyio.current_time() + t
        while len(self._pending_lines) < max_lines and anyio.current_time() < deadline:
            remaining = max(0.001, deadline - anyio.current_time())
            try:
                raw = await self._transport.read_until(
                    LINE_TERMINATOR,
                    timeout=remaining,
                )
            except SartoriusTimeoutError:
                break
            self._pending_lines.append(raw)
            if is_autoprint_line(raw):
                self._autoprint_active = True
                break
    return self._autoprint_active