Skip to content

alicatlib.devices

Device facades and identification. See Devices for the prefix matrix, identification pipeline, and escape hatches; Data frames for DataFrame / DataFrameFormat; Design §5.9 / §5.9a for the class tree and Medium model.

Public leaves

alicatlib.devices

Device facades for every Alicat instrument family.

Facades: :class:Device, :class:FlowMeter, :class:FlowController, :class:PressureMeter, :class:PressureController.

See docs/design.md §5.9 for the class tree and §5.9a for the orthogonal :class:Medium model.

This package keeps __init__ minimal — only the zero-import leaf modules (:class:DeviceKind, :class:Medium) re-export here. The :class:Session layer (:mod:alicatlib.devices.session) and data-frame models (:mod:alicatlib.devices.data_frame, :mod:alicatlib.devices.models) are imported by protocol-layer parsers and the command catalog; promoting them into this package's __init__ would trigger a circular import when parsing helpers reach back into the devices package. Users import those names from their own modules (from alicatlib.devices.session import Session).

DeviceKind

Bases: StrEnum

What kind of Alicat device we're talking to.

Coarser than :class:alicatlib.commands.base.Capability — a flow meter might or might not have a barometer; a flow controller might have one, two, or three valves. Per-feature gating is via Capability; this enum just says "mass-flow meter vs mass-flow controller vs pressure meter ..." so commands can declare a short list of compatible kinds.

UNKNOWN class-attribute instance-attribute

UNKNOWN = 'unknown'

Catch-all for models the factory's MODEL_RULES table doesn't match.

A device with this kind still gets a generic :class:Device facade (poll() and execute() work); only commands whose device_kinds explicitly list UNKNOWN will dispatch — the session's kind-gating (§5.7) rejects the rest. This is the "loud silence" path: we'd rather tell users "unknown model, try model_hint" than silently classify a new MFC as a pressure controller.

Medium

Bases: Flag

What kind of fluid a device moves.

Orthogonal to :class:~alicatlib.devices.kind.DeviceKind (function × form). A :class:Flag rather than a plain :class:Enum so the model can represent devices whose media is ambiguous at the prefix level — either because the hardware truly supports both (some Coriolis lines are reported this way) or because the prefix covers multiple order-time configurations. Gating via bitwise intersection keeps a single code path for every configuration:

.. code:: python

if not (device.info.media & command.media):
    raise AlicatMediumMismatchError(...)

See design §5.9a for the full rationale on modelling medium as a flag (not an enum), why the class tree stays kind-shaped rather than medium-shaped, and why assume_media on the factory replaces rather than unions.

GAS class-attribute instance-attribute

GAS = auto()

Device is configured for gas. Gas-specific commands (GS, ??G*, gas-mix edits) pass the media gate; liquid-specific commands fail pre-I/O.

LIQUID class-attribute instance-attribute

LIQUID = auto()

Device is configured for liquid. Liquid-specific commands (fluid select / list, per-fluid reference density) pass the media gate; gas commands fail pre-I/O.

NONE class-attribute instance-attribute

NONE = 0

No medium resolved. Only valid as an intermediate during identification; a live :class:~alicatlib.devices.models.DeviceInfo always carries at least one of :attr:GAS / :attr:LIQUID.

Factory and lifecycle

alicatlib.devices.factory

Device factory — identification pipeline + open_device context manager.

The factory implements design §5.9's staged identification flow:

  1. (Optional) stream recovery — passively read the transport for ~100 ms; if any bytes arrive, the device was left streaming by a prior process, so issue a stop-stream (@@ {unit_id}) and drain before the real identify begins.
  2. VE — firmware version; works on every firmware family and is the anchor of identification.
  3. ??M* — 10-line manufacturing-info table, only when firmware is numeric-family and ≥ 8v28. Parsed by the protocol layer into :class:ManufacturingInfo; the factory applies a best-guess M<NN> → named-field mapping to synthesise :class:DeviceInfo.
  4. Fallback — for GP / pre-8v28 devices ??M* isn't available, so the caller must supply model_hint. The factory raises :class:AlicatConfigurationError if identification reaches this branch without a hint.
  5. Capability probing — :func:probe_capabilities probes the device for each :class:Capability flag, failing closed (default absent on timeout / ? / parse error). Outcomes are retained in :attr:DeviceInfo.probe_report for diagnostics; gating uses only the flag set.
  6. ??D* — cached on the session as :attr:Session.data_frame_format.
  7. Model-rule dispatch — :func:device_class_for picks the correct :class:Device subclass via the :data:MODEL_RULES table.

Stream recovery, capability probing, and the M-code → named-field mapping are all marked as best-effort and will be tightened against hardware captures.

Design reference: docs/design.md §5.9, §5.20.

ModelRule dataclass

ModelRule(prefix, kind, media, device_cls_map)

One entry in the model-prefix dispatch table (design §5.9, §5.9a).

The factory walks :data:MODEL_RULES in declared order and returns the first rule whose :attr:prefix matches the identified model. Ordering matters: longer / more-specific prefixes must come before their shorter kin (MCDW- before MCW- before MC-), so the most-specific match wins.

Every currently-supported prefix resolves :attr:kind deterministically — the published Alicat part-number decoders for the M, MC, P, PC, L, LC, K-family (CODA), and B/BC (BASIS) lines all encode meter vs. controller as a distinct part-number field. If a future prefix turns out to be kind-ambiguous at the prefix level, this dataclass will grow a kind_probe field back at that time; omitting it now keeps the public shape minimal.

Attributes:

Name Type Description
prefix str

The model-string prefix this rule claims (e.g. "MC-"). Alicat model strings are always uppercase; matching is case-sensitive.

kind DeviceKind

The :class:DeviceKind for any model matching this prefix.

media Medium

The :class:Medium flag this prefix declares. For prefixes whose medium isn't determinable from the part number (the CODA K-family is the current example — the part-number decoder encodes kind but not medium, and every CODA unit is currently believed to handle both), this is the widest possible default (Medium.GAS | Medium.LIQUID); users whose device is configured for a single medium narrow via :func:open_device's assume_media parameter.

device_cls_map Mapping[DeviceKind, type[Device]]

Per-kind facade class. Lookup is device_cls_map[kind]; when the kind isn't in the map the factory drops to the generic :class:Device (which still honours every session gate, so safe).

device_class_for

device_class_for(info)

Return the concrete :class:Device subclass for info.

Routing is prefix-based via :data:MODEL_RULES. Every current rule resolves kind deterministically, so the facade class is rule.device_cls_map[rule.kind].

Unknown prefixes and kinds not in the rule's map fall back to the generic :class:Device — the session's :class:DeviceKind / :class:Medium gates still fire, so the fallback is safe: commands that don't list UNKNOWN in device_kinds simply refuse to dispatch.

Source code in src/alicatlib/devices/factory.py
def device_class_for(info: DeviceInfo) -> type[Device]:
    """Return the concrete :class:`Device` subclass for ``info``.

    Routing is prefix-based via :data:`MODEL_RULES`. Every current rule
    resolves kind deterministically, so the facade class is
    ``rule.device_cls_map[rule.kind]``.

    Unknown prefixes and kinds not in the rule's map fall back to the
    generic :class:`Device` — the session's :class:`DeviceKind` /
    :class:`Medium` gates still fire, so the fallback is safe: commands
    that don't list ``UNKNOWN`` in ``device_kinds`` simply refuse to
    dispatch.
    """
    rule = _rule_for_model(info.model)
    if rule is None:
        return Device
    return rule.device_cls_map.get(rule.kind, Device)

identify_device async

identify_device(client, unit_id='A', *, model_hint=None)

Run VE → (optional) ??M* → classify; return :class:DeviceInfo.

Capabilities are not populated here; call :func:probe_capabilities separately and merge via :func:dataclasses.replace. That split matches design §5.9 and keeps the two concerns testable in isolation.

Parameters:

Name Type Description Default
client AlicatProtocolClient

A wired :class:AlicatProtocolClient. Identification issues VE and (conditionally) ??M*, both of which flow through the client's single-in-flight lock.

required
unit_id str

Polling unit id ("A".."Z").

'A'
model_hint str | None

Required when ??M* can't be reached (GP family, V1_V7, or any device that rejects the command). Ignored when ??M* succeeds. Raises :class:AlicatConfigurationError if identification reaches the fallback branch without a hint.

None
Source code in src/alicatlib/devices/factory.py
async def identify_device(
    client: AlicatProtocolClient,
    unit_id: str = "A",
    *,
    model_hint: str | None = None,
) -> DeviceInfo:
    """Run ``VE`` → (optional) ``??M*`` → classify; return :class:`DeviceInfo`.

    Capabilities are *not* populated here; call :func:`probe_capabilities`
    separately and merge via :func:`dataclasses.replace`. That split
    matches design §5.9 and keeps the two concerns testable in isolation.

    Args:
        client: A wired :class:`AlicatProtocolClient`. Identification
            issues ``VE`` and (conditionally) ``??M*``, both of which
            flow through the client's single-in-flight lock.
        unit_id: Polling unit id (``"A"``..``"Z"``).
        model_hint: Required when ``??M*`` can't be reached (GP family,
            V1_V7, or any device that rejects the command). Ignored when
            ``??M*`` succeeds. Raises :class:`AlicatConfigurationError`
            if identification reaches the fallback branch without a hint.
    """
    # The session's firmware gating references self._info.firmware, so
    # to drive VE we need a throwaway DeviceInfo first. We populate the
    # rest after parsing VE.
    ve_session = _bootstrap_session(client, unit_id)
    try:
        ve_result = await ve_session.execute(VE_QUERY, VeRequest())
    except AlicatTimeoutError:
        # GP firmware does not implement VE (design §16.6.8, confirmed on
        # a GP07R100 capture 2026-04-17). Fall through to a VE-less
        # identification path that drives ``??M*`` directly and
        # synthesises a GP :class:`FirmwareVersion` from the M8
        # ``Software`` field. If ``??M*`` also fails, the caller must
        # supply ``model_hint`` and we build a minimal GP DeviceInfo.
        return await _identify_without_ve(client, unit_id, model_hint=model_hint)
    firmware = ve_result.firmware
    firmware_date = ve_result.firmware_date

    # Try ??M* on any family the reachability gate allows. The device
    # may still reject (older numeric firmware that was never updated, or
    # a custom revision); on rejection / parse error we fall through to
    # the model_hint path. Design §16.6 — the primer's 8v28 floor was
    # observed wrong on real 8v17 hardware.
    if _manufacturing_info_reachable(firmware):
        mfg_session = _bootstrap_session(
            client,
            unit_id,
            firmware_override=firmware,
        )
        mfg_info: ManufacturingInfo | None
        try:
            mfg_info = await mfg_session.execute(
                MANUFACTURING_INFO,
                ManufacturingInfoRequest(),
            )
        except (
            AlicatCommandRejectedError,
            AlicatTimeoutError,
            AlicatParseError,
        ):
            # Device doesn't actually support ??M* despite the family
            # check. Fall through to the model_hint path below.
            mfg_info = None
        if mfg_info is not None:
            named = _extract_named_fields(mfg_info)
            model = named["model"]
            if model is None:
                if model_hint is None:
                    raise AlicatConfigurationError(
                        f"??M* returned no model field (M04) and no model_hint was supplied; "
                        f"raw by_code={dict(mfg_info.by_code)!r}",
                        context=ErrorContext(
                            command_name="??M*",
                            unit_id=unit_id,
                            raw_response=None,
                        ),
                    )
                model = model_hint
            return DeviceInfo(
                unit_id=unit_id,
                manufacturer=named["manufacturer"],
                model=model,
                serial=named["serial"],
                manufactured=named["manufactured"],
                calibrated=named["calibrated"],
                calibrated_by=named["calibrated_by"],
                software=named["software"] or str(firmware),
                firmware=firmware,
                firmware_date=firmware_date,
                kind=_kind_for_model(model),
                media=_media_for_model(model),
                capabilities=Capability.NONE,
            )

    # Fallback: ??M* unsupported / rejected, or family says skip.
    if model_hint is None:
        raise AlicatConfigurationError(
            f"Firmware {firmware} does not support ??M*; supply model_hint=... "
            f"to open_device / identify_device to synthesise DeviceInfo "
            f"(GP family / pre-8v28 devices reach this branch — see design §5.9).",
            context=ErrorContext(
                command_name="identify_device",
                unit_id=unit_id,
                firmware=firmware,
            ),
        )
    return DeviceInfo(
        unit_id=unit_id,
        manufacturer=None,
        model=model_hint,
        serial=None,
        manufactured=None,
        calibrated=None,
        calibrated_by=None,
        software=str(firmware),
        firmware=firmware,
        firmware_date=firmware_date,
        kind=_kind_for_model(model_hint),
        media=_media_for_model(model_hint),
        capabilities=Capability.NONE,
    )

open_device async

open_device(
    port,
    *,
    unit_id="A",
    serial=None,
    timeout=0.5,
    recover_from_stream=True,
    model_hint=None,
    assume_capabilities=Capability.NONE,
    assume_media=None,
)

Open a fully-identified :class:Device for async with use.

