This commit is contained in:
2025-12-16 12:50:46 +06:00
parent 8d6a7651dc
commit 833b7961b7
13 changed files with 404 additions and 325 deletions

163
schema2.sql Normal file
View File

@@ -0,0 +1,163 @@
begin;
-- roles
alter schema public owner to sensor_admin;
grant usage on schema public to sensor_writer;
grant all on all tables in schema public to sensor_admin;
grant all on all sequences in schema public to sensor_admin;
grant insert, update, select on all tables in schema public to sensor_writer;
grant usage, select on all sequences in schema public to sensor_writer;
alter default privileges for role sensor_admin in schema public
grant insert, update, select on tables to sensor_writer;
alter default privileges for role sensor_admin in schema public
grant usage, select on sequences to sensor_writer;
-- pg_partman
create schema if not exists partman;
create extension if not exists pg_partman schema partman;
do $$
begin
if not exists (select 1 from pg_roles where rolname = 'partman_user') then
create role partman_user with login;
end if;
end$$;
grant all on schema partman to partman_user;
grant all on all tables in schema partman to partman_user;
grant execute on all functions in schema partman to partman_user;
grant execute on all procedures in schema partman to partman_user;
grant all on schema public to partman_user;
grant all on all tables in schema public to partman_user;
grant temporary on database sensor_db to partman_user;
grant create on database sensor_db to partman_user;
grant usage, create on schema partman to sensor_admin;
grant execute on all functions in schema partman to sensor_admin;
grant all on all tables in schema partman to sensor_admin;
grant all on all sequences in schema partman to sensor_admin;
set role sensor_admin;
-- physical/transport endpoints
create table if not exists endpoints (
endpoint_id int primary key generated by default as identity,
endpoint_key text not null unique,
-- 'snmp' | 'mqtt' | 'visa'
protocol text not null,
conn jsonb not null default '{}'::jsonb,
is_enabled boolean not null default true,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_endpoints_enabled_protocol
on endpoints (is_enabled, protocol);
create table if not exists locations (
location_id int primary key generated by default as identity,
name text not null,
is_active bool not null default true,
constraint locations_name_unique unique (name)
);
create table if not exists devices (
device_id int primary key generated by default as identity,
-- "snmp:10.0.0.12:probe1"
-- "mqtt:esp32-1"
-- "visa:prologix:gpib5"
device_key text not null unique,
endpoint_id int references endpoints(endpoint_id),
location_id int references locations(location_id),
is_enabled boolean not null default true,
metadata jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_devices_endpoint on devices (endpoint_id);
create index if not exists idx_devices_location on devices (location_id);
create index if not exists idx_devices_enabled on devices (is_enabled);
-- per-device measurement defs
create table if not exists device_channels (
channel_id int primary key generated by default as identity,
device_id int not null references devices(device_id) on delete cascade,
-- corresponds to a column name in sensor_data (wide mapping)
metric text not null,
-- SNMP: {"type":"snmp_oid","oid":".1.3.6...","datatype":"float"}
-- MQTT: {"type":"mqtt_topic","topic":"sensors/x/telemetry","payload":"json","field":"temperature_c"}
-- VISA: {"type":"scpi","query":"MEAS:VOLT:DC?","read_termination":"\n"}
source jsonb not null,
scale_value double precision not null default 1.0,
offset_value double precision not null default 0.0,
poll_interval_s int,
is_enabled boolean not null default true,
created_at timestamptz not null default now(),
constraint device_channels_unique_metric unique (device_id, metric)
);
create index if not exists idx_device_channels_device on device_channels (device_id);
-- device health/status
create table if not exists device_status (
device_id int primary key references devices(device_id) on delete cascade,
last_seen timestamptz,
last_ok timestamptz,
last_error_at timestamptz,
last_error text,
updated_at timestamptz not null default now()
);
create table if not exists sensor_data (
ts timestamptz not null,
device_id int not null references devices(device_id),
location_id int references locations(location_id),
temp_c real,
humidity_rh real,
pressure_pa real,
light_lux real,
soil_moist real,
co2_ppm real,
voltage_v real,
current_a real,
resistance_ohm real,
freq_hz real,
power_w real,
constraint sensor_data_pk primary key (device_id, ts)
) partition by range (ts);
create index if not exists idx_sensor_data_device_ts on sensor_data (device_id, ts desc);
create index if not exists idx_sensor_data_location_ts on sensor_data (location_id, ts desc);
create index if not exists idx_sensor_data_ts_brin on sensor_data using brin(ts);
-- partition
select partman.create_parent(
p_parent_table => 'public.sensor_data',
p_control => 'ts',
p_interval => '1 year',
p_premake => 2
);
commit;

8
schema_reader.sql Normal file
View File

@@ -0,0 +1,8 @@
-- run as a privileged role (e.g., sensor_admin)
create role sensor_reader login password 'GLuBeBFPZuxjru2OYiwlI1eT5Fg';
grant usage on schema public to sensor_reader;
grant select on all tables in schema public to sensor_reader;
alter default privileges for role sensor_admin in schema public
grant select on tables to sensor_reader;

View File

@@ -1,7 +1,6 @@
# sensgw/db.py # sensgw/db.py
import asyncpg import asyncpg
import json import json
from typing import Optional
async def _init_connection(con: asyncpg.Connection) -> None: async def _init_connection(con: asyncpg.Connection) -> None:
@@ -22,7 +21,7 @@ async def _init_connection(con: asyncpg.Connection) -> None:
class Database: class Database:
def __init__(self, dsn: str): def __init__(self, dsn: str):
self._dsn = dsn self._dsn = dsn
self.pool: Optional[asyncpg.Pool] = None self.pool: asyncpg.Pool | None = None
async def start(self) -> None: async def start(self) -> None:
self.pool = await asyncpg.create_pool( self.pool = await asyncpg.create_pool(
@@ -36,4 +35,3 @@ class Database:
if self.pool: if self.pool:
await self.pool.close() await self.pool.close()
self.pool = None self.pool = None

View File

@@ -4,15 +4,19 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
from sensgw.models import Endpoint
from .config import load_config from .config import load_config
from .db import Database from .db import Database
from .registry import load_registry from .registry import load_registry
from .writer import Writer from .writer import Writer
from .protocols.mqtt import MqttCollector, MqttBinding from .protocols.mqtt import MqttCollector, MqttBinding
from .protocols.prologix import PrologixEndpointCollector, PrologixBinding from .protocols.prologix import (
PrologixBinding,
PrologixEndpointLoop,
)
from .protocols.snmp import SnmpEndpointCollector, SnmpBinding from .protocols.snmp import SnmpEndpointCollector, SnmpBinding
# from .protocols.visa import VisaCollector, VisaBinding
async def _run() -> None: async def _run() -> None:
@@ -23,14 +27,13 @@ async def _run() -> None:
db = Database(cfg.db_dsn) db = Database(cfg.db_dsn)
await db.start() await db.start()
writer = Writer(db) writer = Writer(db)
tasks: list[asyncio.Task[None]] = []
try: try:
reg = await load_registry(db) reg = await load_registry(db)
by_proto = reg.channels_by_protocol() by_proto = reg.channels_by_protocol()
tasks: list[asyncio.Task[None]] = [] # MQTT
# --- MQTT (assumes one broker; if multiple, split by endpoint_id) ---
mqtt_bindings: list[MqttBinding] = [] mqtt_bindings: list[MqttBinding] = []
for ep, dev, ch in by_proto.get("mqtt", []): for ep, dev, ch in by_proto.get("mqtt", []):
src = ch.source src = ch.source
@@ -51,8 +54,8 @@ async def _run() -> None:
mqttc = MqttCollector(writer) mqttc = MqttCollector(writer)
tasks.append(asyncio.create_task(mqttc.run(mqtt_bindings), name="mqtt")) tasks.append(asyncio.create_task(mqttc.run(mqtt_bindings), name="mqtt"))
# --- SNMP (one task per endpoint) --- # SNMP
snmp_bindings_by_ep: dict[int, tuple[object, list[SnmpBinding]]] = {} snmp_bindings_by_ep: dict[int, tuple[Endpoint, list[SnmpBinding]]] = {}
for ep, dev, ch in by_proto.get("snmp", []): for ep, dev, ch in by_proto.get("snmp", []):
src = ch.source src = ch.source
if src.get("type") != "snmp_oid": if src.get("type") != "snmp_oid":
@@ -67,7 +70,9 @@ async def _run() -> None:
) )
snmp_bindings_by_ep.setdefault(ep.endpoint_id, (ep, []))[1].append(b) snmp_bindings_by_ep.setdefault(ep.endpoint_id, (ep, []))[1].append(b)
snmpc = SnmpEndpointCollector(writer, default_poll_s=cfg.default_poll_interval_s) snmpc = SnmpEndpointCollector(
writer, default_poll_s=cfg.default_poll_interval_s
)
for _ep_id, (ep, bindings) in snmp_bindings_by_ep.items(): for _ep_id, (ep, bindings) in snmp_bindings_by_ep.items():
tasks.append( tasks.append(
asyncio.create_task( asyncio.create_task(
@@ -76,53 +81,51 @@ async def _run() -> None:
) )
) )
# --- Prologix (one task per channel/binding) --- # Prologix
prolc = PrologixEndpointCollector(writer, default_poll_s=cfg.default_poll_interval_s) # Group bindings by Endpoint ID
prologix_groups: dict[int, tuple[Endpoint, list[PrologixBinding]]] = {}
for ep, dev, ch in by_proto.get("prologix", []): for ep, dev, ch in by_proto.get("prologix", []):
src = ch.source src = ch.source
if src.get("type") != "scpi": if src.get("type") != "scpi":
continue continue
b = PrologixBinding( b = PrologixBinding(
endpoint=ep, endpoint=ep, device=dev, channel=ch, query=str(src["query"])
device=dev,
channel=ch,
query=str(src["query"]),
datatype=str(src.get("datatype", "float")),
)
tasks.append(
asyncio.create_task(
prolc.run_binding(b),
name=f"prologix:{dev.device_id}:{ch.metric}",
)
) )
# --- VISA --- if ep.endpoint_id not in prologix_groups:
# visac = VisaCollector(writer, default_poll_s=cfg.default_poll_interval_s) prologix_groups[ep.endpoint_id] = (ep, [])
# for ep, dev, ch in by_proto.get("visa", []): prologix_groups[ep.endpoint_id][1].append(b)
# src = ch.source
# if src.get("type") != "scpi": # one Task per Endpoint
# continue for _, (ep, bindings) in prologix_groups.items():
# b = VisaBinding( loop = PrologixEndpointLoop(
# endpoint=ep, writer=writer,
# device=dev, endpoint=ep,
# channel=ch, bindings=bindings,
# query=str(src["query"]), default_poll_s=cfg.default_poll_interval_s,
# datatype=str(src.get("datatype", "float")), )
# ) tasks.append(
# tasks.append( asyncio.create_task(loop.run(), name=f"prologix:{ep.endpoint_key}")
# asyncio.create_task( )
# visac.run_binding(b),
# name=f"visa:{dev.device_id}:{ch.metric}",
# )
# )
if not tasks: if not tasks:
log.warning("No enabled channels found. Exiting.") log.warning("No enabled channels found. Exiting.")
return return
log.info("Running with %d task(s)", len(tasks)) log.info("Running with %d task(s)", len(tasks))
await asyncio.gather(*tasks) _ = await asyncio.gather(*tasks)
except asyncio.CancelledError:
# clean exit on SIGINT
pass
finally: finally:
if tasks:
for t in tasks:
_ = t.cancel()
_ = await asyncio.gather(*tasks, return_exceptions=True)
await db.stop() await db.stop()

View File

@@ -1,13 +0,0 @@
ALLOWED_METRICS = {
"temp_c",
"humidity_rh",
"pressure_pa",
"light_lux",
"soil_moist",
"co2_ppm",
"voltage_v",
"current_a",
"resistance_ohm",
"freq_hz",
"power_w",
}

View File

@@ -1,6 +1,6 @@
# sensgw/models.py # sensgw/models.py
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict, Optional from typing import Any
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -8,7 +8,7 @@ class Endpoint:
endpoint_id: int endpoint_id: int
endpoint_key: str endpoint_key: str
protocol: str protocol: str
conn: Dict[str, Any] conn: dict[str, Any]
is_enabled: bool is_enabled: bool
@@ -16,10 +16,10 @@ class Endpoint:
class Device: class Device:
device_id: int device_id: int
device_key: str device_key: str
endpoint_id: Optional[int] endpoint_id: int | None
location_id: Optional[int] location_id: int | None
is_enabled: bool is_enabled: bool
metadata: Dict[str, Any] metadata: dict[str, Any]
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -27,9 +27,8 @@ class Channel:
channel_id: int channel_id: int
device_id: int device_id: int
metric: str metric: str
source: Dict[str, Any] source: dict[str, Any]
scale_value: float scale_value: float
offset_value: float offset_value: float
poll_interval_s: Optional[int] poll_interval_s: int | None
is_enabled: bool is_enabled: bool

View File

@@ -86,7 +86,7 @@ class MqttCollector:
) )
c.on_message = self._on_message c.on_message = self._on_message
# Optional auth # auth?
if "username" in ep.conn: if "username" in ep.conn:
c.username_pw_set(ep.conn["username"], ep.conn.get("password")) c.username_pw_set(ep.conn["username"], ep.conn.get("password"))

