diff --git a/schema2.sql b/schema2.sql new file mode 100644 index 0000000..829b5f4 --- /dev/null +++ b/schema2.sql @@ -0,0 +1,163 @@ +begin; + +-- roles +alter schema public owner to sensor_admin; +grant usage on schema public to sensor_writer; + +grant all on all tables in schema public to sensor_admin; +grant all on all sequences in schema public to sensor_admin; + +grant insert, update, select on all tables in schema public to sensor_writer; +grant usage, select on all sequences in schema public to sensor_writer; + +alter default privileges for role sensor_admin in schema public + grant insert, update, select on tables to sensor_writer; + +alter default privileges for role sensor_admin in schema public + grant usage, select on sequences to sensor_writer; + +-- pg_partman +create schema if not exists partman; +create extension if not exists pg_partman schema partman; + +do $$ +begin + if not exists (select 1 from pg_roles where rolname = 'partman_user') then + create role partman_user with login; + end if; +end$$; + +grant all on schema partman to partman_user; +grant all on all tables in schema partman to partman_user; +grant execute on all functions in schema partman to partman_user; +grant execute on all procedures in schema partman to partman_user; + +grant all on schema public to partman_user; +grant all on all tables in schema public to partman_user; +grant temporary on database sensor_db to partman_user; +grant create on database sensor_db to partman_user; + +grant usage, create on schema partman to sensor_admin; +grant execute on all functions in schema partman to sensor_admin; +grant all on all tables in schema partman to sensor_admin; +grant all on all sequences in schema partman to sensor_admin; + +set role sensor_admin; + +-- physical/transport endpoints +create table if not exists endpoints ( + endpoint_id int primary key generated by default as identity, + endpoint_key text not null unique, + + -- 'snmp' | 'mqtt' | 'visa' + protocol text not null, + conn jsonb not null default '{}'::jsonb, + + is_enabled boolean not null default true, + + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create index if not exists idx_endpoints_enabled_protocol +on endpoints (is_enabled, protocol); + +create table if not exists locations ( + location_id int primary key generated by default as identity, + name text not null, + is_active bool not null default true, + constraint locations_name_unique unique (name) +); + +create table if not exists devices ( + device_id int primary key generated by default as identity, + -- "snmp:10.0.0.12:probe1" + -- "mqtt:esp32-1" + -- "visa:prologix:gpib5" + device_key text not null unique, + + endpoint_id int references endpoints(endpoint_id), + location_id int references locations(location_id), + is_enabled boolean not null default true, + + metadata jsonb not null default '{}'::jsonb, + + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create index if not exists idx_devices_endpoint on devices (endpoint_id); +create index if not exists idx_devices_location on devices (location_id); +create index if not exists idx_devices_enabled on devices (is_enabled); + +-- per-device measurement defs +create table if not exists device_channels ( + channel_id int primary key generated by default as identity, + device_id int not null references devices(device_id) on delete cascade, + + -- corresponds to a column name in sensor_data (wide mapping) + metric text not null, + + -- SNMP: {"type":"snmp_oid","oid":".1.3.6...","datatype":"float"} + -- MQTT: {"type":"mqtt_topic","topic":"sensors/x/telemetry","payload":"json","field":"temperature_c"} + -- VISA: {"type":"scpi","query":"MEAS:VOLT:DC?","read_termination":"\n"} + source jsonb not null, + + scale_value double precision not null default 1.0, + offset_value double precision not null default 0.0, + + poll_interval_s int, + is_enabled boolean not null default true, + + created_at timestamptz not null default now(), + + constraint device_channels_unique_metric unique (device_id, metric) +); + +create index if not exists idx_device_channels_device on device_channels (device_id); + +-- device health/status +create table if not exists device_status ( + device_id int primary key references devices(device_id) on delete cascade, + last_seen timestamptz, + last_ok timestamptz, + last_error_at timestamptz, + last_error text, + updated_at timestamptz not null default now() +); + +create table if not exists sensor_data ( + ts timestamptz not null, + device_id int not null references devices(device_id), + location_id int references locations(location_id), + + temp_c real, + humidity_rh real, + pressure_pa real, + light_lux real, + soil_moist real, + co2_ppm real, + + voltage_v real, + current_a real, + resistance_ohm real, + freq_hz real, + power_w real, + + constraint sensor_data_pk primary key (device_id, ts) +) partition by range (ts); + +create index if not exists idx_sensor_data_device_ts on sensor_data (device_id, ts desc); +create index if not exists idx_sensor_data_location_ts on sensor_data (location_id, ts desc); +create index if not exists idx_sensor_data_ts_brin on sensor_data using brin(ts); + +-- partition +select partman.create_parent( + p_parent_table => 'public.sensor_data', + p_control => 'ts', + p_interval => '1 year', + p_premake => 2 +); + +commit; + diff --git a/schema_reader.sql b/schema_reader.sql new file mode 100644 index 0000000..9909e7b --- /dev/null +++ b/schema_reader.sql @@ -0,0 +1,8 @@ +-- run as a privileged role (e.g., sensor_admin) +create role sensor_reader login password 'GLuBeBFPZuxjru2OYiwlI1eT5Fg'; + +grant usage on schema public to sensor_reader; +grant select on all tables in schema public to sensor_reader; + +alter default privileges for role sensor_admin in schema public + grant select on tables to sensor_reader; diff --git a/sensgw/db.py b/sensgw/db.py index 35292d5..b98bc65 100644 --- a/sensgw/db.py +++ b/sensgw/db.py @@ -1,7 +1,6 @@ # sensgw/db.py import asyncpg import json -from typing import Optional async def _init_connection(con: asyncpg.Connection) -> None: @@ -22,7 +21,7 @@ async def _init_connection(con: asyncpg.Connection) -> None: class Database: def __init__(self, dsn: str): self._dsn = dsn - self.pool: Optional[asyncpg.Pool] = None + self.pool: asyncpg.Pool | None = None async def start(self) -> None: self.pool = await asyncpg.create_pool( @@ -36,4 +35,3 @@ class Database: if self.pool: await self.pool.close() self.pool = None - diff --git a/sensgw/main.py b/sensgw/main.py index aa5981d..5f8f9de 100644 --- a/sensgw/main.py +++ b/sensgw/main.py @@ -4,15 +4,19 @@ from __future__ import annotations import asyncio import logging +from sensgw.models import Endpoint + from .config import load_config from .db import Database from .registry import load_registry from .writer import Writer from .protocols.mqtt import MqttCollector, MqttBinding -from .protocols.prologix import PrologixEndpointCollector, PrologixBinding +from .protocols.prologix import ( + PrologixBinding, + PrologixEndpointLoop, +) from .protocols.snmp import SnmpEndpointCollector, SnmpBinding -# from .protocols.visa import VisaCollector, VisaBinding async def _run() -> None: @@ -23,14 +27,13 @@ async def _run() -> None: db = Database(cfg.db_dsn) await db.start() writer = Writer(db) + tasks: list[asyncio.Task[None]] = [] try: reg = await load_registry(db) by_proto = reg.channels_by_protocol() - tasks: list[asyncio.Task[None]] = [] - - # --- MQTT (assumes one broker; if multiple, split by endpoint_id) --- + # MQTT mqtt_bindings: list[MqttBinding] = [] for ep, dev, ch in by_proto.get("mqtt", []): src = ch.source @@ -51,8 +54,8 @@ async def _run() -> None: mqttc = MqttCollector(writer) tasks.append(asyncio.create_task(mqttc.run(mqtt_bindings), name="mqtt")) - # --- SNMP (one task per endpoint) --- - snmp_bindings_by_ep: dict[int, tuple[object, list[SnmpBinding]]] = {} + # SNMP + snmp_bindings_by_ep: dict[int, tuple[Endpoint, list[SnmpBinding]]] = {} for ep, dev, ch in by_proto.get("snmp", []): src = ch.source if src.get("type") != "snmp_oid": @@ -67,7 +70,9 @@ async def _run() -> None: ) snmp_bindings_by_ep.setdefault(ep.endpoint_id, (ep, []))[1].append(b) - snmpc = SnmpEndpointCollector(writer, default_poll_s=cfg.default_poll_interval_s) + snmpc = SnmpEndpointCollector( + writer, default_poll_s=cfg.default_poll_interval_s + ) for _ep_id, (ep, bindings) in snmp_bindings_by_ep.items(): tasks.append( asyncio.create_task( @@ -76,53 +81,51 @@ async def _run() -> None: ) ) - # --- Prologix (one task per channel/binding) --- - prolc = PrologixEndpointCollector(writer, default_poll_s=cfg.default_poll_interval_s) + # Prologix + # Group bindings by Endpoint ID + prologix_groups: dict[int, tuple[Endpoint, list[PrologixBinding]]] = {} + for ep, dev, ch in by_proto.get("prologix", []): src = ch.source if src.get("type") != "scpi": continue + b = PrologixBinding( - endpoint=ep, - device=dev, - channel=ch, - query=str(src["query"]), - datatype=str(src.get("datatype", "float")), - ) - tasks.append( - asyncio.create_task( - prolc.run_binding(b), - name=f"prologix:{dev.device_id}:{ch.metric}", - ) + endpoint=ep, device=dev, channel=ch, query=str(src["query"]) ) - # --- VISA --- - # visac = VisaCollector(writer, default_poll_s=cfg.default_poll_interval_s) - # for ep, dev, ch in by_proto.get("visa", []): - # src = ch.source - # if src.get("type") != "scpi": - # continue - # b = VisaBinding( - # endpoint=ep, - # device=dev, - # channel=ch, - # query=str(src["query"]), - # datatype=str(src.get("datatype", "float")), - # ) - # tasks.append( - # asyncio.create_task( - # visac.run_binding(b), - # name=f"visa:{dev.device_id}:{ch.metric}", - # ) - # ) + if ep.endpoint_id not in prologix_groups: + prologix_groups[ep.endpoint_id] = (ep, []) + prologix_groups[ep.endpoint_id][1].append(b) + + # one Task per Endpoint + for _, (ep, bindings) in prologix_groups.items(): + loop = PrologixEndpointLoop( + writer=writer, + endpoint=ep, + bindings=bindings, + default_poll_s=cfg.default_poll_interval_s, + ) + tasks.append( + asyncio.create_task(loop.run(), name=f"prologix:{ep.endpoint_key}") + ) if not tasks: log.warning("No enabled channels found. Exiting.") return log.info("Running with %d task(s)", len(tasks)) - await asyncio.gather(*tasks) + _ = await asyncio.gather(*tasks) + except asyncio.CancelledError: + # clean exit on SIGINT + pass + finally: + if tasks: + for t in tasks: + _ = t.cancel() + _ = await asyncio.gather(*tasks, return_exceptions=True) + await db.stop() diff --git a/sensgw/metrics.py b/sensgw/metrics.py deleted file mode 100644 index 3ac8a91..0000000 --- a/sensgw/metrics.py +++ /dev/null @@ -1,13 +0,0 @@ -ALLOWED_METRICS = { - "temp_c", - "humidity_rh", - "pressure_pa", - "light_lux", - "soil_moist", - "co2_ppm", - "voltage_v", - "current_a", - "resistance_ohm", - "freq_hz", - "power_w", -} diff --git a/sensgw/models.py b/sensgw/models.py index 2b4721c..be05615 100644 --- a/sensgw/models.py +++ b/sensgw/models.py @@ -1,6 +1,6 @@ # sensgw/models.py from dataclasses import dataclass -from typing import Any, Dict, Optional +from typing import Any @dataclass(frozen=True) @@ -8,7 +8,7 @@ class Endpoint: endpoint_id: int endpoint_key: str protocol: str - conn: Dict[str, Any] + conn: dict[str, Any] is_enabled: bool @@ -16,10 +16,10 @@ class Endpoint: class Device: device_id: int device_key: str - endpoint_id: Optional[int] - location_id: Optional[int] + endpoint_id: int | None + location_id: int | None is_enabled: bool - metadata: Dict[str, Any] + metadata: dict[str, Any] @dataclass(frozen=True) @@ -27,9 +27,8 @@ class Channel: channel_id: int device_id: int metric: str - source: Dict[str, Any] + source: dict[str, Any] scale_value: float offset_value: float - poll_interval_s: Optional[int] + poll_interval_s: int | None is_enabled: bool - diff --git a/sensgw/protocols/mqtt.py b/sensgw/protocols/mqtt.py index bd84e42..51933eb 100644 --- a/sensgw/protocols/mqtt.py +++ b/sensgw/protocols/mqtt.py @@ -86,7 +86,7 @@ class MqttCollector: ) c.on_message = self._on_message - # Optional auth + # auth? if "username" in ep.conn: c.username_pw_set(ep.conn["username"], ep.conn.get("password")) diff --git a/sensgw/protocols/polling.py b/sensgw/protocols/polling.py index c8c940c..811e9aa 100644 --- a/sensgw/protocols/polling.py +++ b/sensgw/protocols/polling.py @@ -1,7 +1,8 @@ # sensgw/protocols/polling.py import asyncio import datetime as dt -from typing import Awaitable, Callable, Optional +from typing import Callable +from collections.abc import Awaitable async def poll_forever( @@ -9,7 +10,7 @@ async def poll_forever( interval_s: int, read_once: Callable[[], Awaitable[None]], jitter_s: float = 0.0, - stop_event: Optional[asyncio.Event] = None, + stop_event: asyncio.Event | None = None, ) -> None: if jitter_s: await asyncio.sleep(jitter_s) diff --git a/sensgw/protocols/prologix.py b/sensgw/protocols/prologix.py index a2de174..7c57ba6 100644 --- a/sensgw/protocols/prologix.py +++ b/sensgw/protocols/prologix.py @@ -3,13 +3,15 @@ from __future__ import annotations import asyncio import datetime as dt -import threading +import logging +from collections import defaultdict from dataclasses import dataclass from typing import Any from ..models import Endpoint, Device, Channel from ..writer import Writer -from .polling import poll_forever + +log = logging.getLogger(__name__) @dataclass(frozen=True) @@ -18,169 +20,153 @@ class PrologixBinding: device: Device channel: Channel query: str - datatype: str # "float" | "int" | ... -def _parse_numeric(datatype: str, raw: str) -> float: - kind = (datatype or "float").strip().lower() - if kind == "int": - return float(int(raw)) - # default: float - return float(raw) - - -def _driver_class(driver_key: str) -> type[Any] | None: - """ - Map driver keys stored in DB to PyMeasure instrument classes. - - devices.metadata.driver examples: - - "keithley2000" - """ +def _driver_class(driver_key: str | None) -> type[Any] | None: key = (driver_key or "").strip().lower() if not key: return None - - if key in {"keithley2000", "keithley_2000", "keithley:2000"}: + if key in {"keithley2000", "keithley_2000"}: from pymeasure.instruments.keithley import Keithley2000 # type: ignore return Keithley2000 - - # Add more mappings here as you add support. return None -class PrologixEndpointClient: +class PrologixEndpointLoop: """ - One shared Prologix adapter per endpoint, protected by a lock because it is stateful - (address switching) and not safe to use concurrently. - - If a device specifies devices.metadata.driver, we create a PyMeasure Instrument on top - of the same adapter and run queries through instrument.ask(). + Manages one physical Prologix controller and sequentially polls its devices. + Merges connection state (Adapter) and logic (Collector) into one class. """ - def __init__(self, endpoint: Endpoint): + def __init__( + self, + writer: Writer, + endpoint: Endpoint, + bindings: list[PrologixBinding], + default_poll_s: int, + ): + self.writer = writer self.endpoint = endpoint - self._lock = threading.Lock() + self.bindings = bindings + self.default_poll_s = default_poll_s + + # conn state self._adapter: Any | None = None self._instruments: dict[tuple[int, str], Any] = {} + self._initialized_addrs: set[int] = set() def _get_adapter(self) -> Any: if self._adapter is None: from pymeasure.adapters import PrologixAdapter # type: ignore - try: - resource = self.endpoint.conn["resource"] - except KeyError as e: - raise RuntimeError( - f"Missing endpoint.conn['resource'] for endpoint_id={self.endpoint.endpoint_id}" - ) from e + res = self.endpoint.conn["resource"] + to_ms = int(self.endpoint.conn.get("gpib_read_timeout_ms", 500)) - read_timeout = int(self.endpoint.conn.get("gpib_read_timeout_ms", 200)) - auto = bool(self.endpoint.conn.get("auto", False)) + # auto=False gives us manual control over addressing + self._adapter = PrologixAdapter(res, gpib_read_timeout=to_ms, auto=False) - self._adapter = PrologixAdapter( - resource, - gpib_read_timeout=read_timeout, - auto=auto, - ) try: self._adapter.flush_read_buffer() except Exception: pass return self._adapter - def _get_instrument(self, *, gpib_addr: int, driver_key: str) -> Any: - """ - Cached per (addr, driver_key). Uses the shared adapter. - """ - key = (gpib_addr, driver_key.strip().lower()) - inst = self._instruments.get(key) - if inst is not None: - return inst + def _get_instrument(self, gpib_addr: int, driver_key: str) -> Any: + cache_key = (gpib_addr, driver_key) + if cache_key in self._instruments: + return self._instruments[cache_key] cls = _driver_class(driver_key) - if cls is None: - raise KeyError(f"Unknown driver '{driver_key}'") + if not cls: + raise ValueError(f"Unknown driver: {driver_key}") ad = self._get_adapter() - # Ensure the adapter is pointed at the correct instrument when the driver is constructed. ad.address = gpib_addr - inst = cls(ad) - self._instruments[key] = inst + + self._instruments[cache_key] = inst return inst - def query(self, *, gpib_addr: int, cmd: str, driver_key: str | None = None) -> str: - """ - Execute a query at a given GPIB address. - If driver_key is provided and known, execute via driver (instrument.ask). - Otherwise, raw adapter write/read. - """ - with self._lock: + def _exec_sync(self, gpib_addr: int, query: str, driver_key: str | None) -> str: + # init hw + if gpib_addr not in self._initialized_addrs: + dk_check = (driver_key or "").lower() + if "keithley" in dk_check: + try: + log.info(f"Initializing Keithley @ {gpib_addr} (DISP:ENAB OFF)") + if driver_key: + inst = self._get_instrument(gpib_addr, driver_key) + inst.write(":DISP:ENAB OFF") + else: + ad = self._get_adapter() + ad.address = gpib_addr + ad.write(":DISP:ENAB OFF") + except Exception as e: + log.warning(f"VFD saver failed for {gpib_addr}: {e}") + self._initialized_addrs.add(gpib_addr) + + # Query + if driver_key: + inst = self._get_instrument(gpib_addr, driver_key) + # re-assert address in case it chnaged + self._get_adapter().address = gpib_addr + return str(inst.ask(query)).strip() + else: ad = self._get_adapter() ad.address = gpib_addr - - if driver_key: - inst = self._get_instrument(gpib_addr=gpib_addr, driver_key=driver_key) - # Keep the endpoint lock held across ask(); it may do multiple I/O ops. - return str(inst.ask(cmd)).strip() - - ad.write(cmd) + ad.write(query) return str(ad.read()).strip() + async def run(self) -> None: + log.info( + f"Starting Prologix loop: {self.endpoint.endpoint_key} ({len(self.bindings)} bindings)" + ) -class PrologixEndpointCollector: - def __init__(self, writer: Writer, default_poll_s: int): - self.writer = writer - self.default_poll_s = default_poll_s - self._clients: dict[int, PrologixEndpointClient] = {} + # group by device + by_device = defaultdict(list) + for b in self.bindings: + by_device[b.device.device_id].append(b) - def _client(self, endpoint: Endpoint) -> PrologixEndpointClient: - client = self._clients.get(endpoint.endpoint_id) - if client is None: - client = PrologixEndpointClient(endpoint) - self._clients[endpoint.endpoint_id] = client - return client + while True: + start_ts = asyncio.get_running_loop().time() - async def run_binding(self, b: PrologixBinding) -> None: - interval_s = int(b.channel.poll_interval_s or self.default_poll_s) - client = self._client(b.endpoint) + for _dev_id, dev_bindings in by_device.items(): + # validation + meta = dev_bindings[0].device.metadata + gpib_addr = meta.get("gpib_addr") + if gpib_addr is None: + continue - gpib_addr = b.device.metadata.get("gpib_addr") - if gpib_addr is None: - raise RuntimeError( - f"Missing device.metadata.gpib_addr for device_id={b.device.device_id}" - ) - gpib_addr = int(gpib_addr) + gpib_addr = int(gpib_addr) + driver_key = str(meta.get("driver", "")).strip() or None - driver_key = b.device.metadata.get("driver") - driver_key = str(driver_key).strip() if driver_key else None + for b in dev_bindings: + try: + # offload to thread + raw = await asyncio.to_thread( + self._exec_sync, gpib_addr, b.query, driver_key + ) - async def read_once() -> None: - ts = dt.datetime.now(dt.timezone.utc) - try: - raw = await asyncio.to_thread( - client.query, - gpib_addr=gpib_addr, - cmd=b.query, - driver_key=driver_key, - ) + val = float(raw) + val = val * b.channel.scale_value + b.channel.offset_value - v = _parse_numeric(b.datatype, raw) - v = v * b.channel.scale_value + b.channel.offset_value + await self.writer.write_metric( + ts=dt.datetime.now(dt.timezone.utc), + device_id=b.device.device_id, + location_id=b.device.location_id, + metric=b.channel.metric, + value=val, + ) + except Exception as e: + log.error( + f"Prologix error {b.device.device_key}/{b.channel.metric}: {e}" + ) + await self.writer.write_error( + device_id=b.device.device_id, error=f"prologix: {e}" + ) - await self.writer.write_metric( - ts=ts, - device_id=b.device.device_id, - location_id=b.device.location_id, - metric=b.channel.metric, - value=v, - ) - except Exception as e: - await self.writer.write_error( - device_id=b.device.device_id, - error=f"prologix: {e}", - ) - - await poll_forever(interval_s=interval_s, read_once=read_once) + await asyncio.sleep(0) + elapsed = asyncio.get_running_loop().time() - start_ts + await asyncio.sleep(max(0.5, self.default_poll_s - elapsed)) diff --git a/sensgw/protocols/snmp.py b/sensgw/protocols/snmp.py index 5ef664d..6f3a718 100644 --- a/sensgw/protocols/snmp.py +++ b/sensgw/protocols/snmp.py @@ -4,6 +4,16 @@ from __future__ import annotations import datetime as dt from dataclasses import dataclass +from pysnmp.hlapi.v3arch.asyncio import ( + SnmpEngine, + CommunityData, + UdpTransportTarget, + ContextData, + ObjectType, + ObjectIdentity, + get_cmd, +) + from ..models import Endpoint, Device, Channel from ..writer import Writer from .polling import poll_forever @@ -15,7 +25,7 @@ class SnmpBinding: device: Device channel: Channel oid: str - datatype: str # "float" | "int" | ... + datatype: str def _parse_numeric(datatype: str, raw: str) -> float: @@ -26,11 +36,6 @@ def _parse_numeric(datatype: str, raw: str) -> float: def _parse_version(conn: dict) -> int: - """ - Return mpModel: - SNMPv1 -> 0 - SNMPv2c -> 1 - """ v = str(conn.get("version", "2c")).lower() if v in {"1", "v1", "snmpv1"}: return 0 @@ -41,6 +46,10 @@ class SnmpEndpointCollector: def __init__(self, writer: Writer, default_poll_s: int): self.writer = writer self.default_poll_s = default_poll_s + self._engine = SnmpEngine() + + async def close(self): + self._engine.close_dispatcher() async def _get_many( self, @@ -52,52 +61,44 @@ class SnmpEndpointCollector: timeout_s: int, oids: list[str], ) -> dict[str, str]: - from pysnmp.hlapi.v3arch.asyncio import ( # type: ignore - SnmpEngine, - CommunityData, - UdpTransportTarget, - ContextData, - ObjectType, - ObjectIdentity, - get_cmd, + snmp_engine = self._engine + + var_binds = [ObjectType(ObjectIdentity(oid)) for oid in oids] + target = await UdpTransportTarget.create( + (host, port), timeout=timeout_s, retries=0 ) - snmp_engine = SnmpEngine() - try: - var_binds = [ObjectType(ObjectIdentity(oid)) for oid in oids] + iterator = get_cmd( + snmp_engine, + CommunityData(community, mpModel=mp_model), + target, + ContextData(), + *var_binds, + ) - # In pysnmp 7.x, target creation is async: - target = await UdpTransportTarget.create((host, port), timeout=timeout_s, retries=0) + # await the response + error_indication, error_status, error_index, out_binds = await iterator - iterator = get_cmd( - snmp_engine, - CommunityData(community, mpModel=mp_model), - target, - ContextData(), - *var_binds, - ) + if error_indication: + raise RuntimeError(str(error_indication)) - error_indication, error_status, error_index, out_binds = await iterator + if error_status: + idx = int(error_index) - 1 + oid_caused = out_binds[idx][0] if 0 <= idx < len(out_binds) else "?" + raise RuntimeError(f"{str(error_status)} at {oid_caused}") - if error_indication: - raise RuntimeError(str(error_indication)) - if error_status: - raise RuntimeError( - f"{error_status.prettyPrint()} at " - f"{out_binds[int(error_index) - 1][0] if error_index else '?'}" - ) + return {str(name): str(val) for name, val in out_binds} - return {str(name): str(val) for name, val in out_binds} - finally: - snmp_engine.close_dispatcher() - - async def run_endpoint(self, endpoint: Endpoint, bindings: list[SnmpBinding]) -> None: + async def run_endpoint( + self, endpoint: Endpoint, bindings: list[SnmpBinding] + ) -> None: host = endpoint.conn["host"] port = int(endpoint.conn.get("port", 161)) community = endpoint.conn.get("community", "public") timeout_s = int(endpoint.conn.get("timeout_s", 2)) mp_model = _parse_version(endpoint.conn) + # if one channel is 1s and another is 60s, we poll ALL at 1s. intervals = [ int(b.channel.poll_interval_s) for b in bindings @@ -105,6 +106,7 @@ class SnmpEndpointCollector: ] interval_s = min(intervals) if intervals else self.default_poll_s + # map clean string OIDs to bindings oid_to_binding: dict[str, SnmpBinding] = {b.oid.strip(): b for b in bindings} oids = list(oid_to_binding.keys()) @@ -124,6 +126,7 @@ class SnmpEndpointCollector: b = oid_to_binding.get(oid_str) if b is None: continue + try: v = _parse_numeric(b.datatype, raw) v = v * b.channel.scale_value + b.channel.offset_value @@ -138,15 +141,14 @@ class SnmpEndpointCollector: except Exception as e: await self.writer.write_error( device_id=b.device.device_id, - error=f"snmp parse/write: {e}", + error=f"snmp parse: {e}", ) except Exception as e: - # Endpoint-level failure: mark all devices as error + # log endpoint-level failure for b in bindings: await self.writer.write_error( device_id=b.device.device_id, - error=f"snmp endpoint: {e}", + error=f"snmp fetch: {e}", ) await poll_forever(interval_s=interval_s, read_once=read_once) - diff --git a/sensgw/protocols/visa.py b/sensgw/protocols/visa.py deleted file mode 100644 index ee2bebb..0000000 --- a/sensgw/protocols/visa.py +++ /dev/null @@ -1,84 +0,0 @@ -# sensgw/protocols/visa.py -from __future__ import annotations - -import asyncio -import datetime as dt -from dataclasses import dataclass - -from ..models import Endpoint, Device, Channel -from ..writer import Writer -from .polling import poll_forever - - -@dataclass(frozen=True) -class VisaBinding: - endpoint: Endpoint - device: Device - channel: Channel - query: str - datatype: str # "float" etc - - -def _visa_query_sync(*, resource: str, conn: dict, device_meta: dict, query: str) -> str: - import pyvisa # type: ignore - - rm = pyvisa.ResourceManager() - inst = rm.open_resource(resource) - - # Optional serial config - if "baud_rate" in conn and hasattr(inst, "baud_rate"): - inst.baud_rate = int(conn["baud_rate"]) - - if "read_termination" in conn: - inst.read_termination = str(conn["read_termination"]) - if "write_termination" in conn: - inst.write_termination = str(conn["write_termination"]) - - # If you're using a Prologix-like controller over serial, you may need to set addr. - # This is device-specific; keeping it optional: - gpib_addr = device_meta.get("gpib_addr") - if gpib_addr is not None: - inst.write(f"++addr {int(gpib_addr)}") - - return str(inst.query(query)).strip() - - -class VisaCollector: - def __init__(self, writer: Writer, default_poll_s: int): - self.writer = writer - self.default_poll_s = default_poll_s - - async def run_binding(self, b: VisaBinding) -> None: - ep = b.endpoint - resource = ep.conn["resource"] - interval_s = int(b.channel.poll_interval_s or self.default_poll_s) - - async def read_once() -> None: - try: - raw = await asyncio.to_thread( - _visa_query_sync, - resource=resource, - conn=ep.conn, - device_meta=b.device.metadata, - query=b.query, - ) - if b.datatype == "float": - value = float(raw) - elif b.datatype == "int": - value = float(int(raw)) - else: - value = float(raw) - - value = value * b.channel.scale_value + b.channel.offset_value - ts = dt.datetime.now(dt.timezone.utc) - await self.writer.write_metric( - ts=ts, - device_id=b.device.device_id, - location_id=b.device.location_id, - metric=b.channel.metric, - value=value, - ) - except Exception as e: - await self.writer.write_error(device_id=b.device.device_id, error=f"visa: {e}") - - await poll_forever(interval_s=interval_s, read_once=read_once) diff --git a/sensgw/registry.py b/sensgw/registry.py index a94ffec..81c9d6a 100644 --- a/sensgw/registry.py +++ b/sensgw/registry.py @@ -1,6 +1,5 @@ # sensgw/registry.py from dataclasses import dataclass -from typing import Dict, List, Tuple from .models import Endpoint, Device, Channel from .db import Database @@ -8,12 +7,12 @@ from .db import Database @dataclass(frozen=True) class Registry: - endpoints: Dict[int, Endpoint] - devices: Dict[int, Device] - channels: List[Channel] + endpoints: dict[int, Endpoint] + devices: dict[int, Device] + channels: list[Channel] - def channels_by_protocol(self) -> Dict[str, List[Tuple[Endpoint, Device, Channel]]]: - out: Dict[str, List[Tuple[Endpoint, Device, Channel]]] = {} + def channels_by_protocol(self) -> dict[str, list[tuple[Endpoint, Device, Channel]]]: + out: dict[str, list[tuple[Endpoint, Device, Channel]]] = {} for ch in self.channels: dev = self.devices.get(ch.device_id) if not dev or not dev.is_enabled or dev.endpoint_id is None: @@ -67,8 +66,12 @@ async def load_registry(db: Database) -> Registry: int(r["device_id"]): Device( device_id=int(r["device_id"]), device_key=str(r["device_key"]), - endpoint_id=(int(r["endpoint_id"]) if r["endpoint_id"] is not None else None), - location_id=(int(r["location_id"]) if r["location_id"] is not None else None), + endpoint_id=( + int(r["endpoint_id"]) if r["endpoint_id"] is not None else None + ), + location_id=( + int(r["location_id"]) if r["location_id"] is not None else None + ), is_enabled=bool(r["is_enabled"]), metadata=(r["metadata"] or {}), ) @@ -83,11 +86,12 @@ async def load_registry(db: Database) -> Registry: source=(r["source"] or {}), scale_value=float(r["scale_value"]), offset_value=float(r["offset_value"]), - poll_interval_s=(int(r["poll_interval_s"]) if r["poll_interval_s"] is not None else None), + poll_interval_s=( + int(r["poll_interval_s"]) if r["poll_interval_s"] is not None else None + ), is_enabled=bool(r["is_enabled"]), ) for r in ch_rows ] return Registry(endpoints=endpoints, devices=devices, channels=channels) - diff --git a/sensgw/writer.py b/sensgw/writer.py index 869e128..880526e 100644 --- a/sensgw/writer.py +++ b/sensgw/writer.py @@ -2,10 +2,22 @@ from __future__ import annotations import datetime as dt -from typing import Optional from .db import Database -from .metrics import ALLOWED_METRICS + +ALLOWED_METRICS = { + "temp_c", + "humidity_rh", + "pressure_pa", + "light_lux", + "soil_moist", + "co2_ppm", + "voltage_v", + "current_a", + "resistance_ohm", + "freq_hz", + "power_w", +} class Writer: @@ -17,7 +29,7 @@ class Writer: *, ts: dt.datetime, device_id: int, - location_id: Optional[int], + location_id: int | None, metric: str, value: float, ) -> None: