The device class separates concerns across three layers: the device class handles the PLESTY API contract, the traffic manager handles transport I/O, and the command solver handles protocol formatting.
Layer responsibilities
| Layer | What it does | Example |
|---|---|---|
| Device class | Parameters, lifecycle, validation | PowermeterDevice |
| Traffic manager | Open/close connection, send/receive bytes | VisaTrafficManager, SerialTrafficManager |
| Command solver | Build command strings, decode responses | SCPISolver, KVTextSolver |
VISA + SCPI device (most common)
For instruments using VISA (USB, GPIB, Ethernet) with SCPI command syntax:
from plesty.lib.device.base_device_sync import BaseDeviceSyncModel
from plesty.lib.traffic.visa import VisaTrafficManager
from plesty.lib.solver.scpi import SCPISolver
class PowermeterDevice(BaseDeviceSyncModel):
def __init__(self, address: str, sensor_type: str = "S155C"):
super().__init__(id=address, param_schema="assets/param_schema.json")
self._address = address
def init(self, main=None) -> None:
self.tm = VisaTrafficManager(self._address)
self.solver = SCPISolver()
def connect(self) -> None:
self.tm.open()
def disconnect(self) -> None:
self.tm.close()
def _write_(self, key: str, value: str | float | int | bool) -> bool:
cfg = self.get_config(key)
cmd = self.solver.get_write_cmd(cfg, value)
return bool(self.tm.send_command(cmd))
def _query_(self, key: str) -> str:
cfg = self.get_config(key)
cmd = self.solver.get_query_cmd(cfg)
return self.tm.send_command(cmd)
def check_errors(self) -> list[str]:
response = self.tm.send_command("SYST:ERR?")
# PM100D format: "+0,No error" when healthy
if response.startswith("+0"):
return []
return [response]
def check_operatability(self) -> bool:
return self.tm is not None and self.tm.is_open
def identity(self) -> str:
return self.tm.send_command("*IDN?")
Serial device
For RS-232/USB-serial instruments using a text protocol:
from plesty.lib.traffic.serial import SerialTrafficManager
class MySerialDevice(BaseDeviceSyncModel):
def __init__(self, port: str):
super().__init__(id=port, param_schema="assets/param_schema.json")
self._port = port
def init(self, main=None) -> None:
self.tm = SerialTrafficManager(
port=self._port,
baudrate=9600,
timeout=5,
write_termination="\r",
read_termination="\r\n",
)
self.solver = SCPISolver()
def connect(self) -> None:
self.tm.open(parity="none", stopbits="one", bytesize=8)
def disconnect(self) -> None:
self.tm.close()
def _write_(self, key: str, value: str | float | int | bool) -> bool:
cfg = self.get_config(key)
cmd = self.solver.get_write_cmd(cfg, value)
return bool(self.tm.send_command(cmd))
def _query_(self, key: str) -> str:
cfg = self.get_config(key)
cmd = self.solver.get_query_cmd(cfg)
return self.tm.send_command(cmd)
def check_errors(self) -> list[str]:
return []
def check_operatability(self) -> bool:
return self.tm is not None and self.tm.is_open
TCP/IP SCPI device
Note: For instruments with their own TCP socket interface (not PLESTY TCP service), use
TcpIpTrafficManager:
from plesty.lib.traffic.tcp_ip import TcpIpTrafficManager
class TCPSCPIDevice(BaseDeviceSyncModel):
def __init__(self, host: str, port: int = 5025):
super().__init__(id=f"{host}:{port}", param_schema="assets/param_schema.json")
self._host = host
self._port = port
def init(self, main=None) -> None:
self.tm = TcpIpTrafficManager(host=self._host, port=self._port, timeout=5)
self.solver = SCPISolver()
def connect(self) -> None:
self.tm.open()
def disconnect(self) -> None:
self.tm.close()
# _write_, _query_, check_errors, check_operatability follow same pattern
Mock transport for testing
For Gate d1 tests, the device must work without real hardware. Pass a mock transport via a constructor argument:
class PowermeterDevice(BaseDeviceSyncModel):
def __init__(self, address: str, _mock_transport=None):
super().__init__(id=address, param_schema="assets/param_schema.json")
self._address = address
self._mock = _mock_transport
def init(self, main=None) -> None:
if self._mock is not None:
self.tm = self._mock
else:
self.tm = VisaTrafficManager(self._address)
self.solver = SCPISolver()
Then in tests pass a mock that returns plausible strings for any command.