View File

@@ -1,7 +1,8 @@
# sensgw/protocols/polling.py # sensgw/protocols/polling.py
import asyncio import asyncio
import datetime as dt import datetime as dt
from typing import Awaitable, Callable, Optional from typing import Callable
from collections.abc import Awaitable
async def poll_forever( async def poll_forever(
@@ -9,7 +10,7 @@ async def poll_forever(
interval_s: int, interval_s: int,
read_once: Callable[[], Awaitable[None]], read_once: Callable[[], Awaitable[None]],
jitter_s: float = 0.0, jitter_s: float = 0.0,
stop_event: Optional[asyncio.Event] = None, stop_event: asyncio.Event | None = None,
) -> None: ) -> None:
if jitter_s: if jitter_s:
await asyncio.sleep(jitter_s) await asyncio.sleep(jitter_s)

View File

@@ -3,13 +3,15 @@ from __future__ import annotations
import asyncio import asyncio
import datetime as dt import datetime as dt
import threading import logging
from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from ..models import Endpoint, Device, Channel from ..models import Endpoint, Device, Channel
from ..writer import Writer from ..writer import Writer
from .polling import poll_forever
log = logging.getLogger(__name__)
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -18,169 +20,153 @@ class PrologixBinding:
device: Device device: Device
channel: Channel channel: Channel
query: str query: str
datatype: str # "float" | "int" | ...
def _parse_numeric(datatype: str, raw: str) -> float: def _driver_class(driver_key: str | None) -> type[Any] | None:
kind = (datatype or "float").strip().lower()
if kind == "int":
return float(int(raw))
# default: float
return float(raw)
def _driver_class(driver_key: str) -> type[Any] | None:
"""
Map driver keys stored in DB to PyMeasure instrument classes.
devices.metadata.driver examples:
- "keithley2000"
"""
key = (driver_key or "").strip().lower() key = (driver_key or "").strip().lower()
if not key: if not key:
return None return None
if key in {"keithley2000", "keithley_2000"}:
if key in {"keithley2000", "keithley_2000", "keithley:2000"}:
from pymeasure.instruments.keithley import Keithley2000 # type: ignore from pymeasure.instruments.keithley import Keithley2000 # type: ignore
return Keithley2000 return Keithley2000
# Add more mappings here as you add support.
return None return None
class PrologixEndpointClient: class PrologixEndpointLoop:
""" """
One shared Prologix adapter per endpoint, protected by a lock because it is stateful Manages one physical Prologix controller and sequentially polls its devices.
(address switching) and not safe to use concurrently. Merges connection state (Adapter) and logic (Collector) into one class.
If a device specifies devices.metadata.driver, we create a PyMeasure Instrument on top
of the same adapter and run queries through instrument.ask().
""" """
def __init__(self, endpoint: Endpoint): def __init__(
self,
writer: Writer,
endpoint: Endpoint,
bindings: list[PrologixBinding],
default_poll_s: int,
):
self.writer = writer
self.endpoint = endpoint self.endpoint = endpoint
self._lock = threading.Lock() self.bindings = bindings
self.default_poll_s = default_poll_s
# conn state
self._adapter: Any | None = None self._adapter: Any | None = None
self._instruments: dict[tuple[int, str], Any] = {} self._instruments: dict[tuple[int, str], Any] = {}
self._initialized_addrs: set[int] = set()
def _get_adapter(self) -> Any: def _get_adapter(self) -> Any:
if self._adapter is None: if self._adapter is None:
from pymeasure.adapters import PrologixAdapter # type: ignore from pymeasure.adapters import PrologixAdapter # type: ignore
try: res = self.endpoint.conn["resource"]
resource = self.endpoint.conn["resource"] to_ms = int(self.endpoint.conn.get("gpib_read_timeout_ms", 500))
except KeyError as e:
raise RuntimeError(
f"Missing endpoint.conn['resource'] for endpoint_id={self.endpoint.endpoint_id}"
) from e
read_timeout = int(self.endpoint.conn.get("gpib_read_timeout_ms", 200)) # auto=False gives us manual control over addressing
auto = bool(self.endpoint.conn.get("auto", False)) self._adapter = PrologixAdapter(res, gpib_read_timeout=to_ms, auto=False)
self._adapter = PrologixAdapter(
resource,
gpib_read_timeout=read_timeout,
auto=auto,
)
try: try:
self._adapter.flush_read_buffer() self._adapter.flush_read_buffer()
except Exception: except Exception:
pass pass
return self._adapter return self._adapter
def _get_instrument(self, *, gpib_addr: int, driver_key: str) -> Any: def _get_instrument(self, gpib_addr: int, driver_key: str) -> Any:
""" cache_key = (gpib_addr, driver_key)
Cached per (addr, driver_key). Uses the shared adapter. if cache_key in self._instruments:
""" return self._instruments[cache_key]
key = (gpib_addr, driver_key.strip().lower())
inst = self._instruments.get(key)
if inst is not None:
return inst
cls = _driver_class(driver_key) cls = _driver_class(driver_key)
if cls is None: if not cls:
raise KeyError(f"Unknown driver '{driver_key}'") raise ValueError(f"Unknown driver: {driver_key}")
ad = self._get_adapter() ad = self._get_adapter()
# Ensure the adapter is pointed at the correct instrument when the driver is constructed.
ad.address = gpib_addr ad.address = gpib_addr
inst = cls(ad) inst = cls(ad)
self._instruments[key] = inst
self._instruments[cache_key] = inst
return inst return inst
def query(self, *, gpib_addr: int, cmd: str, driver_key: str | None = None) -> str: def _exec_sync(self, gpib_addr: int, query: str, driver_key: str | None) -> str:
""" # init hw
Execute a query at a given GPIB address. if gpib_addr not in self._initialized_addrs:
If driver_key is provided and known, execute via driver (instrument.ask). dk_check = (driver_key or "").lower()
Otherwise, raw adapter write/read. if "keithley" in dk_check:
""" try:
with self._lock: log.info(f"Initializing Keithley @ {gpib_addr} (DISP:ENAB OFF)")
if driver_key:
inst = self._get_instrument(gpib_addr, driver_key)
inst.write(":DISP:ENAB OFF")
else:
ad = self._get_adapter() ad = self._get_adapter()
ad.address = gpib_addr ad.address = gpib_addr
ad.write(":DISP:ENAB OFF")
except Exception as e:
log.warning(f"VFD saver failed for {gpib_addr}: {e}")
self._initialized_addrs.add(gpib_addr)
# Query
if driver_key: if driver_key:
inst = self._get_instrument(gpib_addr=gpib_addr, driver_key=driver_key) inst = self._get_instrument(gpib_addr, driver_key)
# Keep the endpoint lock held across ask(); it may do multiple I/O ops. # re-assert address in case it chnaged
return str(inst.ask(cmd)).strip() self._get_adapter().address = gpib_addr
return str(inst.ask(query)).strip()
ad.write(cmd) else:
ad = self._get_adapter()
ad.address = gpib_addr
ad.write(query)
return str(ad.read()).strip() return str(ad.read()).strip()
async def run(self) -> None:
log.info(
f"Starting Prologix loop: {self.endpoint.endpoint_key} ({len(self.bindings)} bindings)"
)
class PrologixEndpointCollector: # group by device
def __init__(self, writer: Writer, default_poll_s: int): by_device = defaultdict(list)
self.writer = writer for b in self.bindings:
self.default_poll_s = default_poll_s by_device[b.device.device_id].append(b)
self._clients: dict[int, PrologixEndpointClient] = {}
def _client(self, endpoint: Endpoint) -> PrologixEndpointClient: while True:
client = self._clients.get(endpoint.endpoint_id) start_ts = asyncio.get_running_loop().time()
if client is None:
client = PrologixEndpointClient(endpoint)
self._clients[endpoint.endpoint_id] = client
return client
async def run_binding(self, b: PrologixBinding) -> None: for _dev_id, dev_bindings in by_device.items():
interval_s = int(b.channel.poll_interval_s or self.default_poll_s) # validation
client = self._client(b.endpoint) meta = dev_bindings[0].device.metadata
gpib_addr = meta.get("gpib_addr")
gpib_addr = b.device.metadata.get("gpib_addr")
if gpib_addr is None: if gpib_addr is None:
raise RuntimeError( continue
f"Missing device.metadata.gpib_addr for device_id={b.device.device_id}"
)
gpib_addr = int(gpib_addr) gpib_addr = int(gpib_addr)
driver_key = str(meta.get("driver", "")).strip() or None
driver_key = b.device.metadata.get("driver") for b in dev_bindings:
driver_key = str(driver_key).strip() if driver_key else None
async def read_once() -> None:
ts = dt.datetime.now(dt.timezone.utc)
try: try:
# offload to thread
raw = await asyncio.to_thread( raw = await asyncio.to_thread(
client.query, self._exec_sync, gpib_addr, b.query, driver_key
gpib_addr=gpib_addr,
cmd=b.query,
driver_key=driver_key,
) )
v = _parse_numeric(b.datatype, raw) val = float(raw)
v = v * b.channel.scale_value + b.channel.offset_value val = val * b.channel.scale_value + b.channel.offset_value
await self.writer.write_metric( await self.writer.write_metric(
ts=ts, ts=dt.datetime.now(dt.timezone.utc),
device_id=b.device.device_id, device_id=b.device.device_id,
location_id=b.device.location_id, location_id=b.device.location_id,
metric=b.channel.metric, metric=b.channel.metric,
value=v, value=val,
) )
except Exception as e: except Exception as e:
log.error(
f"Prologix error {b.device.device_key}/{b.channel.metric}: {e}"
)
await self.writer.write_error( await self.writer.write_error(
device_id=b.device.device_id, device_id=b.device.device_id, error=f"prologix: {e}"
error=f"prologix: {e}",
) )
await poll_forever(interval_s=interval_s, read_once=read_once) await asyncio.sleep(0)
elapsed = asyncio.get_running_loop().time() - start_ts
await asyncio.sleep(max(0.5, self.default_poll_s - elapsed))

View File

@@ -4,6 +4,16 @@ from __future__ import annotations
import datetime as dt import datetime as dt
from dataclasses import dataclass from dataclasses import dataclass
from pysnmp.hlapi.v3arch.asyncio import (
SnmpEngine,
CommunityData,
UdpTransportTarget,
ContextData,
ObjectType,
ObjectIdentity,
get_cmd,
)
from ..models import Endpoint, Device, Channel from ..models import Endpoint, Device, Channel
from ..writer import Writer from ..writer import Writer
from .polling import poll_forever from .polling import poll_forever
@@ -15,7 +25,7 @@ class SnmpBinding:
device: Device device: Device
channel: Channel channel: Channel
oid: str oid: str
datatype: str # "float" | "int" | ... datatype: str
def _parse_numeric(datatype: str, raw: str) -> float: def _parse_numeric(datatype: str, raw: str) -> float:
@@ -26,11 +36,6 @@ def _parse_numeric(datatype: str, raw: str) -> float:
def _parse_version(conn: dict) -> int: def _parse_version(conn: dict) -> int:
"""
Return mpModel:
SNMPv1 -> 0
SNMPv2c -> 1
"""
v = str(conn.get("version", "2c")).lower() v = str(conn.get("version", "2c")).lower()
if v in {"1", "v1", "snmpv1"}: if v in {"1", "v1", "snmpv1"}:
return 0 return 0
@@ -41,6 +46,10 @@ class SnmpEndpointCollector:
def __init__(self, writer: Writer, default_poll_s: int): def __init__(self, writer: Writer, default_poll_s: int):
self.writer = writer self.writer = writer
self.default_poll_s = default_poll_s self.default_poll_s = default_poll_s
self._engine = SnmpEngine()
async def close(self):
self._engine.close_dispatcher()
async def _get_many( async def _get_many(
self, self,
@@ -52,22 +61,12 @@ class SnmpEndpointCollector:
timeout_s: int, timeout_s: int,
oids: list[str], oids: list[str],
) -> dict[str, str]: ) -> dict[str, str]:
from pysnmp.hlapi.v3arch.asyncio import ( # type: ignore snmp_engine = self._engine
SnmpEngine,
CommunityData,
UdpTransportTarget,
ContextData,
ObjectType,
ObjectIdentity,
get_cmd,
)
snmp_engine = SnmpEngine()
try:
var_binds = [ObjectType(ObjectIdentity(oid)) for oid in oids] var_binds = [ObjectType(ObjectIdentity(oid)) for oid in oids]
target = await UdpTransportTarget.create(
# In pysnmp 7.x, target creation is async: (host, port), timeout=timeout_s, retries=0
target = await UdpTransportTarget.create((host, port), timeout=timeout_s, retries=0) )
iterator = get_cmd( iterator = get_cmd(
snmp_engine, snmp_engine,
@@ -77,27 +76,29 @@ class SnmpEndpointCollector:
*var_binds, *var_binds,
) )
# await the response
error_indication, error_status, error_index, out_binds = await iterator error_indication, error_status, error_index, out_binds = await iterator
if error_indication: if error_indication:
raise RuntimeError(str(error_indication)) raise RuntimeError(str(error_indication))
if error_status: if error_status:
raise RuntimeError( idx = int(error_index) - 1
f"{error_status.prettyPrint()} at " oid_caused = out_binds[idx][0] if 0 <= idx < len(out_binds) else "?"
f"{out_binds[int(error_index) - 1][0] if error_index else '?'}" raise RuntimeError(f"{str(error_status)} at {oid_caused}")
)
return {str(name): str(val) for name, val in out_binds} return {str(name): str(val) for name, val in out_binds}
finally:
snmp_engine.close_dispatcher()
async def run_endpoint(self, endpoint: Endpoint, bindings: list[SnmpBinding]) -> None: async def run_endpoint(
self, endpoint: Endpoint, bindings: list[SnmpBinding]
) -> None:
host = endpoint.conn["host"] host = endpoint.conn["host"]
port = int(endpoint.conn.get("port", 161)) port = int(endpoint.conn.get("port", 161))
community = endpoint.conn.get("community", "public") community = endpoint.conn.get("community", "public")
timeout_s = int(endpoint.conn.get("timeout_s", 2)) timeout_s = int(endpoint.conn.get("timeout_s", 2))
mp_model = _parse_version(endpoint.conn) mp_model = _parse_version(endpoint.conn)
# if one channel is 1s and another is 60s, we poll ALL at 1s.
intervals = [ intervals = [
int(b.channel.poll_interval_s) int(b.channel.poll_interval_s)
for b in bindings for b in bindings
@@ -105,6 +106,7 @@ class SnmpEndpointCollector:
] ]
interval_s = min(intervals) if intervals else self.default_poll_s interval_s = min(intervals) if intervals else self.default_poll_s
# map clean string OIDs to bindings
oid_to_binding: dict[str, SnmpBinding] = {b.oid.strip(): b for b in bindings} oid_to_binding: dict[str, SnmpBinding] = {b.oid.strip(): b for b in bindings}
oids = list(oid_to_binding.keys()) oids = list(oid_to_binding.keys())
@@ -124,6 +126,7 @@ class SnmpEndpointCollector:
b = oid_to_binding.get(oid_str) b = oid_to_binding.get(oid_str)
if b is None: if b is None:
continue continue
try: try:
v = _parse_numeric(b.datatype, raw) v = _parse_numeric(b.datatype, raw)
v = v * b.channel.scale_value + b.channel.offset_value v = v * b.channel.scale_value + b.channel.offset_value
@@ -138,15 +141,14 @@ class SnmpEndpointCollector:
except Exception as e: except Exception as e:
await self.writer.write_error( await self.writer.write_error(
device_id=b.device.device_id, device_id=b.device.device_id,
error=f"snmp parse/write: {e}", error=f"snmp parse: {e}",
) )
except Exception as e: except Exception as e:
# Endpoint-level failure: mark all devices as error # log endpoint-level failure
for b in bindings: for b in bindings:
await self.writer.write_error( await self.writer.write_error(
device_id=b.device.device_id, device_id=b.device.device_id,
error=f"snmp endpoint: {e}", error=f"snmp fetch: {e}",
) )
await poll_forever(interval_s=interval_s, read_once=read_once) await poll_forever(interval_s=interval_s, read_once=read_once)

View File

@@ -1,84 +0,0 @@
# sensgw/protocols/visa.py
from __future__ import annotations
import asyncio
import datetime as dt
from dataclasses import dataclass
from ..models import Endpoint, Device, Channel
from ..writer import Writer
from .polling import poll_forever
@dataclass(frozen=True)
class VisaBinding:
endpoint: Endpoint
device: Device
channel: Channel
query: str
datatype: str # "float" etc
def _visa_query_sync(*, resource: str, conn: dict, device_meta: dict, query: str) -> str:
import pyvisa # type: ignore
rm = pyvisa.ResourceManager()
inst = rm.open_resource(resource)
# Optional serial config
if "baud_rate" in conn and hasattr(inst, "baud_rate"):
inst.baud_rate = int(conn["baud_rate"])
if "read_termination" in conn:
inst.read_termination = str(conn["read_termination"])
if "write_termination" in conn:
inst.write_termination = str(conn["write_termination"])
# If you're using a Prologix-like controller over serial, you may need to set addr.
# This is device-specific; keeping it optional:
gpib_addr = device_meta.get("gpib_addr")
if gpib_addr is not None:
inst.write(f"++addr {int(gpib_addr)}")
return str(inst.query(query)).strip()
class VisaCollector:
def __init__(self, writer: Writer, default_poll_s: int):
self.writer = writer
self.default_poll_s = default_poll_s
async def run_binding(self, b: VisaBinding) -> None:
ep = b.endpoint
resource = ep.conn["resource"]
interval_s = int(b.channel.poll_interval_s or self.default_poll_s)
async def read_once() -> None:
try:
raw = await asyncio.to_thread(
_visa_query_sync,
resource=resource,
conn=ep.conn,
device_meta=b.device.metadata,
query=b.query,
)
if b.datatype == "float":
value = float(raw)
elif b.datatype == "int":
value = float(int(raw))
else:
value = float(raw)
value = value * b.channel.scale_value + b.channel.offset_value
ts = dt.datetime.now(dt.timezone.utc)
await self.writer.write_metric(
ts=ts,
device_id=b.device.device_id,
location_id=b.device.location_id,
metric=b.channel.metric,
value=value,
)
except Exception as e:
await self.writer.write_error(device_id=b.device.device_id, error=f"visa: {e}")
await poll_forever(interval_s=interval_s, read_once=read_once)

View File

@@ -1,6 +1,5 @@
# sensgw/registry.py # sensgw/registry.py
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, List, Tuple
from .models import Endpoint, Device, Channel from .models import Endpoint, Device, Channel
from .db import Database from .db import Database
@@ -8,12 +7,12 @@ from .db import Database
@dataclass(frozen=True) @dataclass(frozen=True)
class Registry: class Registry:
endpoints: Dict[int, Endpoint] endpoints: dict[int, Endpoint]
devices: Dict[int, Device] devices: dict[int, Device]
channels: List[Channel] channels: list[Channel]
def channels_by_protocol(self) -> Dict[str, List[Tuple[Endpoint, Device, Channel]]]: def channels_by_protocol(self) -> dict[str, list[tuple[Endpoint, Device, Channel]]]:
out: Dict[str, List[Tuple[Endpoint, Device, Channel]]] = {} out: dict[str, list[tuple[Endpoint, Device, Channel]]] = {}
for ch in self.channels: for ch in self.channels:
dev = self.devices.get(ch.device_id) dev = self.devices.get(ch.device_id)
if not dev or not dev.is_enabled or dev.endpoint_id is None: if not dev or not dev.is_enabled or dev.endpoint_id is None:
@@ -67,8 +66,12 @@ async def load_registry(db: Database) -> Registry:
int(r["device_id"]): Device( int(r["device_id"]): Device(
device_id=int(r["device_id"]), device_id=int(r["device_id"]),
device_key=str(r["device_key"]), device_key=str(r["device_key"]),
endpoint_id=(int(r["endpoint_id"]) if r["endpoint_id"] is not None else None), endpoint_id=(
location_id=(int(r["location_id"]) if r["location_id"] is not None else None), int(r["endpoint_id"]) if r["endpoint_id"] is not None else None
),
location_id=(
int(r["location_id"]) if r["location_id"] is not None else None
),
is_enabled=bool(r["is_enabled"]), is_enabled=bool(r["is_enabled"]),
metadata=(r["metadata"] or {}), metadata=(r["metadata"] or {}),
) )
@@ -83,11 +86,12 @@ async def load_registry(db: Database) -> Registry:
source=(r["source"] or {}), source=(r["source"] or {}),
scale_value=float(r["scale_value"]), scale_value=float(r["scale_value"]),
offset_value=float(r["offset_value"]), offset_value=float(r["offset_value"]),
poll_interval_s=(int(r["poll_interval_s"]) if r["poll_interval_s"] is not None else None), poll_interval_s=(
int(r["poll_interval_s"]) if r["poll_interval_s"] is not None else None
),
is_enabled=bool(r["is_enabled"]), is_enabled=bool(r["is_enabled"]),
) )
for r in ch_rows for r in ch_rows
] ]
return Registry(endpoints=endpoints, devices=devices, channels=channels) return Registry(endpoints=endpoints, devices=devices, channels=channels)

View File

@@ -2,10 +2,22 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt import datetime as dt
from typing import Optional
from .db import Database from .db import Database
from .metrics import ALLOWED_METRICS
ALLOWED_METRICS = {
"temp_c",
"humidity_rh",
"pressure_pa",
"light_lux",
"soil_moist",
"co2_ppm",
"voltage_v",
"current_a",
"resistance_ohm",
"freq_hz",
"power_w",
}
class Writer: class Writer:
@@ -17,7 +29,7 @@ class Writer:
*, *,
ts: dt.datetime, ts: dt.datetime,
device_id: int, device_id: int,
location_id: Optional[int], location_id: int | None,
metric: str, metric: str,
value: float, value: float,
) -> None: ) -> None: