Skip to content

alicatlib.commands

Declarative command specs plus request / response models. See Commands for the catalog and Design §5.4 for the spec architecture.

alicatlib.commands

Command layer — one declarative :class:Command per Alicat command.

See docs/design.md §5.4.

AnalogOutputSource dataclass

AnalogOutputSource(
    name="analog_output_source",
    token="ASOCV",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.ANALOG_OUTPUT,
    min_firmware=_MIN_FIRMWARE_ASOCV,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[AnalogOutputSourceRequest, AnalogOutputSourceSetting]

ASOCV — analog-output-source query/set (V10 10v05+).

Wire shape (primer p. 22):

  • Query: <uid><prefix>ASOCV <channel>\r
  • Set: <uid><prefix>ASOCV <channel> <value> [<unit_code>]\r

Response: <uid> <value> <unit_code> <unit_label> (4 fields). When value is 0 / 1 (fixed-level sentinels) the primer notes unit_code=1 and unit_label="---".

decode

decode(response, ctx)

Parse 4-field reply into :class:AnalogOutputSourceSetting.

channel is request-echo; the device doesn't re-echo it so the facade fills from the request.

Source code in src/alicatlib/commands/output.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> AnalogOutputSourceSetting:
    """Parse 4-field reply into :class:`AnalogOutputSourceSetting`.

    ``channel`` is request-echo; the device doesn't re-echo it so
    the facade fills from the request.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=4)
    unit_id, value_s, code_s, label = fields
    value = parse_int(value_s, field=f"{self.name}.value")
    code = parse_int(code_s, field=f"{self.name}.unit_code")
    return AnalogOutputSourceSetting(
        unit_id=unit_id,
        channel=AnalogOutputChannel.PRIMARY,  # facade replaces with request.channel
        value=value,
        unit_code=code,
        unit=_resolve_unit_label(label, code),
        unit_label=label,
    )

encode

encode(ctx, request)

Emit ASOCV query or set bytes.

Source code in src/alicatlib/commands/output.py
def encode(self, ctx: DecodeContext, request: AnalogOutputSourceRequest) -> bytes:
    """Emit ASOCV query or set bytes."""
    if request.value is not None and request.value < 0:
        raise AlicatValidationError(
            f"{self.name}: value must be non-negative (0/1 are min/max "
            f"sentinels; ≥2 are statistic codes); got {request.value}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"value": request.value},
            ),
        )
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token} {int(request.channel)}"
    if request.value is None:
        return (head + "\r").encode("ascii")
    tokens = [head, str(request.value)]
    if request.unit_code is not None:
        tokens.append(str(request.unit_code))
    return (" ".join(tokens) + "\r").encode("ascii")

AnalogOutputSourceRequest dataclass

AnalogOutputSourceRequest(
    channel, value=None, unit_code=None
)

Arguments for :data:ANALOG_OUTPUT_SOURCE.

Attributes:

Name Type Description
channel AnalogOutputChannel

:class:AnalogOutputChannel — primary or secondary analog output. Required for both query and set because the primer embeds the channel in the query wire form (ASOCV primary_or_secondary).

value int | None

Either a :class:Statistic wire code (≥2 per primer's statistic numbering) or the fixed-level sentinels 0 (minimum) / 1 (maximum). None issues the query form.

unit_code int | None

Engineering-unit code the output should use (primer Appendix B). Optional — None on set leaves units alone; ignored on query.

AutoTare dataclass

AutoTare(
    name="auto_tare",
    token="ZCA",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_ZCA_ZCP,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[AutoTareRequest, AutoTareState]

ZCA — auto-tare-on-zero-setpoint query/set (controllers, 10v05+).

Wire shape (hardware-corrected; primer p. 18 is incomplete):

  • Query: <uid><prefix>ZCA\r
  • Enable: <uid><prefix>ZCA 1 <delay>\r
  • Disable: <uid><prefix>ZCA 0\r (no delay field — see §15.3)

Primer documents the set form as always carrying both slots, with ZCA <uid> 0 0 disabling auto-tare. Hardware validation (2026-04-17) confirmed on two 10v20 units that ZCA 0 0 rejects with ? — the device does not accept a zero delay in the disable form. The wire-form probe also confirmed that the shortest accepted disable form is ZCA 0 with no delay field (ZCA 0 0.1 / ZCA 0 1 / ZCA 0 1.0 also work, but all land in the same enabled=0 delay=0.0 state per the reply). The encoder emits the bare-0 form for enable=False.

Response: <uid> <enable> <delay> (3 fields) for both enable and disable paths.

decode

decode(response, ctx)

Parse <uid> <enable> <delay> into :class:AutoTareState.

Source code in src/alicatlib/commands/tare.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> AutoTareState:
    """Parse ``<uid> <enable> <delay>`` into :class:`AutoTareState`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=3)
    unit_id, enable_s, delay_s = fields
    return AutoTareState(
        unit_id=unit_id,
        enabled=parse_bool_code(enable_s, field=f"{self.name}.enabled"),
        delay_s=parse_float(delay_s, field=f"{self.name}.delay_s"),
    )

encode

encode(ctx, request)

Emit ZCA query or set bytes.

Disable uses the shortest accepted form (ZCA 0) rather than the primer's ZCA 0 0 — the latter rejects on real 10v20 (see §16.6.10 + class docstring).

Source code in src/alicatlib/commands/tare.py
def encode(self, ctx: DecodeContext, request: AutoTareRequest) -> bytes:
    """Emit ZCA query or set bytes.

    Disable uses the shortest accepted form (``ZCA 0``) rather
    than the primer's ``ZCA 0 0`` — the latter rejects on real
    10v20 (see §16.6.10 + class docstring).
    """
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token}"
    if request.enable is None:
        return f"{head}\r".encode("ascii")
    # Disable form — no delay field on the wire.
    if not request.enable:
        return f"{head} 0\r".encode("ascii")
    # Enable form — delay required.
    delay = request.delay_s
    if delay is None:
        raise AlicatValidationError(
            f"{self.name}: delay_s is required when enabling auto-tare "
            f"(range {ZCA_DELAY_MIN_S}..{ZCA_DELAY_MAX_S} seconds)",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
            ),
        )
    if delay < ZCA_DELAY_MIN_S or delay > ZCA_DELAY_MAX_S:
        raise AlicatValidationError(
            f"{self.name}: delay_s must be in "
            f"[{ZCA_DELAY_MIN_S}, {ZCA_DELAY_MAX_S}] seconds, got {delay}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"delay_s": delay},
            ),
        )
    return f"{head} 1 {delay}\r".encode("ascii")

AutoTareRequest dataclass

AutoTareRequest(enable=None, delay_s=None)

Arguments for :data:AUTO_TARE.

Attributes:

Name Type Description
enable bool | None

True enables auto-tare on zero-setpoint; False disables. None issues the query form.

delay_s float | None

Settling delay in seconds before the device tares after seeing a zero setpoint. Primer constrains this to [0.1, 25.5]. Required when enabling (enable=True); ignored and omitted from the wire when disabling (enable=False). See :class:AutoTare for the wire form the disable path emits.

AverageTiming dataclass

