diff --git a/sensgw/protocols/prologix.py b/sensgw/protocols/prologix.py index 4c175c9..0f225ec 100644 --- a/sensgw/protocols/prologix.py +++ b/sensgw/protocols/prologix.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import datetime as dt +import heapq import logging from collections import defaultdict from dataclasses import dataclass @@ -13,6 +14,8 @@ from ..models import Endpoint, Device, Channel from ..writer import Writer log = logging.getLogger(__name__) +# we don't use vxi11 +logging.getLogger("pymeasure.adapters.vxi11").setLevel(logging.ERROR) KEITHLEY_DRIVER_KEYS = {"keithley2000", "keithley_2000"} @@ -65,16 +68,11 @@ class PrologixEndpointLoop: ":INIT:CONT OFF;:ABORT", # DC voltage config ':SENS:FUNC "VOLT:DC"', - ":SENS:VOLT:DC:RANG 10", # fixed 10 V range + ":SENS:VOLT:DC:RANG 10", # 10V range ":SENS:VOLT:DC:NPLC 10", # slow/low-noise integration ":SENS:VOLT:DC:DIG 7", # max digits # offset drift? ":SYST:AZER:STAT ON", - # trig - ":TRIG:SOUR IMM", - ":TRIG:COUN 1", - ":SAMP:COUN 1", - ":TRIG:DEL 0", # read only ":FORM:ELEM READ", # turn off VFD @@ -186,58 +184,129 @@ class PrologixEndpointLoop: async def run(self) -> None: log.info( - f"Starting Prologix loop: {self.endpoint.endpoint_key} ({len(self.bindings)} bindings)" + "Starting Prologix loop: %s (%d bindings)", + self.endpoint.endpoint_key, + len(self.bindings), ) - # group by device - by_device = defaultdict(list) + loop = asyncio.get_running_loop() + + def poll_s_for(b: PrologixBinding) -> float: + v = b.channel.poll_interval_s + return float(self.default_poll_s if v is None else v) + + # group bindings by device + by_device: defaultdict[int, list[PrologixBinding]] = defaultdict(list) for b in self.bindings: by_device[b.device.device_id].append(b) - while True: - start_ts = asyncio.get_running_loop().time() + devices: list[tuple[int, str, dict[str, Any], list[PrologixBinding]]] = [] + for _dev_id, dev_bindings in by_device.items(): + meta: dict[str, Any] = dev_bindings[0].device.metadata or {} - for _dev_id, dev_bindings in by_device.items(): - # validation - meta = dev_bindings[0].device.metadata - gpib_addr = meta.get("gpib_addr") - if gpib_addr is None: - continue + gpib_addr = meta.get("gpib_addr") + if gpib_addr is None: + log.warning( + "Skipping device %s: missing metadata.gpib_addr", + dev_bindings[0].device.device_key, + ) + continue - gpib_addr = int(gpib_addr) - driver_key = str(meta.get("driver", "")).strip() or None + try: + gpib_addr_i = int(gpib_addr) + except Exception: + log.warning( + "Skipping device %s: invalid metadata.gpib_addr=%r", + dev_bindings[0].device.device_key, + gpib_addr, + ) + continue - if gpib_addr not in self._initialized_addrs: - await asyncio.to_thread( - self._init_device, gpib_addr, driver_key, meta + driver_key: str = str(meta.get("driver", "")).strip() + devices.append((gpib_addr_i, driver_key, meta, dev_bindings)) + + if not devices: + log.warning("No usable Prologix bindings; sleeping forever") + while True: + await asyncio.sleep(60) + + # init all bindings as due "now" + now = loop.time() + heap: list[tuple[float, int, int]] = [] # (due_time, tie_breaker, device_index) + tie = 0 + + # build a flat list of (device_index, binding) and a heap of next due times + flat: list[tuple[int, PrologixBinding]] = [] + for di, (_addr, _drv, _meta, dev_bindings) in enumerate(devices): + for b in dev_bindings: + flat.append((di, b)) + heapq.heappush(heap, (now, tie, len(flat) - 1)) + tie += 1 + + async def ensure_initialized( + gpib_addr: int, driver_key: str, meta: dict[str, Any] + ) -> None: + if gpib_addr in self._initialized_addrs: + return + await asyncio.to_thread(self._init_device, gpib_addr, driver_key, meta) + + try: + while True: + due, _tie, flat_idx = heapq.heappop(heap) + + # sleep until this binding is due + now = loop.time() + if due > now: + await asyncio.sleep(due - now) + + di, b = flat[flat_idx] + gpib_addr, driver_key, meta, _dev_bindings = devices[di] + + try: + await ensure_initialized(gpib_addr, driver_key, meta) + + raw = await asyncio.to_thread( + self._exec_sync, gpib_addr, b.query, driver_key ) + val = float(raw) + val = val * b.channel.scale_value + b.channel.offset_value - for b in dev_bindings: - try: - # offload to thread - raw = await asyncio.to_thread( - self._exec_sync, gpib_addr, b.query, driver_key - ) + await self.writer.write_metric( + ts=dt.datetime.now(dt.timezone.utc), + device_id=b.device.device_id, + location_id=b.device.location_id, + metric=b.channel.metric, + value=val, + ) + except asyncio.CancelledError: + raise + except Exception as e: + log.error( + "Prologix error %s/%s query=%r: %s", + b.device.device_key, + b.channel.metric, + b.query, + e, + ) + await self.writer.write_error( + device_id=b.device.device_id, + error=f"prologix: {e}", + ) + finally: + # steady cadence scheduling + interval = poll_s_for(b) + next_due = due + interval + now = loop.time() + if next_due <= now: + # fast fw without spamming immediate catch-up polls + # (ceil((now - next_due)/interval) + 1 steps) + steps = int((now - next_due) // interval) + 1 + next_due += steps * interval - val = float(raw) - val = val * b.channel.scale_value + b.channel.offset_value + heapq.heappush(heap, (next_due, tie, flat_idx)) + tie += 1 - await self.writer.write_metric( - ts=dt.datetime.now(dt.timezone.utc), - device_id=b.device.device_id, - location_id=b.device.location_id, - metric=b.channel.metric, - value=val, - ) - except Exception as e: - log.error( - f"Prologix error {b.device.device_key}/{b.channel.metric} query={b.query!r}: {e}" - ) - await self.writer.write_error( - device_id=b.device.device_id, error=f"prologix: {e}" - ) - - await asyncio.sleep(0) - - elapsed = asyncio.get_running_loop().time() - start_ts - await asyncio.sleep(max(0.5, self.default_poll_s - elapsed)) + await asyncio.sleep(0) + except asyncio.CancelledError: + log.info("Prologix loop cancelled: %s", self.endpoint.endpoint_key) + raise