The caller's port determines the lifecycle the context manager takes ownership of:

  • str ("/dev/ttyUSB0" etc.) — build a :class:SerialTransport from serial (or defaults), open it, wrap in an :class:AlicatProtocolClient, close both on exit.
  • :class:Transport — wrap in a new :class:AlicatProtocolClient; the transport's open/close is the caller's responsibility (we never close a transport we didn't open).
  • :class:AlicatProtocolClient — use as-is; neither transport nor client is closed on exit. Stream recovery is skipped because the factory doesn't have access to the underlying transport.

The assume_capabilities override is union'd onto the probed set per design §5.9 — the factory never subtracts flags, because silently masking hardware the device reports as present is exactly the failure mode capability probing exists to avoid.

The assume_media override replaces the prefix-derived media (design §5.9a). Medium answers "how is this specific unit configured," not "what can the hardware do" — the common correction is to narrow from a permissive prefix default to the single medium the unit was actually ordered locked to. The K-family CODA prefixes default to Medium.GAS | Medium.LIQUID because the part-number decoder encodes kind but not medium; other future order-configurable prefixes can adopt the same pattern. A replace policy also future-proofs the model: any new ambiguous prefix drops into :data:MODEL_RULES with the widest default, and users narrow at open time.

Source code in src/alicatlib/devices/factory.py
@asynccontextmanager
async def open_device(
    port: str | Transport | AlicatProtocolClient,
    *,
    unit_id: str = "A",
    serial: SerialSettings | None = None,
    timeout: float = 0.5,
    recover_from_stream: bool = True,
    model_hint: str | None = None,
    assume_capabilities: Capability = Capability.NONE,
    assume_media: Medium | None = None,
) -> AsyncGenerator[Device]:
    """Open a fully-identified :class:`Device` for ``async with`` use.

    The caller's ``port`` determines the lifecycle the context manager
    takes ownership of:

    - ``str`` (``"/dev/ttyUSB0"`` etc.) — build a
      :class:`SerialTransport` from ``serial`` (or defaults), open it,
      wrap in an :class:`AlicatProtocolClient`, close both on exit.
    - :class:`Transport` — wrap in a new
      :class:`AlicatProtocolClient`; the transport's open/close is the
      caller's responsibility (we never close a transport we didn't
      open).
    - :class:`AlicatProtocolClient` — use as-is; neither transport nor
      client is closed on exit. Stream recovery is skipped because the
      factory doesn't have access to the underlying transport.

    The ``assume_capabilities`` override is union'd onto the probed set
    per design §5.9 — the factory never *subtracts* flags, because
    silently masking hardware the device reports as present is exactly
    the failure mode capability probing exists to avoid.

    The ``assume_media`` override **replaces** the prefix-derived media
    (design §5.9a). Medium answers "how is this specific unit
    configured," not "what can the hardware do" — the common correction
    is to narrow from a permissive prefix default to the single medium
    the unit was actually ordered locked to. The K-family CODA prefixes
    default to ``Medium.GAS | Medium.LIQUID`` because the part-number
    decoder encodes kind but not medium; other future order-configurable
    prefixes can adopt the same pattern. A replace policy also
    future-proofs the model: any new ambiguous prefix drops into
    :data:`MODEL_RULES` with the widest default, and users narrow at
    open time.
    """
    owns_transport = False
    owns_client = False
    transport: Transport | None = None

    if isinstance(port, AlicatProtocolClient):
        client = port
    elif isinstance(port, str):
        settings = serial if serial is not None else SerialSettings(port=port)
        transport = SerialTransport(settings)
        client = AlicatProtocolClient(transport, default_timeout=timeout)
        owns_transport = True
        owns_client = True
    else:
        # Duck-typed Transport (Protocol isn't runtime-checkable).
        transport = port
        client = AlicatProtocolClient(transport, default_timeout=timeout)
        owns_client = True

    try:
        if transport is not None and not transport.is_open:
            await transport.open()
        if recover_from_stream and transport is not None:
            await _recover_from_stream(transport, unit_id)

        info = await identify_device(client, unit_id, model_hint=model_hint)
        probed_caps, probe_report = await probe_capabilities(client, unit_id, info)
        merged_caps = probed_caps | assume_capabilities
        # Medium resolution: prefix-derived by default; ``assume_media``
        # **replaces** (not unions — design §5.9a). Rationale: the common
        # correction is narrowing a permissive prefix default
        # (``Medium.GAS | Medium.LIQUID`` for K-family CODA prefixes and similar
        # whose medium varies by order-time configuration) to the
        # single medium the unit was actually ordered locked to.
        resolved_media = info.media if assume_media is None else assume_media
        info = dataclasses.replace(
            info,
            media=resolved_media,
            capabilities=merged_caps,
            probe_report=probe_report,
        )

        data_frame_format = await _probe_data_frame_format(client, info, unit_id)

        # Per design §10.1: bind per-field engineering units
        # from ``DCU`` where ``??D*`` didn't surface a recognisable label,
        # then populate ``DeviceInfo.full_scale`` from ``FPF`` so
        # setpoint and similar facades can range-check pre-I/O (design
        # §5.20.2). Both probes iterate the data-frame fields, are
        # best-effort per statistic, and never fail the open — a device
        # that rejects one probe just leaves that slot unresolved.
        data_frame_format = await _bind_field_units(
            client,
            info,
            unit_id,
            data_frame_format,
        )
        info = await _probe_full_scales(
            client,
            info,
            unit_id,
            data_frame_format,
        )

        port_label = _resolve_port_label(port, transport)
        session = Session(
            client,
            unit_id=unit_id,
            info=info,
            data_frame_format=data_frame_format,
            port_label=port_label,
        )
        # Pre-cache the loop-control variable for controllers so the
        # first setpoint call can already range-check. Best-effort:
        # firmware without ``LV`` (V1_V7, pre-9v00 V8_V9) leaves the
        # cache ``None`` and setpoint simply skips the range check.
        await _prefetch_loop_control_variable(session, info)

        device_cls = device_class_for(info)
        device = device_cls(session)

        try:
            yield device
        finally:
            await device.close()
    finally:
        del owns_client  # not load-bearing — we never open client separately
        if owns_transport and transport is not None and transport.is_open:
            await transport.close()

probe_capabilities async

probe_capabilities(client, unit_id, info)

Probe each :class:Capability flag on the device.

Real probes implemented so far:

  • :attr:Capability.BAROMETERFPF 15 on any numeric-family device. "Present" iff the reply has value > 0 and unit_label != "---" (design §16.6.3). The device emits A <zero> 1 --- when the statistic is not supported, which has to be disambiguated from a real reading. Note that a positive BAROMETER probe on a flow controller does NOT imply :attr:Capability.TAREABLE_ABSOLUTE_PRESSURE — the two dissociate in practice (design §16.6.7 / Capability docstring).
  • :attr:Capability.SECONDARY_PRESSURE — identical rule applied to FPF 344 (second absolute pressure). Trying 344 covers the common second-pressure-sensor configuration; future work can extend to 352 / 360 if devices surface those instead.

Stubs still fail-closed:

  • :attr:Capability.TAREABLE_ABSOLUTE_PRESSURE — no safe probe; test-writing PC would re-zero the abs sensor. Users with a pressure meter/controller that supports PC opt in via assume_capabilities=Capability.TAREABLE_ABSOLUTE_PRESSURE.

  • :attr:Capability.MULTI_VALVE / :attr:THIRD_VALVEVD returns four columns unconditionally across meter and single-valve-controller devices alike (design §16.6.6), so the earlier column-count plan is invalidated. Left absent until a valve-count signal surfaces.

  • :attr:Capability.TOTALIZER, analog I/O flags, display, remote tare, bidirectional — no hardware-validated probe strategy yet.

GP family skips every probe: we have no GP capture and the primer doesn't document FPF there, so assuming absence is the safe default. Callers can still union capabilities in via assume_capabilities=... on :func:open_device.

Fails closed on every flag the probe doesn't positively confirm — design §5.9's "default-absent" policy: a probe that can't answer should never falsely claim the hardware is present.

Source code in src/alicatlib/devices/factory.py
async def probe_capabilities(
    client: AlicatProtocolClient,
    unit_id: str,
    info: DeviceInfo,
) -> tuple[Capability, Mapping[Capability, ProbeOutcome]]:
    """Probe each :class:`Capability` flag on the device.

    Real probes implemented so far:

    - :attr:`Capability.BAROMETER` — ``FPF 15`` on any numeric-family
      device. "Present" iff the reply has ``value > 0`` *and*
      ``unit_label != "---"`` (design §16.6.3). The device emits
      ``A <zero> 1 ---`` when the statistic is not supported, which
      has to be disambiguated from a real reading. Note that a
      positive ``BAROMETER`` probe on a flow controller does NOT
      imply :attr:`Capability.TAREABLE_ABSOLUTE_PRESSURE` — the two
      dissociate in practice (design §16.6.7 / Capability docstring).
    - :attr:`Capability.SECONDARY_PRESSURE` — identical rule applied
      to ``FPF 344`` (second absolute pressure). Trying ``344`` covers
      the common second-pressure-sensor configuration; future work can
      extend to ``352`` / ``360`` if devices surface those instead.

    Stubs still fail-closed:

    - :attr:`Capability.TAREABLE_ABSOLUTE_PRESSURE` — no safe probe;
      test-writing ``PC`` would re-zero the abs sensor. Users with a
      pressure meter/controller that supports ``PC`` opt in via
      ``assume_capabilities=Capability.TAREABLE_ABSOLUTE_PRESSURE``.

    - :attr:`Capability.MULTI_VALVE` / :attr:`THIRD_VALVE` —
      ``VD`` returns four columns unconditionally across meter and
      single-valve-controller devices alike (design §16.6.6), so the
      earlier column-count plan is invalidated. Left absent until a
      valve-count signal surfaces.
    - :attr:`Capability.TOTALIZER`, analog I/O flags, display, remote
      tare, bidirectional — no hardware-validated probe strategy yet.

    GP family skips every probe: we have no GP capture and the primer
    doesn't document ``FPF`` there, so assuming absence is the safe
    default. Callers can still union capabilities in via
    ``assume_capabilities=...`` on :func:`open_device`.

    Fails *closed* on every flag the probe doesn't positively confirm —
    design §5.9's "default-absent" policy: a probe that can't answer
    should never falsely claim the hardware is present.
    """
    report: dict[Capability, ProbeOutcome] = dict.fromkeys(
        _iter_single_capability_flags(),
        "absent",
    )

    if info.firmware.family is FirmwareFamily.GP:
        # No FPF on GP; leave every flag absent.
        _logger.info(
            "probe_capabilities.gp_skip",
            extra={
                "unit_id": unit_id,
                "firmware": str(info.firmware),
                "reason": "gp_family_no_fpf",
                "resolved": "NONE",
            },
        )
        return Capability.NONE, MappingProxyType(report)

    probe_session = _bootstrap_session(client, unit_id, firmware_override=info.firmware)

    baro_flag, baro_outcome = await _probe_fpf_capability(
        probe_session,
        statistic=_STAT_BARO_PRESS,
        present_flag=Capability.BAROMETER,
    )
    report[Capability.BAROMETER] = baro_outcome

    sec_flag, sec_outcome = await _probe_fpf_capability(
        probe_session,
        statistic=_STAT_ABS_PRESS_SECOND,
        present_flag=Capability.SECONDARY_PRESSURE,
    )
    report[Capability.SECONDARY_PRESSURE] = sec_outcome

    resolved = baro_flag | sec_flag
    # Structured INFO per design §5.19 / §15.2: one log event per
    # identification summarising the capability-probe outcome. Users
    # wiring up dashboards / dashboards-by-error want to see this as
    # a single row per device rather than scraping per-probe entries.
    _logger.info(
        "probe_capabilities.result",
        extra={
            "unit_id": unit_id,
            "firmware": str(info.firmware),
            "model": info.model,
            "resolved": resolved.name or str(resolved),
            "present": [c.name for c in _iter_single_capability_flags() if report[c] == "present"],
            "outcomes": {c.name: report[c] for c in _iter_single_capability_flags() if c.name},
        },
    )
    return resolved, MappingProxyType(report)

Data-frame models

alicatlib.devices.data_frame

Data-frame format, parsing, and timing-wrapped result.

The Alicat A\r poll response is core to the polling path, yet its shape is device-dependent — Alicat advertises it via ??D* at session start. This module models that format explicitly (so positional parsing survives conditional *-marked fields), keeps the byte-level parse pure (no clocks), and layers timing provenance on top via :class:DataFrame.

The split between :class:ParsedFrame (pure bytes → typed values) and :class:DataFrame (ParsedFrame + received_at / monotonic_ns) is load-bearing: parser unit tests stay clock-free (no freeze-time mocking), and the :class:~alicatlib.devices.session.Session owns the single place that captures timing. See design doc §5.6.

DataFrame dataclass

DataFrame(
    unit_id,
    format,
    values,
    values_by_statistic,
    status,
    received_at,
    monotonic_ns,
)

Timing-wrapped :class:ParsedFrame — the public polling result.

Built by :meth:from_parsed. monotonic_ns is for drift analysis and scheduling (never wall-clock); received_at is for data provenance in sinks.

as_dict

as_dict()

Flatten to a JSON/CSV-friendly dict.

Produces {field_name: value, "status": "HLD,OPL", "received_at": iso8601} — status codes collapse into a single comma-joined sorted string (empty when no codes are active) so downstream schema is stable across rows. Callers that need per-code boolean columns should wrap this themselves; the library picks the schema-stable form.

Source code in src/alicatlib/devices/data_frame.py
def as_dict(self) -> dict[str, float | str | None]:
    """Flatten to a JSON/CSV-friendly dict.

    Produces ``{field_name: value, "status": "HLD,OPL", "received_at": iso8601}``
    — status codes collapse into a single comma-joined sorted string
    (empty when no codes are active) so downstream schema is stable
    across rows. Callers that need per-code boolean columns should
    wrap this themselves; the library picks the schema-stable form.
    """
    result: dict[str, float | str | None] = dict(self.values)
    result["status"] = ",".join(sorted(code.value for code in self.status))
    result["received_at"] = self.received_at.isoformat()
    return result

from_parsed classmethod

from_parsed(parsed, *, format, received_at, monotonic_ns)

Wrap a :class:ParsedFrame with timing captured at read time.

Source code in src/alicatlib/devices/data_frame.py
@classmethod
def from_parsed(
    cls,
    parsed: ParsedFrame,
    *,
    format: DataFrameFormat,  # noqa: A002 — "format" is the public kwarg per design §5.5
    received_at: datetime,
    monotonic_ns: int,
) -> DataFrame:
    """Wrap a :class:`ParsedFrame` with timing captured at read time."""
    return cls(
        unit_id=parsed.unit_id,
        format=format,
        values=parsed.values,
        values_by_statistic=parsed.values_by_statistic,
        status=parsed.status,
        received_at=received_at,
        monotonic_ns=monotonic_ns,
    )

get_float

get_float(name)

Return the float value at name, or None if absent or non-numeric.

This is the "forgiving" accessor used when a downstream consumer wants a numeric value and accepts absence. Text-valued fields and the -- sentinel both yield None; exceptions are never raised. Callers that need strict behaviour should index :attr:values directly.

Source code in src/alicatlib/devices/data_frame.py
def get_float(self, name: str) -> float | None:
    """Return the float value at ``name``, or ``None`` if absent or non-numeric.

    This is the "forgiving" accessor used when a downstream consumer
    wants a numeric value and accepts absence. Text-valued fields and
    the ``--`` sentinel both yield ``None``; exceptions are never
    raised. Callers that need strict behaviour should index
    :attr:`values` directly.
    """
    value = self.values.get(name)
    return value if isinstance(value, float) else None

get_statistic

get_statistic(stat)

Return the value keyed by :class:Statistic, or None if absent.

Prefer this over :meth:get_float when the caller has a typed :class:Statistic — it's IDE-completable and robust to wire-name renames across firmware versions.

Source code in src/alicatlib/devices/data_frame.py
def get_statistic(self, stat: Statistic) -> float | str | None:
    """Return the value keyed by :class:`Statistic`, or ``None`` if absent.

    Prefer this over :meth:`get_float` when the caller has a typed
    :class:`Statistic` — it's IDE-completable and robust to wire-name
    renames across firmware versions.
    """
    return self.values_by_statistic.get(stat)

DataFrameField dataclass

DataFrameField(
    name,
    raw_name,
    type_name,
    statistic,
    unit,
    conditional,
    parser,
)

One column in the ??D*-advertised data-frame format.

Attributes:

Name Type Description
name str

Canonical field name, e.g. "Mass_Flow". Used as a key in :attr:ParsedFrame.values and :meth:DataFrame.get_float.

raw_name str

The exact name as reported by the device, preserved so a fixture diff can surface unexpected firmware-side renames.

type_name str

Wire type as declared in ??D* (e.g. "decimal", "integer", "text") — retained for diagnostics, not used by the parser (the parser callable is authoritative).

statistic Statistic | None

Linkage back to :class:Statistic so :attr:ParsedFrame.values_by_statistic can be built. None for fields not modelled in the statistics registry.

unit Unit | None

Engineering :class:Unit active for this field at the time the format was cached. The data frame itself doesn't carry units — the session probes DCU / FPF at startup to bind these. None when the unit doesn't resolve against the registry.

conditional bool

True when ??D* reported this field with a leading *. A conditional field appears in the wire frame only when its condition is met; the parser tail-matches conditionals after all required fields have been consumed.

parser Callable[[str], float | str | None]

Bytes-less (already-decoded string) parser that turns the raw token into the typed value. Supplied by the factory that builds the format — typically parse_float, parse_optional_float, parse_int, or an identity callable for text fields.

DataFrameFormat dataclass

DataFrameFormat(fields, flavor)

Advertised data-frame layout with a pure :meth:parse method.

The format is cached on the :class:~alicatlib.devices.session.Session at startup (via ??D*) and exposed via session.refresh_data_frame_format() for the rare runtime-mutation cases (e.g. after FDF or DCU). The format is immutable — any change produces a new :class:DataFrameFormat.

names

names()

Canonical field names, in declared order.

Source code in src/alicatlib/devices/data_frame.py
def names(self) -> tuple[str, ...]:
    """Canonical field names, in declared order."""
    return tuple(f.name for f in self.fields)

parse

parse(raw)

Parse a single data-frame line into a :class:ParsedFrame.

Strategy (per design §5.6):

  1. Tokenise on whitespace; first token is the device's unit ID.
  2. Match required (non-conditional) fields left-to-right against the leading tokens — they always appear.
  3. Walk the surplus tokens. Any token matching a :class:~alicatlib.devices.models.StatusCode value collapses into :attr:ParsedFrame.status; remaining tokens are assigned to conditional fields in declared order.
  4. Conditional fields that never receive a token are simply absent from :attr:ParsedFrame.values — they are not None. This matters for downstream sinks: an absent column is distinct from a column whose value is the -- sentinel (which does land as None via :func:parse_optional_float).

Raises:

Type Description
AlicatParseError

Empty frame, non-ASCII bytes, or not enough tokens to cover the required fields.

Source code in src/alicatlib/devices/data_frame.py
def parse(self, raw: bytes) -> ParsedFrame:
    """Parse a single data-frame line into a :class:`ParsedFrame`.

    Strategy (per design §5.6):

    1. Tokenise on whitespace; first token is the device's unit ID.
    2. Match required (non-conditional) fields left-to-right against
       the leading tokens — they always appear.
    3. Walk the surplus tokens. Any token matching a
       :class:`~alicatlib.devices.models.StatusCode` value collapses
       into :attr:`ParsedFrame.status`; remaining tokens are assigned
       to conditional fields in declared order.
    4. Conditional fields that never receive a token are simply absent
       from :attr:`ParsedFrame.values` — they are not ``None``. This
       matters for downstream sinks: an absent column is distinct from
       a column whose value is the ``--`` sentinel (which *does* land
       as ``None`` via :func:`parse_optional_float`).

    Raises:
        AlicatParseError: Empty frame, non-ASCII bytes, or not enough
            tokens to cover the required fields.
    """
    from alicatlib.protocol.framing import decode_ascii  # noqa: PLC0415, I001 — see top-of-module note

    text = decode_ascii(raw)
    tokens = text.split()
    if not tokens:
        raise AlicatParseError(
            "empty data frame",
            field_name="data_frame",
            expected=">=1 token",
            actual=raw,
            context=ErrorContext(command_name="poll", raw_response=raw),
        )

    required = tuple(f for f in self.fields if not f.conditional)
    conditional = tuple(f for f in self.fields if f.conditional)

    if len(tokens) < len(required):
        raise AlicatParseError(
            f"data frame truncated: expected >= {len(required)} required fields, "
            f"got {len(tokens)} tokens — {text!r}",
            field_name="data_frame",
            expected=len(required),
            actual=len(tokens),
            context=ErrorContext(command_name="poll", raw_response=raw),
        )

    values: dict[str, float | str | None] = {}
    for field_spec, token in zip(required, tokens[: len(required)], strict=True):
        values[field_spec.name] = field_spec.parser(token)

    tail = tokens[len(required) :]
    status: set[StatusCode] = set()
    conditional_tokens: list[str] = []
    for token in tail:
        if token in _STATUS_VALUES:
            status.add(StatusCode(token))
        else:
            conditional_tokens.append(token)

    for field_spec, token in zip(conditional, conditional_tokens, strict=False):
        values[field_spec.name] = field_spec.parser(token)

    values_by_statistic: dict[Statistic, float | str | None] = {
        f.statistic: values[f.name]
        for f in self.fields
        if f.statistic is not None and f.name in values
    }

    return ParsedFrame(
        unit_id=tokens[0],
        values=MappingProxyType(values),
        values_by_statistic=MappingProxyType(values_by_statistic),
        status=frozenset(status),
    )

DataFrameFormatFlavor

Bases: Enum

Wire-format generation for the ??D* data-frame advertisement.

Alicat firmware has used at least two distinct ??D* layouts over the years (design §16.6 / §16.6.2 / §16.6.4). The flavor lives on :class:~alicatlib.commands.base.DecodeContext so the dispatching parser knows which line-shape to expect.

Captured-device map (2026-04-17 hardware validation):

  • DEFAULT — canonical Alicat layout. Column header <uid> D00 ID_ NAME... TYPE... WIDTH NOTES.... Field rows carry an explicit stat-code column and conditional fields are marked with a leading *<name>. Devices observed: 6v21, 8v17, 8v30, 10v04, 10v20.
  • LEGACY — older shape from before the dialect transition. Column header <uid> D00 NAME... TYPE... MinVal MaxVal UNITS.... No stat-code column, no * marker, signed / char types instead of s decimal / string, units inline in a single column. Devices observed: 5v12. The transition happened between firmware 5v12 and 6v21 — so the legacy shape is not family-correlated (both devices are V1_V7-family); it correlates with firmware version somewhere around the V5→V6 cut-over. The flavor used to be called V1_V7 but that was misleading; renamed to LEGACY.

SIGNED and VARIABLE_V8 are reserved for future use if a captured device shows a third / fourth distinct dialect; currently unused.

ParsedFrame dataclass

ParsedFrame(unit_id, values, values_by_statistic, status)

Byte-level parse result. Pure function of (raw, format); no timing.

The :class:~alicatlib.devices.session.Session wraps this into a :class:DataFrame via :meth:DataFrame.from_parsed, at which point received_at and monotonic_ns are captured from the terminator-read call site. Keeping the two separate makes parser unit tests clock-free.

Typed identity / measurement models

alicatlib.devices.models

Typed device-identity and measurement models.

These are the frozen dataclasses returned by the session layer for identification results (:class:DeviceInfo), capability probe outcomes (:data:ProbeOutcome), individual statistic readings (:class:MeasurementSet), and cached full-scale ranges (:class:FullScaleValue). Together with :mod:alicatlib.devices.data_frame they are the full set of public models referenced by the rest of the package, per design doc §5.5.

Data-frame models (:class:~alicatlib.devices.data_frame.DataFrame, :class:~alicatlib.devices.data_frame.DataFrameFormat, ...) live in :mod:alicatlib.devices.data_frame to keep the wire-parsing machinery separate from the identity models that cache it.

ProbeOutcome

ProbeOutcome = Literal[
    "present",
    "absent",
    "timeout",
    "rejected",
    "parse_error",
]

Per-:class:Capability probe result.

Retained on :attr:DeviceInfo.probe_report for diagnostics and user-facing override guidance. The gating check in the session is binary — the flag is either set in :attr:DeviceInfo.capabilities or not — but the per-flag outcome is useful when a user needs to understand why a capability was marked absent. A timeout looks the same as "hardware missing" at the gating layer, but they imply very different remediations.

AnalogOutputChannel

Bases: IntEnum

Analog-output channel selector for ASOCV.

Devices ship with a primary analog output (4-20 mA or 0-5 V per part-number suffix) and optionally a secondary. ASOCV 0 targets primary; ASOCV 1 targets secondary.

AnalogOutputSourceSetting dataclass

AnalogOutputSourceSetting(
    unit_id, channel, value, unit_code, unit, unit_label
)

Result of an ASOCV (analog-output-source) query or set.

value is the statistic code the output tracks — or the sentinel 0 (minimum) / 1 (maximum), in which case the device emits a fixed min / max analog level instead of following a measurement. When value is 0 or 1, the primer notes that unit_code=1 and unit_label="---".

AutoTareState dataclass

AutoTareState(unit_id, enabled, delay_s)

Result of an auto-tare (ZCA) query or set.

delay_s is the configured settling delay in seconds; primer constrains this to [0.1, 25.5] and the command encoder validates range pre-I/O (see design §10).

AverageTimingSetting dataclass

AverageTimingSetting(unit_id, statistic_code, averaging_ms)

Result of a DCA (flow/pressure average) query or set.

Averaging window in milliseconds for a specific statistic code (primer p. 18 table: 1 = all pressures, 2 = absolute pressure, 4 = volumetric flow, 5 = mass flow, 6 = gauge pressure, 7 = differential pressure, 17 = external volumetric flow, 344/352/360 = secondary-sensor variants). averaging_ms=0 reports every-millisecond readings.

BlinkDisplayState dataclass

BlinkDisplayState(unit_id, flashing)

Result of an FFP (blink display) query or set.

flashing is the echo of the primer's 0/1 binary response — True while the backlight is flashing, False otherwise. Gated at the command layer by :attr:Capability.DISPLAY (probed at :func:open_device).

DeadbandSetting dataclass

DeadbandSetting(
    unit_id, deadband, unit_code, unit, unit_label
)

Result of an LCDB (deadband limit) query or set.

Wire shape: <uid> <deadband> <unit_code> <unit_label>. Controllers apply the deadband around the setpoint in the controlled variable's engineering units — a value of 0.5 with unit_label="PSIA" means "allow ±0.5 PSIA drift before re-correcting." A value of 0 disables the deadband.

DeviceInfo dataclass

DeviceInfo(
    unit_id,
    manufacturer,
    model,
    serial,
    manufactured,
    calibrated,
    calibrated_by,
    software,
    firmware,
    firmware_date,
    kind,
    media,
    capabilities,
    probe_report=_empty_probe_report(),
    full_scale=_empty_full_scale(),
)

Everything known about a device after identification.

Built by :func:alicatlib.devices.factory.identify_device. The probe_report preserves per-capability outcomes even when the capability is absent from :attr:capabilities, so users can tell a "device lacks the hardware" situation from a "probe timed out" situation (see design §5.9 and :data:ProbeOutcome).

For GP-family or pre-8v28 devices the ??M* manufacturing-info table is unavailable; the factory synthesises a :class:DeviceInfo from the VE reply plus a caller-supplied model_hint, in which case the string-shaped fields may all be None except model.

DisplayLockResult dataclass

DisplayLockResult(frame)

Result of an L / U (lock / unlock display) command.

Both commands respond with a data frame — L sets the :attr:StatusCode.LCK bit, U clears it. :attr:locked exposes that for convenience.

locked property

locked

True if the post-op frame reports :attr:StatusCode.LCK.

FullScaleValue dataclass

FullScaleValue(statistic, value, unit, unit_label)

Cached full-scale range for one statistic.

Populated by the session's capability-probe step (FPF queries) and then used by :meth:Device.setpoint and similar facades for pre-I/O range validation (design §5.20.2). unit is None when the device's unit doesn't map to a known :class:Unit — the raw unit_label is always preserved for diagnostics.

statistic is filled by the facade after dispatch — the device's FPF reply doesn't echo the requested statistic (verified against a V10 capture on 2026-04-17), so the decoder leaves it as :attr:Statistic.NONE and the facade calls :func:dataclasses.replace to populate it from the request.

LoopControlState dataclass

LoopControlState(unit_id, variable, label)

Result of an LV (loop-control variable) query or set.

variable is the typed :class:LoopControlVariable the controller's loop is tracking. label preserves the device's raw descriptor string for diagnostics.

ManufacturingInfo dataclass

ManufacturingInfo(unit_id, by_code)

Parsed ??M* manufacturing-info table.

Minimal, honest surface: the raw per-M-code payload keyed by the M<NN> index. The parser pins only what the wire format guarantees (<unit_id> M<NN> <payload>); the semantic mapping from M-code number to named field (M04 → model, M05 → serial, etc.) is a separate concern handled by the factory, which can adjust per firmware version without rewriting the parser.

Only emitted by :func:alicatlib.protocol.parser.parse_manufacturing_info when the firmware family and version support ??M* (numeric family, ≥ 8v28 per design §5.9). GP and pre-8v28 devices synthesise :class:DeviceInfo directly from the VE reply plus a caller-supplied model_hint.

get

get(code)

Return the payload for M<code>, or None if not reported.

Source code in src/alicatlib/devices/models.py
def get(self, code: int) -> str | None:
    """Return the payload for ``M<code>``, or ``None`` if not reported."""
    return self.by_code.get(code)

MeasurementSet dataclass

MeasurementSet(unit_id, values, averaging_ms, received_at)

Result of a :class:~alicatlib.commands.polling.RequestData (DV) query.

Unlike a :class:~alicatlib.devices.data_frame.DataFrame, which returns the cached full set of fields, a DV query targets a specific list of statistics (1–13 per call) and reports each with an averaging window. Values that come back as the -- sentinel are None.

PowerUpTareState dataclass

PowerUpTareState(unit_id, enabled)

Result of a ZCP (power-up tare) query or set.

True means the device performs a 0.25 s tare after sensors stabilise on power-up. On controllers, closed-loop control is delayed and valves stay closed until the tare completes.

RampRateSetting dataclass

RampRateSetting(
    unit_id,
    max_ramp,
    setpoint_unit_code,
    setpoint_unit,
    time_unit,
    rate_unit_label,
)

Result of an SR (max ramp rate) query or set.

Wire shape: <uid> <max_ramp> <setpoint_unit_code> <time_value> <rate_unit_label>. max_ramp == 0.0 means ramping is disabled; the controller jumps to the new setpoint instantly on the next write.

Attributes:

Name Type Description
unit_id str

Echoed unit id.

max_ramp float

Ramp step size, in the device's current engineering units for the loop-control variable. 0.0 disables ramping.

setpoint_unit_code int

Raw numeric unit code from primer Appendix B. Preserved for diagnostics; the typed :attr:setpoint_unit is the preferred handle.

setpoint_unit Unit | None

Resolved :class:Unit member, or None when the wire label doesn't resolve against the registry.

time_unit TimeUnit

:class:TimeUnit encoding the ramp rate's time base (ms / s / m / h / d).

rate_unit_label str

Device-reported units-over-time label (e.g. "SCCM/s"). Preserved verbatim.

SetpointState dataclass

SetpointState(
    unit_id,
    current,
    requested,
    unit,
    unit_label,
    frame=None,
)

Result of a :class:~alicatlib.commands.setpoint.Setpoint query or set.

current and requested are reported separately by the device (modern LS reply: <uid> <current> <requested> <unit_code> <unit_label>). They diverge briefly on a set while the controller's loop closes on the new target; they track to the same value in steady state. unit / unit_label come straight from the same reply.

frame is optional: legacy S (set-only, pre-9v00) responds with a post-op data frame rather than the 5-field LS reply, so the facade can attach the parsed frame on the legacy path. On the modern LS path frame is always None.

StatusCode

Bases: StrEnum

Device status codes that may appear in the data-frame tail.

The Alicat primer defines these as 3-letter tokens trailing the numeric fields when the condition is active. Multiple codes may be present simultaneously (e.g. MOV + TMF); :class:alicatlib.devices.data_frame.DataFrame carries them as a :class:frozenset so ordering on the wire doesn't matter downstream.

ADC class-attribute instance-attribute

ADC = 'ADC'

Internal analog-to-digital communication error.

EXH class-attribute instance-attribute

EXH = 'EXH'

Manual exhaust valve override active.

HLD class-attribute instance-attribute

HLD = 'HLD'

Valve-drive hold enabled.

LCK class-attribute instance-attribute

LCK = 'LCK'

Display front-panel buttons disabled.

MOV class-attribute instance-attribute

MOV = 'MOV'

Mass-flow rate over full-scale.

OPL class-attribute instance-attribute

OPL = 'OPL'

Overpressure limit actively throttling.

OVR class-attribute instance-attribute

OVR = 'OVR'

Totalizer rolled over / frozen at max.

POV class-attribute instance-attribute

POV = 'POV'

Pressure reading over full-scale.

TMF class-attribute instance-attribute

TMF = 'TMF'

Totalizer missed data (typically following MOV or VOV).

TOV class-attribute instance-attribute

TOV = 'TOV'

Temperature reading over full-scale.

VOV class-attribute instance-attribute

VOV = 'VOV'

Volumetric-flow reading over full-scale.

StpNtpMode

Bases: StrEnum

Reference mode for DCFRP / DCFRT.

Standard conditions (STP"S") underpin standard volumetric flow units (SLPM / SCFM); normal conditions (NTP"N") underpin normal volumetric flow units (LPM / CFM). The two are separate reference points that the device lets users retune — the enum mirrors the primer's single-letter wire encoding.

StpNtpPressureSetting dataclass

StpNtpPressureSetting(
    unit_id, mode, pressure, unit_code, unit, unit_label
)

Result of a DCFRP (standard / normal pressure reference) query or set.

Default on Alicat devices is 14.696 PSIA. Changing this affects the density calculation for every standard / normal volumetric flow reading the device reports.

StpNtpTemperatureSetting dataclass

StpNtpTemperatureSetting(
    unit_id, mode, temperature, unit_code, unit, unit_label
)

Result of a DCFRT (standard / normal temperature reference) query or set.

Default on Alicat devices is 25 °C. Same density-calculation story as :class:StpNtpPressureSetting.

TareResult dataclass

TareResult(frame)

Result of a tare command (flow / gauge pressure / absolute pressure).

The device responds with a post-tare data frame; that frame is the most useful artifact (it reports the new zero-referenced reading), so the result surface is intentionally minimal: just the frame.

TimeUnit

Bases: IntEnum

Time-unit code used by the SR (max ramp rate) command.

Primer p. 15 encodes the unit-over-time base for ramping as a single integer: 3..7 for ms / s / m / hour / day. The enum mirrors that encoding so callers write TimeUnit.SECOND instead of a magic literal.

TotalizerConfig dataclass

TotalizerConfig(
    unit_id,
    totalizer,
    flow_statistic_code,
    mode,
    limit_mode,
    digits,
    decimal_place,
)

Result of a TC (configure totalizer) query or set.

Attributes mirror the primer's wire order (flow_statistic_code mode limit_mode digits decimal_place) — callers inspect :attr:enabled rather than reading flow_statistic_code == TOTALIZER_DISABLED_CODE themselves.

enabled property

enabled

True when the totalizer is tracking a flow statistic.

Primer: flow_statistic_code == 1 signals "disabled" — any other code means the totalizer is enabled on that statistic.

TotalizerId

Bases: IntEnum

Which totalizer to address — primer supports two (1 / 2).

TotalizerLimitMode

Bases: IntEnum

Totalizer overflow behaviour for TC (primer p. 23 table).

-1 is the set-only "keep current" sentinel (same convention as :class:TotalizerMode).

KEEP class-attribute instance-attribute

KEEP = -1

Set-only: leave the current limit mode unchanged.

ROLLOVER class-attribute instance-attribute

ROLLOVER = 1

Reset to zero and keep counting. No TOV status bit.

ROLLOVER_WITH_TOV class-attribute instance-attribute

ROLLOVER_WITH_TOV = 3

Reset to zero and keep counting; set the TOV status bit.

STOP_AT_MAX class-attribute instance-attribute

STOP_AT_MAX = 0

Stop counting at the maximum value. No TOV status bit.

STOP_AT_MAX_WITH_TOV class-attribute instance-attribute

STOP_AT_MAX_WITH_TOV = 2

Stop counting at the maximum value; set the TOV status bit.

TotalizerMode

Bases: IntEnum

Totalizer-accumulation mode for TC (primer p. 23 table).

The -1 KEEP sentinel is a set-time "don't change" marker the primer admits; it is not a real config state the device ever echoes back.

BIDIRECTIONAL class-attribute instance-attribute

BIDIRECTIONAL = 2

Accumulate positive and subtract negative flow.

KEEP class-attribute instance-attribute

KEEP = -1

Set-only: leave the current mode unchanged.

NEGATIVE_ONLY class-attribute instance-attribute

NEGATIVE_ONLY = 1

Accumulate negative flow only; ignore positive flow.

POSITIVE_ONLY class-attribute instance-attribute

POSITIVE_ONLY = 0

Accumulate positive flow only; ignore negative flow.

RESET_ON_STOP class-attribute instance-attribute

RESET_ON_STOP = 3

Accumulate positive flow, reset to zero when flow stops.

TotalizerResetResult dataclass

TotalizerResetResult(frame)

Wraps the post-op data frame from T <n> or TP <n>.

The frame is the useful artifact (it carries the fresh totalizer reading); the result object exists so a future addition (observed totalizer reading extracted from the frame, timing, …) has a stable home.

TotalizerSaveState dataclass

TotalizerSaveState(unit_id, enabled)

Result of a TCR (save totalizer) query or set.

enabled=True means the device periodically persists totalizer values to EEPROM and restores them at power-on.

UnitSetting dataclass

UnitSetting(unit_id, statistic, unit, label)

Result of an engineering-units (DCU) query or set.

unit is None when the device reports a code that does not map to a known :class:Unit — the raw label is always preserved so diagnostics can see the device's exact string. statistic scopes the setting: DCU applies per-statistic (or per-group when apply_to_group=True is requested at the facade).

UserDataSetting dataclass

UserDataSetting(unit_id, slot, value)

Result of a UD (user data) read or write.

Four slots (0..3) each hold up to 32 ASCII characters. Encoded binary data (hex / base64) goes through the value field unchanged — the library does not interpret user data.

ValveDriveState dataclass

ValveDriveState(unit_id, valves)

Result of a VD (valve-drive query) command.

valves carries 1–3 drive percentages in primer-declared order: single-valve controllers report one value; dual-valve controllers report (upstream, downstream); tri-valve (exhaust) controllers add a third entry. The wire-side shape is not a reliable signal of device capability — design §9 warns against inferring valve count from the reply. Multi-valve-specific logic should gate on :attr:Capability.MULTI_VALVE / :attr:Capability.THIRD_VALVE, not on len(valves).

ValveHoldResult dataclass

ValveHoldResult(frame)

Result of a valve-hold command (HP / HC / C).

All three commands respond with a post-op data frame; the discriminator is whether :attr:DataFrame.status carries :attr:StatusCode.HLD. :attr:held captures that for convenience — True after HP or HC, False after C.

Per design §9 Tier-2 controller scope.

held property

held

True if the post-op frame reports :attr:StatusCode.HLD.

ZeroBandSetting dataclass

ZeroBandSetting(unit_id, zero_band)

Result of a DCZ (zero band) query or set.

Zero band is the minimum-reporting threshold expressed as a percentage of full scale: values below it are reported as zero. Primer constrains the range to 0..6.38 (percent); 0 disables the zero band. Device responds with <uid> 0 <zero_band> — the literal 0 is the primer's placeholder for a statistic argument that DCZ does not use.

Discovery

alicatlib.devices.discovery

Device discovery — enumerate serial ports and identify Alicat devices.

Three entry points, each wider than the last:

  • :func:list_serial_ports — thin wrapper over :func:anyserial.list_serial_ports returning device paths.
  • :func:probe — open one port at one baudrate, run the full identification pipeline, return a :class:DiscoveryResult.
  • :func:find_devices — run :func:probe over the cross-product of ports × unit_ids × baudrates, bounded by :class:anyio.CapacityLimiter, returning every result (ok or errored).

Real fleets are mixed — baud rates vary, units aren't always at A, and a GP box sits next to a 10v05 one. :func:find_devices does not raise on individual probe failure; every combination produces a :class:DiscoveryResult and the caller decides what to do with the errors. The library never prints — formatting a human-readable report belongs to example scripts / CLIs, not the core (design §5.12).

Design reference: docs/design.md §5.12.

DiscoveryResult dataclass

DiscoveryResult(port, unit_id, baudrate, info, error)

Outcome of a single :func:probe attempt.

Exactly one of :attr:info / :attr:error is populated — ok results carry a fully-identified :class:DeviceInfo, failed ones carry the typed :class:AlicatError from the identification pipeline. The :attr:ok convenience lets callers filter without hasattr.

ok property

ok

Whether identification completed successfully.

find_devices async

find_devices(
    ports=None,
    *,
    unit_ids=("A",),
    baudrates=DEFAULT_DISCOVERY_BAUDRATES,
    timeout=_DEFAULT_PROBE_TIMEOUT_S,
    max_concurrency=_DEFAULT_MAX_CONCURRENCY,
    stop_on_first_hit=False,
)

Probe the cross-product ports × unit_ids × baudrates concurrently.

When ports is None the sweep enumerates every port visible via :func:list_serial_ports — convenient for "what's plugged in?" but note that a large fleet plus multiple baudrates multiplies out quickly (10 ports × 2 baud × 5 unit ids = 100 probes).

Concurrency is bounded two ways:

  • max_concurrency via :class:anyio.CapacityLimiter — at most that many serial handles are ever open simultaneously.
  • A per-port :class:anyio.Lock — combinations targeting the same physical port serialise, because a serial port can only be held by one transport at a time. Without this, a sweep that tries two baud rates on one port would see the second probe fail with PortBusyError (or an unrelated transport error) even when the device is present at the correct baud — the two probes simply raced for the same handle.

Lock order is port-first, limiter-second: a probe waiting on its port lock does not consume a limiter slot, which keeps the overall concurrency ceiling meaningful.

When stop_on_first_hit is True, a successful probe at (port, _, baud) records baud as that port's confirmed rate and any pending same-port probe at a different baud is skipped. Same-baud probes at other unit ids still run (important for RS-485 multi-drop buses where several devices share a port at a single baud). Skipped combinations are simply omitted from the result tuple, so the caller can expect len(result) ≤ len(combinations). Default is False — every combination produces a result, in a stable row-major order (ports × unit_ids × baudrates).

The function never raises — every probe's result lands in the returned tuple, ok or not.

Source code in src/alicatlib/devices/discovery.py
async def find_devices(
    ports: Iterable[str] | None = None,
    *,
    unit_ids: Sequence[str] = ("A",),
    baudrates: Sequence[int] = DEFAULT_DISCOVERY_BAUDRATES,
    timeout: float = _DEFAULT_PROBE_TIMEOUT_S,
    max_concurrency: int = _DEFAULT_MAX_CONCURRENCY,
    stop_on_first_hit: bool = False,
) -> tuple[DiscoveryResult, ...]:
    """Probe the cross-product ``ports × unit_ids × baudrates`` concurrently.

    When ``ports`` is ``None`` the sweep enumerates every port visible
    via :func:`list_serial_ports` — convenient for "what's plugged in?"
    but note that a large fleet plus multiple baudrates multiplies out
    quickly (10 ports × 2 baud × 5 unit ids = 100 probes).

    Concurrency is bounded two ways:

    - ``max_concurrency`` via :class:`anyio.CapacityLimiter` — at most
      that many serial handles are ever open simultaneously.
    - A per-port :class:`anyio.Lock` — combinations targeting the same
      physical port serialise, because a serial port can only be held
      by one transport at a time. Without this, a sweep that tries two
      baud rates on one port would see the second probe fail with
      ``PortBusyError`` (or an unrelated transport error) even when the
      device is present at the correct baud — the two probes simply
      raced for the same handle.

    Lock order is port-first, limiter-second: a probe waiting on its
    port lock does not consume a limiter slot, which keeps the overall
    concurrency ceiling meaningful.

    When ``stop_on_first_hit`` is ``True``, a successful probe at
    ``(port, _, baud)`` records ``baud`` as that port's confirmed rate
    and any pending same-port probe at a different baud is skipped.
    Same-baud probes at other unit ids still run (important for RS-485
    multi-drop buses where several devices share a port at a single
    baud). Skipped combinations are simply omitted from the result
    tuple, so the caller can expect ``len(result) ≤ len(combinations)``.
    Default is ``False`` — every combination produces a result, in a
    stable row-major order (``ports`` × ``unit_ids`` × ``baudrates``).

    The function never raises — every probe's result lands in the
    returned tuple, ``ok`` or not.
    """
    if ports is None:
        ports = await list_serial_ports()
    port_list = list(ports)

    combinations = list(product(port_list, unit_ids, baudrates))
    results: list[DiscoveryResult | None] = [None] * len(combinations)
    limiter = anyio.CapacityLimiter(max_concurrency)
    port_locks: dict[str, anyio.Lock] = {port: anyio.Lock() for port in port_list}
    # Per-port confirmed baud — populated on first ok result under
    # ``stop_on_first_hit``. Keyed by port because baud is a bus
    # property, not a per-device one: if one unit id responded at
    # 19200, the bus is at 19200 and other bauds are pointless.
    confirmed_baud: dict[str, int] = {}

    async def _run(index: int, port: str, unit_id: str, baudrate: int) -> None:
        async with port_locks[port]:
            if stop_on_first_hit:
                hit = confirmed_baud.get(port)
                if hit is not None and hit != baudrate:
                    return
            async with limiter:
                result = await probe(
                    port,
                    unit_id=unit_id,
                    baudrate=baudrate,
                    timeout=timeout,
                )
            results[index] = result
            if stop_on_first_hit and result.ok:
                confirmed_baud[port] = baudrate

    async with anyio.create_task_group() as tg:
        for index, (port, unit_id, baudrate) in enumerate(combinations):
            tg.start_soon(_run, index, port, unit_id, baudrate)

    # ``None`` entries are skipped-by-design under ``stop_on_first_hit``;
    # otherwise every slot is populated because the task group only
    # exits after every spawned task returns.
    return tuple(r for r in results if r is not None)

list_serial_ports async

list_serial_ports()

Enumerate serial-port device paths visible to the OS.

Thin wrapper over :func:anyserial.list_serial_ports. Returns device-path strings (/dev/ttyUSB0, COM3 …) in whatever order the backend reports.

The native backend does not require the anyserial[discovery-pyserial] extra; platforms where it misses devices can install that extra and switch by setting the backend="pyserial" kwarg on :func:anyserial.list_serial_ports directly.

Source code in src/alicatlib/devices/discovery.py
async def list_serial_ports() -> list[str]:
    """Enumerate serial-port device paths visible to the OS.

    Thin wrapper over :func:`anyserial.list_serial_ports`. Returns
    device-path strings (``/dev/ttyUSB0``, ``COM3`` …) in whatever order
    the backend reports.

    The native backend does not require the ``anyserial[discovery-pyserial]``
    extra; platforms where it misses devices can install that extra and
    switch by setting the ``backend="pyserial"`` kwarg on
    :func:`anyserial.list_serial_ports` directly.
    """
    return [port.device for port in await anyserial.list_serial_ports()]

probe async

probe(
    port,
    *,
    unit_id="A",
    baudrate=19200,
    timeout=_DEFAULT_PROBE_TIMEOUT_S,
)

Probe one port at one baudrate for one unit id.

Never raises — every failure becomes :attr:DiscoveryResult.error so that a bulk :func:find_devices call collects a uniform result set. Opening errors (permission denied, port busy, no such device) are caught here the same as identification errors; the caller sees one shape whether the device is offline, misconfigured, or silent.

Source code in src/alicatlib/devices/discovery.py
async def probe(
    port: str,
    *,
    unit_id: str = "A",
    baudrate: int = 19200,
    timeout: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> DiscoveryResult:
    """Probe one port at one baudrate for one unit id.

    Never raises — every failure becomes :attr:`DiscoveryResult.error`
    so that a bulk :func:`find_devices` call collects a uniform result
    set. Opening errors (permission denied, port busy, no such device)
    are caught here the same as identification errors; the caller sees
    one shape whether the device is offline, misconfigured, or silent.
    """
    settings = SerialSettings(port=port, baudrate=baudrate)
    transport = SerialTransport(settings)
    try:
        await transport.open()
    except AlicatError as err:
        return DiscoveryResult(
            port=port,
            unit_id=unit_id,
            baudrate=baudrate,
            info=None,
            error=err,
        )
    try:
        client = AlicatProtocolClient(
            transport,
            default_timeout=timeout,
            # Multiline (``??M*``) deserves a bit more headroom — the
            # factory-default ratio of 2x matches the protocol client
            # itself.
            multiline_timeout=timeout * 2,
        )
        return await _probe_with_client(
            client,
            port=port,
            unit_id=unit_id,
            baudrate=baudrate,
        )
    finally:
        # Best-effort teardown — a close failure here shouldn't hide
        # the identification result the caller came for.
        with contextlib.suppress(AlicatError):
            await transport.close()

Session

alicatlib.devices.session

Session — the one object that dispatches commands.

A :class:Session owns a validated unit_id, the device's :class:DeviceInfo, and (optionally) its cached :class:DataFrameFormat. It holds no I/O lock of its own — the shared :class:~alicatlib.protocol.client.AlicatProtocolClient serialises traffic at the port level, so every session pointed at the same client naturally serialises on the same lock (correct for multi-unit RS-485 buses per design §5.7).

:meth:Session.execute is the single pre-I/O gating path:

  1. Firmware family membership (cmd.firmware_families).
  2. Firmware min/max within the matching family.
  3. Device kind (cmd.device_kinds).
  4. Medium compatibility (cmd.mediainfo.media).
  5. Required hardware capabilities (cmd.required_capabilities).
  6. Destructive-confirm (cmd.destructive + request.confirm).

All six fail loudly (typed exceptions, ErrorContext populated) and fail before any I/O — the library's "silence is unsafe" stance (design §5.17).

Lifecycle-changing operations (change_unit_id / change_baud_rate — design §5.7) use bounded cancellation shields to keep the device and the client in sync across the write → verify → reconfigure boundary. An unbounded shield would hang the process if the device wedged; the bounded shield escalates to :attr:SessionState.BROKEN instead, which is recoverable.

Design reference: docs/design.md §5.7, §5.10, §5.17, §5.20.

Session

Session(
    client,
    *,
    unit_id,
    info,
    data_frame_format=None,
    port_label=None,
    config=None,
)

Single-device dispatch path.

Constructor validates unit_id eagerly — an invalid id is :class:InvalidUnitIdError at construction, not at first use.

The session does not own the :class:AlicatProtocolClient; the factory does. close() is a no-op placeholder; the factory's context-manager unwind is what drops the transport.

Source code in src/alicatlib/devices/session.py
def __init__(
    self,
    client: AlicatProtocolClient,
    *,
    unit_id: str,
    info: DeviceInfo,
    data_frame_format: DataFrameFormat | None = None,
    port_label: str | None = None,
    config: AlicatConfig | None = None,
) -> None:
    self._client = client
    self._unit_id = validate_unit_id(unit_id)
    self._info = info
    self._data_frame_format = data_frame_format
    self._port_label = port_label
    self._closed = False
    cfg = config if config is not None else AlicatConfig()
    self._config = cfg
    self._eeprom_monitor = EepromWearMonitor(
        unit_id=self._unit_id,
        warn_per_minute=cfg.save_rate_warn_per_min,
    )
    # Setpoint-source cache ("S" / "A" / "U"), populated opportunistically
    # by the ``LSS`` command facade. The setpoint facade consults this
    # pre-I/O to short-circuit the "LSS=A silently ignores serial
    # setpoints" failure mode (design §5.20 risk table); when the cache
    # is ``None`` the check is skipped so the facade stays usable before
    # the first LSS round-trip.
    self._setpoint_source: str | None = None
    # Loop-control-variable cache. ``open_device`` pre-populates this
    # for controllers whose firmware supports ``LV`` so
    # :meth:`FlowController.setpoint` can pick the right
    # :class:`FullScaleValue` from :attr:`DeviceInfo.full_scale` for
    # pre-I/O range validation (design §5.20.2 — "setpoint
    # full-scale validation"). Every ``LV`` query / set
    # through the facade refreshes the cache so subsequent setpoint
    # writes validate against the current controlled variable. Stays
    # ``None`` on firmware / kinds that don't support ``LV`` — the
    # range check is skipped rather than failed in that case.
    self._loop_control_variable: LoopControlVariable | None = None
    # Lifecycle state. Transitions to ``BROKEN`` only on an
    # unreconcilable ``change_baud_rate`` failure (design §5.7). A
    # BROKEN session refuses further dispatch, surfacing the
    # situation instead of hiding it behind a silent timeout.
    self._state: SessionState = SessionState.OPERATIONAL

closed property

closed

True once :meth:close has been called.

config property

config

The :class:AlicatConfig this session was constructed with.

Used by facades that need to read runtime knobs (e.g. the EEPROM-wear threshold) without plumbing a separate config through every call site.

data_frame_format property

data_frame_format

Cached :class:DataFrameFormat, or None before it's been probed.

firmware property

firmware

Convenience accessor for :attr:info.firmware.

info property

info

Device identity snapshot — updated by :meth:refresh_firmware.

loop_control_variable property

loop_control_variable

Cached loop-control variable, or None if unprobed / unsupported.

:func:~alicatlib.devices.factory.open_device pre-populates this for controllers whose firmware supports LV; the :meth:FlowController.loop_control_variable facade refreshes it on every query / set. Consumed by :meth:FlowController.setpoint to pick the right :class:~alicatlib.devices.models.FullScaleValue from :attr:DeviceInfo.full_scale for pre-I/O range validation.

port_label property

port_label

Human-readable port identifier, surfaced on every :class:ErrorContext.

setpoint_source property

setpoint_source

Cached setpoint source ("S" / "A" / "U"), or None if unprobed.

Populated by :meth:update_setpoint_source after an LSS query or set. The :meth:FlowController.setpoint facade reads this pre-I/O to detect the LSS=A failure mode — a serial setpoint write is silently ignored when the source is analog (design §5.20 risk table), so rather than let the write disappear the facade raises :class:AlicatValidationError.

state property

state

Current lifecycle state.

Transitions to :attr:SessionState.BROKEN only when :meth:change_baud_rate cannot reconcile the transport with the device's new baud. A BROKEN session refuses every subsequent :meth:execute with :class:AlicatConnectionError so callers recognise the situation instead of hitting silent timeouts.

unit_id property

unit_id

The validated single-letter unit id this session targets.

change_baud_rate async

change_baud_rate(new_baud, *, confirm=False)

Change the device's baud rate and retune the transport.

Sends NCB <new_baud> at the current baud, reads the ack (still at the old baud), tells the transport to :meth:~alicatlib.transport.base.Transport.reopen at the new baud, then verifies with a VE round-trip. All four steps after the write happen inside a bounded :func:anyio.move_on_after(_CHANGE_BAUD_RATE_SHIELD_S, shield=True).

confirm=True is required — a failed baud change splits the adapter from the device until someone reopens the port. new_baud must be in :data:SUPPORTED_BAUDRATES.

On any failure inside the shielded block (or the shield timing out) the session transitions to :attr:SessionState.BROKEN and raises :class:AlicatConnectionError with remediation guidance. Subsequent :meth:execute calls then fail fast instead of hanging.

Source code in src/alicatlib/devices/session.py
async def change_baud_rate(
    self,
    new_baud: int,
    *,
    confirm: bool = False,
) -> None:
    """Change the device's baud rate and retune the transport.

    Sends ``NCB <new_baud>`` at the current baud, reads the ack
    (still at the old baud), tells the transport to
    :meth:`~alicatlib.transport.base.Transport.reopen` at the new
    baud, then verifies with a ``VE`` round-trip. All four steps
    after the write happen inside a bounded
    :func:`anyio.move_on_after(_CHANGE_BAUD_RATE_SHIELD_S, shield=True)`.

    ``confirm=True`` is required — a failed baud change splits
    the adapter from the device until someone reopens the port.
    ``new_baud`` must be in :data:`SUPPORTED_BAUDRATES`.

    On any failure inside the shielded block (or the shield
    timing out) the session transitions to
    :attr:`SessionState.BROKEN` and raises
    :class:`AlicatConnectionError` with remediation guidance.
    Subsequent :meth:`execute` calls then fail fast instead of
    hanging.
    """
    if not confirm:
        raise AlicatValidationError(
            "change_baud_rate is destructive; pass confirm=True to execute",
            context=ErrorContext(
                command_name="change_baud_rate",
                unit_id=self._unit_id,
                extra={"new_baud": new_baud},
            ),
        )
    self._check_state()
    if new_baud not in SUPPORTED_BAUDRATES:
        raise AlicatValidationError(
            f"change_baud_rate: new_baud {new_baud} not one of {sorted(SUPPORTED_BAUDRATES)}",
            context=ErrorContext(
                command_name="change_baud_rate",
                unit_id=self._unit_id,
                extra={"new_baud": new_baud},
            ),
        )

    prefix = self._command_prefix_bytes().decode("ascii")
    ncb_bytes = f"{self._unit_id}{prefix}NCB {new_baud}\r".encode("ascii")

    async with self._client.lock:
        # Write NCB at the current baud before the shield; a pre-
        # write cancellation leaves the device unchanged.
        await self._client.transport.write(
            ncb_bytes,
            timeout=self._config.write_timeout_s,
        )

        with anyio.move_on_after(
            _CHANGE_BAUD_RATE_SHIELD_S,
            shield=True,
        ) as scope:
            try:
                # Device acks at the old baud, then switches.
                ack = await self._client.transport.read_until(
                    self._client.eol,
                    timeout=self._config.default_timeout_s,
                )
                self._client.guard_response(
                    strip_eol(ack, eol=self._client.eol),
                    command=ncb_bytes,
                )
                await self._client.transport.reopen(baudrate=new_baud)
                await self._verify_unit_id_via_ve(self._unit_id)
            except AlicatError:
                # A verifiable failure inside the shield — the
                # device is on the new baud but our client may not
                # be. Escalate to BROKEN with a clear error.
                self._state = SessionState.BROKEN
                raise AlicatConnectionError(
                    f"change_baud_rate to {new_baud} failed mid-sequence; "
                    "session is BROKEN. Close this session and re-open "
                    f"via open_device(...) at baudrate={new_baud} to "
                    "recover.",
                    context=ErrorContext(
                        command_name="change_baud_rate",
                        unit_id=self._unit_id,
                        port=self._port_label,
                        firmware=self._info.firmware,
                        extra={
                            "new_baud": new_baud,
                            "session_state": self._state.value,
                        },
                    ),
                ) from None

        if scope.cancelled_caught:
            self._state = SessionState.BROKEN
            raise AlicatConnectionError(
                f"change_baud_rate to {new_baud} wedged after "
                f"{_CHANGE_BAUD_RATE_SHIELD_S:.1f}s; session is BROKEN. "
                "Close this session and re-open via open_device(...) at "
                f"baudrate={new_baud} to recover.",
                context=ErrorContext(
                    command_name="change_baud_rate",
                    unit_id=self._unit_id,
                    port=self._port_label,
                    firmware=self._info.firmware,
                    extra={
                        "new_baud": new_baud,
                        "session_state": self._state.value,
                        "shield_timeout_s": _CHANGE_BAUD_RATE_SHIELD_S,
                    },
                ),
            )

change_unit_id async

change_unit_id(new_unit_id, *, confirm=False)

Rename the device this session talks to.

Sends the primer's bus-level rename {old}@ {new}\r (no $$ prefix on GP — this is a wire-level mode switch, not a normal command). The device does not ack on the wire; the session waits :data:_RENAME_GRACE_S and then verifies the rename with a VE at the new unit id.

Argument rules (design §5.7, §5.20 pt 1):

  • confirm=True is required: a rename collision (two devices ending up on the same unit id) silently splits the bus, so the caller must opt in explicitly.
  • new_unit_id must be A..Z (the polling alphabet).
  • new_unit_id must differ from the current :attr:unit_id.

Cancellation semantics: the rename write happens outside the shield (cancellation there leaves the device untouched). The post-write verify runs inside a :func:anyio.move_on_after(timeout, shield=True) of :data:_CHANGE_UNIT_ID_SHIELD_S. If the shield fires the device may or may not have accepted the rename — the session raises :class:AlicatTimeoutError and the cached unit id is not updated, leaving recovery to the caller.

Source code in src/alicatlib/devices/session.py
async def change_unit_id(
    self,
    new_unit_id: str,
    *,
    confirm: bool = False,
) -> None:
    r"""Rename the device this session talks to.

    Sends the primer's bus-level rename ``{old}@ {new}\r`` (no
    ``$$`` prefix on GP — this is a wire-level mode switch, not a
    normal command). The device does *not* ack on the wire; the
    session waits :data:`_RENAME_GRACE_S` and then verifies the
    rename with a ``VE`` at the new unit id.

    Argument rules (design §5.7, §5.20 pt 1):

    - ``confirm=True`` is required: a rename collision (two
      devices ending up on the same unit id) silently splits the
      bus, so the caller must opt in explicitly.
    - ``new_unit_id`` must be ``A``..``Z`` (the polling alphabet).
    - ``new_unit_id`` must differ from the current :attr:`unit_id`.

    Cancellation semantics: the rename write happens outside the
    shield (cancellation there leaves the device untouched). The
    post-write verify runs inside a
    :func:`anyio.move_on_after(timeout, shield=True)` of
    :data:`_CHANGE_UNIT_ID_SHIELD_S`. If the shield fires the
    device may or may not have accepted the rename — the session
    raises :class:`AlicatTimeoutError` and the cached unit id is
    *not* updated, leaving recovery to the caller.
    """
    if not confirm:
        raise AlicatValidationError(
            "change_unit_id is destructive; pass confirm=True to execute",
            context=ErrorContext(
                command_name="change_unit_id",
                unit_id=self._unit_id,
                extra={"new_unit_id": new_unit_id},
            ),
        )
    self._check_state()
    validated = validate_unit_id(new_unit_id)
    if validated == self._unit_id:
        raise AlicatValidationError(
            f"change_unit_id: new_unit_id {validated!r} matches the "
            f"current unit id — pass a different A-Z letter",
            context=ErrorContext(
                command_name="change_unit_id",
                unit_id=self._unit_id,
            ),
        )

    old_unit_id = self._unit_id
    rename_bytes = f"{old_unit_id}@ {validated}\r".encode("ascii")

    async with self._client.lock:
        # The rename write is outside the shield so cancellation
        # pre-write leaves the device unchanged.
        await self._client.transport.write(
            rename_bytes,
            timeout=self._config.write_timeout_s,
        )

        with anyio.move_on_after(
            _CHANGE_UNIT_ID_SHIELD_S,
            shield=True,
        ) as scope:
            # Short grace window — the device accepts the rename
            # silently (no ack on the wire); giving it ~50 ms
            # before polling the new unit id avoids a race where
            # our VE lands mid-commit.
            await anyio.sleep(_RENAME_GRACE_S)
            await self._verify_unit_id_via_ve(validated)

        if scope.cancelled_caught:
            raise AlicatTimeoutError(
                f"change_unit_id verification timed out after "
                f"{_CHANGE_UNIT_ID_SHIELD_S:.1f}s; device state unknown — "
                f"did not update cached unit id from {old_unit_id!r}",
                context=ErrorContext(
                    command_name="change_unit_id",
                    unit_id=old_unit_id,
                    port=self._port_label,
                    firmware=self._info.firmware,
                    extra={
                        "attempted_unit_id": validated,
                        "shield_timeout_s": _CHANGE_UNIT_ID_SHIELD_S,
                    },
                ),
            )

        # Verified — swap caches. ``DeviceInfo.unit_id`` also updates
        # so downstream errors carry the post-rename id.
        self._unit_id = validated
        self._info = dataclasses.replace(self._info, unit_id=validated)
        self._eeprom_monitor.unit_id = validated

close async

close()

Mark the session closed. No transport ownership → no I/O teardown.

Source code in src/alicatlib/devices/session.py
async def close(self) -> None:
    """Mark the session closed. No transport ownership → no I/O teardown."""
    self._closed = True

execute async

execute(command, request, *, timeout=None)

Dispatch command with pre-I/O gating and error enrichment.

Gating order (cheapest first — every check is pre-I/O): firmware family → firmware min/max → device kind → capability → destructive-confirm. The first failed check raises; no later check runs.

Any :class:AlicatError raised from the encode / I/O / decode path is re-raised with :attr:ErrorContext.command_name / unit_id / port / firmware / elapsed_s populated from this session — the pattern described in design §5.7.

Source code in src/alicatlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with pre-I/O gating and error enrichment.

    Gating order (cheapest first — every check is pre-I/O):
    firmware family → firmware min/max → device kind → capability →
    destructive-confirm. The first failed check raises; no later
    check runs.

    Any :class:`AlicatError` raised from the encode / I/O / decode
    path is re-raised with :attr:`ErrorContext.command_name` /
    ``unit_id`` / ``port`` / ``firmware`` / ``elapsed_s`` populated
    from this session — the pattern described in design §5.7.
    """
    self._check_state()
    self._check_streaming(command)
    self._check_firmware_family(command)
    self._check_firmware_range(command)
    self._check_device_kind(command)
    self._check_media(command)
    self._check_capabilities(command)
    self._check_destructive(command, request)

    # EEPROM-wear guard: WARN when per-device ``save=True`` rate
    # crosses the configured threshold (design §5.20.7). Cheap and
    # pre-I/O — commands without a ``save`` attribute on the request
    # short-circuit inside the monitor.
    self._eeprom_monitor.record(command, request)

    ctx = self._build_decode_context(command)
    wire_bytes = command.encode(ctx, request)

    started = monotonic_ns()
    try:
        return await self._dispatch(command, wire_bytes, ctx, timeout=timeout)
    except AlicatError as err:
        elapsed_s = (monotonic_ns() - started) / 1e9
        raise err.with_context(
            command_name=command.name,
            command_bytes=wire_bytes,
            unit_id=self._unit_id,
            port=self._port_label,
            firmware=self._info.firmware,
            device_kind=self._info.kind,
            device_media=self._info.media,
            command_media=command.media,
            elapsed_s=elapsed_s,
        ) from None

invalidate_data_frame_format

invalidate_data_frame_format()

Drop the cached :class:DataFrameFormat without re-probing.

After a command that changes the device's data-frame shape (DCU engineering-units set, FDF field reorder, …) the cached format is stale. Clearing it lets the next :meth:poll lazily re-probe via :meth:refresh_data_frame_format — one round-trip amortised over the next poll rather than immediately. Sync + non-awaitable by design so facade set-paths don't pay a second ??D* at every call.

Source code in src/alicatlib/devices/session.py
def invalidate_data_frame_format(self) -> None:
    """Drop the cached :class:`DataFrameFormat` without re-probing.

    After a command that changes the device's data-frame shape
    (``DCU`` engineering-units set, ``FDF`` field reorder, …) the
    cached format is stale. Clearing it lets the next :meth:`poll`
    lazily re-probe via :meth:`refresh_data_frame_format` — one
    round-trip amortised over the next poll rather than immediately.
    Sync + non-awaitable by design so facade set-paths don't pay a
    second ``??D*`` at every call.
    """
    self._data_frame_format = None

poll async

poll()

Convenience poll — execute POLL_DATA and wrap with read-site timing.

This is the one place the session owns timing capture; per design §5.6 the :class:DataFrame is the session's job, not the command's. Callers that want the pure (clock-free) :class:ParsedFrame go through session.execute(POLL_DATA, PollRequest()) instead.

Source code in src/alicatlib/devices/session.py
async def poll(self) -> DataFrame:
    """Convenience poll — execute ``POLL_DATA`` and wrap with read-site timing.

    This is the one place the session owns timing capture; per design
    §5.6 the :class:`DataFrame` is the session's job, not the
    command's. Callers that want the pure (clock-free)
    :class:`ParsedFrame` go through
    ``session.execute(POLL_DATA, PollRequest())`` instead.
    """
    fmt = self._data_frame_format
    if fmt is None:
        fmt = await self.refresh_data_frame_format()
    parsed: ParsedFrame = await self.execute(POLL_DATA, PollRequest())
    # Timing captured as close to the read site as the Session sees.
    # Exact-to-the-byte timing would require plumbing callbacks into
    # the protocol client; deferred until a real need surfaces (design §5.6).
    return DataFrame.from_parsed(
        parsed,
        format=fmt,
        received_at=datetime.now(UTC),
        monotonic_ns=monotonic_ns(),
    )

refresh_capabilities async

refresh_capabilities()

Re-probe the device's capability flags.

Implementation lives in the factory (:mod:alicatlib.devices.factory), which owns the per-capability probe map (FPF/VD/??D*-derived flags). This method is reserved on the :class:Session surface for API stability; calling it now raises :class:NotImplementedError pointing at the right place.

Source code in src/alicatlib/devices/session.py
async def refresh_capabilities(self) -> Capability:
    """Re-probe the device's capability flags.

    Implementation lives in the factory (:mod:`alicatlib.devices.factory`),
    which owns the per-capability probe map
    (``FPF``/``VD``/``??D*``-derived flags). This method is reserved
    on the :class:`Session` surface for API stability; calling it now
    raises :class:`NotImplementedError` pointing at the right place.
    """
    raise NotImplementedError(
        "Session.refresh_capabilities is not yet implemented; "
        "construct the session from open_device(...) for now.",
    )

refresh_data_frame_format async

refresh_data_frame_format()

Re-probe ??D* and update the cached :class:DataFrameFormat.

Source code in src/alicatlib/devices/session.py
async def refresh_data_frame_format(self) -> DataFrameFormat:
    """Re-probe ``??D*`` and update the cached :class:`DataFrameFormat`."""
    fmt = await self.execute(DATA_FRAME_FORMAT_QUERY, DataFrameFormatRequest())
    self._data_frame_format = fmt
    return fmt

refresh_firmware async

refresh_firmware()

Re-probe VE and update the cached :class:FirmwareVersion.

Uses :data:alicatlib.commands.system.VE_QUERY; the session's cached :class:DeviceInfo is updated in place via :func:dataclasses.replace (the dataclass itself is frozen).

Source code in src/alicatlib/devices/session.py
async def refresh_firmware(self) -> FirmwareVersion:
    """Re-probe ``VE`` and update the cached :class:`FirmwareVersion`.

    Uses :data:`alicatlib.commands.system.VE_QUERY`; the session's
    cached :class:`DeviceInfo` is updated in place via
    :func:`dataclasses.replace` (the dataclass itself is frozen).
    """
    result = await self.execute(VE_QUERY, VeRequest())
    self._info = dataclasses.replace(
        self._info,
        firmware=result.firmware,
        firmware_date=result.firmware_date,
    )
    return result.firmware

update_loop_control_variable

update_loop_control_variable(variable)

Record variable as the session's current loop-control variable.

Source code in src/alicatlib/devices/session.py
def update_loop_control_variable(
    self,
    variable: LoopControlVariable,
) -> None:
    """Record ``variable`` as the session's current loop-control variable."""
    self._loop_control_variable = variable

update_setpoint_source

update_setpoint_source(source)

Record source as the session's current setpoint source.

Called by the LSS command facade (:meth:FlowController.setpoint_source) on every query / set so the cache tracks the device's state. Stays a plain setter rather than a @setpoint_source.setter to keep the mutation verb visible at call sites (session.update_setpoint_source("S") reads differently from an assignment).

Source code in src/alicatlib/devices/session.py
def update_setpoint_source(self, source: str) -> None:
    """Record ``source`` as the session's current setpoint source.

    Called by the ``LSS`` command facade (:meth:`FlowController.setpoint_source`)
    on every query / set so the cache tracks the device's state. Stays
    a plain setter rather than a ``@setpoint_source.setter`` to keep
    the mutation verb visible at call sites (``session.update_setpoint_source("S")``
    reads differently from an assignment).
    """
    self._setpoint_source = source

SessionState

Bases: Enum

Lifecycle state of a :class:Session.

OPERATIONAL is the normal state — commands dispatch freely. BROKEN is entered when an atomic lifecycle operation (change_baud_rate) cannot reconcile the transport with the device's new state. A BROKEN session rejects every subsequent :meth:Session.execute with :class:AlicatConnectionError and the caller must construct a fresh session (typically by re-running :func:open_device) to recover.

validate_unit_id

validate_unit_id(unit_id, *, allow_streaming=False)

Return unit_id if valid, otherwise raise :class:InvalidUnitIdError.

A plain polling id ("A".."Z") is always valid. The streaming id "@" is accepted only when allow_streaming is True — callers that are building a normal :class:Session should not pass this flag, because a polling session on @ can never talk to a device.

Source code in src/alicatlib/devices/session.py
def validate_unit_id(unit_id: str, *, allow_streaming: bool = False) -> str:
    """Return ``unit_id`` if valid, otherwise raise :class:`InvalidUnitIdError`.

    A plain polling id (``"A"``..``"Z"``) is always valid. The streaming
    id ``"@"`` is accepted only when ``allow_streaming`` is True — callers
    that are building a normal :class:`Session` should not pass this
    flag, because a polling session on ``@`` can never talk to a device.
    """
    if unit_id in UNIT_ID_POLLING:
        return unit_id
    if allow_streaming and unit_id == UNIT_ID_STREAMING:
        return unit_id
    raise InvalidUnitIdError(
        f"invalid unit id {unit_id!r}: expected one of {sorted(UNIT_ID_POLLING)}"
        + (" or '@'" if allow_streaming else ""),
    )

Streaming runtime

alicatlib.devices.streaming

Streaming-mode runtime — :class:StreamingSession.

Streaming mode is a port-level state transition, not a request/response command (design §5.8). The device stops responding to prompts, overwrites its unit id with @, and pushes data frames continuously until stopped. This module owns that runtime:

  • Setup — optionally configures NCS (streaming rate), marks the shared :class:~alicatlib.protocol.client.AlicatProtocolClient as streaming, writes the primer's {unit_id}@ @\r start-stream bytes directly under the port lock (bypassing :meth:Session.execute because we own the mode transition, not the command layer).
  • Producer — a background task reads frames from the transport into a bounded :mod:anyio.streams.memory object stream, parsing each line with the session's cached :class:~alicatlib.devices.data_frame.DataFrameFormat. Overflow is controlled by :class:OverflowPolicy (design §5.14 — reused from the sample recorder so the knob is one concept across acquisition surfaces). Parse errors are logged and skipped unless strict=True.
  • Teardown — always writes the primer's @@ {unit_id}\r stop-stream bytes, drains any trailing frames, and clears the streaming latch. __aexit__ does this even when the body raised, so a crashed consumer never leaves the device flooding the bus.

Shape:

.. code-block:: python

async with dev.stream(rate_ms=50) as stream:
    async for frame in stream:
        process(frame)

Design reference: docs/design.md §5.8.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples. The effective sample rate drops to the consumer's drain rate; samples_late accrues once the consumer catches up and the producer can check its schedule.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the sample that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch, then enqueue. Counted as late.

StreamingSession

StreamingSession(
    session,
    *,
    rate_ms=None,
    strict=False,
    overflow=OverflowPolicy.DROP_OLDEST,
    buffer_size=_DEFAULT_BUFFER_SIZE,
)

Async context manager + async iterator for streaming data frames.

Users construct this via :meth:Device.stream, not directly. The public contract is the dunder surface — __aenter__ / __aexit__ for scope and __aiter__ / __anext__ for the data. Once the context exits, the instance is not reusable; the next stream requires a new call to :meth:Device.stream.

Parameters:

Name Type Description Default
session Session

The owning :class:Session. Streaming shares the session's port lock and its cached :class:DataFrameFormat.

required
rate_ms int | None

If not None, configures NCS before entering streaming mode. 0 is the primer's "as-fast-as-possible" setting; distinct from None (which means "leave the device's current rate alone"). Firmware-gated at V10 >= 10v05 by the underlying STREAMING_RATE command, so passing a value on older firmware fails pre-I/O at the session gate.

None
strict bool

If True, :class:AlicatParseError from a malformed frame propagates out of :meth:__anext__ and tears down the stream via the task group. If False (default), the error is logged at WARN and the producer continues.

False
overflow OverflowPolicy

Back-pressure policy when the bounded producer buffer is full. Defaults to :attr:OverflowPolicy.DROP_OLDEST — latest-data-wins is the right default for high-rate telemetry. :attr:OverflowPolicy.BLOCK is valid but risks the OS-level serial buffer dropping bytes if the consumer stays behind for long; DROP_NEWEST keeps the oldest queued frame.

DROP_OLDEST
buffer_size int

Producer/consumer buffer depth.

_DEFAULT_BUFFER_SIZE

Attributes:

Name Type Description
dropped_frames int

Count of frames the producer had to discard because the consumer was behind and overflow is not BLOCK. Available after the CM exits.

Source code in src/alicatlib/devices/streaming.py
def __init__(
    self,
    session: Session,
    *,
    rate_ms: int | None = None,
    strict: bool = False,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
    buffer_size: int = _DEFAULT_BUFFER_SIZE,
) -> None:
    self._session = session
    self._client = session._client  # pyright: ignore[reportPrivateUsage] — co-owned state
    self._rate_ms = rate_ms
    self._strict = strict
    self._overflow = overflow
    self._buffer_size = buffer_size
    self._format: DataFrameFormat | None = session.data_frame_format
    self._send: MemoryObjectSendStream[DataFrame] | None = None
    self._recv: MemoryObjectReceiveStream[DataFrame] | None = None
    self._task_group: TaskGroup | None = None
    self._entered = False
    self._producer_failure: BaseException | None = None
    self.dropped_frames: int = 0

__aenter__ async

__aenter__()

Enter streaming mode.

Sequence:

  1. Lazy-probe ??D* if the session has no cached :class:DataFrameFormat — streaming has to parse every frame, so a missing format is a hard error the moment the producer starts.
  2. Optionally configure NCS rate. Done before flipping the streaming latch so the command still runs as a normal request/response.
  3. Acquire the port lock, verify the client isn't already streaming, flip the latch, write the start-stream bytes, release the lock. Holding the lock across the latch + write is what makes the mode transition atomic w.r.t. other sessions on the same client.
  4. Start the producer task inside a task group. The group lives for the duration of the context and is cancelled by :meth:__aexit__.
Source code in src/alicatlib/devices/streaming.py
async def __aenter__(self) -> Self:
    """Enter streaming mode.

    Sequence:

    1. Lazy-probe ``??D*`` if the session has no cached
       :class:`DataFrameFormat` — streaming has to parse every
       frame, so a missing format is a hard error the moment the
       producer starts.
    2. Optionally configure ``NCS`` rate. Done *before* flipping
       the streaming latch so the command still runs as a normal
       request/response.
    3. Acquire the port lock, verify the client isn't already
       streaming, flip the latch, write the start-stream bytes,
       release the lock. Holding the lock across the latch + write
       is what makes the mode transition atomic w.r.t. other
       sessions on the same client.
    4. Start the producer task inside a task group. The group
       lives for the duration of the context and is cancelled by
       :meth:`__aexit__`.
    """
    if self._entered:
        raise RuntimeError("StreamingSession is not reusable after exit")
    self._entered = True

    # Step 1: cached format or lazy-probe.
    if self._format is None:
        self._format = await self._session.refresh_data_frame_format()

    # Step 2: optional NCS rate config. Skipped for rate_ms=None.
    # ``0`` is a real setting ("as fast as possible") — distinct
    # from None per the StreamingRateRequest contract.
    if self._rate_ms is not None:
        await self._session.execute(
            STREAMING_RATE,
            StreamingRateRequest(rate_ms=self._rate_ms),
        )

    # Step 3: atomic mode transition under the port lock.
    async with self._client.lock:
        if self._client.is_streaming:
            raise AlicatStreamingModeError(
                "client is already streaming — only one streamer per port",
                context=ErrorContext(
                    unit_id=self._session.unit_id,
                    port=self._session.port_label,
                    extra={"streaming": True},
                ),
            )
        self._client._mark_streaming(True)  # pyright: ignore[reportPrivateUsage]
        try:
            await self._client.transport.write(
                encode_start_stream(self._session.unit_id),
                timeout=self._session.config.write_timeout_s,
            )
        except BaseException:
            # Pre-write failure never left the device streaming;
            # clear the latch so the next enter can proceed.
            self._client._mark_streaming(False)  # pyright: ignore[reportPrivateUsage]
            raise

    # Step 4: producer task. The task group is entered here and
    # exited in ``__aexit__`` — the producer's lifetime matches the
    # streaming context exactly.
    self._send, self._recv = anyio.create_memory_object_stream[DataFrame](
        max_buffer_size=self._buffer_size,
    )
    task_group = anyio.create_task_group()
    await task_group.__aenter__()
    task_group.start_soon(self._producer_loop)
    self._task_group = task_group
    return self

__aexit__ async

__aexit__(exc_type, exc, tb)

Exit streaming mode — always sends stop-stream.

Order is load-bearing:

  1. Cancel the producer task group and close the send side of the buffer so any pending __anext__ receives StopAsyncIteration.
  2. Send stop-stream bytes and drain. If the body raised this still has to happen — the device would otherwise keep pushing frames onto a bus no one is reading.
  3. Clear the streaming latch so other sessions on this client resume dispatching.
Source code in src/alicatlib/devices/streaming.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Exit streaming mode — always sends stop-stream.

    Order is load-bearing:

    1. Cancel the producer task group and close the send side of
       the buffer so any pending ``__anext__`` receives
       ``StopAsyncIteration``.
    2. Send stop-stream bytes and drain. If the body raised this
       still has to happen — the device would otherwise keep
       pushing frames onto a bus no one is reading.
    3. Clear the streaming latch so other sessions on this client
       resume dispatching.
    """
    del exc_type, exc, tb

    # Step 1: cancel producer. The task group's cancel_scope unwinds
    # the read loop; we then exit the task group to join the task.
    # Swallow cancellation artifacts — this is a cleanup path, any
    # failure at this layer is secondary to the stop-stream write.
    tg = self._task_group
    self._task_group = None
    if tg is not None:
        tg.cancel_scope.cancel()
        # AnyIO cancellation of the producer task wraps the Cancelled
        # into an ExceptionGroup that we surface on tg.__aexit__.
        # This is a cleanup path; a genuine failure inside the
        # producer has already been recorded via
        # ``self._producer_failure`` (re-raised by __anext__), so
        # suppressing here only hides the expected cancel artefacts.
        with contextlib.suppress(BaseException):
            await tg.__aexit__(None, None, None)

    # Close both sides of the memory object stream. The send side
    # releases any consumer still blocked in ``__anext__`` with
    # ``EndOfStream``; closing the receive side too is required
    # because anyio emits an ``Unclosed`` warning (which pytest
    # promotes to a failure via ``sys.unraisablehook``) if either
    # end is garbage-collected without ``aclose``. Both are
    # idempotent.
    if self._send is not None:
        await self._send.aclose()
    if self._recv is not None:
        await self._recv.aclose()

    # Step 2: stop-stream + drain, under the lock. This is the
    # symmetric partner to the start-stream write in __aenter__;
    # write happens even if the producer failed or the body raised.
    try:
        async with self._client.lock:
            # Transport may already be torn down — the device is
            # the caller's problem at that point, but the latch
            # still needs clearing below so we suppress and move
            # on rather than swallow a failing stop-stream silently.
            with contextlib.suppress(AlicatTransportError):
                await self._client.transport.write(
                    encode_stop_stream(self._session.unit_id),
                    timeout=self._session.config.write_timeout_s,
                )
            with contextlib.suppress(AlicatTransportError):
                await self._client.transport.read_available(
                    idle_timeout=_STOP_DRAIN_WINDOW_S,
                )
    finally:
        # Step 3: always clear the latch, even if the transport
        # writes above raised. Leaving it set would permanently
        # brick the client for request/response use.
        self._client._mark_streaming(False)  # pyright: ignore[reportPrivateUsage]

__aiter__

__aiter__()

Return self — :class:StreamingSession is its own iterator.

Source code in src/alicatlib/devices/streaming.py
def __aiter__(self) -> Self:
    """Return self — :class:`StreamingSession` is its own iterator."""
    return self

__anext__ async

__anext__()

Return the next buffered :class:DataFrame.

Raises :class:StopAsyncIteration when the producer has closed the send side (either on context exit, or under strict=True after a parse error tore the task group down). A strict-mode parse error is re-raised here so the caller's async for loop surfaces the real exception, not a silent stop.

Source code in src/alicatlib/devices/streaming.py
async def __anext__(self) -> DataFrame:
    """Return the next buffered :class:`DataFrame`.

    Raises :class:`StopAsyncIteration` when the producer has closed
    the send side (either on context exit, or under
    ``strict=True`` after a parse error tore the task group down).
    A strict-mode parse error is re-raised here so the caller's
    ``async for`` loop surfaces the real exception, not a silent
    stop.
    """
    if self._recv is None:
        raise RuntimeError(
            "StreamingSession.__anext__ called outside its async-with body",
        )
    try:
        return await self._recv.receive()
    except anyio.EndOfStream:
        if self._producer_failure is not None:
            failure = self._producer_failure
            self._producer_failure = None
            raise failure from None
        raise StopAsyncIteration from None

Base facade

alicatlib.devices.base

Device facade base.

:class:Device is the public, user-facing object returned by :func:alicatlib.devices.factory.open_device. It is a thin veneer over :class:alicatlib.devices.session.Session — every method delegates to the session's :meth:~Session.execute (or :meth:~Session.poll for the timing-wrapped poll), so all pre-I/O gating lives in one place.

:class:DeviceKind lives in its sibling :mod:alicatlib.devices.kind module. That split is what lets :class:Device import command specs (GAS_SELECT, ENGINEERING_UNITS, …) for its method bodies without creating a cycle with :mod:alicatlib.commands, which needs :class:DeviceKind at command-spec definition time. See design §15.1.

Subclasses in :mod:.flow_meter and :mod:.flow_controller add family-specific methods (setpoint, valve drive, exhaust, ...) without changing the dispatch model — they just expose additional commands from the catalog.

Design reference: docs/design.md §5.9.

Device

Device(session)

User-facing façade over a :class:Session.

Constructed by :func:alicatlib.devices.factory.open_device. Users do not instantiate this class directly (the factory picks the correct subclass based on the :class:DeviceInfo.model prefix via the MODEL_RULES dispatch table — see design §5.9).

The device does not own the transport's lifecycle; the context manager returned by open_device does. Entering the device as a context manager is a no-op for nesting convenience.

Source code in src/alicatlib/devices/base.py
def __init__(self, session: Session) -> None:
    self._session = session

info property

info

Identity snapshot from the factory's identification pipeline.

session property

session

Underlying :class:Session.

Exposed for advanced users who need :meth:Session.execute directly or want to inspect the session's gating state.

unit_id property

unit_id

Validated single-letter unit id this device is addressed by.

__aenter__ async

__aenter__()

Support async with device: ... nesting — returns self.

Source code in src/alicatlib/devices/base.py
async def __aenter__(self) -> Device:
    """Support ``async with device: ...`` nesting — returns ``self``."""
    return self

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the device on exit — aligns with the factory context manager.

Source code in src/alicatlib/devices/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the device on exit — aligns with the factory context manager."""
    del exc_type, exc, tb
    await self.close()

analog_output_source async

analog_output_source(
    channel=AnalogOutputChannel.PRIMARY,
    value=None,
    unit_code=None,
)

Query or set the analog-output source (ASOCV, V10 10v05+).

Gated on :attr:Capability.ANALOG_OUTPUT. channel selects primary vs. secondary. value=None queries; value=0 / value=1 set fixed min / max output; value>=2 pins the output to a statistic.

Source code in src/alicatlib/devices/base.py
async def analog_output_source(
    self,
    channel: AnalogOutputChannel = AnalogOutputChannel.PRIMARY,
    value: int | None = None,
    unit_code: int | None = None,
) -> AnalogOutputSourceSetting:
    """Query or set the analog-output source (``ASOCV``, V10 10v05+).

    Gated on :attr:`Capability.ANALOG_OUTPUT`. ``channel`` selects
    primary vs. secondary. ``value=None`` queries; ``value=0`` /
    ``value=1`` set fixed min / max output; ``value>=2`` pins the
    output to a statistic.
    """
    result = await self._session.execute(
        ANALOG_OUTPUT_SOURCE,
        AnalogOutputSourceRequest(
            channel=channel,
            value=value,
            unit_code=unit_code,
        ),
    )
    return replace(result, channel=channel)

average_timing async

average_timing(statistic_code, averaging_ms=None)

Query or set the per-statistic averaging window (DCA, V10 10v05+).

averaging_ms=None issues the query form; a value in 0..9999 sets the window (0 → update every millisecond). statistic_code is the primer's numeric code (see :data:DCA_ALLOWED_STATISTIC_CODES) — arbitrary :class:Statistic codes are rejected pre-I/O because the device only averages pressure / flow primary + secondary readings.

Source code in src/alicatlib/devices/base.py
async def average_timing(
    self,
    statistic_code: int,
    averaging_ms: int | None = None,
) -> AverageTimingSetting:
    """Query or set the per-statistic averaging window (``DCA``, V10 10v05+).

    ``averaging_ms=None`` issues the query form; a value in
    ``0..9999`` sets the window (``0`` → update every millisecond).
    ``statistic_code`` is the primer's numeric code (see
    :data:`DCA_ALLOWED_STATISTIC_CODES`) — arbitrary
    :class:`Statistic` codes are rejected pre-I/O because the
    device only averages pressure / flow primary + secondary
    readings.
    """
    result = await self._session.execute(
        AVERAGE_TIMING,
        AverageTimingRequest(
            statistic_code=statistic_code,
            averaging_ms=averaging_ms,
        ),
    )
    # Hardware-validation finding: real 10v20 firmware drops the echoed
    # statistic code on the DCA reply, so the decoder's shorter-form
    # path leaves statistic_code == 0. Re-populate from the request
    # so callers can trust the returned setting.
    if result.statistic_code == 0 != statistic_code:
        return replace(result, statistic_code=statistic_code)
    return result
blink_display(duration_s=None)

Query or trigger a display blink (FFP, 8v28+).

Gated on :attr:Capability.DISPLAY. None queries the current flash state; a positive value flashes for that many seconds; 0 stops an active flash; -1 flashes indefinitely.

Source code in src/alicatlib/devices/base.py
async def blink_display(
    self,
    duration_s: int | None = None,
) -> BlinkDisplayState:
    """Query or trigger a display blink (``FFP``, 8v28+).

    Gated on :attr:`Capability.DISPLAY`. ``None`` queries the
    current flash state; a positive value flashes for that many
    seconds; ``0`` stops an active flash; ``-1`` flashes
    indefinitely.
    """
    return await self._session.execute(
        BLINK_DISPLAY,
        BlinkDisplayRequest(duration_s=duration_s),
    )

close async

close()

Release the session — idempotent.

The underlying transport is owned by the async context manager returned from :func:open_device; closing the device only marks the session as closed. Users should prefer async with open_device(...) as dev: over calling close() by hand.

Source code in src/alicatlib/devices/base.py
async def close(self) -> None:
    """Release the session — idempotent.

    The underlying transport is owned by the async context manager
    returned from :func:`open_device`; closing the device only marks
    the session as closed. Users should prefer
    ``async with open_device(...) as dev:`` over calling ``close()``
    by hand.
    """
    await self._session.close()

engineering_units async

engineering_units(
    statistic,
    unit=None,
    *,
    apply_to_group=False,
    override_special_rules=False,
)

Query or set the engineering unit for statistic (DCU).

unit=None issues the query form. Passing a :class:Unit, its alias, or an explicit integer wire code sets the unit. apply_to_group=True broadcasts the change to every statistic in the target's group; override_special_rules=True bypasses device-side restrictions on unusual statistic/unit pairings.

A successful SET invalidates the session's cached :class:DataFrameFormat: units affect display in the data frame, so the next :meth:poll re-probes ??D* lazily via :meth:Session.invalidate_data_frame_format. Query form is a no-op for the cache.

Raises:

Type Description
AlicatValidationError

Ambiguous :class:Unit (member that maps to multiple codes across categories) — pass the raw integer code to disambiguate.

Source code in src/alicatlib/devices/base.py
async def engineering_units(
    self,
    statistic: Statistic | str,
    unit: Unit | int | str | None = None,
    *,
    apply_to_group: bool = False,
    override_special_rules: bool = False,
) -> UnitSetting:
    """Query or set the engineering unit for ``statistic`` (``DCU``).

    ``unit=None`` issues the query form. Passing a :class:`Unit`,
    its alias, or an explicit integer wire code sets the unit.
    ``apply_to_group=True`` broadcasts the change to every
    statistic in the target's group; ``override_special_rules=True``
    bypasses device-side restrictions on unusual statistic/unit
    pairings.

    A successful SET invalidates the session's cached
    :class:`DataFrameFormat`: units affect display in the data
    frame, so the next :meth:`poll` re-probes ``??D*`` lazily via
    :meth:`Session.invalidate_data_frame_format`. Query form is a
    no-op for the cache.

    Raises:
        AlicatValidationError: Ambiguous :class:`Unit` (member that
            maps to multiple codes across categories) — pass the
            raw integer code to disambiguate.
    """
    result = await self._session.execute(
        ENGINEERING_UNITS,
        EngineeringUnitsRequest(
            statistic=statistic,
            unit=unit,
            apply_to_group=apply_to_group,
            override_special_rules=override_special_rules,
        ),
    )
    if unit is not None:
        # Only a SET reshapes the data frame; query returns the same
        # wire format the session cached at startup.
        self._session.invalidate_data_frame_format()
    # The DCU reply doesn't echo the requested statistic (verified on
    # V10 hardware 2026-04-17 — see design §16.6); fill it from the
    # request so the caller sees the typed enum, not Statistic.NONE.
    # Statistic is a StrEnum so coerce() accepts either form.
    return replace(result, statistic=statistic_registry.coerce(statistic))

execute async

execute(command, request)

Dispatch a catalog command directly.

Exposed for advanced users who want to reach commands that don't yet have a facade method, or to wrap session.execute with middleware. Same gating, same error context, same result types.

Source code in src/alicatlib/devices/base.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
) -> Resp:
    """Dispatch a catalog command directly.

    Exposed for advanced users who want to reach commands that don't
    yet have a facade method, or to wrap session.execute with
    middleware. Same gating, same error context, same result types.
    """
    return await self._session.execute(command, request)

full_scale async

full_scale(statistic)

Query the full-scale value for statistic (FPF).

Used by setpoint range validation (design §5.20.2) and as a capability-probe signal (FPF on stat 15 → barometer present). The session's factory-level probe populates :attr:DeviceInfo.full_scale for common statistics at startup; this method exposes the same command for ad-hoc queries.

Source code in src/alicatlib/devices/base.py
async def full_scale(self, statistic: Statistic | str) -> FullScaleValue:
    """Query the full-scale value for ``statistic`` (``FPF``).

    Used by setpoint range validation (design §5.20.2) and as a
    capability-probe signal (``FPF`` on stat 15 → barometer
    present). The session's factory-level probe populates
    :attr:`DeviceInfo.full_scale` for common statistics at startup;
    this method exposes the same command for ad-hoc queries.
    """
    result = await self._session.execute(
        FULL_SCALE_QUERY,
        FullScaleQueryRequest(statistic=statistic),
    )
    # FPF reply doesn't echo the requested statistic — fill it from
    # the request (design §16.6).
    return replace(result, statistic=statistic_registry.coerce(statistic))

gas async

gas(gas=None, *, save=None)

Query or set the active gas.

gas=None issues the query form and returns the current selection without changing it. Passing a :class:~alicatlib.registry.Gas (or any registered alias) sets the active gas. save=True persists to EEPROM — beware of the rate-warning guard (design §5.20.7) if you call this in a loop.

Dispatch is firmware-aware:

  • V10 ≥ 10v05 → :data:GAS_SELECT (GS). Supports query, set, and save.
  • All other supported firmware (GP, V1_V7, V8_V9, V10 < 10v05) → :data:GAS_SELECT_LEGACY (G). Set only; no save flag; the device replies with a post-op data frame rather than the modern 4-field form. The facade fabricates a :class:GasState from the request and the frame's echoed unit id; label / long_name are resolved from the gas registry.

The command's device_kinds gate rejects calls on device kinds that don't have a selectable active gas (pressure-only devices) pre-I/O with :class:AlicatUnsupportedCommandError.

Raises:

Type Description
AlicatUnsupportedCommandError

gas is None (query form) on firmware that only supports the legacy set-only path.

AlicatValidationError

save is True on legacy firmware, which has no persist flag.

Source code in src/alicatlib/devices/base.py
async def gas(
    self,
    gas: Gas | str | None = None,
    *,
    save: bool | None = None,
) -> GasState:
    """Query or set the active gas.

    ``gas=None`` issues the query form and returns the current
    selection without changing it. Passing a
    :class:`~alicatlib.registry.Gas` (or any registered alias)
    sets the active gas. ``save=True`` persists to EEPROM — beware
    of the rate-warning guard (design §5.20.7) if you call this in
    a loop.

    Dispatch is firmware-aware:

    - V10 ≥ 10v05 → :data:`GAS_SELECT` (``GS``). Supports query,
      set, and save.
    - All other supported firmware (GP, V1_V7, V8_V9, V10 < 10v05)
      → :data:`GAS_SELECT_LEGACY` (``G``). Set only; no ``save``
      flag; the device replies with a post-op data frame rather
      than the modern 4-field form. The facade fabricates a
      :class:`GasState` from the request and the frame's echoed
      unit id; ``label`` / ``long_name`` are resolved from the
      gas registry.

    The command's ``device_kinds`` gate rejects calls on device
    kinds that don't have a selectable active gas (pressure-only
    devices) pre-I/O with :class:`AlicatUnsupportedCommandError`.

    Raises:
        AlicatUnsupportedCommandError: ``gas is None`` (query form)
            on firmware that only supports the legacy set-only path.
        AlicatValidationError: ``save is True`` on legacy firmware,
            which has no persist flag.
    """
    firmware = self._session.firmware
    if uses_modern_gas_select(firmware):
        return await self._session.execute(
            GAS_SELECT,
            GasSelectRequest(gas=gas, save=save),
        )

    # Legacy path — set only, no ``save`` flag.
    if gas is None:
        raise AlicatUnsupportedCommandError(
            "gas() query form requires firmware supporting GS (V10 ≥ 10v05); "
            f"this device reports {firmware}. Legacy G is set-only.",
            context=ErrorContext(
                command_name="gas_select_legacy",
                unit_id=self._session.unit_id,
                firmware=firmware,
            ),
        )
    if save is True:
        raise AlicatValidationError(
            "save=True requires firmware supporting GS (V10 ≥ 10v05); "
            f"this device reports {firmware}. Legacy G has no persist flag.",
            context=ErrorContext(
                command_name="gas_select_legacy",
                unit_id=self._session.unit_id,
                firmware=firmware,
                extra={"save_requested": True},
            ),
        )

    typed_gas = gas_registry.coerce(gas)
    frame = await self._session.execute(
        GAS_SELECT_LEGACY,
        GasSelectLegacyRequest(gas=typed_gas),
    )
    # The legacy device replies with a data frame, not ``<uid> code
    # short long``. Fabricate a ``GasState`` from the request + the
    # frame's echoed unit id so the facade's return shape matches
    # the modern path.
    return GasState(
        unit_id=frame.unit_id,
        code=typed_gas.code,
        gas=typed_gas,
        label=typed_gas.value,
        long_name=typed_gas.value,
    )

gas_list async

gas_list()

Enumerate gases available on the device (??G*).

Returns a mapping from Alicat gas code (primer Appendix C) to the raw label the device reports. Codes in the 236..255 range correspond to custom-mixture slots; an empty / absent slot is simply not included in the mapping.

Callers that want typed :class:Gas members should feed each code through :func:alicatlib.registry.gas_registry.by_code; unknown codes are preserved as labels so diagnostics still see the device's exact string.

Source code in src/alicatlib/devices/base.py
async def gas_list(self) -> Mapping[int, str]:
    """Enumerate gases available on the device (``??G*``).

    Returns a mapping from Alicat gas code (primer Appendix C) to
    the raw label the device reports. Codes in the ``236``..``255``
    range correspond to custom-mixture slots; an empty / absent
    slot is simply not included in the mapping.

    Callers that want typed :class:`Gas` members should feed each
    code through :func:`alicatlib.registry.gas_registry.by_code`;
    unknown codes are preserved as labels so diagnostics still see
    the device's exact string.
    """
    return await self._session.execute(GAS_LIST, GasListRequest())

lock_display async

lock_display()

Lock the front-panel display (L); reply is a post-op data frame.

Gated on :attr:Capability.DISPLAY. The result's :attr:DisplayLockResult.locked is True after a successful lock.

Source code in src/alicatlib/devices/base.py
async def lock_display(self) -> DisplayLockResult:
    """Lock the front-panel display (``L``); reply is a post-op data frame.

    Gated on :attr:`Capability.DISPLAY`. The result's
    :attr:`DisplayLockResult.locked` is ``True`` after a successful
    lock.
    """
    return await self._execute_display_lock(LOCK_DISPLAY, LockDisplayRequest())

poll async

poll()

Read one data frame.

Lazy-probes ??D* the first time it's called if the session didn't have a cached :class:DataFrameFormat yet. Returns a :class:DataFrame with read-site received_at and monotonic_ns captured by the session.

Source code in src/alicatlib/devices/base.py
async def poll(self) -> DataFrame:
    """Read one data frame.

    Lazy-probes ``??D*`` the first time it's called if the session
    didn't have a cached :class:`DataFrameFormat` yet. Returns a
    :class:`DataFrame` with read-site ``received_at`` and
    ``monotonic_ns`` captured by the session.
    """
    return await self._session.poll()

power_up_tare async

power_up_tare(enable=None)

Query or set the power-up tare (ZCP, V10 10v05+).

None queries; True / False sets. On a controller that enables this, closed-loop control is delayed and valves stay closed until the ~0.25 s tare completes at power-on.

Source code in src/alicatlib/devices/base.py
async def power_up_tare(
    self,
    enable: bool | None = None,
) -> PowerUpTareState:
    """Query or set the power-up tare (``ZCP``, V10 10v05+).

    ``None`` queries; ``True`` / ``False`` sets. On a controller
    that enables this, closed-loop control is delayed and valves
    stay closed until the ~0.25 s tare completes at power-on.
    """
    return await self._session.execute(
        POWER_UP_TARE,
        PowerUpTareRequest(enable=enable),
    )

request async

request(statistics, *, averaging_ms=1)

Request a specific list of statistics with an averaging window.

DV on the wire. Unlike :meth:poll, which returns the device's cached data-frame fields, this targets 1–13 caller-chosen :class:~alicatlib.registry.Statistic members and reports each averaged over averaging_ms milliseconds.

Per-slot -- sentinels (invalid statistic code for this device) map to None in the returned :attr:MeasurementSet.values.

Parameters:

Name Type Description Default
statistics Sequence[Statistic | str]

1–13 :class:Statistic members or alias strings. Preserved order in the returned mapping.

required
averaging_ms int

Rolling averaging window in milliseconds, 1–9999. 0 is rejected pre-I/O (:class:AlicatValidationError) since the device rejects it with a generic ? and the stricter message is more useful.

1

Returns:

Name Type Description
MeasurementSet

class:MeasurementSet whose values mapping is keyed by

the MeasurementSet

class:Statistic members the caller asked for. If the

MeasurementSet

caller repeats a statistic, the last occurrence wins in the

MeasurementSet

mapping; the wire still carries every request (the devicce

MeasurementSet

still averages over all slots).

Source code in src/alicatlib/devices/base.py
async def request(
    self,
    statistics: Sequence[Statistic | str],
    *,
    averaging_ms: int = 1,
) -> MeasurementSet:
    """Request a specific list of statistics with an averaging window.

    ``DV`` on the wire. Unlike :meth:`poll`, which returns the
    device's cached data-frame fields, this targets 1–13 caller-chosen
    :class:`~alicatlib.registry.Statistic` members and reports each
    averaged over ``averaging_ms`` milliseconds.

    Per-slot ``--`` sentinels (invalid statistic code for this
    device) map to ``None`` in the returned
    :attr:`MeasurementSet.values`.

    Args:
        statistics: 1–13 :class:`Statistic` members or alias strings.
            Preserved order in the returned mapping.
        averaging_ms: Rolling averaging window in milliseconds,
            1–9999. ``0`` is rejected pre-I/O
            (:class:`AlicatValidationError`) since the device rejects
            it with a generic ``?`` and the stricter message is more
            useful.

    Returns:
        :class:`MeasurementSet` whose ``values`` mapping is keyed by
        the :class:`Statistic` members the caller asked for. If the
        caller repeats a statistic, the last occurrence wins in the
        mapping; the wire still carries every request (the devicce
        still averages over all slots).
    """
    typed_stats = tuple(statistic_registry.coerce(s) for s in statistics)
    raw_values: tuple[float | None, ...] = await self._session.execute(
        REQUEST_DATA,
        RequestDataRequest(statistics=typed_stats, averaging_ms=averaging_ms),
    )
    return MeasurementSet(
        unit_id=self._session.unit_id,
        values=dict(zip(typed_stats, raw_values, strict=True)),
        averaging_ms=averaging_ms,
        received_at=datetime.now(UTC),
    )

stp_ntp_pressure async

stp_ntp_pressure(mode, pressure=None, unit_code=None)

Query or set the standard / normal pressure reference (DCFRP).

Mass-flow devices only (V10 10v05+). mode selects STP vs NTP reference. pressure=None issues the query form; unit_code=None or 0 on set leaves the engineering unit unchanged. The device doesn't echo mode so the facade fills it from the request.

Source code in src/alicatlib/devices/base.py
async def stp_ntp_pressure(
    self,
    mode: StpNtpMode,
    pressure: float | None = None,
    unit_code: int | None = None,
) -> StpNtpPressureSetting:
    """Query or set the standard / normal pressure reference (``DCFRP``).

    Mass-flow devices only (V10 10v05+). ``mode`` selects STP vs
    NTP reference. ``pressure=None`` issues the query form;
    ``unit_code=None`` or ``0`` on set leaves the engineering
    unit unchanged. The device doesn't echo ``mode`` so the
    facade fills it from the request.
    """
    result = await self._session.execute(
        STP_NTP_PRESSURE,
        StpNtpPressureRequest(
            mode=mode,
            pressure=pressure,
            unit_code=unit_code,
        ),
    )
    return replace(result, mode=mode)

stp_ntp_temperature async

stp_ntp_temperature(mode, temperature=None, unit_code=None)

Query or set the standard / normal temperature reference (DCFRT).

Mirror of :meth:stp_ntp_pressure for temperature.

Source code in src/alicatlib/devices/base.py
async def stp_ntp_temperature(
    self,
    mode: StpNtpMode,
    temperature: float | None = None,
    unit_code: int | None = None,
) -> StpNtpTemperatureSetting:
    """Query or set the standard / normal temperature reference (``DCFRT``).

    Mirror of :meth:`stp_ntp_pressure` for temperature.
    """
    result = await self._session.execute(
        STP_NTP_TEMPERATURE,
        StpNtpTemperatureRequest(
            mode=mode,
            temperature=temperature,
            unit_code=unit_code,
        ),
    )
    return replace(result, mode=mode)

stream

stream(
    *,
    rate_ms=None,
    strict=False,
    overflow=None,
    buffer_size=256,
)

Open a streaming-mode context for this device.

Returns a :class:StreamingSession — an async context manager and an async iterator::

async with dev.stream(rate_ms=50) as stream:
    async for frame in stream:
        process(frame)

Streaming is a port-level state transition (design §5.8); while the context is active, every other :meth:execute / :meth:poll / etc. on sessions sharing this client's port fails fast with :class:~alicatlib.errors.AlicatStreamingModeError. One streamer per port.

Parameters:

Name Type Description Default
rate_ms int | None

If not None, configures NCS (streaming rate) before entering streaming mode. V10 >= 10v05 only; older firmware lacks the rate command and keeps its 50 ms default. None leaves the device's current rate alone; 0 is the device's "as-fast-as-possible" setting.

None
strict bool

If True, a malformed frame from the device propagates out of __anext__ and tears down the stream. Default False logs and skips.

False
overflow OverflowPolicy | None

Back-pressure policy when the producer's buffer fills. Defaults to :attr:OverflowPolicy.DROP_OLDEST — latest-data-wins is the right default for high-rate telemetry.

None
buffer_size int

Producer/consumer buffer depth. Default 256 frames; at the default 50 ms rate that's ~13 s of backlog.

256
Source code in src/alicatlib/devices/base.py
def stream(
    self,
    *,
    rate_ms: int | None = None,
    strict: bool = False,
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 256,
) -> StreamingSession:
    """Open a streaming-mode context for this device.

    Returns a :class:`StreamingSession` — an async context manager
    *and* an async iterator::

        async with dev.stream(rate_ms=50) as stream:
            async for frame in stream:
                process(frame)

    Streaming is a port-level state transition (design §5.8); while
    the context is active, every other :meth:`execute` / :meth:`poll`
    / etc. on sessions sharing this client's port fails fast with
    :class:`~alicatlib.errors.AlicatStreamingModeError`. One
    streamer per port.

    Args:
        rate_ms: If not ``None``, configures ``NCS`` (streaming
            rate) before entering streaming mode. V10 >= 10v05
            only; older firmware lacks the rate command and keeps
            its 50 ms default. ``None`` leaves the device's current
            rate alone; ``0`` is the device's "as-fast-as-possible"
            setting.
        strict: If ``True``, a malformed frame from the device
            propagates out of ``__anext__`` and tears down the
            stream. Default ``False`` logs and skips.
        overflow: Back-pressure policy when the producer's buffer
            fills. Defaults to
            :attr:`OverflowPolicy.DROP_OLDEST` — latest-data-wins
            is the right default for high-rate telemetry.
        buffer_size: Producer/consumer buffer depth. Default 256
            frames; at the default 50 ms rate that's ~13 s of
            backlog.
    """
    from alicatlib.devices.streaming import (  # noqa: PLC0415 — lazy to avoid import cycle
        OverflowPolicy as _OverflowPolicy,
    )
    from alicatlib.devices.streaming import (  # noqa: PLC0415
        StreamingSession as _StreamingSession,
    )

    return _StreamingSession(
        self._session,
        rate_ms=rate_ms,
        strict=strict,
        overflow=overflow if overflow is not None else _OverflowPolicy.DROP_OLDEST,
        buffer_size=buffer_size,
    )

tare_absolute_pressure async

tare_absolute_pressure()

Calibrate absolute pressure against the onboard barometer (PC).

Gated on :attr:Capability.TAREABLE_ABSOLUTE_PRESSURE — NOT on :attr:Capability.BAROMETER. The two dissociate in practice (design §16.6.7): flow controllers expose a firmware-computed barometer reading but lack a tareable process-port abs sensor. Users with a pressure meter/controller that supports PC opt in via assume_capabilities on :func:~alicatlib.devices.factory.open_device; devices without the capability raise :class:AlicatMissingHardwareError pre-I/O. Same INFO-log + data-frame-wrap semantics as :meth:tare_flow.

Source code in src/alicatlib/devices/base.py
async def tare_absolute_pressure(self) -> TareResult:
    """Calibrate absolute pressure against the onboard barometer (``PC``).

    Gated on :attr:`Capability.TAREABLE_ABSOLUTE_PRESSURE` — NOT
    on :attr:`Capability.BAROMETER`. The two dissociate in practice
    (design §16.6.7): flow controllers expose a firmware-computed
    barometer reading but lack a tareable process-port abs sensor.
    Users with a pressure meter/controller that supports ``PC``
    opt in via ``assume_capabilities`` on
    :func:`~alicatlib.devices.factory.open_device`; devices without
    the capability raise :class:`AlicatMissingHardwareError`
    pre-I/O. Same INFO-log + data-frame-wrap semantics as
    :meth:`tare_flow`.
    """
    _logger.info(
        _TARE_ABSOLUTE_PRECONDITION,
        extra={
            "unit_id": self._session.unit_id,
            "command": TARE_ABSOLUTE_PRESSURE.name,
        },
    )
    return await self._execute_tare(
        TARE_ABSOLUTE_PRESSURE,
        TareAbsolutePressureRequest(),
    )

tare_flow async

tare_flow()

Zero the flow reading (T).

Caller's precondition: no gas flowing through the device. The library cannot verify this — an INFO log records the expectation on every call so the precondition is auditable after the fact (design §5.18 pt 6). The device replies with a post-op data frame; the returned :class:TareResult wraps it as a :class:DataFrame with read-site timing.

Source code in src/alicatlib/devices/base.py
async def tare_flow(self) -> TareResult:
    """Zero the flow reading (``T``).

    Caller's precondition: no gas flowing through the device. The
    library cannot verify this — an INFO log records the
    expectation on every call so the precondition is auditable
    after the fact (design §5.18 pt 6). The device replies with a
    post-op data frame; the returned :class:`TareResult` wraps it
    as a :class:`DataFrame` with read-site timing.
    """
    _logger.info(
        _TARE_FLOW_PRECONDITION,
        extra={
            "unit_id": self._session.unit_id,
            "command": TARE_FLOW.name,
        },
    )
    return await self._execute_tare(TARE_FLOW, TareFlowRequest())

tare_gauge_pressure async

tare_gauge_pressure()

Zero the gauge-pressure reading (TP).

Caller's precondition: line depressurised to atmosphere. Same INFO-log + data-frame-wrap semantics as :meth:tare_flow.

Source code in src/alicatlib/devices/base.py
async def tare_gauge_pressure(self) -> TareResult:
    """Zero the gauge-pressure reading (``TP``).

    Caller's precondition: line depressurised to atmosphere.
    Same INFO-log + data-frame-wrap semantics as :meth:`tare_flow`.
    """
    _logger.info(
        _TARE_GAUGE_PRECONDITION,
        extra={
            "unit_id": self._session.unit_id,
            "command": TARE_GAUGE_PRESSURE.name,
        },
    )
    return await self._execute_tare(
        TARE_GAUGE_PRESSURE,
        TareGaugePressureRequest(),
    )

totalizer_config async

totalizer_config(
    totalizer=TotalizerId.FIRST,
    *,
    flow_statistic_code=None,
    mode=None,
    limit_mode=None,
    digits=None,
    decimal_place=None,
)

Query or set a totalizer's configuration (TC, V10 10v00+).

flow_statistic_code=None issues the query form. 1 disables the totalizer (other fields stay None). Any other value enables / reconfigures — mode / limit_mode / digits / decimal_place are required together in that case. Use :attr:TotalizerMode.KEEP / :attr:TotalizerLimitMode.KEEP (-1) to preserve the current value of one field while changing others.

Returns:

Type Description
TotalizerConfig

class:TotalizerConfig — the facade fills

TotalizerConfig

attr:TotalizerConfig.totalizer from the request since

TotalizerConfig

the wire reply does not echo the id.

Source code in src/alicatlib/devices/base.py
async def totalizer_config(
    self,
    totalizer: TotalizerId = TotalizerId.FIRST,
    *,
    flow_statistic_code: int | None = None,
    mode: TotalizerMode | None = None,
    limit_mode: TotalizerLimitMode | None = None,
    digits: int | None = None,
    decimal_place: int | None = None,
) -> TotalizerConfig:
    """Query or set a totalizer's configuration (``TC``, V10 10v00+).

    ``flow_statistic_code=None`` issues the query form. ``1``
    disables the totalizer (other fields stay ``None``). Any
    other value enables / reconfigures — ``mode`` /
    ``limit_mode`` / ``digits`` / ``decimal_place`` are required
    together in that case. Use :attr:`TotalizerMode.KEEP` /
    :attr:`TotalizerLimitMode.KEEP` (``-1``) to preserve the
    current value of one field while changing others.

    Returns:
        :class:`TotalizerConfig` — the facade fills
        :attr:`TotalizerConfig.totalizer` from the request since
        the wire reply does not echo the id.
    """
    result = await self._session.execute(
        TOTALIZER_CONFIG,
        TotalizerConfigRequest(
            totalizer=totalizer,
            flow_statistic_code=flow_statistic_code,
            mode=mode,
            limit_mode=limit_mode,
            digits=digits,
            decimal_place=decimal_place,
        ),
    )
    return replace(result, totalizer=totalizer)

totalizer_reset async

totalizer_reset(
    totalizer=TotalizerId.FIRST, *, confirm=False
)

Reset a totalizer's count (T <n>, 8v00+) — destructive.

Token-collision note: the command spec always emits the numeric totalizer argument on the wire, so it can never accidentally produce the flow-tare form (bare T\r). The destructive-confirm gate on the session requires the caller to pass confirm=True explicitly.

Source code in src/alicatlib/devices/base.py
async def totalizer_reset(
    self,
    totalizer: TotalizerId = TotalizerId.FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    r"""Reset a totalizer's count (``T <n>``, 8v00+) — destructive.

    Token-collision note: the command spec always emits the
    numeric totalizer argument on the wire, so it can never
    accidentally produce the flow-tare form (bare ``T\r``). The
    destructive-confirm gate on the session requires the caller
    to pass ``confirm=True`` explicitly.
    """
    return await self._execute_totalizer_reset(
        TOTALIZER_RESET,
        TotalizerResetRequest(totalizer=totalizer, confirm=confirm),
    )

totalizer_reset_peak async

totalizer_reset_peak(
    totalizer=TotalizerId.FIRST, *, confirm=False
)

Reset a totalizer's peak reading (TP <n>, 8v00+) — destructive.

Same token-collision protection as :meth:totalizer_reset — the spec always emits the numeric argument so TP\r (gauge-pressure tare) is unreachable from this path.

Source code in src/alicatlib/devices/base.py
async def totalizer_reset_peak(
    self,
    totalizer: TotalizerId = TotalizerId.FIRST,
    *,
    confirm: bool = False,
) -> TotalizerResetResult:
    r"""Reset a totalizer's peak reading (``TP <n>``, 8v00+) — destructive.

    Same token-collision protection as :meth:`totalizer_reset` —
    the spec always emits the numeric argument so ``TP\r``
    (gauge-pressure tare) is unreachable from this path.
    """
    return await self._execute_totalizer_reset(
        TOTALIZER_RESET_PEAK,
        TotalizerResetPeakRequest(totalizer=totalizer, confirm=confirm),
    )

totalizer_save async

totalizer_save(enable=None, *, save=None)

Query or set persist-totalizer-on-power-cycle (TCR, V10 10v05+).

enable=None queries; True / False sets. save=True persists the TCR config itself to EEPROM and feeds through the session's EEPROM-wear monitor (design §5.20.7).

Source code in src/alicatlib/devices/base.py
async def totalizer_save(
    self,
    enable: bool | None = None,
    *,
    save: bool | None = None,
) -> TotalizerSaveState:
    """Query or set persist-totalizer-on-power-cycle (``TCR``, V10 10v05+).

    ``enable=None`` queries; ``True`` / ``False`` sets. ``save=True``
    persists the ``TCR`` config itself to EEPROM and feeds through
    the session's EEPROM-wear monitor (design §5.20.7).
    """
    return await self._session.execute(
        TOTALIZER_SAVE,
        TotalizerSaveRequest(enable=enable, save=save),
    )

unlock_display async

unlock_display()

Unlock the front-panel display (U); reply is a post-op data frame.

Intentionally not gated on :attr:Capability.DISPLAY: this is the safety escape for a device that got into a locked state. Always callable. Hardware validation (2026-04-17) verified AU works on V1_V7 (7v09), V8_V9, and V10. On a device without a display, the command is a harmless no-op; on a locked device it clears the LCK status bit.

Source code in src/alicatlib/devices/base.py
async def unlock_display(self) -> DisplayLockResult:
    """Unlock the front-panel display (``U``); reply is a post-op data frame.

    Intentionally not gated on :attr:`Capability.DISPLAY`: this is
    the safety escape for a device that got into a locked state.
    Always callable. Hardware validation (2026-04-17) verified ``AU``
    works on V1_V7 (7v09), V8_V9, and V10. On a device without a
    display, the command is a harmless no-op; on a locked device
    it clears the ``LCK`` status bit.
    """
    return await self._execute_display_lock(UNLOCK_DISPLAY, UnlockDisplayRequest())

user_data async

user_data(slot, value=None)

Read or write a user-data slot (UD, 8v24+).

Four slots (0..3), 32 ASCII characters each. value=None reads the slot; a string writes it. Values are validated pre-I/O: ASCII-only, ≤ 32 characters, no \r / \n (those would truncate the wire write).

Source code in src/alicatlib/devices/base.py
async def user_data(
    self,
    slot: int,
    value: str | None = None,
) -> UserDataSetting:
    r"""Read or write a user-data slot (``UD``, 8v24+).

    Four slots (``0..3``), 32 ASCII characters each. ``value=None``
    reads the slot; a string writes it. Values are validated
    pre-I/O: ASCII-only, ≤ 32 characters, no ``\r`` / ``\n``
    (those would truncate the wire write).
    """
    result = await self._session.execute(
        USER_DATA,
        UserDataRequest(slot=slot, value=value),
    )
    # Hardware-validation finding: real 10v20 firmware returns only the
    # unit id when the slot is empty; the decoder's short-form path
    # marks ``slot`` as ``-1`` sentinel. Re-populate from the
    # request so the returned ``UserDataSetting`` always round-trips.
    if result.slot == -1:
        return replace(result, slot=slot)
    return result

zero_band async

zero_band(zero_band=None)

Query or set the zero band (DCZ, V10 10v05+).

Zero band is a percent-of-full-scale threshold: readings below it are reported as zero. zero_band=None issues the query form; a value in 0..6.38 sets it (0 disables).

Source code in src/alicatlib/devices/base.py
async def zero_band(
    self,
    zero_band: float | None = None,
) -> ZeroBandSetting:
    """Query or set the zero band (``DCZ``, V10 10v05+).

    Zero band is a percent-of-full-scale threshold: readings below
    it are reported as zero. ``zero_band=None`` issues the query
    form; a value in ``0..6.38`` sets it (``0`` disables).
    """
    return await self._session.execute(
        ZERO_BAND,
        ZeroBandRequest(zero_band=zero_band),
    )