AverageTiming(
    name="average_timing",
    token="DCA",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_V10_05,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[AverageTimingRequest, AverageTimingSetting]

DCA — per-statistic averaging-window query/set (V10 10v05+).

Wire shape (primer p. 18):

  • Query: <uid><prefix>DCA <stat>\r
  • Set: <uid><prefix>DCA <stat> <averaging_ms>\r

Response: <uid> <stat> <averaging_ms> (3 fields).

decode

decode(response, ctx)

Parse the DCA reply into :class:AverageTimingSetting.

Primer p. 18 documents <uid> <stat> <averaging_ms> (3 fields). Hardware validation (2026-04-17) on 10v20 firmware shows the device omits the <stat> echo and replies with just <uid> <averaging_ms> (2 fields). Accept both shapes; when the 2-field form arrives, statistic_code is 0 and the facade re-populates it from the request.

Source code in src/alicatlib/commands/data_readings.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> AverageTimingSetting:
    """Parse the ``DCA`` reply into :class:`AverageTimingSetting`.

    Primer p. 18 documents ``<uid> <stat> <averaging_ms>`` (3
    fields). Hardware validation (2026-04-17) on 10v20 firmware shows
    the device omits the ``<stat>`` echo and replies with just
    ``<uid> <averaging_ms>`` (2 fields). Accept both shapes; when
    the 2-field form arrives, ``statistic_code`` is ``0`` and the
    facade re-populates it from the request.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name)
    if len(fields) == _DCA_FIELDS_WITH_STAT:
        unit_id, stat_s, avg_s = fields
        statistic_code = parse_int(stat_s, field=f"{self.name}.statistic_code")
    elif len(fields) == _DCA_FIELDS_WITHOUT_STAT:
        unit_id, avg_s = fields
        statistic_code = 0
    else:
        raise AlicatParseError(
            f"{self.name}: expected 2 or 3 fields, got {len(fields)}{text!r}",
            field_name="average_timing",
            expected="2 or 3 fields",
            actual=len(fields),
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    return AverageTimingSetting(
        unit_id=unit_id,
        statistic_code=statistic_code,
        averaging_ms=parse_int(avg_s, field=f"{self.name}.averaging_ms"),
    )

encode

encode(ctx, request)

Emit the DCA query or set bytes.

Source code in src/alicatlib/commands/data_readings.py
def encode(self, ctx: DecodeContext, request: AverageTimingRequest) -> bytes:
    """Emit the DCA query or set bytes."""
    if request.statistic_code not in DCA_ALLOWED_STATISTIC_CODES:
        raise AlicatValidationError(
            f"{self.name}: statistic_code {request.statistic_code} not in "
            f"{sorted(DCA_ALLOWED_STATISTIC_CODES)} — DCA only averages "
            "pressure / flow statistics per primer p. 18.",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"statistic_code": request.statistic_code},
            ),
        )
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token} {request.statistic_code}"
    if request.averaging_ms is None:
        return (head + "\r").encode("ascii")
    if request.averaging_ms < 0 or request.averaging_ms > _DCA_MAX_AVERAGE_MS:
        raise AlicatValidationError(
            f"{self.name}: averaging_ms must be in [0, {_DCA_MAX_AVERAGE_MS}], "
            f"got {request.averaging_ms}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"averaging_ms": request.averaging_ms},
            ),
        )
    return f"{head} {request.averaging_ms}\r".encode("ascii")

AverageTimingRequest dataclass

AverageTimingRequest(statistic_code, averaging_ms=None)

Arguments for :data:AVERAGE_TIMING.

Attributes:

Name Type Description
statistic_code int

One of the primer's permitted averaging codes (see :data:DCA_ALLOWED_STATISTIC_CODES). Required for both query and set (DCA is per-statistic).

averaging_ms int | None

Rolling window in ms. None issues the query form. 0..9999 otherwise; 0 means "update every millisecond" (no averaging).

BlinkDisplay dataclass

BlinkDisplay(
    name="blink_display",
    token="FFP",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.DISPLAY,
    min_firmware=_MIN_FIRMWARE_FFP,
    max_firmware=None,
    firmware_families=frozenset(
        {FirmwareFamily.V8_V9, FirmwareFamily.V10}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[BlinkDisplayRequest, BlinkDisplayState]

FFP — blink display query/set (8v28+, DISPLAY capability).

Wire:

  • Query: <uid><prefix>FFP\r
  • Set: <uid><prefix>FFP <duration>\r

Response: <uid> <0|1> — two fields. 1 = flashing, 0 = idle.

decode

decode(response, ctx)

Parse <uid> <0|1> into :class:BlinkDisplayState.

Source code in src/alicatlib/commands/display.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> BlinkDisplayState:
    """Parse ``<uid> <0|1>`` into :class:`BlinkDisplayState`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    unit_id, flag_s = fields
    return BlinkDisplayState(
        unit_id=unit_id,
        flashing=parse_bool_code(flag_s, field=f"{self.name}.flashing"),
    )

encode

encode(ctx, request)

Emit FFP query or set bytes.

Source code in src/alicatlib/commands/display.py
def encode(self, ctx: DecodeContext, request: BlinkDisplayRequest) -> bytes:
    """Emit FFP query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.duration_s is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    if request.duration_s < 0 and request.duration_s != _FFP_FLASH_INDEFINITELY:
        raise AlicatValidationError(
            f"{self.name}: duration_s must be >= 0 or exactly "
            f"{_FFP_FLASH_INDEFINITELY} (flash indefinitely), "
            f"got {request.duration_s}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"duration_s": request.duration_s},
            ),
        )
    return f"{ctx.unit_id}{prefix}{self.token} {request.duration_s}\r".encode("ascii")

BlinkDisplayRequest dataclass

BlinkDisplayRequest(duration_s=None)

Arguments for :data:BLINK_DISPLAY.

Attributes:

Name Type Description
duration_s int | None

Flash duration in seconds. None issues the query form; 0 stops an active flash; -1 flashes indefinitely. Other negatives are rejected pre-I/O because they are not documented sentinels.

CancelValveHold dataclass

CancelValveHold(
    name="cancel_valve_hold",
    token="C",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=_ALL_CONTROLLER_FIRMWARE_FAMILIES,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[CancelValveHoldRequest, 'ParsedFrame']

C — cancel valve hold, resume closed-loop control.

Wire: <uid><prefix>C\r. Response is a post-op data frame without the :attr:StatusCode.HLD bit. Safe to issue even when no hold is active (the primer documents a successful data-frame reply in that case).

No primer firmware cutoff — the command is documented as universally available across V1_V7, V8_V9, V10. GP behaviour is not documented; the firmware-family gate excludes GP to stay conservative until a capture confirms.

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/valve.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_hold_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>C\r.

Source code in src/alicatlib/commands/valve.py
def encode(self, ctx: DecodeContext, request: CancelValveHoldRequest) -> bytes:
    r"""Emit ``<uid><prefix>C\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

CancelValveHoldRequest dataclass

CancelValveHoldRequest()

Arguments for :data:CANCEL_VALVE_HOLD (empty — C takes no arguments).

Capability

Bases: Flag

Device-level feature flags, discovered per-device at session startup.

:class:~alicatlib.devices.base.DeviceKind is too coarse to gate many commands (a flow meter may or may not have a barometer; only some devices have a secondary pressure sensor or remote-tare pin). These capabilities are orthogonal to DeviceKind and are declared per command via :attr:Command.required_capabilities.

See design §5.4 for the full rationale.

BAROMETER class-attribute instance-attribute

BAROMETER = auto()

Device reports a barometric pressure reading (FPF 15 returns a plausible value with a real unit label). Probed via FPF 15.

Does NOT imply :attr:TAREABLE_ABSOLUTE_PRESSURE. Hardware validation on 2026-04-17 established that flow-controller devices (MCR, MCP, …) expose a firmware-computed barometer reading used internally for abs/gauge pressure derivation, but do not have a process-port absolute-pressure sensor that the PC command can re-zero. Four devices (8v17 MCR-200, 8v30 MCR-500, 6v21 MCR-775, 7v09 MCP-50) all probed BAROMETER positive via FPF 15 yet rejected or silently ignored PC. See design §16.6.7 for the narrative.

TAREABLE_ABSOLUTE_PRESSURE class-attribute instance-attribute

TAREABLE_ABSOLUTE_PRESSURE = auto()

Device has a process-port absolute-pressure sensor whose zero can be re-referenced against the current barometric reading via PC (tare_absolute_pressure). Gated separately from :attr:BAROMETER because the two properties dissociate in practice (see the BAROMETER docstring).

No safe probe exists — test-writing PC would tare the device. Users opt in via assume_capabilities=Capability.TAREABLE_ABSOLUTE_PRESSURE on :func:~alicatlib.devices.factory.open_device when they know their hardware supports it (typically pressure meters/controllers).

Command dataclass

Command(
    name,
    token,
    response_mode,
    device_kinds,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Declarative spec for an Alicat command.

Subclasses are frozen dataclass instances. The overridden :meth:encode / :meth:decode methods are the command's wire format. Everything else — firmware gating, capability gating, destructive / experimental flags, multiline termination — is metadata that the session reads before dispatching, so commands fail fast with typed errors rather than silently producing a bad wire payload.

Attributes:

Name Type Description
name str

Canonical Python-friendly name (e.g. "gas_select"). Used in error messages and telemetry.

token str

Protocol token (e.g. "GS"). Emitted verbatim — Alicat commands are case-insensitive except FACTORY RESTORE ALL, which must be uppercase; set :attr:case_sensitive on that.

response_mode ResponseMode

How the session should dispatch the I/O.

device_kinds frozenset[DeviceKind]

Which :class:DeviceKind values this command applies to. Empty means "any".

media Medium

Which :class:Medium flag(s) this command applies to. Default (Medium.GAS | Medium.LIQUID) is medium-agnostic and lets the command run on every device. Gas-specific commands (GS, ??G*, gas-mix edits) narrow to :attr:Medium.GAS; liquid-specific commands narrow to :attr:Medium.LIQUID. Gated pre-I/O in :class:~alicatlib.devices.session.Session — a bitwise intersection of command media against the device's configured media determines whether dispatch proceeds (design §5.9a).

required_capabilities Capability

Capability bits the device must have. A command run on a device missing one raises :class:~alicatlib.errors.AlicatMissingHardwareError.

min_firmware / max_firmware

Supported firmware range within a family. Cross-family comparison raises TypeError by design (see :class:alicatlib.firmware.FirmwareVersion).

firmware_families frozenset[FirmwareFamily]

Which families support this command — monotonic gate: declare a family only when every captured device in that family either implements the command or is documented to. Empty means "any". For commands whose availability varies per-device within a family (??G* works on 5v12 + 7v09 but rejects on 6v21; FPF rejects on 5v12 but works on 6v+), use a conservative superset (include families where some devices work) and let the runtime rejection path handle per-device variation. The empirical matrix (tests/fixtures/device_matrix.yaml) and its validator (tests/unit/test_device_matrix.py) enforce this policy end-to-end.

destructive bool

Requires explicit confirm=True at the session layer.

experimental bool

Emits a deprecation-style warning on use.

case_sensitive bool

Suppress any hypothetical upstream lowercase normalisation. Only needed for FACTORY RESTORE ALL.

prefix_less bool

Command opts out of the unit-id prefix (e.g. @@ stop-stream).

expected_lines int | None

Fixed row count for LINES commands (e.g. ??M* is 10 lines).

is_complete Callable[[Sequence[bytes]], bool] | None

Predicate that returns True when a multiline response is complete. Checked before expected_lines, so takes priority (see design §5.2).

A :class:Command must declare at least one of expected_lines, is_complete for every :attr:ResponseMode.LINES command — otherwise reads fall through to the idle-timeout fallback every time, adding roughly 100 ms of latency per invocation. A test pins this invariant.

decode

decode(response, ctx)

Parse the device's (EOL-stripped) response into the typed result.

Source code in src/alicatlib/commands/base.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> Resp:
    """Parse the device's (EOL-stripped) response into the typed result."""
    raise NotImplementedError(f"{self.name}.decode is not implemented")

encode

encode(ctx, request)

Render request as the exact bytes to put on the wire (incl. EOL).

Source code in src/alicatlib/commands/base.py
def encode(self, ctx: DecodeContext, request: Req) -> bytes:
    """Render ``request`` as the exact bytes to put on the wire (incl. EOL)."""
    raise NotImplementedError(f"{self.name}.encode is not implemented")

Commands

Namespace for command spec singletons.

Usage::

from alicatlib.commands import Commands

await session.execute(Commands.GAS_SELECT, GasSelectRequest(gas="N2"))

DataFrameFormatQuery dataclass

DataFrameFormatQuery(
    name="data_frame_format_query",
    token="??D*",
    response_mode=ResponseMode.LINES,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=True,
    expected_lines=64,
    is_complete=None,
)

Bases: Command[DataFrameFormatRequest, 'DataFrameFormat']

??D* — query the device's advertised data-frame layout.

decode

decode(response, ctx)

Parse the advertised data-frame layout into :class:DataFrameFormat.

Source code in src/alicatlib/commands/system.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> DataFrameFormat:
    """Parse the advertised data-frame layout into :class:`DataFrameFormat`."""
    del ctx
    if not isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected multi-line response, got single line",
        )
    return parse_data_frame_table(response)

encode

encode(ctx, request)

Emit <unit_id><prefix>??D*\r.

Source code in src/alicatlib/commands/system.py
def encode(
    self,
    ctx: DecodeContext,
    request: DataFrameFormatRequest,
) -> bytes:
    r"""Emit ``<unit_id><prefix>??D*\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

DataFrameFormatRequest dataclass

DataFrameFormatRequest()

Arguments for :data:DATA_FRAME_FORMAT_QUERY — no user-provided fields.

DeadbandLimit dataclass

DeadbandLimit(
    name="deadband_limit",
    token="LCDB",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_DEADBAND,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[DeadbandLimitRequest, DeadbandSetting]

LCDB — deadband-limit query/set (10v05+).

Wire shape (primer p. 14):

  • Query: <uid><prefix>LCDB\r
  • Set: <uid><prefix>LCDB <save> <deadband_limit>\r

Response: <uid> <current_deadband> <unit_code> <unit_label> (4 fields).

decode

decode(response, ctx)

Parse the 4-field reply into :class:DeadbandSetting.

Source code in src/alicatlib/commands/control.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> DeadbandSetting:
    """Parse the 4-field reply into :class:`DeadbandSetting`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=4)
    unit_id, deadband_s, unit_code_s, unit_label = fields
    deadband = parse_float(deadband_s, field=f"{self.name}.deadband")
    unit_code = parse_int(unit_code_s, field=f"{self.name}.unit_code")
    return DeadbandSetting(
        unit_id=unit_id,
        deadband=deadband,
        unit_code=unit_code,
        unit=_resolve_unit_label(unit_label, unit_code),
        unit_label=unit_label,
    )

encode

encode(ctx, request)

Emit the LCDB query or set bytes.

Source code in src/alicatlib/commands/control.py
def encode(self, ctx: DecodeContext, request: DeadbandLimitRequest) -> bytes:
    """Emit the LCDB query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.deadband is None:
        # Query — ``save`` is set-only semantics, silently ignored
        # here so a caller that reuses a request dataclass with
        # default flags for both query and set doesn't have to
        # reset them.
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    if request.deadband < 0:
        raise AlicatValidationError(
            f"{self.name}: deadband must be >= 0 (0 disables), got {request.deadband}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"deadband": request.deadband},
            ),
        )
    save_flag = "1" if request.save else "0"
    return (f"{ctx.unit_id}{prefix}{self.token} {save_flag} {request.deadband}\r").encode(
        "ascii"
    )

DeadbandLimitRequest dataclass

DeadbandLimitRequest(deadband=None, save=None)

Arguments for :data:DEADBAND_LIMIT.

Attributes:

Name Type Description
deadband float | None

Acceptable drift around the setpoint in the controlled variable's engineering units. 0.0 disables the deadband; negative values are rejected pre-I/O. None issues the query form.

save bool | None

True persists the new value to EEPROM (subject to the :attr:AlicatConfig.save_rate_warn_per_min rate-warn guard); False / None keeps the change volatile. Primer requires save in the first wire position on set, so the encoder emits 0/1 in that slot; None defaults to 0 (volatile) per primer-safe behaviour.

DecodeContext dataclass

DecodeContext(
    unit_id,
    firmware,
    capabilities=Capability.NONE,
    command_prefix=b"",
    data_frame_format=None,
)

Per-call context threaded through encode / decode.

Built once per command by the session from cached device info, so commands never do I/O to figure out the prefix / firmware themselves.

Attributes:

Name Type Description
unit_id str

Single-letter AZ identifying the device on the bus.

firmware FirmwareVersion

Parsed device firmware; used by commands that alter their wire format across firmware families (e.g. legacy setpoint S vs modern LS).

capabilities Capability

Feature flags discovered at session startup. The session pre-checks Command.required_capabilities against this, so encode-time commands generally don't re-check.

command_prefix bytes

Bytes injected between unit_id and the command token. Empty for numeric-family firmware; b"$$" for GP-family devices (per primer p. 4 and design §5.10).

data_frame_format DataFrameFormat | None

The cached ??D* format. None before the session has probed it; commands that decode a data frame (POLL_DATA, any command returning a post-op state frame) raise :class:AlicatParseError if they're asked to decode with this still None.

EngineeringUnits dataclass

EngineeringUnits(
    name="engineering_units",
    token="DCU",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=MIN_FIRMWARE_DCU,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[EngineeringUnitsRequest, UnitSetting]

DCU — engineering-units query / set.

Wire shape (primer-derived; hardware-correctable):

  • Query: <uid><prefix>DCU <stat>
  • Set: <uid><prefix>DCU <stat> <unit>
  • Group: <uid><prefix>DCU <stat> <unit> 1
  • Special: <uid><prefix>DCU <stat> <unit> 1 1

Response shape: <uid> <stat_code> <unit_code> <unit_label>.

decode

decode(response, ctx)

Parse <uid> <unit_code> <unit_label> into :class:UnitSetting.

Verified against a V10 capture on 2026-04-17 (design §16.6) — the device does not echo the requested statistic in the reply, so statistic is left as :attr:Statistic.NONE and the facade fills it from the request via :func:dataclasses.replace.

Source code in src/alicatlib/commands/units.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> UnitSetting:
    """Parse ``<uid> <unit_code> <unit_label>`` into :class:`UnitSetting`.

    Verified against a V10 capture on 2026-04-17 (design §16.6) — the
    device does *not* echo the requested statistic in the reply, so
    ``statistic`` is left as :attr:`Statistic.NONE` and the facade
    fills it from the request via :func:`dataclasses.replace`.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=3)
    unit_id, unit_code_s, label = fields
    unit_code = parse_int(unit_code_s, field="unit_code")

    return UnitSetting(
        unit_id=unit_id,
        statistic=Statistic.NONE,  # facade replaces with request.statistic
        unit=_resolve_response_unit(label, unit_code),
        label=label,
    )

