Plesty Documentation

Implementing the Hardware Interface

Wire up a traffic manager and command solver to implement the six required BaseDeviceSyncModel methods.

The device class separates concerns across three layers: the device class handles the PLESTY API contract, the traffic manager handles transport I/O, and the command solver handles protocol formatting.

Layer responsibilities

Layer What it does Example
Device class Parameters, lifecycle, validation PowermeterDevice
Traffic manager Open/close connection, send/receive bytes VisaTrafficManager, SerialTrafficManager
Command solver Build command strings, decode responses SCPISolver, KVTextSolver

VISA + SCPI device (most common)

For instruments using VISA (USB, GPIB, Ethernet) with SCPI command syntax:

from plesty.lib.device.base_device_sync import BaseDeviceSyncModel
from plesty.lib.traffic.visa import VisaTrafficManager
from plesty.lib.solver.scpi import SCPISolver


class PowermeterDevice(BaseDeviceSyncModel):
    def __init__(self, address: str, sensor_type: str = "S155C"):
        super().__init__(id=address, param_schema="assets/param_schema.json")
        self._address = address

    def init(self, main=None) -> None:
        self.tm = VisaTrafficManager(self._address)
        self.solver = SCPISolver()

    def connect(self) -> None:
        self.tm.open()

    def disconnect(self) -> None:
        self.tm.close()

    def _write_(self, key: str, value: str | float | int | bool) -> bool:
        cfg = self.get_config(key)
        cmd = self.solver.get_write_cmd(cfg, value)
        return bool(self.tm.send_command(cmd))

    def _query_(self, key: str) -> str:
        cfg = self.get_config(key)
        cmd = self.solver.get_query_cmd(cfg)
        return self.tm.send_command(cmd)

    def check_errors(self) -> list[str]:
        response = self.tm.send_command("SYST:ERR?")
        # PM100D format: "+0,No error" when healthy
        if response.startswith("+0"):
            return []
        return [response]

    def check_operatability(self) -> bool:
        return self.tm is not None and self.tm.is_open

    def identity(self) -> str:
        return self.tm.send_command("*IDN?")

Serial device

For RS-232/USB-serial instruments using a text protocol:

from plesty.lib.traffic.serial import SerialTrafficManager


class MySerialDevice(BaseDeviceSyncModel):
    def __init__(self, port: str):
        super().__init__(id=port, param_schema="assets/param_schema.json")
        self._port = port

    def init(self, main=None) -> None:
        self.tm = SerialTrafficManager(
            port=self._port,
            baudrate=9600,
            timeout=5,
            write_termination="\r",
            read_termination="\r\n",
        )
        self.solver = SCPISolver()

    def connect(self) -> None:
        self.tm.open(parity="none", stopbits="one", bytesize=8)

    def disconnect(self) -> None:
        self.tm.close()

    def _write_(self, key: str, value: str | float | int | bool) -> bool:
        cfg = self.get_config(key)
        cmd = self.solver.get_write_cmd(cfg, value)
        return bool(self.tm.send_command(cmd))

    def _query_(self, key: str) -> str:
        cfg = self.get_config(key)
        cmd = self.solver.get_query_cmd(cfg)
        return self.tm.send_command(cmd)

    def check_errors(self) -> list[str]:
        return []

    def check_operatability(self) -> bool:
        return self.tm is not None and self.tm.is_open

TCP/IP SCPI device

Note: For instruments with their own TCP socket interface (not PLESTY TCP service), use TcpIpTrafficManager:

from plesty.lib.traffic.tcp_ip import TcpIpTrafficManager


class TCPSCPIDevice(BaseDeviceSyncModel):
    def __init__(self, host: str, port: int = 5025):
        super().__init__(id=f"{host}:{port}", param_schema="assets/param_schema.json")
        self._host = host
        self._port = port

    def init(self, main=None) -> None:
        self.tm = TcpIpTrafficManager(host=self._host, port=self._port, timeout=5)
        self.solver = SCPISolver()

    def connect(self) -> None:
        self.tm.open()

    def disconnect(self) -> None:
        self.tm.close()

    # _write_, _query_, check_errors, check_operatability follow same pattern

Mock transport for testing

For Gate d1 tests, the device must work without real hardware. Pass a mock transport via a constructor argument:

class PowermeterDevice(BaseDeviceSyncModel):
    def __init__(self, address: str, _mock_transport=None):
        super().__init__(id=address, param_schema="assets/param_schema.json")
        self._address = address
        self._mock = _mock_transport

    def init(self, main=None) -> None:
        if self._mock is not None:
            self.tm = self._mock
        else:
            self.tm = VisaTrafficManager(self._address)
        self.solver = SCPISolver()

Then in tests pass a mock that returns plausible strings for any command.