Skip to content

Async quickstart

import anyio
from alicatlib import Gas, Unit, open_device

async def main() -> None:
    async with open_device("/dev/ttyUSB0") as dev:
        frame = await dev.poll()
        print(frame.get_float("Mass_Flow"))

        await dev.gas(Gas.N2, save=True)
        await dev.setpoint(50.0, Unit.SCCM)

anyio.run(main)

Multi-device acquisition

from alicatlib import AlicatManager
from alicatlib.streaming import record
from alicatlib.sinks import CsvSink, pipe

async def run() -> None:
    async with AlicatManager() as mgr:
        await mgr.add("fuel", "/dev/ttyUSB0")
        await mgr.add("air",  "/dev/ttyUSB1")
        async with (
            record(mgr, rate_hz=10, duration=60) as stream,
            CsvSink("run.csv") as sink,
        ):
            await pipe(stream, sink)

See Logging and acquisition for the sink surface, row layout, and structured log events, and the Design doc ยง5.14 for scheduling and backpressure details.