encode

encode(ctx, request)

Emit the DCU query / set bytes.

Source code in src/alicatlib/commands/units.py
def encode(
    self,
    ctx: DecodeContext,
    request: EngineeringUnitsRequest,
) -> bytes:
    """Emit the DCU query / set bytes."""
    _, stat_code = _resolve_statistic_code(request.statistic)
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token} {stat_code}"

    if request.unit is None:
        # Query form. ``apply_to_group`` / ``override_special_rules``
        # are set-only semantics; silently ignored here so a caller
        # that constructs the request with default flags for both
        # query and set doesn't have to reset them.
        return (head + "\r").encode("ascii")

    unit_code = _resolve_unit_code(request.unit)
    tokens = [head, str(unit_code)]
    if request.apply_to_group or request.override_special_rules:
        tokens.append("1" if request.apply_to_group else "0")
        if request.override_special_rules:
            tokens.append("1")
    return (" ".join(tokens) + "\r").encode("ascii")

EngineeringUnitsRequest dataclass

EngineeringUnitsRequest(
    statistic,
    unit=None,
    apply_to_group=False,
    override_special_rules=False,
)

Arguments for :data:ENGINEERING_UNITS.

Attributes:

Name Type Description
statistic Statistic | str

Statistic to query or set units on. Accepts the :class:Statistic member or any registered alias / value string.

unit Unit | int | str | None

None issues the query form. An :class:Unit member, numeric code, or registered alias sets the unit.

apply_to_group bool

When setting, broadcast the change to the statistic's group instead of the single statistic.

override_special_rules bool

When setting, override any device-specific restrictions on the statistic / unit pair. Only meaningful on set; ignored in query form.

FullScaleQuery dataclass

