This commit is contained in:
2025-12-16 22:49:50 +06:00
parent 265d611a12
commit f1fbafd84f

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
import datetime as dt
import heapq
import logging
from collections import defaultdict
from dataclasses import dataclass
@@ -13,6 +14,8 @@ from ..models import Endpoint, Device, Channel
from ..writer import Writer
log = logging.getLogger(__name__)
# we don't use vxi11
logging.getLogger("pymeasure.adapters.vxi11").setLevel(logging.ERROR)
KEITHLEY_DRIVER_KEYS = {"keithley2000", "keithley_2000"}
@@ -65,16 +68,11 @@ class PrologixEndpointLoop:
":INIT:CONT OFF;:ABORT",
# DC voltage config
':SENS:FUNC "VOLT:DC"',
":SENS:VOLT:DC:RANG 10", # fixed 10 V range
":SENS:VOLT:DC:RANG 10", # 10V range
":SENS:VOLT:DC:NPLC 10", # slow/low-noise integration
":SENS:VOLT:DC:DIG 7", # max digits
# offset drift?
":SYST:AZER:STAT ON",
# trig
":TRIG:SOUR IMM",
":TRIG:COUN 1",
":SAMP:COUN 1",
":TRIG:DEL 0",
# read only
":FORM:ELEM READ",
# turn off VFD
@@ -186,58 +184,129 @@ class PrologixEndpointLoop:
async def run(self) -> None:
log.info(
f"Starting Prologix loop: {self.endpoint.endpoint_key} ({len(self.bindings)} bindings)"
"Starting Prologix loop: %s (%d bindings)",
self.endpoint.endpoint_key,
len(self.bindings),
)
# group by device
by_device = defaultdict(list)
loop = asyncio.get_running_loop()
def poll_s_for(b: PrologixBinding) -> float:
v = b.channel.poll_interval_s
return float(self.default_poll_s if v is None else v)
# group bindings by device
by_device: defaultdict[int, list[PrologixBinding]] = defaultdict(list)
for b in self.bindings:
by_device[b.device.device_id].append(b)
while True:
start_ts = asyncio.get_running_loop().time()
devices: list[tuple[int, str, dict[str, Any], list[PrologixBinding]]] = []
for _dev_id, dev_bindings in by_device.items():
meta: dict[str, Any] = dev_bindings[0].device.metadata or {}
for _dev_id, dev_bindings in by_device.items():
# validation
meta = dev_bindings[0].device.metadata
gpib_addr = meta.get("gpib_addr")
if gpib_addr is None:
continue
gpib_addr = meta.get("gpib_addr")
if gpib_addr is None:
log.warning(
"Skipping device %s: missing metadata.gpib_addr",
dev_bindings[0].device.device_key,
)
continue
gpib_addr = int(gpib_addr)
driver_key = str(meta.get("driver", "")).strip() or None
try:
gpib_addr_i = int(gpib_addr)
except Exception:
log.warning(
"Skipping device %s: invalid metadata.gpib_addr=%r",
dev_bindings[0].device.device_key,
gpib_addr,
)
continue
if gpib_addr not in self._initialized_addrs:
await asyncio.to_thread(
self._init_device, gpib_addr, driver_key, meta
driver_key: str = str(meta.get("driver", "")).strip()
devices.append((gpib_addr_i, driver_key, meta, dev_bindings))
if not devices:
log.warning("No usable Prologix bindings; sleeping forever")
while True:
await asyncio.sleep(60)
# init all bindings as due "now"
now = loop.time()
heap: list[tuple[float, int, int]] = [] # (due_time, tie_breaker, device_index)
tie = 0
# build a flat list of (device_index, binding) and a heap of next due times
flat: list[tuple[int, PrologixBinding]] = []
for di, (_addr, _drv, _meta, dev_bindings) in enumerate(devices):
for b in dev_bindings:
flat.append((di, b))
heapq.heappush(heap, (now, tie, len(flat) - 1))
tie += 1
async def ensure_initialized(
gpib_addr: int, driver_key: str, meta: dict[str, Any]
) -> None:
if gpib_addr in self._initialized_addrs:
return
await asyncio.to_thread(self._init_device, gpib_addr, driver_key, meta)
try:
while True:
due, _tie, flat_idx = heapq.heappop(heap)
# sleep until this binding is due
now = loop.time()
if due > now:
await asyncio.sleep(due - now)
di, b = flat[flat_idx]
gpib_addr, driver_key, meta, _dev_bindings = devices[di]
try:
await ensure_initialized(gpib_addr, driver_key, meta)
raw = await asyncio.to_thread(
self._exec_sync, gpib_addr, b.query, driver_key
)
val = float(raw)
val = val * b.channel.scale_value + b.channel.offset_value
for b in dev_bindings:
try:
# offload to thread
raw = await asyncio.to_thread(
self._exec_sync, gpib_addr, b.query, driver_key
)
await self.writer.write_metric(
ts=dt.datetime.now(dt.timezone.utc),
device_id=b.device.device_id,
location_id=b.device.location_id,
metric=b.channel.metric,
value=val,
)
except asyncio.CancelledError:
raise
except Exception as e:
log.error(
"Prologix error %s/%s query=%r: %s",
b.device.device_key,
b.channel.metric,
b.query,
e,
)
await self.writer.write_error(
device_id=b.device.device_id,
error=f"prologix: {e}",
)
finally:
# steady cadence scheduling
interval = poll_s_for(b)
next_due = due + interval
now = loop.time()
if next_due <= now:
# fast fw without spamming immediate catch-up polls
# (ceil((now - next_due)/interval) + 1 steps)
steps = int((now - next_due) // interval) + 1
next_due += steps * interval
val = float(raw)
val = val * b.channel.scale_value + b.channel.offset_value
heapq.heappush(heap, (next_due, tie, flat_idx))
tie += 1
await self.writer.write_metric(
ts=dt.datetime.now(dt.timezone.utc),
device_id=b.device.device_id,
location_id=b.device.location_id,
metric=b.channel.metric,
value=val,
)
except Exception as e:
log.error(
f"Prologix error {b.device.device_key}/{b.channel.metric} query={b.query!r}: {e}"
)
await self.writer.write_error(
device_id=b.device.device_id, error=f"prologix: {e}"
)
await asyncio.sleep(0)
elapsed = asyncio.get_running_loop().time() - start_ts
await asyncio.sleep(max(0.5, self.default_poll_s - elapsed))
await asyncio.sleep(0)
except asyncio.CancelledError:
log.info("Prologix loop cancelled: %s", self.endpoint.endpoint_key)
raise