Files
sensgw/sensgw/protocols/prologix.py
2025-12-16 19:34:53 +06:00

221 lines
7.1 KiB
Python

# sensgw/protocols/prologix.py
from __future__ import annotations
import asyncio
import datetime as dt
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Any
import time
from ..models import Endpoint, Device, Channel
from ..writer import Writer
log = logging.getLogger(__name__)
KEITHLEY_DRIVER_KEYS = {"keithley2000", "keithley_2000"}
@dataclass(frozen=True)
class PrologixBinding:
endpoint: Endpoint
device: Device
channel: Channel
query: str
def _driver_class(driver_key: str | None) -> type[Any] | None:
key = (driver_key or "").strip().lower()
if not key:
return None
if key in KEITHLEY_DRIVER_KEYS:
from pymeasure.instruments.keithley import Keithley2000 # type: ignore
return Keithley2000
return None
class PrologixEndpointLoop:
"""
Manages one physical Prologix controller and sequentially polls its devices.
Merges connection state (Adapter) and logic (Collector) into one class.
"""
def __init__(
self,
writer: Writer,
endpoint: Endpoint,
bindings: list[PrologixBinding],
default_poll_s: int,
):
self.writer = writer
self.endpoint = endpoint
self.bindings = bindings
self.default_poll_s = default_poll_s
# conn state
self._adapter: Any | None = None
self._instruments: dict[tuple[int, str], Any] = {}
self._initialized_addrs: set[int] = set()
def _keithley2000_init_cmds(self, dev_meta: dict[str, Any]) -> list[str]:
default_cmds = [
"*CLS",
":INIT:CONT OFF;:ABORT",
# DC voltage config
':SENS:FUNC "VOLT:DC"',
":SENS:VOLT:DC:RANG 10", # fixed 10 V range
":SENS:VOLT:DC:NPLC 10", # slow/low-noise integration
":SENS:VOLT:DC:DIG 7", # max digits
# offset drift?
":SYST:AZER:STAT ON",
# trig
":TRIG:SOUR IMM",
":TRIG:COUN 1",
":SAMP:COUN 1",
":TRIG:DEL 0",
# read only
":FORM:ELEM READ",
# turn off VFD
":DISP:ENAB OFF",
]
extra = dev_meta.get("init_cmds")
if isinstance(extra, list) and all(isinstance(x, str) for x in extra):
return default_cmds + extra
return default_cmds
def _init_device(
self, gpib_addr: int, driver_key: str, meta: dict[str, Any]
) -> None:
key = (driver_key or "").strip().lower()
# Only init if the configured driver is explicitly Keithley2000
if key not in KEITHLEY_DRIVER_KEYS:
self._initialized_addrs.add(gpib_addr)
return
cmds = self._keithley2000_init_cmds(meta)
log.info(f"Initializing Keithley @ {gpib_addr} with {len(cmds)} SCPI cmds")
inst = self._get_instrument(
gpib_addr, driver_key
) # driver_key is not None here
self._get_adapter().address = gpib_addr
for cmd in cmds:
inst.write(cmd)
time.sleep(0.02)
# Read error after each command; stop early if something is wrong
err = str(inst.ask(":SYST:ERR?")).strip()
if not err.startswith("0,") and "No error" not in err:
log.error(f"Keithley init failed at {cmd}: {err}")
break
self._initialized_addrs.add(gpib_addr)
def _get_adapter(self) -> Any:
if self._adapter is None:
from pymeasure.adapters import PrologixAdapter # type: ignore
res = self.endpoint.conn["resource"]
to_ms = int(self.endpoint.conn.get("gpib_read_timeout_ms", 500))
# auto=False gives us manual control over addressing
self._adapter = PrologixAdapter(res, gpib_read_timeout=to_ms, auto=False)
try:
self._adapter.flush_read_buffer()
except Exception:
pass
return self._adapter
def _get_instrument(self, gpib_addr: int, driver_key: str) -> Any:
cache_key = (gpib_addr, driver_key)
if cache_key in self._instruments:
return self._instruments[cache_key]
cls = _driver_class(driver_key)
if not cls:
raise ValueError(f"Unknown driver: {driver_key}")
ad = self._get_adapter()
ad.address = gpib_addr
inst = cls(ad)
self._instruments[cache_key] = inst
return inst
def _exec_sync(self, gpib_addr: int, query: str, driver_key: str | None) -> str:
# Query
if driver_key:
inst = self._get_instrument(gpib_addr, driver_key)
# re-assert address in case it chnaged
self._get_adapter().address = gpib_addr
return str(inst.ask(query)).strip()
else:
ad = self._get_adapter()
ad.address = gpib_addr
ad.write(query)
return str(ad.read()).strip()
async def run(self) -> None:
log.info(
f"Starting Prologix loop: {self.endpoint.endpoint_key} ({len(self.bindings)} bindings)"
)
# group by device
by_device = defaultdict(list)
for b in self.bindings:
by_device[b.device.device_id].append(b)
while True:
start_ts = asyncio.get_running_loop().time()
for _dev_id, dev_bindings in by_device.items():
# validation
meta = dev_bindings[0].device.metadata
gpib_addr = meta.get("gpib_addr")
if gpib_addr is None:
continue
gpib_addr = int(gpib_addr)
driver_key = str(meta.get("driver", "")).strip() or None
if gpib_addr not in self._initialized_addrs:
await asyncio.to_thread(
self._init_device, gpib_addr, driver_key, meta
)
for b in dev_bindings:
try:
# offload to thread
raw = await asyncio.to_thread(
self._exec_sync, gpib_addr, b.query, driver_key
)
val = float(raw)
val = val * b.channel.scale_value + b.channel.offset_value
await self.writer.write_metric(
ts=dt.datetime.now(dt.timezone.utc),
device_id=b.device.device_id,
location_id=b.device.location_id,
metric=b.channel.metric,
value=val,
)
except Exception as e:
log.error(
f"Prologix error {b.device.device_key}/{b.channel.metric}: {e}"
)
await self.writer.write_error(
device_id=b.device.device_id, error=f"prologix: {e}"
)
await asyncio.sleep(0)
elapsed = asyncio.get_running_loop().time() - start_ts
await asyncio.sleep(max(0.5, self.default_poll_s - elapsed))