upd
This commit is contained in:
@@ -7,6 +7,7 @@ import logging
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
import time
|
||||||
|
|
||||||
from ..models import Endpoint, Device, Channel
|
from ..models import Endpoint, Device, Channel
|
||||||
from ..writer import Writer
|
from ..writer import Writer
|
||||||
@@ -56,6 +57,55 @@ class PrologixEndpointLoop:
|
|||||||
self._instruments: dict[tuple[int, str], Any] = {}
|
self._instruments: dict[tuple[int, str], Any] = {}
|
||||||
self._initialized_addrs: set[int] = set()
|
self._initialized_addrs: set[int] = set()
|
||||||
|
|
||||||
|
def _keithley2000_init_cmds(self, dev_meta: dict[str, Any]) -> list[str]:
|
||||||
|
default_cmds = [
|
||||||
|
# turn off VFD
|
||||||
|
":DISP:ENAB OFF",
|
||||||
|
# DC voltage config
|
||||||
|
':SENS:FUNC "VOLT:DC"',
|
||||||
|
":SENS:VOLT:DC:RANG 10", # fixed 10 V range
|
||||||
|
":SENS:VOLT:DC:NPLC 10", # slow/low-noise integration
|
||||||
|
":SENS:VOLT:DC:DIG 7", # max digits
|
||||||
|
# offset drift?
|
||||||
|
":SYST:AZER ON",
|
||||||
|
":TRIG:SOUR IMM",
|
||||||
|
":TRIG:COUN 1",
|
||||||
|
":SAMP:COUN 1",
|
||||||
|
":INIT:CONT OFF",
|
||||||
|
]
|
||||||
|
|
||||||
|
extra = dev_meta.get("init_cmds")
|
||||||
|
if isinstance(extra, list) and all(isinstance(x, str) for x in extra):
|
||||||
|
return default_cmds + extra
|
||||||
|
return default_cmds
|
||||||
|
|
||||||
|
def _init_device(
|
||||||
|
self, gpib_addr: int, driver_key: str | None, meta: dict[str, Any]
|
||||||
|
) -> None:
|
||||||
|
dk = (driver_key or "").lower()
|
||||||
|
if "keithley" not in dk:
|
||||||
|
self._initialized_addrs.add(gpib_addr)
|
||||||
|
return
|
||||||
|
|
||||||
|
cmds = self._keithley2000_init_cmds(meta)
|
||||||
|
log.info(f"Initializing Keithley @ {gpib_addr} with {len(cmds)} SCPI cmds")
|
||||||
|
|
||||||
|
if driver_key:
|
||||||
|
inst = self._get_instrument(gpib_addr, driver_key)
|
||||||
|
# make sure prologix is pointed at the right addr
|
||||||
|
self._get_adapter().address = gpib_addr
|
||||||
|
for cmd in cmds:
|
||||||
|
inst.write(cmd)
|
||||||
|
time.sleep(0.02)
|
||||||
|
else:
|
||||||
|
ad = self._get_adapter()
|
||||||
|
ad.address = gpib_addr
|
||||||
|
for cmd in cmds:
|
||||||
|
ad.write(cmd)
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
self._initialized_addrs.add(gpib_addr)
|
||||||
|
|
||||||
def _get_adapter(self) -> Any:
|
def _get_adapter(self) -> Any:
|
||||||
if self._adapter is None:
|
if self._adapter is None:
|
||||||
from pymeasure.adapters import PrologixAdapter # type: ignore
|
from pymeasure.adapters import PrologixAdapter # type: ignore
|
||||||
@@ -89,23 +139,6 @@ class PrologixEndpointLoop:
|
|||||||
return inst
|
return inst
|
||||||
|
|
||||||
def _exec_sync(self, gpib_addr: int, query: str, driver_key: str | None) -> str:
|
def _exec_sync(self, gpib_addr: int, query: str, driver_key: str | None) -> str:
|
||||||
# init hw
|
|
||||||
if gpib_addr not in self._initialized_addrs:
|
|
||||||
dk_check = (driver_key or "").lower()
|
|
||||||
if "keithley" in dk_check:
|
|
||||||
try:
|
|
||||||
log.info(f"Initializing Keithley @ {gpib_addr} (DISP:ENAB OFF)")
|
|
||||||
if driver_key:
|
|
||||||
inst = self._get_instrument(gpib_addr, driver_key)
|
|
||||||
inst.write(":DISP:ENAB OFF")
|
|
||||||
else:
|
|
||||||
ad = self._get_adapter()
|
|
||||||
ad.address = gpib_addr
|
|
||||||
ad.write(":DISP:ENAB OFF")
|
|
||||||
except Exception as e:
|
|
||||||
log.warning(f"VFD saver failed for {gpib_addr}: {e}")
|
|
||||||
self._initialized_addrs.add(gpib_addr)
|
|
||||||
|
|
||||||
# Query
|
# Query
|
||||||
if driver_key:
|
if driver_key:
|
||||||
inst = self._get_instrument(gpib_addr, driver_key)
|
inst = self._get_instrument(gpib_addr, driver_key)
|
||||||
@@ -141,6 +174,11 @@ class PrologixEndpointLoop:
|
|||||||
gpib_addr = int(gpib_addr)
|
gpib_addr = int(gpib_addr)
|
||||||
driver_key = str(meta.get("driver", "")).strip() or None
|
driver_key = str(meta.get("driver", "")).strip() or None
|
||||||
|
|
||||||
|
if gpib_addr not in self._initialized_addrs:
|
||||||
|
await asyncio.to_thread(
|
||||||
|
self._init_device, gpib_addr, driver_key, meta
|
||||||
|
)
|
||||||
|
|
||||||
for b in dev_bindings:
|
for b in dev_bindings:
|
||||||
try:
|
try:
|
||||||
# offload to thread
|
# offload to thread
|
||||||
|
|||||||
Reference in New Issue
Block a user