FullScaleQuery(
    name="full_scale_query",
    token="FPF",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[FullScaleQueryRequest, FullScaleValue]

FPF — full-scale query for a single statistic.

Wire shape: <uid><prefix>FPF <stat>\r.

Response (primer-derived): <uid> <stat_code> <value> <unit_code> <unit_label>.

decode

decode(response, ctx)

Parse <uid> <value> <unit_code> <unit_label> into :class:FullScaleValue.

Verified against a V10 capture on 2026-04-17 (design §16.6) — the device does not echo the requested statistic in the reply, so statistic is left as :attr:Statistic.NONE and the facade fills it from the request via :func:dataclasses.replace.

Source code in src/alicatlib/commands/units.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> FullScaleValue:
    """Parse ``<uid> <value> <unit_code> <unit_label>`` into :class:`FullScaleValue`.

    Verified against a V10 capture on 2026-04-17 (design §16.6) — the
    device does *not* echo the requested statistic in the reply, so
    ``statistic`` is left as :attr:`Statistic.NONE` and the facade
    fills it from the request via :func:`dataclasses.replace`.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=4)
    _unit_id, value_s, unit_code_s, label = fields
    value = parse_float(value_s, field="full_scale_value")
    unit_code = parse_int(unit_code_s, field="unit_code")

    return FullScaleValue(
        statistic=Statistic.NONE,  # facade replaces with request.statistic
        value=value,
        unit=_resolve_response_unit(label, unit_code),
        unit_label=label,
    )

encode

encode(ctx, request)

Emit <unit_id><prefix>FPF <stat_code>\r.

Source code in src/alicatlib/commands/units.py
def encode(
    self,
    ctx: DecodeContext,
    request: FullScaleQueryRequest,
) -> bytes:
    r"""Emit ``<unit_id><prefix>FPF <stat_code>\r``."""
    _, stat_code = _resolve_statistic_code(request.statistic)
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token} {stat_code}\r".encode("ascii")

FullScaleQueryRequest dataclass

FullScaleQueryRequest(statistic)

Arguments for :data:FULL_SCALE_QUERY.

Attributes:

Name Type Description
statistic Statistic | str

Statistic whose full-scale value to query.

GasList dataclass

GasList(
    name="gas_list",
    token="??G*",
    response_mode=ResponseMode.LINES,
    device_kinds=frozenset(
        {DeviceKind.FLOW_METER, DeviceKind.FLOW_CONTROLLER}
    ),
    media=Medium.GAS,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=True,
    expected_lines=_GAS_LIST_MAX_LINES,
    is_complete=None,
)

Bases: Command[GasListRequest, dict[int, str]]

??G* — enumerate built-in and mixture gases on the device.

Returns {gas_code: raw_label}. Callers that want typed :class:Gas members should cross-reference the code against :func:alicatlib.registry.gas_registry.by_code; unknown codes (custom mixtures, legacy compat slots) are preserved as raw labels so diagnostics retain them.

decode

decode(response, ctx)

Parse the multi-line gas list into {gas_code: label}.

Source code in src/alicatlib/commands/gas.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> dict[int, str]:
    """Parse the multi-line gas list into ``{gas_code: label}``."""
    del ctx
    if not isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected multi-line response, got single line",
        )
    return parse_gas_list(response)

encode

encode(ctx, request)

Emit <unit_id><prefix>??G*\r.

Source code in src/alicatlib/commands/gas.py
def encode(self, ctx: DecodeContext, request: GasListRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>??G*\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

GasListRequest dataclass

GasListRequest()

Arguments for :data:GAS_LIST — no user-provided fields.

GasSelect dataclass

GasSelect(
    name="gas_select",
    token="GS",
    response_mode=ResponseMode.LINE,
    device_kinds=frozenset(
        {DeviceKind.FLOW_METER, DeviceKind.FLOW_CONTROLLER}
    ),
    media=Medium.GAS,
    required_capabilities=Capability.NONE,
    min_firmware=MIN_FIRMWARE_GAS_SELECT,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[GasSelectRequest, GasState]

Active-gas command (GS, 10v05+).

Both get (GS) and set (GS <code> [save]) share a single command spec — the request's gas field picks the mode. The facade routes firmwares < 10v05 to :data:GAS_SELECT_LEGACY (see :func:~alicatlib.commands._firmware_cutoffs.uses_modern_gas_select).

decode

decode(response, ctx)

Parse the four-field GS reply into a typed :class:GasState.

Source code in src/alicatlib/commands/gas.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> GasState:
    """Parse the four-field GS reply into a typed :class:`GasState`."""
    del ctx  # kept for signature uniformity across commands
    if isinstance(response, tuple):
        # A LINE command should never receive a multi-line response; the
        # session guarantees this, but guard against a mis-configured spec.
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=4)
    unit_id, code_s, label, long_name = fields
    code = parse_int(code_s, field="code")
    return GasState(
        unit_id=unit_id,
        code=code,
        gas=gas_registry.by_code(code),
        label=label,
        long_name=long_name,
    )

encode

encode(ctx, request)

Emit the wire bytes for a GS query or set command.

Source code in src/alicatlib/commands/gas.py
def encode(self, ctx: DecodeContext, request: GasSelectRequest) -> bytes:
    """Emit the wire bytes for a GS query or set command."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.gas is None:
        # Query form: no gas argument, no save flag.
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    gas = gas_registry.coerce(request.gas)
    if request.save is None:
        body = f"{ctx.unit_id}{prefix}{self.token} {gas.code}"
    else:
        save_flag = "1" if request.save else "0"
        body = f"{ctx.unit_id}{prefix}{self.token} {gas.code} {save_flag}"
    return body.encode("ascii") + b"\r"

GasSelectLegacy dataclass

GasSelectLegacy(
    name="gas_select_legacy",
    token="G",
    response_mode=ResponseMode.LINE,
    device_kinds=frozenset(
        {DeviceKind.FLOW_METER, DeviceKind.FLOW_CONTROLLER}
    ),
    media=Medium.GAS,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=_MAX_FIRMWARE_GAS_SELECT_LEGACY_V10,
    firmware_families=frozenset(
        {
            FirmwareFamily.GP,
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[GasSelectLegacyRequest, 'ParsedFrame']

Legacy set-gas command (G) for firmware older than V10 ≥ 10v05.

Per design §5.4, the device replies with a post-op data frame rather than the modern 4-field <uid> <code> <short> <long> reply. This command's decoder returns the raw :class:ParsedFrame so the facade can stitch a :class:GasState from (a) the gas code the caller sent — known at the facade but not at the decoder — and (b) the frame's echoed unit id.

Gating: firmware_families lists every family the legacy path applies to; max_firmware set to 10v04 inside :attr:FirmwareFamily.V10 blocks V10 ≥ 10v05 specifically. Because the session's range check is family-scoped (design §5.10), the V10 upper bound does not leak into the gating decisions for GP / V1_V7 / V8_V9.

decode

decode(response, ctx)

Parse the post-set data frame against ctx.data_frame_format.

The session caches the format at startup; callers hitting this decoder before the format has been probed get an :class:AlicatParseError pointing at the missing probe — same failure mode as :data:~alicatlib.commands.polling.POLL_DATA.

Source code in src/alicatlib/commands/gas.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-set data frame against ``ctx.data_frame_format``.

    The session caches the format at startup; callers hitting this
    decoder before the format has been probed get an
    :class:`AlicatParseError` pointing at the missing probe — same
    failure mode as :data:`~alicatlib.commands.polling.POLL_DATA`.
    """
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    if ctx.data_frame_format is None:
        raise AlicatParseError(
            f"{self.name} requires ctx.data_frame_format; session must probe ??D* first",
            field_name="data_frame_format",
            expected="DataFrameFormat",
            actual=None,
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    return ctx.data_frame_format.parse(response)

encode

encode(ctx, request)

Emit <unit_id><prefix>G <gas_code>\r.

Source code in src/alicatlib/commands/gas.py
def encode(self, ctx: DecodeContext, request: GasSelectLegacyRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>G <gas_code>\r``."""
    gas = gas_registry.coerce(request.gas)
    prefix = ctx.command_prefix.decode("ascii")
    body = f"{ctx.unit_id}{prefix}{self.token} {gas.code}"
    return body.encode("ascii") + b"\r"

GasSelectLegacyRequest dataclass

GasSelectLegacyRequest(gas)

Arguments for :data:GAS_SELECT_LEGACY.

Legacy G is set only — there is no query form and no save flag. The facade (:meth:FlowMeter.gas) raises :class:AlicatUnsupportedCommandError if a caller asks for a query while the device is on firmware that only supports the legacy path; likewise it raises :class:AlicatValidationError if save=True is passed to a legacy dispatch.

Attributes:

Name Type Description
gas Gas | str

Gas to select. Accepts a :class:Gas enum member, its primer short name, long name, or any registered alias. Required.

GasSelectRequest dataclass

GasSelectRequest(gas=None, save=None)

Arguments for :data:GAS_SELECT.

Attributes:

Name Type Description
gas Gas | str | None

Gas to select. Accepts a :class:Gas enum member, its primer short name ("N2"), its long name ("Nitrogen"), or any registered alias. None issues the query form (GS with no argument) which reads back the active gas without changing it.

save bool | None

If True, persist the selection to EEPROM. False keeps it volatile (lost on power cycle). None — the default — omits the flag, which matches the device's own default behavior (volatile). See design §5.20 for the EEPROM-wear guard that fires on hot-looped save=True calls.

GasState dataclass

GasState(unit_id, code, gas, label, long_name)

Active-gas response.

Populated from a four-field reply: <unit_id> <code> <short> <long>.

Attributes:

Name Type Description
unit_id str

Echoed unit id ("A".."Z").

code int

Numeric gas code (primer Appendix C). Redundant with gas.code but preserved because some (pre-10v05) firmwares emit a code the registry hasn't seen — in which case :func:gas_registry.by_code raises and users can still read the raw code off the exception.

gas Gas

The typed :class:Gas enum member.

label str

Short primer label as echoed by the device (usually matches gas.value; device may send a custom label for mixture slots 236–255).

long_name str

Long primer name as echoed by the device.

HoldValves dataclass

HoldValves(
    name="hold_valves",
    token="HP",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_HOLD,
    max_firmware=None,
    firmware_families=_ALL_CONTROLLER_FIRMWARE_FAMILIES,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[HoldValvesRequest, 'ParsedFrame']

HP — hold valve(s) at current position (5v07+).

Wire: <uid><prefix>HP\r. Response is a post-op data frame with :attr:StatusCode.HLD active. Closed-loop control pauses until :data:CANCEL_VALVE_HOLD is sent.

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/valve.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_hold_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>HP\r.

Source code in src/alicatlib/commands/valve.py
def encode(self, ctx: DecodeContext, request: HoldValvesRequest) -> bytes:
    r"""Emit ``<uid><prefix>HP\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

HoldValvesClosed dataclass

HoldValvesClosed(
    name="hold_valves_closed",
    token="HC",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_HOLD,
    max_firmware=None,
    firmware_families=_ALL_CONTROLLER_FIRMWARE_FAMILIES,
    destructive=True,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[HoldValvesClosedRequest, 'ParsedFrame']

HC — hold valves closed (5v07+); destructive.

Wire: <uid><prefix>HC\r. Response is a post-op data frame with :attr:StatusCode.HLD active; flow and closed-loop control both stop. destructive=True forces explicit confirm=True on the request.

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/valve.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_hold_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>HC\r.

Source code in src/alicatlib/commands/valve.py
def encode(self, ctx: DecodeContext, request: HoldValvesClosedRequest) -> bytes:
    r"""Emit ``<uid><prefix>HC\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

HoldValvesClosedRequest dataclass

HoldValvesClosedRequest(confirm=False)

Arguments for :data:HOLD_VALVES_CLOSED.

Attributes:

Name Type Description
confirm bool

Must be TrueHC forces valves closed and interrupts any in-flight closed-loop control, which on a live process can be surprising. The session's destructive- confirm gate raises :class:AlicatValidationError when this is False (design §5.4 gating step 5).

HoldValvesRequest dataclass

HoldValvesRequest()

Arguments for :data:HOLD_VALVES (empty — HP takes no arguments).

LockDisplay dataclass

LockDisplay(
    name="lock_display",
    token="L",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.DISPLAY,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[LockDisplayRequest, 'ParsedFrame']

L — lock front-panel display (DISPLAY capability).

Wire: <uid><prefix>L\r. Response is a post-op data frame with :attr:StatusCode.LCK active. No primer firmware cutoff — capability gate suffices.

decode

decode(response, ctx)

Parse the post-op data frame — see :func:_decode_display_frame.

Source code in src/alicatlib/commands/display.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame — see :func:`_decode_display_frame`."""
    return _decode_display_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>L\r.

Source code in src/alicatlib/commands/display.py
def encode(self, ctx: DecodeContext, request: LockDisplayRequest) -> bytes:
    r"""Emit ``<uid><prefix>L\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

LockDisplayRequest dataclass

LockDisplayRequest()

Arguments for :data:LOCK_DISPLAY (empty — L takes no arguments).

LoopControlVariableCommand dataclass

LoopControlVariableCommand(
    name="loop_control_variable",
    token="LV",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_LV,
    max_firmware=None,
    firmware_families=frozenset(
        {FirmwareFamily.V8_V9, FirmwareFamily.V10}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[LoopControlVariableRequest, LoopControlState]

LV — loop-control variable get/set.

Firmware-gated at 9v00 within :attr:FirmwareFamily.V8_V9 and available on every V10 release. Set form validates the variable against the restricted eight-member subset via :func:coerce_loop_control_variable; an ineligible :class:Statistic raises :class:AlicatValidationError pre-I/O rather than letting the device reject silently.

decode

decode(response, ctx)

Parse <uid> <stat_code> into :class:LoopControlState.

Verified against a V10 capture on 2026-04-17 (design §16.6) — the device replies with just <uid> <stat_code> (2 fields, no human-readable label). The label is derived from the typed variable's :class:Statistic display name.

Source code in src/alicatlib/commands/loop_control.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> LoopControlState:
    """Parse ``<uid> <stat_code>`` into :class:`LoopControlState`.

    Verified against a V10 capture on 2026-04-17 (design §16.6) — the
    device replies with just ``<uid> <stat_code>`` (2 fields, no
    human-readable label). The label is derived from the typed
    variable's :class:`Statistic` display name.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    unit_id, stat_code_s = fields
    stat_code = parse_int(stat_code_s, field="statistic_code")
    variable = coerce_loop_control_variable(stat_code)
    # Derive a human-readable label from the corresponding Statistic
    # since the device doesn't echo one. LoopControlVariable is an
    # IntEnum (its `.value` is the wire code, not a name); the
    # statistic registry's display name is the right label source.
    return LoopControlState(
        unit_id=unit_id,
        variable=variable,
        label=variable.name,
    )

encode

encode(ctx, request)

Emit the LV query or set bytes.

Source code in src/alicatlib/commands/loop_control.py
def encode(
    self,
    ctx: DecodeContext,
    request: LoopControlVariableRequest,
) -> bytes:
    """Emit the LV query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.variable is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    lv = coerce_loop_control_variable(request.variable)
    return f"{ctx.unit_id}{prefix}{self.token} {int(lv)}\r".encode("ascii")

LoopControlVariableRequest dataclass

LoopControlVariableRequest(variable=None)

Arguments for :data:LOOP_CONTROL_VARIABLE.

Attributes:

Name Type Description
variable LoopControlVariable | str | int | None

One of the eight LV-eligible statistics, accepted as :class:LoopControlVariable / :class:Statistic / int code / name string. None issues the query form.

ManufacturingInfoCommand dataclass

ManufacturingInfoCommand(
    name="manufacturing_info",
    token="??M*",
    response_mode=ResponseMode.LINES,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.GP,
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=True,
    expected_lines=10,
    is_complete=None,
)

Bases: Command[ManufacturingInfoRequest, ManufacturingInfo]

??M* — 10-line manufacturing-info table (8v28+, numeric families).

The session gates this command on firmware family and version before dispatching, so the encoder never reaches a GP or pre-8v28 device.

decode

decode(response, ctx)

Parse the 10-line ??M* table into :class:ManufacturingInfo.

Source code in src/alicatlib/commands/system.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ManufacturingInfo:
    """Parse the 10-line ``??M*`` table into :class:`ManufacturingInfo`."""
    del ctx
    if not isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected multi-line response, got single line",
        )
    return parse_manufacturing_info(response)

encode

encode(ctx, request)

Emit <unit_id><prefix>??M*\r.

Source code in src/alicatlib/commands/system.py
def encode(
    self,
    ctx: DecodeContext,
    request: ManufacturingInfoRequest,
) -> bytes:
    r"""Emit ``<unit_id><prefix>??M*\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

ManufacturingInfoRequest dataclass

ManufacturingInfoRequest()

Arguments for :data:MANUFACTURING_INFO — no user-provided fields.

PollData dataclass

PollData(
    name="poll_data",
    token="",
    response_mode=ResponseMode.LINE,
    device_kinds=frozenset(DeviceKind),
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=True,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[PollRequest, 'ParsedFrame']

Poll the device's current data frame (primer A\r).

Encodes as just {unit_id}{prefix}\r — no token. Decodes against the session-cached :class:DataFrameFormat carried on the :class:DecodeContext.

The decode layer returns a :class:ParsedFrame rather than a :class:DataFrame because timing belongs to the I/O layer, not the pure decode step. The session's execute() wraps via :meth:DataFrame.from_parsed before returning from the facade (Device.poll()), so users never see a raw :class:ParsedFrame unless they go through the session.execute(POLL_DATA, ...) escape hatch. See design §5.6.

decode

decode(response, ctx)

Parse the raw data frame against ctx.data_frame_format.

Source code in src/alicatlib/commands/polling.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the raw data frame against ``ctx.data_frame_format``."""
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    if ctx.data_frame_format is None:
        raise AlicatParseError(
            "poll_data requires ctx.data_frame_format; session must probe ??D* first",
            field_name="data_frame_format",
            expected="DataFrameFormat",
            actual=None,
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    return ctx.data_frame_format.parse(response)

encode

encode(ctx, request)

Emit the device's poll bytes — <unit_id><prefix>\r, no token.

Source code in src/alicatlib/commands/polling.py
def encode(
    self,
    ctx: DecodeContext,
    request: PollRequest,
) -> bytes:
    r"""Emit the device's poll bytes — ``<unit_id><prefix>\r``, no token."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}\r".encode("ascii")

PollRequest dataclass

PollRequest()

Arguments for :data:POLL_DATA — no user-provided fields.

PowerUpTare dataclass

PowerUpTare(
    name="power_up_tare",
    token="ZCP",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_ZCA_ZCP,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[PowerUpTareRequest, PowerUpTareState]

ZCP — power-up tare query/set (all devices, V10 10v05+).

Wire shape (primer p. 19):

  • Query: <uid><prefix>ZCP\r
  • Set: <uid><prefix>ZCP <enable>\r

Response: <uid> <enable> (2 fields).

decode

decode(response, ctx)

Parse <uid> <enable> into :class:PowerUpTareState.

Source code in src/alicatlib/commands/tare.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> PowerUpTareState:
    """Parse ``<uid> <enable>`` into :class:`PowerUpTareState`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    unit_id, enable_s = fields
    return PowerUpTareState(
        unit_id=unit_id,
        enabled=parse_bool_code(enable_s, field=f"{self.name}.enabled"),
    )

encode

encode(ctx, request)

Emit ZCP query or set bytes.

Source code in src/alicatlib/commands/tare.py
def encode(self, ctx: DecodeContext, request: PowerUpTareRequest) -> bytes:
    """Emit ZCP query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.enable is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token} {int(request.enable)}\r".encode("ascii")

PowerUpTareRequest dataclass

PowerUpTareRequest(enable=None)

Arguments for :data:POWER_UP_TARE.

Attributes:

Name Type Description
enable bool | None

True / False sets power-up tare; None issues the query form.

RampRate dataclass

RampRate(
    name="ramp_rate",
    token="SR",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_RAMP_RATE,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[RampRateRequest, RampRateSetting]

SR — max ramp rate query/set (7v11+).

Wire shape:

  • Query: <uid><prefix>SR\r
  • Set: <uid><prefix>SR <max_ramp> <time_unit_code>\r

Response (primer p. 15, hardware-correctable): 5 fields — <uid> <max_ramp> <setpoint_unit_code> <time_value> <rate_unit_label> (e.g. A 25.0 12 4 SCCM/s).

decode

decode(response, ctx)

Parse the 5-field reply into :class:RampRateSetting.

Source code in src/alicatlib/commands/control.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> RampRateSetting:
    """Parse the 5-field reply into :class:`RampRateSetting`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=5)
    unit_id, ramp_s, unit_code_s, time_s, rate_label = fields
    max_ramp = parse_float(ramp_s, field=f"{self.name}.max_ramp")
    setpoint_unit_code = parse_int(unit_code_s, field=f"{self.name}.setpoint_unit_code")
    time_code = parse_int(time_s, field=f"{self.name}.time_unit")
    try:
        time_unit = TimeUnit(time_code)
    except ValueError as err:
        raise AlicatValidationError(
            f"{self.name}: device returned time-unit code {time_code!r} "
            f"not in TimeUnit enum {sorted(int(m) for m in TimeUnit)}",
            context=ErrorContext(
                command_name=self.name,
                raw_response=response,
                extra={"time_code": time_code},
            ),
        ) from err

    # The rate label is ``<setpoint_unit>/<time_unit>`` (e.g.
    # ``SCCM/s``). Derive the typed setpoint unit from the
    # setpoint-unit code; the registry is the authoritative source
    # for ``setpoint_unit``, and the rate label is kept as the raw
    # diagnostic string.
    matches = {u for (_cat, c), u in UNIT_BY_CATEGORY_CODE.items() if c == setpoint_unit_code}
    setpoint_unit: Unit | None = next(iter(matches)) if len(matches) == 1 else None

    return RampRateSetting(
        unit_id=unit_id,
        max_ramp=max_ramp,
        setpoint_unit_code=setpoint_unit_code,
        setpoint_unit=setpoint_unit,
        time_unit=time_unit,
        rate_unit_label=rate_label,
    )

encode

encode(ctx, request)

Emit the SR query or set bytes.

Source code in src/alicatlib/commands/control.py
def encode(self, ctx: DecodeContext, request: RampRateRequest) -> bytes:
    """Emit the SR query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.max_ramp is None:
        # Query — ``time_unit`` is silently ignored here (set-only
        # semantics). Primer has no query-form ``time_unit``.
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

    if request.max_ramp < 0:
        raise AlicatValidationError(
            f"{self.name}: max_ramp must be >= 0 (0 disables), got {request.max_ramp}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"max_ramp": request.max_ramp},
            ),
        )
    if request.time_unit is None:
        raise AlicatValidationError(
            f"{self.name}: time_unit is required when setting max_ramp "
            "(including when disabling ramping via max_ramp=0); pass a "
            "TimeUnit member.",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"max_ramp": request.max_ramp},
            ),
        )
    return (
        f"{ctx.unit_id}{prefix}{self.token} {request.max_ramp} {int(request.time_unit)}\r"
    ).encode("ascii")

RampRateRequest dataclass

RampRateRequest(max_ramp=None, time_unit=None)

Arguments for :data:RAMP_RATE.

Attributes:

Name Type Description
max_ramp float | None

Ramp step size in the current engineering units. 0.0 disables ramping; negative values are rejected pre-I/O. None issues the query form.

time_unit TimeUnit | None

Time base (:class:TimeUnit) — required when max_ramp is set; silently ignored on query. Primer p. 15 says the time-unit parameter is required even when disabling ramping, so the encoder enforces presence whenever max_ramp is not None.

RequestData dataclass

RequestData(
    name="request_data",
    token="DV",
    response_mode=ResponseMode.LINE,
    device_kinds=frozenset(DeviceKind),
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[RequestDataRequest, tuple[float | None, ...]]

DV — request a caller-chosen subset of statistics.

Decoder returns a positional tuple of parsed values aligned with :attr:RequestDataRequest.statistics. The -- wire sentinel (an invalid statistic code passed through per-slot) maps to None.

The wire reply has no unit-id prefix — unique in the catalog and load-bearing for the decoder. The facade :meth:alicatlib.devices.base.Device.request wraps the tuple into a :class:~alicatlib.devices.models.MeasurementSet with the correct unit_id / averaging_ms / received_at mapping (same pure-parse → timing-wrap split as :data:POLL_DATA).

decode

decode(response, ctx)

Parse <val1> [val2...] into a positional tuple.

-- slots resolve to None; every other token parses as a float. Unit-id prefix absence is the load-bearing wire property — if a future firmware adds one, this decoder needs an update (pin via a fresh fixture).

Source code in src/alicatlib/commands/polling.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> tuple[float | None, ...]:
    """Parse ``<val1> [val2...]`` into a positional tuple.

    ``--`` slots resolve to ``None``; every other token parses as a
    float. Unit-id prefix absence is the load-bearing wire property
    — if a future firmware adds one, this decoder needs an update
    (pin via a fresh fixture).
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name)
    if not fields:
        raise AlicatParseError(
            f"{self.name}: empty reply",
            field_name="values",
            expected="at least one value",
            actual=text,
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    return tuple(
        None if _is_absent(value) else parse_optional_float(value, field=f"value[{i}]")
        for i, value in enumerate(fields)
    )

encode

encode(ctx, request)

Emit <unit_id><prefix>DV <time_ms> <stat1> [stat2...]\r.

Validates pre-I/O per design §5.20 item 4: averaging in 1..9999 ms, 1..13 statistics.

Source code in src/alicatlib/commands/polling.py
def encode(
    self,
    ctx: DecodeContext,
    request: RequestDataRequest,
) -> bytes:
    r"""Emit ``<unit_id><prefix>DV <time_ms> <stat1> [stat2...]\r``.

    Validates pre-I/O per design §5.20 item 4: averaging in
    ``1..9999`` ms, 1..13 statistics.
    """
    if not (_MIN_AVERAGING_MS <= request.averaging_ms <= _MAX_AVERAGING_MS):
        raise AlicatValidationError(
            f"{self.name}: averaging_ms must be in "
            f"[{_MIN_AVERAGING_MS}, {_MAX_AVERAGING_MS}], "
            f"got {request.averaging_ms}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"averaging_ms": request.averaging_ms},
            ),
        )
    count = len(request.statistics)
    if not (_MIN_DV_STATISTICS <= count <= _MAX_DV_STATISTICS):
        raise AlicatValidationError(
            f"{self.name}: statistics count must be in "
            f"[{_MIN_DV_STATISTICS}, {_MAX_DV_STATISTICS}], got {count}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"statistics_count": count},
            ),
        )
    codes = [_resolve_statistic_code(s)[1] for s in request.statistics]
    prefix = ctx.command_prefix.decode("ascii")
    stats_part = " ".join(str(c) for c in codes)
    return f"{ctx.unit_id}{prefix}{self.token} {request.averaging_ms} {stats_part}\r".encode(
        "ascii",
    )

RequestDataRequest dataclass

RequestDataRequest(statistics, averaging_ms=1)

Arguments for :data:REQUEST_DATA.

Attributes:

Name Type Description
statistics Sequence[Statistic | str]

1–13 :class:Statistic members (or alias strings / canonical values). Order is preserved on the wire and in the returned values tuple.

averaging_ms int

Rolling averaging window in milliseconds (primer: 1–9999). The device rejects 0 on the wire, so the encoder rejects it pre-I/O with :class:AlicatValidationError.

ResponseMode

Bases: Enum

What the transport should do for a command after writing.

The :class:~alicatlib.devices.session.Session uses this to pick write_only / query_line / query_lines without per-command branching in the session code.

LINE class-attribute instance-attribute

LINE = 'line'

Single-line response terminated by CR. The common case.

LINES class-attribute instance-attribute

LINES = 'lines'

Multiline table response. See :attr:Command.expected_lines / :attr:Command.is_complete for the termination contract.

NONE class-attribute instance-attribute

NONE = 'none'

Write-only; no read. Example: @@ stop-stream.

STREAM class-attribute instance-attribute

STREAM = 'stream'

Enters streaming mode. Not a normal request/response command.

Setpoint dataclass

Setpoint(
    name="setpoint",
    token="LS",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=MIN_FIRMWARE_SETPOINT_LS,
    max_firmware=None,
    firmware_families=frozenset(
        {FirmwareFamily.V8_V9, FirmwareFamily.V10}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[SetpointRequest, SetpointState]

LS — modern setpoint get/set (V10 + V8_V9 ≥ 9v00).

Wire shape:

  • Query: <uid><prefix>LS\r
  • Set: <uid><prefix>LS <value>\r

Response is a post-op data frame containing the updated Setpoint field; the decoder returns the :class:ParsedFrame and the facade converts to :class:SetpointState with facade-level timing (same pattern as tare / legacy-gas).

Gated to :attr:DeviceKind.FLOW_CONTROLLER and :attr:DeviceKind.PRESSURE_CONTROLLER — a plain meter has no setpoint. Firmware gating handles V8_V9 < 9v00 → redirect to :data:SETPOINT_LEGACY at the facade layer.

decode

decode(response, ctx)

Parse the modern LS 5-field reply into :class:SetpointState.

Source code in src/alicatlib/commands/setpoint.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> SetpointState:
    """Parse the modern ``LS`` 5-field reply into :class:`SetpointState`."""
    return _decode_setpoint_reply(self.name, response, ctx)

encode

encode(ctx, request)

Emit the LS query or set bytes.

Source code in src/alicatlib/commands/setpoint.py
def encode(self, ctx: DecodeContext, request: SetpointRequest) -> bytes:
    r"""Emit the LS query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.value is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    value_s = _format_setpoint_value(request.value)
    return f"{ctx.unit_id}{prefix}{self.token} {value_s}\r".encode("ascii")

SetpointLegacy dataclass

SetpointLegacy(
    name="setpoint_legacy",
    token="S",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=_MAX_FIRMWARE_SETPOINT_LEGACY_V8V9,
    firmware_families=frozenset(
        {FirmwareFamily.V1_V7, FirmwareFamily.V8_V9}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[SetpointLegacyRequest, 'ParsedFrame']

S — legacy setpoint set for pre-9v00 firmware.

Applies to :attr:FirmwareFamily.V1_V7 (all) and :attr:FirmwareFamily.V8_V9 < 9v00. Session's family-scoped max_firmware gate blocks V8_V9 ≥ 9v00 (redirect to :data:SETPOINT); V1_V7 has no upper bound.

Response: post-op data frame, same shape as :data:SETPOINT.

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/setpoint.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_setpoint_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <unit_id><prefix>S <value>\r.

Source code in src/alicatlib/commands/setpoint.py
def encode(self, ctx: DecodeContext, request: SetpointLegacyRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>S <value>\r``."""
    prefix = ctx.command_prefix.decode("ascii")
    value_s = _format_setpoint_value(request.value)
    return f"{ctx.unit_id}{prefix}{self.token} {value_s}\r".encode("ascii")

SetpointLegacyRequest dataclass

SetpointLegacyRequest(value)

Arguments for :data:SETPOINT_LEGACY.

Legacy S is set-only — there is no query form on firmware that predates LS. The facade rejects value is None pre-I/O with :class:AlicatUnsupportedCommandError and routes query intents to the modern :data:SETPOINT if the firmware supports it.

SetpointRequest dataclass

SetpointRequest(value=None)

Arguments for :data:SETPOINT.

Attributes:

Name Type Description
value float | None

Setpoint target in the device's current engineering units. None issues the query form (LS alone).

SetpointSource dataclass

SetpointSource(
    name="setpoint_source",
    token="LSS",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=MIN_FIRMWARE_LSS,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[SetpointSourceRequest, SetpointSourceResult]

LSS — setpoint-source get/set.

Wire shape:

  • Query: <uid><prefix>LSS\r
  • Set: <uid><prefix>LSS <mode>[ <save>]\r — mode ∈ {S, A, U}.

Response: <uid> <mode> (2 fields). The facade caches the decoded mode on :attr:Session.setpoint_source so :meth:FlowController.setpoint can detect the LSS=A silently ignores serial setpoint failure mode.

decode

decode(response, ctx)

Parse <uid> <mode> into :class:SetpointSourceResult.

Source code in src/alicatlib/commands/setpoint.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> SetpointSourceResult:
    """Parse ``<uid> <mode>`` into :class:`SetpointSourceResult`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    return SetpointSourceResult(unit_id=fields[0], mode=fields[1])

encode

encode(ctx, request)

Emit the LSS query or set bytes.

Source code in src/alicatlib/commands/setpoint.py
def encode(
    self,
    ctx: DecodeContext,
    request: SetpointSourceRequest,
) -> bytes:
    """Emit the LSS query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.mode is None:
        # Query form — ``save`` is set-only semantics, silently
        # ignored in query shape.
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    mode = request.mode.strip().upper()
    if mode not in SETPOINT_SOURCE_MODES:
        raise AlicatValidationError(
            f"LSS mode {request.mode!r} not one of {sorted(SETPOINT_SOURCE_MODES)}",
            context=ErrorContext(
                command_name=self.name,
                extra={"mode": request.mode},
            ),
        )
    body = f"{ctx.unit_id}{prefix}{self.token} {mode}"
    if request.save is not None:
        body += f" {'1' if request.save else '0'}"
    return (body + "\r").encode("ascii")

SetpointSourceRequest dataclass

SetpointSourceRequest(mode=None, save=None)

Arguments for :data:SETPOINT_SOURCE.

Attributes:

Name Type Description
mode str | None

"S" (serial), "A" (analog), or "U" (user-knob). None issues the query form.

save bool | None

True persists to EEPROM (design §5.20.7 wear-rate guard applies); None / False keeps the change volatile.

SetpointSourceResult dataclass

SetpointSourceResult(unit_id, mode)

Typed response for :data:SETPOINT_SOURCE.

Decodes the primer's <uid> <mode> two-field reply. Keeping the decoded mode as a str (rather than a dedicated enum) lets the facade treat unknown modes as best-effort diagnostics without fighting an enum-coerce failure — the mode is re-validated against :data:SETPOINT_SOURCE_MODES on the facade set path.

StpNtpPressure dataclass

StpNtpPressure(
    name="stp_ntp_pressure",
    token="DCFRP",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=Medium.GAS,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_V10_05,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[StpNtpPressureRequest, StpNtpPressureSetting]

DCFRP — STP/NTP pressure reference query/set (V10 10v05+, mass-flow).

Wire shape (primer p. 18):

  • Query: <uid><prefix>DCFRP <S|N>\r
  • Set: <uid><prefix>DCFRP <S|N> <unit_code> <pressure>\r

Response: <uid> <pressure> <unit_code> <unit_label> (4 fields).

decode

decode(response, ctx)

Parse the 4-field reply into :class:StpNtpPressureSetting.

mode is carried on the request-echo convention — the device doesn't re-echo S / N in the reply, so the facade fills the returned dataclass's mode via :func:dataclasses.replace (same pattern as DCU / FPF).

Source code in src/alicatlib/commands/data_readings.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> StpNtpPressureSetting:
    """Parse the 4-field reply into :class:`StpNtpPressureSetting`.

    ``mode`` is carried on the request-echo convention — the device
    doesn't re-echo ``S`` / ``N`` in the reply, so the facade
    fills the returned dataclass's ``mode`` via
    :func:`dataclasses.replace` (same pattern as DCU / FPF).
    """
    del ctx
    unit_id, value, code, label = _stp_ntp_decode(self.name, response)
    return StpNtpPressureSetting(
        unit_id=unit_id,
        mode=StpNtpMode.STP,  # facade replaces with request.mode
        pressure=value,
        unit_code=code,
        unit=_resolve_unit_label(label, code),
        unit_label=label,
    )

encode

encode(ctx, request)

Emit DCFRP query or set bytes.

Source code in src/alicatlib/commands/data_readings.py
def encode(self, ctx: DecodeContext, request: StpNtpPressureRequest) -> bytes:
    """Emit DCFRP query or set bytes."""
    return _stp_ntp_encode(self.token, ctx, request.mode, request.pressure, request.unit_code)

StpNtpPressureRequest dataclass

StpNtpPressureRequest(mode, pressure=None, unit_code=None)

Arguments for :data:STP_NTP_PRESSURE.

Attributes:

Name Type Description
mode StpNtpMode

:class:StpNtpMode — which reference (standard or normal) to query or set.

pressure float | None

Reference pressure. None issues the query form.

unit_code int | None

Engineering-unit code for the pressure value. None or 0 means "keep current units" on set; the query form ignores this.

StpNtpTemperature dataclass

StpNtpTemperature(
    name="stp_ntp_temperature",
    token="DCFRT",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=Medium.GAS,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_V10_05,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[StpNtpTemperatureRequest, StpNtpTemperatureSetting]

DCFRT — STP/NTP temperature reference query/set (V10 10v05+, mass-flow).

Wire + response shape: see :class:StpNtpPressure; substitute DCFRT for DCFRP and temperature for pressure.

decode

decode(response, ctx)

Parse the 4-field reply. Facade replaces mode from the request.

Source code in src/alicatlib/commands/data_readings.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> StpNtpTemperatureSetting:
    """Parse the 4-field reply. Facade replaces ``mode`` from the request."""
    del ctx
    unit_id, value, code, label = _stp_ntp_decode(self.name, response)
    return StpNtpTemperatureSetting(
        unit_id=unit_id,
        mode=StpNtpMode.STP,  # facade replaces with request.mode
        temperature=value,
        unit_code=code,
        unit=_resolve_unit_label(label, code),
        unit_label=label,
    )

encode

encode(ctx, request)

Emit DCFRT query or set bytes.

Source code in src/alicatlib/commands/data_readings.py
def encode(self, ctx: DecodeContext, request: StpNtpTemperatureRequest) -> bytes:
    """Emit DCFRT query or set bytes."""
    return _stp_ntp_encode(
        self.token, ctx, request.mode, request.temperature, request.unit_code
    )

StpNtpTemperatureRequest dataclass

StpNtpTemperatureRequest(
    mode, temperature=None, unit_code=None
)

Arguments for :data:STP_NTP_TEMPERATURE.

Same shape as :class:StpNtpPressureRequest but for temperature.

StreamingRate dataclass

StreamingRate(
    name="streaming_rate",
    token="NCS",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=MIN_FIRMWARE_NCS,
    max_firmware=None,
    firmware_families=frozenset({FirmwareFamily.V10}),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[StreamingRateRequest, StreamingRateResult]

NCS — query or set the streaming interval.

Wire shape (primer p. 22):

  • Query: <uid><prefix>NCS\r
  • Set: <uid><prefix>NCS <interval_ms>\r

Response (primer-derived, hardware-correctable): <uid> <interval_ms>. The primer says the device "confirms the rate"; a real capture may refine the reply shape, which is a one-line regex change to the decoder per design §15.3.

decode

decode(response, ctx)

Parse <uid> <interval_ms> into :class:StreamingRateResult.

Two-field reply per primer p. 22 — the device echoes its unit id and the effective interval. Hardware-correctable per design §15.3; any observed extra field surfaces as a parse error pointing at the raw bytes.

Source code in src/alicatlib/commands/streaming.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> StreamingRateResult:
    """Parse ``<uid> <interval_ms>`` into :class:`StreamingRateResult`.

    Two-field reply per primer p. 22 — the device echoes its unit id
    and the effective interval. Hardware-correctable per design §15.3;
    any observed extra field surfaces as a parse error pointing at
    the raw bytes.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    unit_id, rate_s = fields
    return StreamingRateResult(
        unit_id=unit_id,
        rate_ms=parse_int(rate_s, field="rate_ms"),
    )

encode

encode(ctx, request)

Emit the NCS query or set bytes.

rate_ms must be a non-negative int. 0 is a valid device setting (as-fast-as-possible); None distinguishes the query form.

Source code in src/alicatlib/commands/streaming.py
def encode(
    self,
    ctx: DecodeContext,
    request: StreamingRateRequest,
) -> bytes:
    """Emit the NCS query or set bytes.

    ``rate_ms`` must be a non-negative ``int``. ``0`` is a valid
    device setting (as-fast-as-possible); ``None`` distinguishes
    the query form.
    """
    prefix = ctx.command_prefix.decode("ascii")
    if request.rate_ms is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    # Runtime guards for callers bypassing the type system.
    # ``bool`` is an ``int`` subclass, so the type-annotated
    # ``int | None`` admits ``True`` / ``False`` — but those would
    # silently encode as ``1`` / ``0`` ms, an accidental rate the
    # caller almost certainly didn't mean. Reject explicitly; the
    # four sentinels (``None``, ``0``, ``False``, ``""``) must stay
    # distinct per the encoder rule in design §5.4.
    # ``float`` and other numeric types are rejected to keep the
    # wire format integral — ``50.0`` would format as ``"50.0"``
    # and the device would reject it.
    rate_ms = request.rate_ms
    if isinstance(rate_ms, bool) or type(rate_ms) is not int:
        raise AlicatValidationError(
            f"{self.name}: rate_ms must be int, got {type(rate_ms).__name__}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"rate_ms": rate_ms},
            ),
        )
    if rate_ms < 0:
        raise AlicatValidationError(
            f"{self.name}: rate_ms must be >= 0, got {rate_ms}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"rate_ms": rate_ms},
            ),
        )
    return f"{ctx.unit_id}{prefix}{self.token} {rate_ms}\r".encode("ascii")

StreamingRateRequest dataclass

StreamingRateRequest(rate_ms=None)

Arguments for :data:STREAMING_RATE.

Attributes:

Name Type Description
rate_ms int | None

Streaming interval in milliseconds. None issues the query form. A non-negative integer sets the interval — 0 is the primer's "send as fast as the wire allows" setting and is distinct from None.

StreamingRateResult dataclass

StreamingRateResult(unit_id, rate_ms)

Reply payload for :data:STREAMING_RATE.

Attributes:

Name Type Description
unit_id str

Echoed unit id from the device.

rate_ms int

Current streaming interval, in milliseconds.

TareAbsolutePressure dataclass

TareAbsolutePressure(
    name="tare_absolute_pressure",
    token="PC",
    response_mode=ResponseMode.LINE,
    device_kinds=_PRESSURE_AWARE_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.TAREABLE_ABSOLUTE_PRESSURE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TareAbsolutePressureRequest, 'ParsedFrame']

PC — calibrate absolute pressure against the onboard barometer.

Gated on :attr:Capability.TAREABLE_ABSOLUTE_PRESSURE — NOT on :attr:Capability.BAROMETER. Hardware validation on 2026-04-17 established that flow-controller devices report a firmware-computed barometer reading (so BAROMETER probes positive) but do not have a process-port absolute-pressure sensor and therefore reject or silently ignore PC. Four devices confirmed the pattern (8v17 MCR-200, 8v30 MCR-500, 6v21 MCR-775, 7v09 MCP-50); see design §16.6.7 for the narrative and Capability.BAROMETER / :attr:Capability.TAREABLE_ABSOLUTE_PRESSURE for the semantic split.

No safe probe for TAREABLE_ABSOLUTE_PRESSURE exists (probing would tare the device). Users with a pressure meter/controller that supports PC opt in via assume_capabilities on :func:~alicatlib.devices.factory.open_device. Devices without the capability raise :class:AlicatMissingHardwareError pre-I/O.

Precondition: the gauge pressure should be at atmosphere — the device uses its barometer reading as the reference.

decode

decode(response, ctx)

Parse the post-tare data frame.

Source code in src/alicatlib/commands/tare.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-tare data frame."""
    return _decode_tare_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <unit_id><prefix>PC\r.

Source code in src/alicatlib/commands/tare.py
def encode(
    self,
    ctx: DecodeContext,
    request: TareAbsolutePressureRequest,
) -> bytes:
    r"""Emit ``<unit_id><prefix>PC\r``."""
    del request
    return _encode_bare_tare(self.token, ctx)

TareAbsolutePressureRequest dataclass

TareAbsolutePressureRequest()

Arguments for :data:TARE_ABSOLUTE_PRESSURE — no user-provided fields.

TareFlow dataclass

TareFlow(
    name="tare_flow",
    token="T",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TareFlowRequest, 'ParsedFrame']

T — tare the flow reading.

Precondition (caller's responsibility): no gas is flowing through the device. The library cannot verify this — the facade emits an INFO log noting the precondition on every call.

decode

decode(response, ctx)

Parse the post-tare data frame.

Source code in src/alicatlib/commands/tare.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-tare data frame."""
    return _decode_tare_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <unit_id><prefix>T\r.

Source code in src/alicatlib/commands/tare.py
def encode(self, ctx: DecodeContext, request: TareFlowRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>T\r``."""
    del request
    return _encode_bare_tare(self.token, ctx)

TareFlowRequest dataclass

TareFlowRequest()

Arguments for :data:TARE_FLOW — no user-provided fields.

TareGaugePressure dataclass

TareGaugePressure(
    name="tare_gauge_pressure",
    token="TP",
    response_mode=ResponseMode.LINE,
    device_kinds=_PRESSURE_AWARE_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(
        {
            FirmwareFamily.V1_V7,
            FirmwareFamily.V8_V9,
            FirmwareFamily.V10,
        }
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TareGaugePressureRequest, 'ParsedFrame']

TP — tare the gauge-pressure reading.

Precondition (caller's responsibility): line depressurised to atmosphere. Applies to both flow devices (which carry a pressure transducer for compensation) and pressure devices.

decode

decode(response, ctx)

Parse the post-tare data frame.

Source code in src/alicatlib/commands/tare.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-tare data frame."""
    return _decode_tare_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <unit_id><prefix>TP\r.

Source code in src/alicatlib/commands/tare.py
def encode(self, ctx: DecodeContext, request: TareGaugePressureRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>TP\r``."""
    del request
    return _encode_bare_tare(self.token, ctx)

TareGaugePressureRequest dataclass

TareGaugePressureRequest()

Arguments for :data:TARE_GAUGE_PRESSURE — no user-provided fields.

TotalizerConfigCommand dataclass

TotalizerConfigCommand(
    name="totalizer_config",
    token="TC",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_TC,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TotalizerConfigRequest, TotalizerConfig]

TC — totalizer configuration query/set (V10 10v00+, flow devices).

Wire shape:

  • Query: <uid><prefix>TC <totalizer>\r
  • Disable: <uid><prefix>TC <totalizer> 1\r
  • Set: <uid><prefix>TC <totalizer> <flow_stat> <mode> <limit_mode> <digits> <decimal>\r

Response: <uid> <flow_stat> <mode> <limit_mode> <digits> <decimal> (6 fields — primer's explicit field list). The totalizer id is the caller's responsibility to track; the facade re-populates :attr:TotalizerConfig.totalizer from the request.

decode

decode(response, ctx)

Parse the TC reply into :class:TotalizerConfig.

Primer-derived shape: <uid> <stat> <mode> <limit_mode> <digits> <decimal> — 6 fields; totalizer id not echoed. Hardware validation (2026-04-17) on 10v20 firmware shows the totalizer id IS echoed as the second token, producing 7 fields (<uid> <totalizer_id> <stat> <mode> <limit_mode> <digits> <decimal>). Accept both shapes; drop the echoed id when present. The facade re-populates :attr:totalizer from the request either way.

Source code in src/alicatlib/commands/totalizer.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> TotalizerConfig:
    """Parse the ``TC`` reply into :class:`TotalizerConfig`.

    Primer-derived shape: ``<uid> <stat> <mode> <limit_mode>
    <digits> <decimal>`` — 6 fields; totalizer id not echoed.
    Hardware validation (2026-04-17) on 10v20 firmware shows the
    totalizer id IS echoed as the second token, producing 7 fields
    (``<uid> <totalizer_id> <stat> <mode> <limit_mode> <digits>
    <decimal>``). Accept both shapes; drop the echoed id when
    present. The facade re-populates :attr:`totalizer` from the
    request either way.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name)
    if len(fields) == _TC_FIELD_COUNT + 1:
        # Real 10v20: skip the totalizer_id echo at index 1.
        unit_id = fields[0]
        stat_s, mode_s, limit_s, digits_s, decimal_s = fields[2:]
    elif len(fields) == _TC_FIELD_COUNT:
        unit_id, stat_s, mode_s, limit_s, digits_s, decimal_s = fields
    else:
        raise AlicatParseError(
            f"{self.name}: expected {_TC_FIELD_COUNT} or "
            f"{_TC_FIELD_COUNT + 1} fields, got {len(fields)}{text!r}",
            field_name="totalizer_config",
            expected=f"{_TC_FIELD_COUNT} or {_TC_FIELD_COUNT + 1}",
            actual=len(fields),
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    return TotalizerConfig(
        unit_id=unit_id,
        totalizer=TotalizerId.FIRST,  # facade replaces from request
        flow_statistic_code=parse_int(stat_s, field=f"{self.name}.flow_statistic_code"),
        mode=TotalizerMode(parse_int(mode_s, field=f"{self.name}.mode")),
        limit_mode=TotalizerLimitMode(
            parse_int(limit_s, field=f"{self.name}.limit_mode"),
        ),
        digits=parse_int(digits_s, field=f"{self.name}.digits"),
        decimal_place=parse_int(decimal_s, field=f"{self.name}.decimal_place"),
    )

encode

encode(ctx, request)

Emit the TC query or set bytes.

Source code in src/alicatlib/commands/totalizer.py
def encode(self, ctx: DecodeContext, request: TotalizerConfigRequest) -> bytes:
    """Emit the TC query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token} {int(request.totalizer)}"
    if request.flow_statistic_code is None:
        return (head + "\r").encode("ascii")
    # Disable form per primer: ``TC <tot> 1`` with no further args.
    if request.flow_statistic_code == 1:
        return f"{head} 1\r".encode("ascii")
    # Full set — require all five config fields. Partial sets use
    # the primer's ``-1`` "keep current" sentinel per field, which
    # is encoded the same way as any other integer here.
    if (
        request.mode is None
        or request.limit_mode is None
        or request.digits is None
        or request.decimal_place is None
    ):
        raise AlicatValidationError(
            f"{self.name}: enabling / reconfiguring requires mode, limit_mode, "
            f"digits, and decimal_place (use TotalizerMode.KEEP / "
            f"TotalizerLimitMode.KEEP to preserve the current value of a "
            f"specific field); to disable instead, pass flow_statistic_code=1 "
            f"with the rest left ``None``.",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={
                    "flow_statistic_code": request.flow_statistic_code,
                    "mode": request.mode,
                    "limit_mode": request.limit_mode,
                    "digits": request.digits,
                    "decimal_place": request.decimal_place,
                },
            ),
        )
    if not (TOTALIZER_TC_DIGITS_MIN <= request.digits <= TOTALIZER_TC_DIGITS_MAX):
        raise AlicatValidationError(
            f"{self.name}: digits must be in "
            f"[{TOTALIZER_TC_DIGITS_MIN}, {TOTALIZER_TC_DIGITS_MAX}], "
            f"got {request.digits}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"digits": request.digits},
            ),
        )
    if not (TOTALIZER_TC_DECIMAL_MIN <= request.decimal_place <= TOTALIZER_TC_DECIMAL_MAX):
        raise AlicatValidationError(
            f"{self.name}: decimal_place must be in "
            f"[{TOTALIZER_TC_DECIMAL_MIN}, {TOTALIZER_TC_DECIMAL_MAX}], "
            f"got {request.decimal_place}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"decimal_place": request.decimal_place},
            ),
        )
    return (
        f"{head} {request.flow_statistic_code} {int(request.mode)} "
        f"{int(request.limit_mode)} {request.digits} {request.decimal_place}\r"
    ).encode("ascii")

TotalizerConfigRequest dataclass

TotalizerConfigRequest(
    totalizer,
    flow_statistic_code=None,
    mode=None,
    limit_mode=None,
    digits=None,
    decimal_place=None,
)

Arguments for :data:TOTALIZER_CONFIG.

Attributes:

Name Type Description
totalizer TotalizerId

Which totalizer (1 or 2) to query or configure. Required for both query and set — TC is per-totalizer on the wire.

flow_statistic_code int | None

None issues the query form. 1 disables the totalizer (subsequent fields are omitted). -1 keeps the current statistic. Any other positive integer selects the flow statistic to accumulate (primer Appendix A).

mode / limit_mode / digits / decimal_place

Required whenever flow_statistic_code is set to a value other than 1 (the disable sentinel). Ranges: digits ∈ [7, 10] (default 7), decimal_place ∈ [0, 9]. Use :attr:TotalizerMode.KEEP / :attr:TotalizerLimitMode.KEEP to retain the current setting for just one field.

TotalizerReset dataclass

TotalizerReset(
    name="totalizer_reset",
    token="T",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_RESET,
    max_firmware=None,
    firmware_families=_V8_V9_V10,
    destructive=True,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TotalizerResetRequest, 'ParsedFrame']

T <n> — reset totalizer count (8v00+, flow devices). Destructive.

Wire: <uid><prefix>T <totalizer>\ralways with the numeric argument. Primer's bare <uid>T\r is the flow-tare command (see :data:TARE_FLOW); the two share a token and collide at the wire level if the numeric arg is omitted.

Response: post-op data frame with the totalizer reset to zero.

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/totalizer.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_reset_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>T <totalizer>\r — always with the numeric arg.

Source code in src/alicatlib/commands/totalizer.py
def encode(self, ctx: DecodeContext, request: TotalizerResetRequest) -> bytes:
    r"""Emit ``<uid><prefix>T <totalizer>\r`` — always with the numeric arg."""
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token} {int(request.totalizer)}\r".encode("ascii")

TotalizerResetPeak dataclass

TotalizerResetPeak(
    name="totalizer_reset_peak",
    token="TP",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_RESET,
    max_firmware=None,
    firmware_families=_V8_V9_V10,
    destructive=True,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TotalizerResetPeakRequest, 'ParsedFrame']

TP <n> — reset totalizer peak reading (8v00+). Destructive.

Wire: <uid><prefix>TP <totalizer>\r. Always emits the numeric argument so the spec can never accidentally produce the :data:TARE_GAUGE_PRESSURE wire form (bare <uid>TP\r).

decode

decode(response, ctx)

Parse the post-op data frame.

Source code in src/alicatlib/commands/totalizer.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame."""
    return _decode_reset_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>TP <totalizer>\r — always with the numeric arg.

Source code in src/alicatlib/commands/totalizer.py
def encode(self, ctx: DecodeContext, request: TotalizerResetPeakRequest) -> bytes:
    r"""Emit ``<uid><prefix>TP <totalizer>\r`` — always with the numeric arg."""
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token} {int(request.totalizer)}\r".encode("ascii")

TotalizerResetPeakRequest dataclass

TotalizerResetPeakRequest(
    totalizer=TotalizerId.FIRST, confirm=False
)

Arguments for :data:TOTALIZER_RESET_PEAK.

Same shape as :class:TotalizerResetRequest; destructive.

TotalizerResetRequest dataclass

TotalizerResetRequest(
    totalizer=TotalizerId.FIRST, confirm=False
)

Arguments for :data:TOTALIZER_RESET.

Attributes:

Name Type Description
totalizer TotalizerId

Which totalizer to reset. Defaults to :attr:TotalizerId.FIRST — primer says a bare T <n> command without a number defaults to totalizer 1, but the encoder always emits an explicit number so the wire shape can never collide with TARE_FLOW (bare T).

confirm bool

Required True — the session's destructive-confirm gate raises :class:AlicatValidationError when False.

TotalizerSave dataclass

TotalizerSave(
    name="totalizer_save",
    token="TCR",
    response_mode=ResponseMode.LINE,
    device_kinds=_FLOW_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_TCR,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[TotalizerSaveRequest, TotalizerSaveState]

TCR — save-totalizer query/set (V10 10v05+, flow devices).

Wire shape:

  • Query: <uid><prefix>TCR\r
  • Set: <uid><prefix>TCR <enable>\r

Response: <uid> <enable> (2 fields).

decode

decode(response, ctx)

Parse <uid> <enable> into :class:TotalizerSaveState.

Source code in src/alicatlib/commands/totalizer.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> TotalizerSaveState:
    """Parse ``<uid> <enable>`` into :class:`TotalizerSaveState`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=2)
    unit_id, enable_s = fields
    return TotalizerSaveState(
        unit_id=unit_id,
        enabled=parse_bool_code(enable_s, field=f"{self.name}.enabled"),
    )

encode

encode(ctx, request)

Emit TCR query or set bytes.

Source code in src/alicatlib/commands/totalizer.py
def encode(self, ctx: DecodeContext, request: TotalizerSaveRequest) -> bytes:
    """Emit TCR query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.enable is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token} {int(request.enable)}\r".encode("ascii")

TotalizerSaveRequest dataclass

TotalizerSaveRequest(enable=None, save=None)

Arguments for :data:TOTALIZER_SAVE.

Attributes:

Name Type Description
enable bool | None

True / False toggles whether the device persists totalizer values across power cycles. None issues the query form.

save bool | None

True persists to EEPROM. The underlying TCR command writes a config flag; we surface the standard save attribute so the session's EEPROM-wear monitor (design §5.20.7) can track its rate alongside other EEPROM-backed sets.

UnlockDisplay dataclass

UnlockDisplay(
    name="unlock_display",
    token="U",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[UnlockDisplayRequest, 'ParsedFrame']

U — unlock front-panel display. Safety escape hatch.

Wire: <uid><prefix>U\r. Response is a post-op data frame without the :attr:StatusCode.LCK bit.

Intentionally NOT gated on :attr:Capability.DISPLAY (unlike :data:LOCK_DISPLAY): the point of this command is to recover a device that got into a locked state. Hardware validation (2026-04-17) found that on V1_V7 firmware, any command starting with AL<X> (including ALS / ALSS / ALV that the library itself firmware-gates pre-I/O) is parsed by the device as "lock display with argument X" and sets the LCK status bit. The library's firmware gates protect against this under normal use, but third-party code or direct catalog-command execution can still trip it — dev.unlock_display() must always be callable as the escape. AU is confirmed safe on V1_V7 (7v09) / V8_V9 / V10; on a device without a physical display it's a harmless no-op (the primer makes no firmware-cutoff claim).

decode

decode(response, ctx)

Parse the post-op data frame — see :func:_decode_display_frame.

Source code in src/alicatlib/commands/display.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ParsedFrame:
    """Parse the post-op data frame — see :func:`_decode_display_frame`."""
    return _decode_display_frame(self.name, response, ctx)

encode

encode(ctx, request)

Emit <uid><prefix>U\r.

Source code in src/alicatlib/commands/display.py
def encode(self, ctx: DecodeContext, request: UnlockDisplayRequest) -> bytes:
    r"""Emit ``<uid><prefix>U\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

UnlockDisplayRequest dataclass

UnlockDisplayRequest()

Arguments for :data:UNLOCK_DISPLAY (empty).

UserData dataclass

UserData(
    name="user_data",
    token="UD",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_UD,
    max_firmware=None,
    firmware_families=frozenset(
        {FirmwareFamily.V8_V9, FirmwareFamily.V10}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[UserDataRequest, UserDataSetting]

UD — read or write one of the four 32-char user-data slots.

Wire shape:

  • Read: <uid><prefix>UD <slot>\r
  • Write: <uid><prefix>UD <slot> <value>\r

Response: <uid> <slot> <value> where value may contain spaces. The decoder joins tokens after the slot into a single string, preserving whatever the device echoed verbatim.

decode

decode(response, ctx)

Parse <uid> <slot> <value...> into :class:UserDataSetting.

value may contain spaces; the decoder re-joins everything after the slot field with single spaces so round-trip writes of multi-word strings behave predictably.

Source code in src/alicatlib/commands/user_data.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> UserDataSetting:
    """Parse ``<uid> <slot> <value...>`` into :class:`UserDataSetting`.

    ``value`` may contain spaces; the decoder re-joins everything
    after the slot field with single spaces so round-trip writes
    of multi-word strings behave predictably.
    """
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name)
    # Minimum: uid + slot (2 tokens) — some firmware returns exactly
    # that when the slot has never been written. A 3+ field reply
    # adds the value, which may contain spaces (re-joined below).
    if len(fields) < _UD_MIN_FIELDS:
        raise AlicatParseError(
            f"{self.name}: expected >={_UD_MIN_FIELDS} fields "
            f"(uid [+ slot + value]), got {len(fields)}{text!r}",
            field_name="user_data",
            expected=f">= {_UD_MIN_FIELDS} fields",
            actual=len(fields),
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    unit_id = fields[0]
    # Empty-slot reply: only the unit id comes back. Leave slot as a
    # placeholder; the facade re-populates it from the request.
    if len(fields) == 1:
        return UserDataSetting(unit_id=unit_id, slot=-1, value="")
    slot = parse_int(fields[1], field=f"{self.name}.slot")
    value = " ".join(fields[2:]) if len(fields) > _UD_VALUE_START_INDEX else ""
    return UserDataSetting(unit_id=unit_id, slot=slot, value=value)

encode

encode(ctx, request)

Emit UD read or write bytes.

Source code in src/alicatlib/commands/user_data.py
def encode(self, ctx: DecodeContext, request: UserDataRequest) -> bytes:
    """Emit UD read or write bytes."""
    if not (0 <= request.slot <= UD_MAX_SLOT):
        raise AlicatValidationError(
            f"{self.name}: slot must be in [0, {UD_MAX_SLOT}], got {request.slot}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"slot": request.slot},
            ),
        )
    prefix = ctx.command_prefix.decode("ascii")
    head = f"{ctx.unit_id}{prefix}{self.token} {request.slot}"
    if request.value is None:
        return (head + "\r").encode("ascii")
    value = request.value
    if len(value) > UD_MAX_VALUE_LEN:
        raise AlicatValidationError(
            f"{self.name}: value must be <= {UD_MAX_VALUE_LEN} chars, got {len(value)}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"value_len": len(value)},
            ),
        )
    if "\r" in value or "\n" in value:
        raise AlicatValidationError(
            f"{self.name}: value must not contain \\r or \\n "
            "(wire-level terminators would truncate the write)",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"value_len": len(value)},
            ),
        )
    try:
        value.encode("ascii")
    except UnicodeEncodeError as err:
        raise AlicatValidationError(
            f"{self.name}: value must be pure ASCII; got non-ASCII at position {err.start}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"value": value},
            ),
        ) from err
    return f"{head} {value}\r".encode("ascii")

UserDataRequest dataclass

UserDataRequest(slot, value=None)

Arguments for :data:USER_DATA.

Attributes:

Name Type Description
slot int

Which 32-char slot to read / write — 0..3 inclusive.

value str | None

None issues the read form; a string writes the new value. Validated pre-I/O: must be pure ASCII and ≤ 32 characters, must not contain \r (the wire terminator) because that would truncate the write.

ValveDrive dataclass

ValveDrive(
    name="valve_drive",
    token="VD",
    response_mode=ResponseMode.LINE,
    device_kinds=_CONTROLLER_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_VD,
    max_firmware=None,
    firmware_families=frozenset(
        {FirmwareFamily.V8_V9, FirmwareFamily.V10}
    ),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[ValveDriveRequest, ValveDriveState]

VD — query valve drive state (8v18+).

Wire: <uid><prefix>VD\r. Response is 2–4 whitespace-separated tokens: <uid> <pct1> [<pct2>] [<pct3>]. The decoder returns a :class:ValveDriveState whose valves tuple carries all reported percentages.

Column count reflects the physical valve count of the controller, but users should gate by :attr:Capability.MULTI_VALVE / :attr:THIRD_VALVE rather than infer from the reply (design §9): the capability flags are probed once at open_device and survive firmware quirks that VD's column count does not.

decode

decode(response, ctx)

Parse <uid> <pct1> [<pct2>] [<pct3>] into :class:ValveDriveState.

Source code in src/alicatlib/commands/valve.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ValveDriveState:
    """Parse ``<uid> <pct1> [<pct2>] [<pct3>]`` into :class:`ValveDriveState`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name)
    if len(fields) < _VD_MIN_FIELDS or len(fields) > _VD_MAX_FIELDS:
        raise AlicatParseError(
            f"{self.name}: expected {_VD_MIN_FIELDS}..{_VD_MAX_FIELDS} fields "
            f"(uid + 1..4 percentages), got {len(fields)}{text!r}",
            field_name="valve_drive",
            expected=f"{_VD_MIN_FIELDS}..{_VD_MAX_FIELDS} fields",
            actual=len(fields),
            context=ErrorContext(command_name=self.name, raw_response=response),
        )
    unit_id = fields[0]
    valves = tuple(
        parse_float(f, field=f"{self.name}.valve[{i}]") for i, f in enumerate(fields[1:])
    )
    return ValveDriveState(unit_id=unit_id, valves=valves)

encode

encode(ctx, request)

Emit <uid><prefix>VD\r.

Source code in src/alicatlib/commands/valve.py
def encode(self, ctx: DecodeContext, request: ValveDriveRequest) -> bytes:
    r"""Emit ``<uid><prefix>VD\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

ValveDriveRequest dataclass

ValveDriveRequest()

Arguments for :data:VALVE_DRIVE (empty — VD takes no arguments).

VeCommand dataclass

VeCommand(
    name="ve_query",
    token="VE",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=None,
    max_firmware=None,
    firmware_families=frozenset(),
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[VeRequest, VeResult]

Firmware-version query. Works on every family; anchor of identification.

decode

decode(response, ctx)

Parse the firmware version (and optional date) out of a VE reply.

Source code in src/alicatlib/commands/system.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> VeResult:
    """Parse the firmware version (and optional date) out of a ``VE`` reply."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    # Unit ID is the first whitespace-delimited token; parse_ve_response
    # scans the whole line for firmware + optional date.
    first_token = response.split(None, 1)[0].decode("ascii", errors="replace")
    firmware, firmware_date = parse_ve_response(response)
    return VeResult(
        unit_id=first_token,
        firmware=firmware,
        firmware_date=firmware_date,
    )

encode

encode(ctx, request)

Emit <unit_id><prefix>VE\r.

Source code in src/alicatlib/commands/system.py
def encode(self, ctx: DecodeContext, request: VeRequest) -> bytes:
    r"""Emit ``<unit_id><prefix>VE\r``."""
    del request
    prefix = ctx.command_prefix.decode("ascii")
    return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")

VeRequest dataclass

VeRequest()

Arguments for :data:VE_QUERY — no user-provided fields.

VeResult dataclass

VeResult(unit_id, firmware, firmware_date)

Typed response for :data:VE_QUERY.

:func:alicatlib.protocol.parser.parse_ve_response is intentionally tolerant (the VE response shape varies across firmware families), so we surface both the parsed :class:FirmwareVersion and the optional firmware date to let callers use whichever they need without having to re-parse the raw bytes.

ZeroBand dataclass

ZeroBand(
    name="zero_band",
    token="DCZ",
    response_mode=ResponseMode.LINE,
    device_kinds=_ALL_DEVICE_KINDS,
    media=_DEFAULT_COMMAND_MEDIA,
    required_capabilities=Capability.NONE,
    min_firmware=_MIN_FIRMWARE_V10_05,
    max_firmware=None,
    firmware_families=_V10_ONLY,
    destructive=False,
    experimental=False,
    case_sensitive=False,
    prefix_less=False,
    expected_lines=None,
    is_complete=None,
)

Bases: Command[ZeroBandRequest, ZeroBandSetting]

DCZ — zero-band query/set (V10 10v05+).

Wire shape (primer p. 14):

  • Query: <uid><prefix>DCZ\r
  • Set: <uid><prefix>DCZ 0 <zero_band>\r (the literal 0 is the primer's placeholder for a statistic slot that DCZ does not actually use).

Response: <uid> 0 <zero_band> (3 fields).

decode

decode(response, ctx)

Parse <uid> 0 <zero_band> into :class:ZeroBandSetting.

Source code in src/alicatlib/commands/data_readings.py
def decode(
    self,
    response: bytes | tuple[bytes, ...],
    ctx: DecodeContext,
) -> ZeroBandSetting:
    """Parse ``<uid> 0 <zero_band>`` into :class:`ZeroBandSetting`."""
    del ctx
    if isinstance(response, tuple):
        raise TypeError(
            f"{self.name}.decode expected single-line response, got {len(response)} lines",
        )
    text = response.decode("ascii")
    fields = parse_fields(text, command=self.name, expected_count=3)
    unit_id, _stat_slot, zero_band_s = fields
    return ZeroBandSetting(
        unit_id=unit_id,
        zero_band=parse_float(zero_band_s, field=f"{self.name}.zero_band"),
    )

encode

encode(ctx, request)

Emit DCZ query or set bytes.

Source code in src/alicatlib/commands/data_readings.py
def encode(self, ctx: DecodeContext, request: ZeroBandRequest) -> bytes:
    """Emit DCZ query or set bytes."""
    prefix = ctx.command_prefix.decode("ascii")
    if request.zero_band is None:
        return f"{ctx.unit_id}{prefix}{self.token}\r".encode("ascii")
    if request.zero_band < 0 or request.zero_band > DCZ_MAX_ZERO_BAND:
        raise AlicatValidationError(
            f"{self.name}: zero_band must be in [0, {DCZ_MAX_ZERO_BAND}]%, "
            f"got {request.zero_band}",
            context=ErrorContext(
                command_name=self.name,
                unit_id=ctx.unit_id,
                extra={"zero_band": request.zero_band},
            ),
        )
    return f"{ctx.unit_id}{prefix}{self.token} 0 {request.zero_band}\r".encode("ascii")

ZeroBandRequest dataclass

ZeroBandRequest(zero_band=None)

Arguments for :data:ZERO_BAND.

Attributes:

Name Type Description
zero_band float | None

Zero-band threshold as a percent of full scale. 0.0 disables the zero band; valid range is 0.0..6.38. None issues the query form.